Odyssey
ody_migr_members.py
1 #!/usr/bin/env python
2 """Main module to migrate members data and history."""
3 
4 
5 import os
6 import logging
7 import time
8 import math
9 import gzip
10 import csv
11 from psycopg2.extras import DictCursor
12 from copy import deepcopy
13 
14 # import only necessary variables for settings
15 from ody_migr_config import (INSERT,
16  INSERT_ONE,
17  SELECT,
18  SELECT_RETURN,
19  DELETE,
20  SUMMARY,
21  TABLE_EXISTS,
22  KEY_MEMBERS,
23  DATA_OPT_MEMBERS,
24  DATA_OPT_SWITCH_ACCT_MEMBERS,
25  DATA_OPT_MEMDATA,
26  DATA_OPT_SWITCH_ACCOUNTS,
27  MEMBERS_PAGE_SIZE,
28  USERID_CROSS_REF_CSV_FILE,
29  MAMMOTH_ORPHANED_MEMLIST_CSV_FILE,
30  MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE,
31  SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE,
32  ADMIN_CUSURVEY_ID_MAP_CSV_FILE,
33  SWITCH_ACC_BATCH_SIZE,
34  SCHEDULE_INTERVAL_MAPPING,
35  DATA_OPT_MEMHIST,
36  DATA_OPT_CREATE_HIST,
37  DATA_OPT_GET_HIST,
38  HIST_CREATE_WAIT_TIME,
39  MEMHIST_DOWNLOADED_FILE,
40  MEMHIST_ACCOUNT_HISTORY_FILENAME,
41  MEMHIST_LOAN_HISTORY_FILENAME,
42  MEMBER_ACC_RIGHTS,
43  DEF_MEMACCT_RIGHTS_OPTIONS,
44  ACTION_OPT_CLEAN,
45  ACTION_OPT_MIGRATE,
46  ACTION_OPT_SUMMARY,
47  CU2_SPEC18,
48  AUDITUSER_DESCMAP
49  )
50 
51 from ody_migr_transaction import PgTransaction
52 from ody_migr_transaction import pg_crsr_hndlr_decrtr
53 import ody_migr_db_handler as pg_handler
54 from ody_migr_mmth_endpoint import MammothMigration
55 from ody_migr_utils import (get_valid_json,
56  get_ith_batch,
57  get_strip_value_dict,
58  file_exc_decorator,
59  generate_mdx_hash)
60 from ody_migr_loans import cleanup_loanapps
61 
62 LOGGER = logging.getLogger(__name__)
63 
64 
65 @pg_crsr_hndlr_decrtr
66 def cuuser_main_handler(members_batch_transaction,
67  ody_uname,
68  mmth_uname_also_ody_mem_accnum,
69  verbose,
70  cuuser_acc_record,
71  cuquestselect_records,
72  forceremain_update_list,
73  _cu):
74  """Populate cuusers table from Mammoth and populate related tables in Ody
75 
76  This method is a part of memdata data category.
77 
78  Args:
79  members_batch_transaction: PgTransaction instance to prepare
80  insertion transaction for the current
81  batch of members
82  ody_uname: odyssey user_name
83  mmth_uname_also_ody_mem_accnum: mammoth user_name, and odysseys acc num
84  verbose: verbosity flag
85  cuuser_acc_record: cuusers response dict from Mammoth
86  1 insertion in <cu>user in odyssey also
87  includes 1 record insertion in
88  <cu>memberacct, <cu>user, <cu>group,
89  <cu>usercontact tables and 4 records
90  in <cu>memberaccrights table.
91  cuquestselect_records: records from this table are used to
92  prepare mfaquest column in <cu>user
93  forceremain_update_list: list to collect memebrs whose
94  forceremain is updated
95  _cu: current cu code
96 
97  Returns:
98  user_id: returning user_id from <cu>user
99  member_acct_balance_attempt: to be populated in other related tables
100  member_acct_balance_stamp: to be populated in other related tables
101 
102  Raises:
103  psycopg2.Error, psycopg2.Warning on db operation errors
104 
105  """
106  cu_lower = _cu.lower()
107  assert type(cuquestselect_records) is list
108  assert type(cuuser_acc_record) is dict
109 
110  cuuser_returning_col = "user_id"
111 
112  if len(cuuser_acc_record) == 0:
113  return
114 
115  mfaquest = get_valid_json({}) # default
116 
117  mfaquest_dict = {"answers": {}}
118  if len(cuquestselect_records) != 0:
119  for cuquest_record in cuquestselect_records:
120  qid = cuquest_record["quest_id"].strip()
121  ans = cuquest_record["answer"].strip()
122  mfaquest_dict["answers"][qid] = ans
123  mfaquest = get_valid_json(mfaquest_dict)
124 
125  # table names related to <cu>user
126  member_acct_tbl_name = "{}memberacct".format(cu_lower)
127  member_acct_rights_tbl_name = "{}memberacctrights".format(cu_lower)
128  cuuser_tbl_name = "{}user".format(cu_lower)
129  group_table_name = "{}group".format(cu_lower)
130  profile_tbl_name = "cu_profile"
131  contact_tbl_name = "{}usercontact".format(cu_lower)
132 
133  # <cu>group columns
134  cu_group_columns = ["group_name", "profile_id", "contact", "tax_id"]
135 
136  # new user_name in odyssey
137  cuuser_acc_record["user_name"] = ody_uname
138  # move pktattempt column to pkattempt
139  cuuser_acc_record["pkattempt"] = get_strip_value_dict(
140  "pktattempt", cuuser_acc_record, pop=True)
141 
142  # prepare <cu>mermberacct record
143  # values from mammoth's cuusers that go to the <cu>memberacct
144  member_acct_balance_attempt = get_strip_value_dict(
145  "pkattempt", cuuser_acc_record, default=0)
146  member_acct_balance_stamp = get_strip_value_dict(
147  "pktstamp", cuuser_acc_record, pop=True)
148  member_acct_billpayid = get_strip_value_dict(
149  "billpayid", cuuser_acc_record, pop=True)
150  member_acct_estmt_flag = get_strip_value_dict(
151  "estmt_flag", cuuser_acc_record, pop=True, default='N')
152  member_acct_rdcsetting = int(float(get_strip_value_dict(
153  "depositlimit", cuuser_acc_record, pop=True, default=0)))
154 
155  # get profile info
156  profile_where_conditions = {
157  "profile_code": "DEF",
158  "cu": _cu
159  }
160  profile_select_columns = ["profile_id"]
161  # get profile_id for this CU for "DEF" profile code
162  cu_profile_records = members_batch_transaction(
163  SELECT_RETURN,
164  profile_tbl_name,
165  where_conditions=profile_where_conditions,
166  select_columns=profile_select_columns
167  )
168 
169  assert len(cu_profile_records) == 1
170  profile_id = cu_profile_records[0][0]
171 
172  # <cu>group values to be inserted
173  cu_group_values = [
174  "g{}".format(ody_uname),
175  profile_id,
176  0,
177  None
178  ]
179 
180  # insert <cu>group and return group_id
181  members_batch_transaction(
182  INSERT_ONE,
183  group_table_name,
184  cu_group_columns,
185  cu_group_values,
186  returning_col="group_id"
187  )
188  # <cu>group.group_id [Populate <cu>group and cu_profile tables first
189  group_id = members_batch_transaction.cur.fetchone()[0]
190 
191  # <cu>usercontact table columns
192  contact_columns = [
193  "address1",
194  "address2",
195  "city",
196  "state",
197  "zip",
198  "phones"
199  ]
200 
201  # <cu>usercontact record
202  contact_values = ["", "", "", "", "", ""]
203 
204  # insert <cu>usercontact record and return contact_id
205  members_batch_transaction(
206  INSERT_ONE,
207  contact_tbl_name,
208  contact_columns,
209  contact_values,
210  returning_col="contact_id"
211  )
212 
213  # empty record with <cu>usercontact and use contact_id
214  contact = members_batch_transaction.cur.fetchone()[0]
215 
216  # finally prepare <cu>user record
217  cuuser_acc_record["mfaquest"] = mfaquest
218  cuuser_acc_record["group_id"] = group_id
219  # cuuser_rec["primary_account"] = primary_account
220  cuuser_acc_record["other_rights"] = None
221  cuuser_acc_record["ip"] = None
222  cuuser_acc_record["is_group_primary"] = True # always true
223  cuuser_acc_record["contact"] = contact
224 
225  cuuser_forceremain = get_strip_value_dict("forceremain", cuuser_acc_record)
226  cuuser_forcechange = get_strip_value_dict("forcechange", cuuser_acc_record)
227 
228  # update forceremain to 5 to unlock users with
229  # forcechange == 'N' and forceremain == 0
230  if cuuser_forcechange is not None and cuuser_forceremain is not None:
231  if cuuser_forcechange == 'N' and int(cuuser_forceremain) == 0:
232  cuuser_acc_record["forceremain"] = 5
233  forceremain_update_list.append(mmth_uname_also_ody_mem_accnum)
234 
235  del cuuser_acc_record["pktdate"]
236  del cuuser_acc_record["cu"]
237  del cuuser_acc_record["user_alias"]
238 
239  assert "user_alias" not in cuuser_acc_record
240  assert "cu" not in cuuser_acc_record
241  assert "pktdate" not in cuuser_acc_record
242  assert "pktstamp" not in cuuser_acc_record
243  assert "billpayid" not in cuuser_acc_record
244  assert "estmt_flag" not in cuuser_acc_record
245 
246  columns = list(cuuser_acc_record.keys())
247  values = list(cuuser_acc_record.values())
248  values = [val.strip() if type(val) == str else val for val in values]
249 
250  # populate <cu>user table and return user_id
251  members_batch_transaction(
252  INSERT_ONE,
253  cuuser_tbl_name,
254  columns,
255  values,
256  returning_col=cuuser_returning_col
257  )
258 
259  user_id = members_batch_transaction.cur.fetchone()[0]
260 
261  # prepare <cu>memberaccount table record
262  # <cu>memberaccount table columns
263  member_accnt_columns = [
264  "accountnumber",
265  "estmnt_flag",
266  "billpayid",
267  "rdcsetting",
268  # "restrictions",
269  "primary_user",
270  "balance_stamp",
271  "balance_attempt",
272  "allowenroll"
273  ]
274 
275  # <cu>memberaccount table record
276  member_accnt_record = [
277  mmth_uname_also_ody_mem_accnum,
278  member_acct_estmt_flag,
279  member_acct_billpayid,
280  member_acct_rdcsetting,
281  # "",
282  user_id,
283  member_acct_balance_stamp,
284  member_acct_balance_attempt,
285  False
286  ]
287 
288  # print currently inserted <cu>user record if verbose
289  if verbose:
290  members_batch_transaction(
291  SELECT,
292  cuuser_tbl_name,
293  where_conditions={
294  "user_id": user_id
295  }
296  )
297 
298  assert len(member_accnt_columns) == len(member_accnt_record)
299 
300  # populate <cu>memberaccount table
301  members_batch_transaction(
302  INSERT_ONE,
303  member_acct_tbl_name,
304  member_accnt_columns,
305  member_accnt_record
306  )
307 
308  # prepare <cu>memberaccountrights table
309  memberacctrights_cols = [
310  "user_id",
311  "accountnumber",
312  "whichright",
313  "allowed",
314  "platform"
315  ]
316 
317  # prepare <cu>memberacctrights records
318  for memberacctright in MEMBER_ACC_RIGHTS:
319  memberacct_rec = []
320  platform = get_valid_json(DEF_MEMACCT_RIGHTS_OPTIONS)
321  if memberacctright == "ACCESS":
322  platform = None
323 
324  memberacct_rec = [
325  user_id,
326  mmth_uname_also_ody_mem_accnum,
327  memberacctright,
328  True,
329  platform
330  ]
331  # populate <cu>memberaccountrights table
332  members_batch_transaction(
333  INSERT_ONE,
334  member_acct_rights_tbl_name,
335  memberacctrights_cols,
336  memberacct_rec
337  )
338 
339  # return user_id, balance_attempt and balance_stamp to be used in other
340  # related tables of the same member in a same transaction
341  return (user_id,
342  member_acct_balance_attempt,
343  member_acct_balance_stamp)
344 
345 
346 def cu_alert_table_transform(user_id, alrt_type_code, alrt_record):
347  """Prepares a single cu_alerts record for all 4 alert type codes
348 
349  Update (inplace) alrt_record keys to match with the current schema of
350  cu_alerts table in Odyssey.
351 
352  Args:
353  user_id: returning user_id from <cu>user table
354  alert_type_code: {'C': checking, 'L': loan, 'T': transaction,
355  'B': balance}
356  alrt_record: dictionary of a single record from one of the
357  {cualertcheck, cualertbal, cualertloan,
358  cualerttrans} tables from Mammoth
359 
360  """
361  alrt_record.pop('id', None)
362 
363  assert "cu" in alrt_record
364  assert "accountnumber" in alrt_record
365 
366  if alrt_type_code == 'L':
367  if "loannumber" in alrt_record:
368  alrt_record["accounttype"] = alrt_record.pop("loannumber")
369  if "alert_days_prior" in alrt_record:
370  alrt_record["notifyloandaysprior"] = alrt_record.pop(
371  "alert_days_prior")
372 
373  alrt_record["user_id"] = user_id
374  alrt_record["alerttype"] = alrt_type_code
375 
376  alrt_record["certnumber"] = alrt_record.get(
377  "certnumber", None) # 0
378 
379  alrt_record["incbal"] = alrt_record.get("incbal", None) # 0
380  alrt_record["incamt"] = alrt_record.get("incamt", None) # 0
381  alrt_record["inctransdesc"] = alrt_record.get(
382  "inctransdesc", None) # None
383 
384  YesNoMap = {
385  "Y": 1,
386  "N": 0
387  }
388 
389  for col in ["useavailbal", "notifyrange"]:
390  Y_or_N = get_strip_value_dict(
391  col, alrt_record, default=None)
392 
393  if Y_or_N in [None, '']:
394  alrt_record[col] = None
395  else:
396  assert Y_or_N in YesNoMap
397  alrt_record[col] = YesNoMap[Y_or_N]
398 
399 
400 @pg_crsr_hndlr_decrtr
401 def cualerts_handler(members_batch_transaction,
402  _verbose,
403  _cualert_check_records,
404  _cualert_bal_records,
405  _cualert_loan_records,
406  _cualert_trans_records,
407  user_id):
408  """Populate updated records in cu_alerts table in Odyssey
409 
410  Associated tables from Mammoth are cualertcheck, cualertbal, cualertloan
411  and cualerttrans
412 
413  Args:
414  members_batch_transaction: PgTransaction instance to prepare
415  insertion transaction for the current
416  batch of members
417  _verbose: verbosity flag
418  _cualert_check_records: list of dictionaries: cualertcheck records
419  _cualert_bal_records: list of dictionaries: cualertbal records
420  _cualert_loan_records: list of dictionaries: cualertloan records
421  _cualert_trans_records: list of dictionaries: cualerttrans records
422  user_id: returning user_id from <cu>user table
423 
424  Raises:
425  psycopg2.Error, psycopg2.Warning on db operation errors
426  """
427  alert_types = {
428  'C': _cualert_check_records,
429  'L': _cualert_loan_records,
430  'T': _cualert_trans_records,
431  'B': _cualert_bal_records
432  }
433  for alrt_type_code in alert_types.keys():
434  clbt_alert_records = alert_types[alrt_type_code]
435  for alrt_record in clbt_alert_records:
436  cu_alert_table_transform(user_id, alrt_type_code, alrt_record)
437 
438  # insert the whole C or L or B or T collection to the table
439  members_batch_transaction(
440  INSERT,
441  "cu_alerts",
442  collection=clbt_alert_records
443  )
444 
445 
447  user_id,
448  mmth_uname_also_ody_mem_accnum,
449  _mmth_accnum_to_ody_userid_ref_csv_writer,
450  mdx_record,
451  acc_or_loan,
452  mdx_cc_type18_flag=False):
453  """Prepare a MDX complying CSV record
454 
455  Applies to all associated subaccounts(loanbalance and accountbalance
456  records) of each member.
457  """
458  xref_record = []
459  # old_user_id
460  xref_record.append("{}:{}".format(_cu, mmth_uname_also_ody_mem_accnum))
461  # new_user_id
462  xref_record.append("{}-{}".format(_cu, user_id))
463  # new_member_id
464  xref_record.append("M-{}-{}".format(_cu, user_id))
465  # old_member_id
466  xref_record.append("M-{}:{}".format(_cu, mmth_uname_also_ody_mem_accnum))
467 
468  # 1. handle accountbalance useracconts i.e. acc_or_loan == 'A'
469  if (acc_or_loan == "A"):
470  deposittype = get_strip_value_dict("deposittype", mdx_record).upper()
471  certnum = get_strip_value_dict("certnumber", mdx_record)
472  accounttype = get_strip_value_dict("accounttype", mdx_record)
473 
474  # account_type
475  if deposittype == 'Y':
476  xref_record.append('CHECKING')
477  else:
478  xref_record.append('SAVINGS')
479 
480  # acctid
481  if certnum == 0:
482  xref_record.append(accounttype)
483  else:
484  xref_record.append("{}_{}".format(accounttype, certnum))
485 
486  # key
487  acc_key = "D|{}|{}|{}".format(mmth_uname_also_ody_mem_accnum,
488  accounttype,
489  certnum)
490  xref_record.append(acc_key)
491  # newid
492  xref_record.append(generate_mdx_hash(_cu, acc_key))
493  # add a csv record
494  _mmth_accnum_to_ody_userid_ref_csv_writer.writerow(xref_record)
495 
496  # 2. handle loanbalance useraccounts i.e. acc_or_loan == 'L'
497  else:
498  cbtype = mdx_record["cbtype"]
499  loannumber = get_strip_value_dict("loannumber", mdx_record)
500 
501  # if credit card type 18 handling flag is on, identify such record
502  # as CREDIT_CARD type
503  if mdx_cc_type18_flag:
504  if cbtype == '18':
505  cc_key = "C|{}|{}".format(mmth_uname_also_ody_mem_accnum,
506  loannumber)
507  xref_record.extend(
508  [
509  'CREDIT_CARD', # account_type
510  loannumber, # acctid
511  cc_key, # key
512  generate_mdx_hash(_cu, cc_key) # newid
513  ]
514  )
515  # add a csv record
516  _mmth_accnum_to_ody_userid_ref_csv_writer.writerow(xref_record)
517 
518  # include loans
519  else:
520  if cbtype != '18' or cbtype is None:
521  loan_key = "L|{}|{}".format(mmth_uname_also_ody_mem_accnum,
522  loannumber)
523  xref_record.extend(
524  [
525  'LOAN', # account_type
526  loannumber, # acctid
527  loan_key, # key
528  generate_mdx_hash(_cu, loan_key) # newid
529  ]
530  )
531  # add a csv record
532  _mmth_accnum_to_ody_userid_ref_csv_writer.writerow(xref_record)
533 
534 
536  """Update from/to format for overloaded accounts
537 
538  Args: from_or_to
539 
540  Examples: To:
541  "L|XXXXX|LN06:01@YYYYY" **overloaded
542  "L|XXXXX|LN06:01"
543  "L|XXXXX|LN06:01|0"
544 
545  Examples: From:
546  "D|XXXXX|SAV0:00@YYYYY" **overloaded
547  "D|XXXXX|SAV0:00"
548  "D|XXXXX|SAV0:00|0"
549 
550  Returns: updated value for from_or_to ("from" or "to" attribute
551  in cu_scheduledtxn.txn_data)
552  Examples
553  "L|XXXXX|LN06:01@YYYYY" ---> "L|YYYYY|LN06:01" **overloaded
554  "L|XXXXX|LN06:01" ---> "L|XXXXX|LN06:01"
555  "L|XXXXX|LN06:01|0" ---> "L|XXXXX|LN06:01|0"
556 
557  "D|XXXXX|SAV0:00@YYYYY" ---> "D|YYYYY|SAV0:00" **overloaded
558  "D|XXXXX|SAV0:00" ---> "D|XXXXX|SAV0:00"
559  "D|XXXXX|SAV0:00|0" ---> "D|XXXXX|SAV0:00|0"
560  """
561  elems = from_or_to.split("|")
562  assert len(elems) >= 3
563 
564  if "@" in elems[2]:
565  from_to_subaccount, overloaded_accountnumber = elems[2].split("@")
566  elems[1] = overloaded_accountnumber
567  elems[2] = from_to_subaccount
568 
569  return "|".join(elems)
570 
571 
572 @pg_crsr_hndlr_decrtr
573 def migrate_individual_memdata(members_batch_transaction,
574  member_data,
575  ody_uname,
576  mmth_uname_also_ody_mem_accnum,
577  _verbose,
578  _cu,
579  _mmth_accnum_to_ody_userid_ref_csv_writer,
580  _mmth_repeating_transfers_outliers,
581  forceremain_update_list,
582  cusurveymaster_surveyid_map,
583  money_desktop_credit_card_type18_flag):
584  """Migrate member data tables of individual member"""
585  cu_lower = _cu.lower()
586 
587  # handle cuusers, memberaccnt and related tables
588  (user_id,
589  memacct_balance_attempt,
590  member_acct_balance_stamp) = cuuser_main_handler(
591  members_batch_transaction,
592  ody_uname,
593  mmth_uname_also_ody_mem_accnum,
594  _verbose,
595  member_data["cuusers"],
596  member_data["cuquestselect"],
597  forceremain_update_list,
598  _cu
599  )
600 
601  # handle cualerts
603  members_batch_transaction,
604  _verbose,
605  member_data["cualertcheck"],
606  member_data["cualertbal"],
607  member_data["cualertloan"],
608  member_data["cualerttrans"],
609  user_id
610  )
611 
612  # handle tables with no schema change
613  mmth_tbls_no_schema_change = [
614  "holds",
615  "culogtrack"
616  ]
617  for mmth_key in mmth_tbls_no_schema_change:
618  mmth_data_nsc = member_data[mmth_key]
619 
620  if mmth_key == "holds":
621  ody_table_name = "{}{}".format(
622  cu_lower, mmth_key)
623  else:
624  ody_table_name = mmth_key
625 
626  members_batch_transaction(
627  INSERT,
628  ody_table_name,
629  collection=mmth_data_nsc
630  )
631 
632  # handle tables with odyssey's user_id as relation
633  # curepeattx also falls in this category if need be
634 
635  # handle cuadmeco separately: If we import two CU's from different produ-
636  # ction databases (eg. IDADIV from www6 and SNOCOPE from www3), there is
637  # a possibility that the message id's will be duplicated. We reassign
638  # messageid's and also update the parentid's for each record.
639  tbl_cuadmeco = "cuadmeco"
640  tbl_cuadmeco_mmth_data = member_data[tbl_cuadmeco]
641 
642  if type(tbl_cuadmeco_mmth_data) == dict:
643  for mmth_msg_id, cuadmeco_message_records in\
644  tbl_cuadmeco_mmth_data.items():
645  members_batch_transaction.cur.execute(
646  "select nextval('cuadmeco_messageid_seq')")
647  next_ody_message_id = members_batch_transaction.cur.fetchone()[0]
648  this_message_thread_collection = []
649  for ind, rec in enumerate(cuadmeco_message_records):
650  # first record is always a parent of the message thread
651  if ind == 0:
652  rec["messageid"] = next_ody_message_id
653  else:
654  # all other records are threads and we let psycopg2 pick
655  # the messageid itself
656  del rec["messageid"]
657  # for all records, we UPDATE the parentid
658  rec["parentid"] = next_ody_message_id
659 
660  rec["user_id"] = user_id
661  del rec["accountnumber"]
662  rec["admin"] = cu_lower
663  this_message_thread_collection.append(rec)
664  members_batch_transaction(
665  INSERT,
666  tbl_cuadmeco,
667  collection=this_message_thread_collection
668  )
669 
670  for cusurveysays_rec in member_data["cusurveysays"]:
671  assert "surveyid" in cusurveysays_rec, "cusurveysays: missing surveyid"
672  # invalid surveyid: not existing in cusurveymaster
673  if cusurveysays_rec["surveyid"].strip() \
674  not in cusurveymaster_surveyid_map:
675  LOGGER.warning("Invalid surveyid found: {}".format(
676  cusurveysays_rec["surveyid"].strip()))
677  # valid surveyid
678  else:
679  cusurveysays_rec["surveyid"] = cusurveymaster_surveyid_map[
680  cusurveysays_rec["surveyid"].strip()]
681  assert (cusurveysays_rec["accountnumber"].strip() ==
682  mmth_uname_also_ody_mem_accnum)
683  cusurveysays_rec["user_id"] = user_id
684  members_batch_transaction(
685  INSERT_ONE,
686  "cusurveysays",
687  list(cusurveysays_rec.keys()),
688  list(cusurveysays_rec.values())
689  )
690 
691  mmth_tbls_userid_relation = ["cucmsresponse", "userlogins"]
692 
693  for mmth_key in mmth_tbls_userid_relation:
694  mmth_data_with_userid = member_data[mmth_key]
695 
696  if mmth_key == "userlogins":
697  ody_table_name = "{}{}".format(
698  cu_lower, mmth_key)
699  else:
700  ody_table_name = mmth_key
701 
702  for mmth_rec in mmth_data_with_userid:
703  assert (mmth_rec["accountnumber"].strip() ==
704  mmth_uname_also_ody_mem_accnum)
705  mmth_rec["user_id"] = user_id
706 
707  if mmth_key == "userlogins":
708  del mmth_rec["accountnumber"]
709  del mmth_rec["userloginid"]
710  mmth_rec["user_name"] = ody_uname
711 
712  members_batch_transaction(
713  INSERT,
714  ody_table_name,
715  collection=mmth_data_with_userid
716  )
717 
718  # these columns and the following default values are being used to populate
719  # records in <cu>useraccounts table and are associated with the records in
720  # (a) cross account records coming from culivetx (b) <cu>accountbalance
721  # and (c) <cu>loanbalance tables.
722  # fields for <cu>useraccounts table
723  cu_useraccounts_cols = [
724  "user_id", "accountnumber",
725  "accounttype", "certnumber",
726  "recordtype", "display_name",
727  "view_balances",
728  "view_transactions",
729  "int_deposit",
730  "int_withdraw",
731  "ext_deposit",
732  "ext_withdraw",
733  "display_order"
734  ]
735  # (order of columns and values mentioned below must match)
736  # default values to be used for columns from the end of the list
737  # cu_useraccounts_cols i.e. ("view_balances", "view_transactions",
738  # "int_deposit", "int_withdraw", "ext_deposit", "ext_withdraw",
739  # "display_order") in order.
740  cuuseraccounts_rec_default_crossaccts = [
741  False, False, True, False, False, False, 0]
742 
743  culivetx_records_for_crossaccts = member_data["culivetx_x"]
744  # x signifies related to crossaccounts (and useraccounts) for records
745  # coming from culivetx
746  for mmth_culivetx_x in culivetx_records_for_crossaccts:
747  del mmth_culivetx_x["id"]
748  del mmth_culivetx_x["cu"]
749 
750  acc_type_x = get_strip_value_dict("accounttype", mmth_culivetx_x)
751  deposit_type_x = get_strip_value_dict("deposittype", mmth_culivetx_x)
752  accnum_x = get_strip_value_dict("accountnumber", mmth_culivetx_x)
753  tomember_x = get_strip_value_dict("tomember", mmth_culivetx_x)
754  description_x = get_strip_value_dict("description", mmth_culivetx_x)
755  # Now also prepare values to insert in <cu>useraccounts table as well
756  useraccounts_rec_from_culivetx = []
757  # following append order must align with the order of columns in
758  # cu_useraccounts_cols
759  # append user_id for useraccounts
760  useraccounts_rec_from_culivetx.append(user_id)
761  # append accountnumber for useraccounts
762  useraccounts_rec_from_culivetx.append(accnum_x)
763  # append accounttype for useraccounts
764  useraccounts_rec_from_culivetx.append("{}#{}".format(
765  acc_type_x, tomember_x))
766  # append certnumber for useraccounts
767  useraccounts_rec_from_culivetx.append(0)
768  # append recordtype for useraccounts
769  if deposit_type_x.upper() in ['S', 'Y', 'N', 'C']:
770  useraccounts_rec_from_culivetx.append("T")
771  else:
772  useraccounts_rec_from_culivetx.append("P")
773  # append display_name for useraccounts
774  useraccounts_rec_from_culivetx.append(description_x)
775  # append remaining column default values for useraccounts
776  useraccounts_rec_from_culivetx.extend(
777  cuuseraccounts_rec_default_crossaccts)
778  # insert a single useraccounts record based on culivetx crossaccount
779  # records
780  members_batch_transaction(
781  INSERT_ONE,
782  "{}useraccounts".format(cu_lower),
783  cu_useraccounts_cols,
784  useraccounts_rec_from_culivetx
785  )
786 
787  members_batch_transaction(
788  INSERT,
789  "{}crossaccounts".format(cu_lower),
790  collection=culivetx_records_for_crossaccts
791  )
792 
793  # add two columns to specify additional default values for records
794  # coming from <cu>accountbalance and <cu>loanbalance
795  cu_useraccounts_cols.append("display_qty")
796  cu_useraccounts_cols.append("display_qty_type")
797  # (order of columns and values mentioned below must match)
798  # default values to be used for columns from the end of the list
799  # cu_useraccounts_cols i.e. ("display_name", "view_balances",
800  # "view_transactions", "int_deposit", "int_withdraw", "ext_deposit",
801  # "ext_withdraw", "display_order", "display_qty", "display_qty_type")
802  # in order.
803  # Difference: Records (to be inserted as normal records to
804  # useraccounts) that come from loanbalance and accountbalance have
805  # extra info for display_name, display_qty and display_qty_type columns
806  # than the records that come from crossaccounts (records from culivetx
807  # in Mammoth with specific conditions).
808  cuuseraccounts_rec_default_ln_bal = [
809  None, True, True, True, True, True, True, 0, 30, "D"]
810 
811  # <cu>accountbalance table [similar schema in Mammoth and
812  # also in Odyssey for Live and Batch CUs]
813  account_balance_coll = member_data["accountbalance"]
814  for accbal_rec in account_balance_coll:
815  acc_type = get_strip_value_dict("accounttype", accbal_rec)
816  deposit_type = get_strip_value_dict("deposittype", accbal_rec)
817  accbal_accnum = get_strip_value_dict("accountnumber", accbal_rec)
818  accbal_certnum = get_strip_value_dict("certnumber", accbal_rec)
819 
820  assert acc_type is not None
821  assert accbal_accnum is not None
822  assert accbal_accnum == mmth_uname_also_ody_mem_accnum
823 
824  may_deposit = False
825  may_withdraw = False
826  if deposit_type.upper() in ['Y', 'N', 'S']:
827  may_deposit = True
828  may_withdraw = True
829  accbal_rec["interestrate"] = None
830  accbal_rec["maturitydate"] = None
831  accbal_rec["misc1"] = None
832  accbal_rec["regdcount"] = None
833  accbal_rec["may_deposit"] = may_deposit
834  accbal_rec["may_withdraw"] = may_withdraw
835  accbal_rec["balance_stamp"] = member_acct_balance_stamp
836  accbal_rec["history_stamp"] = member_acct_balance_stamp
837  accbal_rec["balance_attempt"] = memacct_balance_attempt
838  accbal_rec["history_attempt"] = memacct_balance_attempt
839  # Now also prepare values to insert in <cu>useraccounts table as well
840  useraccounts_rec_from_accbal = []
841  # following append order must align with the order of columns in
842  # cu_useraccounts_cols
843  # append user_id for <cu>useraccounts
844  useraccounts_rec_from_accbal.append(user_id)
845  # append accountnumber for <cu>useraccounts
846  useraccounts_rec_from_accbal.append(accbal_accnum)
847  # append accounttype for <cu>useraccounts
848  useraccounts_rec_from_accbal.append(acc_type)
849  # append certnumber for <cu>useraccounts
850  useraccounts_rec_from_accbal.append(accbal_certnum)
851  # append recordtype for <cu>useraccounts
852  useraccounts_rec_from_accbal.append("D")
853  useraccounts_rec_from_accbal.extend(cuuseraccounts_rec_default_ln_bal)
854 
855  # insert a single useraccounts record based on loanbalance
856  members_batch_transaction(
857  INSERT_ONE,
858  "{}useraccounts".format(cu_lower),
859  cu_useraccounts_cols,
860  useraccounts_rec_from_accbal
861  )
862 
863  # prepare a record to prepare MoneyDesktop cross reference document
864  # based on accountbalance records
866  user_id,
867  mmth_uname_also_ody_mem_accnum,
868  _mmth_accnum_to_ody_userid_ref_csv_writer,
869  accbal_rec,
870  "A")
871 
872  # insert a accountbalance collection
873  members_batch_transaction(
874  INSERT,
875  "{}accountbalance".format(cu_lower),
876  collection=account_balance_coll
877  )
878 
879  # <cu>loanbalance table [different schema in Mammoth
880  # for Live and Batch CUs]
881  loan_balance_coll = member_data["loanbalance"]
882  for lnbal_rec in loan_balance_coll:
883  lnbal_accnum = get_strip_value_dict("accountnumber",
884  lnbal_rec)
885  lnbal_lnnum = get_strip_value_dict("loannumber",
886  lnbal_rec)
887 
888  assert lnbal_accnum is not None
889  assert lnbal_accnum == mmth_uname_also_ody_mem_accnum
890 
891  credit_limit = get_strip_value_dict("creditlimit", lnbal_rec)
892  lnbal_rec["payoff"] = get_strip_value_dict(
893  "payoff", lnbal_rec, default=0)
894  lnbal_rec["currentdue"] = 0 # default, NOT NULL
895 
896  lnbal_rec["misc1"] = get_strip_value_dict(
897  "misc1", lnbal_rec, default=None)
898 
899  lnbal_rec["cbtype"] = get_strip_value_dict(
900  "type", lnbal_rec, pop=True)
901 
902  lnbal_rec["balance_stamp"] = member_acct_balance_stamp
903  lnbal_rec["history_stamp"] = member_acct_balance_stamp
904  lnbal_rec["balance_attempt"] = memacct_balance_attempt
905  lnbal_rec["history_attempt"] = memacct_balance_attempt
906 
907  if float(credit_limit) > 0:
908  lnbal_rec["may_addon"] = True
909  else:
910  lnbal_rec["may_addon"] = False
911 
912  lnbal_rec["may_payment"] = True
913 
914  # Columns present in MAMMOTH in Batch but not in Live CUs
915  # We take the appropriate value if the column exist,
916  # otherwise use the default values
917  lnbal_rec["unpaidinterest"] = get_strip_value_dict(
918  "unpaidinterest", lnbal_rec, default=None)
919 
920  lnbal_rec["frequencyperyear"] = get_strip_value_dict(
921  "frequencyperyear", lnbal_rec, default=None)
922 
923  lnbal_rec["originalamount"] = get_strip_value_dict(
924  "originalamount", lnbal_rec, default=None)
925 
926  lnbal_rec["originaldate"] = get_strip_value_dict(
927  "originaldate", lnbal_rec, default=None)
928 
929  lnbal_rec["term"] = get_strip_value_dict(
930  "term", lnbal_rec, default=None)
931 
932  lnbal_rec["creditdisability"] = get_strip_value_dict(
933  "cdi", lnbal_rec, pop=True, default=None)
934 
935  lnbal_rec["creditlife"] = get_strip_value_dict(
936  "cli", lnbal_rec, pop=True, default=None)
937 
938  # Now also prepare values to insert in <cu>useraccounts table as well
939  useraccounts_rec_from_lnbal = []
940  # following append order must align with the order of columns in
941  # cu_useraccounts_cols
942  # append user_id for <cu>useraccounts
943  useraccounts_rec_from_lnbal.append(user_id)
944  # append accountnumber for <cu>useraccounts
945  useraccounts_rec_from_lnbal.append(lnbal_accnum)
946  # append accounttype for <cu>useraccounts
947  useraccounts_rec_from_lnbal.append(lnbal_lnnum)
948  # append certtype for <cu>useraccounts
949  useraccounts_rec_from_lnbal.append(0)
950  # append recordtype for <cu>useraccounts
951  useraccounts_rec_from_lnbal.append("L")
952  useraccounts_rec_from_lnbal.extend(cuuseraccounts_rec_default_ln_bal)
953 
954  # insert a single useraccounts record based on loanbalance
955  members_batch_transaction(
956  INSERT_ONE,
957  "{}useraccounts".format(cu_lower),
958  cu_useraccounts_cols,
959  useraccounts_rec_from_lnbal
960  )
961 
962  # prepare a record to prepare MoneyDesktop cross reference document
963  # based on loanbalance records
965  user_id,
966  mmth_uname_also_ody_mem_accnum,
967  _mmth_accnum_to_ody_userid_ref_csv_writer,
968  lnbal_rec,
969  "L",
970  money_desktop_credit_card_type18_flag)
971 
972  # insert a loanbalance collection
973  members_batch_transaction(
974  INSERT,
975  "{}loanbalance".format(cu_lower),
976  collection=loan_balance_coll
977  )
978 
979  # transform curepeattx to cu_scheduledtxn
980  curepeattx_mammoth_records = member_data["curepeattx"]
981  for mmth_curepeattx_rec in curepeattx_mammoth_records:
982  mmth_curepeattx_rec_copy = deepcopy(mmth_curepeattx_rec)
983 
984  del mmth_curepeattx_rec["txid"]
985 
986  curepeattx_interval = get_strip_value_dict(
987  "interval",
988  mmth_curepeattx_rec,
989  pop=True
990  )
991 
992  mmth_curepeattx_rec["repeating_parameters"] = get_valid_json({
993  "interval": SCHEDULE_INTERVAL_MAPPING[curepeattx_interval]})
994 
995  mmth_curepeattx_rec["user_id"] = user_id
996 
997  mmth_curepeattx_rec["interval_count"] = get_strip_value_dict(
998  "intervalcount",
999  mmth_curepeattx_rec,
1000  pop=True
1001  )
1002 
1003  mmth_curepeattx_rec["next_trigger_date"] = get_strip_value_dict(
1004  "nexttriggerdate",
1005  mmth_curepeattx_rec,
1006  pop=True
1007  )
1008 
1009  mmth_curepeattx_rec["end_date"] = get_strip_value_dict(
1010  "stopdate",
1011  mmth_curepeattx_rec,
1012  pop=True
1013  )
1014 
1015  curepeattx_startdate = get_strip_value_dict(
1016  "startdate",
1017  mmth_curepeattx_rec,
1018  pop=True
1019  )
1020 
1021  mmth_curepeattx_rec["start_date"] = curepeattx_startdate
1022  mmth_curepeattx_rec["create_date"] = curepeattx_startdate
1023  mmth_curepeattx_rec["approved_date"] = curepeattx_startdate
1024  mmth_curepeattx_rec["last_edit_date"] = curepeattx_startdate
1025 
1026  mmth_curepeattx_rec["approved_by"] = user_id
1027  mmth_curepeattx_rec["last_edit_by"] = user_id
1028  mmth_curepeattx_rec["approved_status"] = 10
1029 
1030  mmth_curepeattx_rec["failure_count"] = 0
1031  mmth_curepeattx_rec["feature_code"] = "TRN"
1032 
1033  # prepare txn_data field
1034  curepeattx_transactioncode = get_strip_value_dict(
1035  "transactioncode",
1036  mmth_curepeattx_rec,
1037  pop=True
1038  )
1039 
1040  curepeattx_fromsuffix = get_strip_value_dict(
1041  "fromsuffix",
1042  mmth_curepeattx_rec,
1043  pop=True
1044  )
1045  curepeattx_tosuffix = get_strip_value_dict(
1046  "tosuffix",
1047  mmth_curepeattx_rec,
1048  pop=True
1049  )
1050  curepeattx_accountnumber = get_strip_value_dict(
1051  "accountnumber",
1052  mmth_curepeattx_rec,
1053  pop=True
1054  )
1055 
1056  curepeattx_tomember = get_strip_value_dict(
1057  "tomember",
1058  mmth_curepeattx_rec,
1059  pop=True
1060  )
1061 
1062  curepeattx_amount = get_strip_value_dict(
1063  "amount",
1064  mmth_curepeattx_rec,
1065  pop=True
1066  )
1067 
1068  from_type = ""
1069  to_type = ""
1070 
1071  if curepeattx_transactioncode == "AT":
1072  from_type = "D"
1073  to_type = "D"
1074  elif curepeattx_transactioncode in ["LP", "CP"]:
1075  from_type = "D"
1076  # cross-account: L
1077  if curepeattx_accountnumber != curepeattx_tomember:
1078  to_type = "L"
1079 
1080  else:
1081  # retrieve cbtype of the target account to identify whether the
1082  # target account is loan account or credit card
1083  for loanbalance_rec in loan_balance_coll:
1084  if curepeattx_tosuffix == loanbalance_rec["loannumber"]:
1085  if loanbalance_rec["cbtype"] == '18':
1086  to_type = "C"
1087  else:
1088  to_type = "L"
1089  break
1090 
1091  # target loan/credit card account does not exist; L
1092  # Note: when testing with SNOCOPE records; there were couple
1093  # instances of scheduled transfers (inactive) that include transfer
1094  # to non-existent credit card accounts. Log such instances
1095  # of scheduled transfers. However, these inactive records are
1096  # allowed to be updated in banking. As per discussion with Mark,
1097  # even if such instances are updated to be active from banking
1098  # side, these will be caught in a process of
1099  # execution of scheduled transfers; hence are to be handled
1100  # from there.
1101  if to_type == "":
1102  to_type = "L"
1103  _mmth_repeating_transfers_outliers.writerow(
1104  [("'{}' is not a valid loan/credit card account "
1105  "for member '{}'.").format(curepeattx_tosuffix,
1106  curepeattx_accountnumber),
1107  get_valid_json(mmth_curepeattx_rec_copy)])
1108 
1109  elif curepeattx_transactioncode in ["LA", "CA"]:
1110  from_type = "L"
1111  to_type = "D"
1112 
1113  if from_type == "" or to_type == "":
1114  error_msg = "cu_scheduledtxn from_type or to_type not populated"
1115  LOGGER.error(error_msg)
1116  # raise SystemExit(error_msg)
1117 
1118  # Loans with transaction code LA should not have the additional `|0|
1119  # appended to the "from".
1120  if curepeattx_transactioncode == "LA":
1121  from_txn = "{}|{}|{}".format(
1122  from_type,
1123  curepeattx_accountnumber,
1124  curepeattx_fromsuffix)
1125  else:
1126  from_txn = "{}|{}|{}|{}".format(
1127  from_type,
1128  curepeattx_accountnumber,
1129  curepeattx_fromsuffix,
1130  0)
1131 
1132  # Loans with transaction codes LP or CP should not have the additional
1133  # `|0| appended to the "to".
1134  if curepeattx_transactioncode in ["LP", "CP"]:
1135  to_txn = "{}|{}|{}".format(
1136  to_type,
1137  curepeattx_tomember,
1138  curepeattx_tosuffix)
1139  else:
1140  to_txn = "{}|{}|{}|{}".format(
1141  to_type,
1142  curepeattx_tomember,
1143  curepeattx_tosuffix,
1144  0)
1145 
1146  # After ISUCU migration, we noted problems for any scheduled transfer
1147  # that involved a cross-account. This should fix the problem where
1148  # accounts were reported as ‘no longer has rights’ for the future
1149  # migrating credit unions
1150  if curepeattx_accountnumber != curepeattx_tomember:
1151  if curepeattx_transactioncode == 'AT':
1152  curepeattx_transactioncode = 'XA'
1153  elif curepeattx_transactioncode == 'LP':
1154  curepeattx_transactioncode = 'XP'
1155 
1156  cuscheduled_txndata = {}
1157  cuscheduled_txndata["txn"] = {
1158  "from": handle_overloaded_accounts(from_txn),
1159  "frommember": curepeattx_accountnumber,
1160  "fromsuffix": curepeattx_fromsuffix,
1161  "fromtype": from_type,
1162  "to": handle_overloaded_accounts(to_txn),
1163  "tomember": curepeattx_tomember,
1164  "tosuffix": curepeattx_tosuffix,
1165  "totype": to_type,
1166  "amount": curepeattx_amount,
1167  "transactioncode": curepeattx_transactioncode,
1168  "deposittype": from_type,
1169  "memo": ""
1170  }
1171  mmth_curepeattx_rec["txn_data"] = get_valid_json(cuscheduled_txndata)
1172 
1173  # insert a transformed list of dictionary to cu_scheduledtxn
1174  members_batch_transaction(
1175  INSERT,
1176  "cu_scheduledtxn",
1177  collection=curepeattx_mammoth_records
1178  )
1179 
1180 
1181  # <cu>audituser table
1182  cuauditusers_coll = member_data["cuauditusers"]
1183  for cuaudituser_rec in cuauditusers_coll:
1184  mmth_cu = get_strip_value_dict(
1185  "cu", cuaudituser_rec, pop=True)
1186  assert mmth_cu == _cu
1187  del cuaudituser_rec["admuser"]
1188  cuaudituser_rec["user_id"] = user_id
1189  cuaudituser_rec["auditdate"] = get_strip_value_dict(
1190  "chdate", cuaudituser_rec, pop=True)
1191  cuaudituser_rec["auditaction"] = get_strip_value_dict(
1192  "action", cuaudituser_rec, pop=True)
1193  cuaudituser_rec["auditsrctype"] = "U"
1194  cuaudituser_rec["auditsrcuser_name"] = get_strip_value_dict(
1195  "user_name", cuaudituser_rec, pop=True)
1196  cuaudituser_rec["auditsrcemail"] = get_strip_value_dict(
1197  "email", cuaudituser_rec, pop=True, default="a@b.cde")
1198  cuaudituser_rec["auditsrcip"] = "127.0.0.1"
1199  cuaudituser_rec["accountnumber"] = mmth_uname_also_ody_mem_accnum
1200 
1201  # Give a description if it exists.
1202  # SHOULD be one of the following in AUDITUSER_DESCMAP.
1203  if cuaudituser_rec["auditaction"] in AUDITUSER_DESCMAP:
1204  val = cuaudituser_rec["auditaction"]
1205  cuaudituser_rec["auditfulldesc"] = AUDITUSER_DESCMAP[val]
1206 
1207  # insert a <cu>audituser collection
1208  members_batch_transaction(
1209  INSERT,
1210  "{}audituser".format(cu_lower),
1211  collection=cuauditusers_coll
1212  )
1213 
1214  # insert cuovermicr data
1215  cuovermicr_coll = member_data["cuovermicr"]
1216  members_batch_transaction(
1217  INSERT,
1218  "cuovermicr",
1219  collection=cuovermicr_coll
1220  )
1221 
1222 
1223 def get_ody_user_ids(member):
1224  """Get Odyssey user_name and member account number
1225 
1226  Args:
1227  member: dictionary (with content <cuusers>.user_name and
1228  <cuusers>.user_alias from Mammoth)
1229 
1230  Returns:
1231  ody_uname (odyssey user_name): Mammoth's <cuusers>.user_alias if
1232  any, otherwise <cuusers>.user_name
1233  mmth_uname_also_ody_mem_accnum (odyssey member acc num):
1234  Mammoth's <cuusers>.user_name
1235  """
1236  mmth_uname_also_ody_mem_accnum = member["user_name"]
1237  mmth_ualias = member["user_alias"]
1238  ody_uname = ""
1239 
1240  if hasattr(mmth_uname_also_ody_mem_accnum, 'strip'):
1241  mmth_uname_also_ody_mem_accnum =\
1242  mmth_uname_also_ody_mem_accnum.strip()
1243 
1244  if hasattr(mmth_ualias, 'strip'):
1245  mmth_ualias = mmth_ualias.strip()
1246 
1247  # get appropriate odyssey username based on user_name and user_alias
1248  if mmth_ualias is not None and mmth_ualias != "":
1249  assert (mmth_uname_also_ody_mem_accnum is
1250  not None and
1251  mmth_uname_also_ody_mem_accnum !=
1252  ""), ("Mammoth 'user_name' is empty!")
1253  ody_uname = mmth_ualias
1254 
1255  else:
1256  assert (mmth_uname_also_ody_mem_accnum is
1257  not None and
1258  mmth_uname_also_ody_mem_accnum !=
1259  ""), ("Both 'user_alias' and"
1260  "'user_name' from Mammoth empty")
1261  ody_uname = mmth_uname_also_ody_mem_accnum
1262  return ody_uname, mmth_uname_also_ody_mem_accnum
1263 
1264 
1265 @file_exc_decorator
1266 def copy_http_file_stream(_resp, _target_path, _cu):
1267  """Stream download the http response content to a local file
1268 
1269  Raises:
1270  SystemExit: if IOError, NameError or PermissionError is caught
1271  """
1272  LOGGER.info("[gethistfile]: Downloading (stream download, decode gzip and "
1273  "deflate transfer-encodings) zipped sql file - `{}`"
1274  .format(_target_path))
1275 
1276  # An important note about using Response.iter_content versus Response.raw.
1277  # Response.iter_content will automatically DECODE (not decompress) the gzip
1278  # and deflate transfer-encodings. Response.raw is a raw stream of bytes –
1279  # it does not transform the response content. If you really need access to
1280  # the bytes as they were returned, use Response.raw.
1281  # https://en.wikipedia.org/wiki/Chunked_transfer_encoding
1282 
1283  with open(_target_path, "wb") as f:
1284  for chunk in _resp.iter_content(chunk_size=8192 * 2):
1285  if chunk:
1286  f.write(chunk)
1287  LOGGER.info("[gethistfile]: Download complete.")
1288 
1289 
1290 @pg_crsr_hndlr_decrtr
1291 @file_exc_decorator
1292 def execute_sql_script_from_file(_target_gzip, _cu):
1293  """Loads the COPY FROM command including file and execute a bulk
1294  insertion command to populate <cu>accounthistory and <cu>loanhistory
1295  tables.
1296 
1297  Args:
1298  _target_gzip: gzip file that contains COPY FROM script and csv
1299  formatted content from Mammoth for <cu>accounthistory
1300  and <cu>loanhistory tables
1301 
1302  Raises:
1303  SystemExit: if IOError, NameError or PermissionError is caught
1304  psycopg2.Error, psycopg2.Warning on db operation errors
1305  """
1306  LOGGER.info("[gethistfile]: Loading local memhist files and "
1307  "preparing SQL execution!")
1308 
1309  accounthistory_sql = "COPY {}accounthistory".format(_cu.lower())
1310  loanhistory_sql = "COPY {}loanhistory".format(_cu.lower())
1311 
1312  accounthistory_complete_sql = ""
1313  loanhistory_complete_sql = ""
1314 
1315  zip_dir = os.path.dirname(_target_gzip)
1316  accounthistory_file = os.path.join(
1317  zip_dir, MEMHIST_ACCOUNT_HISTORY_FILENAME)
1318  loanhistory_file = os.path.join(zip_dir, MEMHIST_LOAN_HISTORY_FILENAME)
1319 
1320  DECODE_OPTION = "utf-8"
1321  write_loan = False
1322  write_account = False
1323 
1324  # separate accounthistory and loanhistory content in two separate files
1325  with open(accounthistory_file, "wb") as f_a, \
1326  open(loanhistory_file, "wb") as f_l:
1327  with gzip.open(_target_gzip, "rb") as f:
1328  for line in f.readlines():
1329  decoded_line = line.decode(DECODE_OPTION)
1330 
1331  if(accounthistory_sql in decoded_line):
1332  write_account = True
1333  write_loan = False
1334  accounthistory_complete_sql = decoded_line
1335  continue
1336 
1337  if(loanhistory_sql in decoded_line):
1338  write_account = False
1339  write_loan = True
1340  loanhistory_complete_sql = decoded_line
1341  continue
1342 
1343  # this order depends on which records are added first when
1344  # creating a gzip file on Mammoth side (as of 2018/09/21,
1345  # accounthistory records are written first)
1346  if write_account:
1347  f_a.write(line)
1348 
1349  if write_loan:
1350  f_l.write(line)
1351 
1352  # read the file-like objects from disk and run COPY to populate records
1353  # in the respective tables
1354  with pg_handler.PGSession() as conn:
1355  with conn.cursor() as cur:
1356  memhist_transaction = PgTransaction(conn, cur)
1357  cleanup_memhist(memhist_transaction, _cu, commit_later=True)
1358 
1359  with open(accounthistory_file, "rb") as f_a, \
1360  open(loanhistory_file, "rb") as f_l:
1361 
1362  memhist_transaction.cur.copy_expert(
1363  accounthistory_complete_sql, f_a)
1364 
1365  memhist_transaction.cur.copy_expert(
1366  loanhistory_complete_sql, f_l)
1367 
1368  # commit after all the tables are populated
1369  memhist_transaction.commit()
1370 
1371 
1372 @pg_crsr_hndlr_decrtr
1373 @file_exc_decorator
1374 def move_members_hist(_server, _verbose, _cu, _user, _passwd):
1375  """Move members history tables
1376 
1377  Steps:
1378  1. createhistfile: creates the gzip file including sql command on Mammoth
1379  2. gethistfile: downloads gzip file from Mammoth, saves as a local copy
1380  then execute the COPY command using the content from
1381  downloaded file.
1382 
1383  Migration involves reading the downloaded content, preparing the COPY FROM
1384  sql command for <cu>accounthistory and <cu>loanhistory tables and commiting
1385  the changes.
1386 
1387  Args:
1388  _server: Mammoth endpoint server
1389  _verbose: verbose flag
1390  _cu: current credit union code
1391  _user: Mammoth monitor username
1392  _passwd: Mammoth monitor password
1393 
1394  Raises:
1395  SystemExit: if IOError, NameError or PermissionError is caught
1396  psycopg2.Error, psycopg2.Warning on db operation errors
1397  """
1398  # always deleted after successful migration and before clean startup
1399  valid_history_gzip_import = MEMHIST_DOWNLOADED_FILE.format(_cu.lower())
1400  # check if the file has already been pulled and something went
1401  # wrong on Odyssey side earlier
1402  if os.path.exists(valid_history_gzip_import):
1403  LOGGER.warning("Local copy of SQL dump file exist! You might be "
1404  "loading outdated file! If you think the data might "
1405  " have been changed from last time, clean up and re-run"
1406  " to download the most recent data!")
1407  # local gzip file already exist, execute
1408  execute_sql_script_from_file(valid_history_gzip_import, _cu)
1409 
1410  else:
1411  # connect to Mammoth, create and get history file
1412  historyfile_create_request = MammothMigration(
1413  _cu,
1414  DATA_OPT_CREATE_HIST,
1415  _server,
1416  _user,
1417  _passwd,
1418  params={"restart": "Y"}
1419  )
1420  historyfile_create_request.run()
1421  hist_creation_resp = historyfile_create_request.response
1422  mmth_histfile_path = hist_creation_resp["file"]
1423  # histfile_create_status = hist_creation_resp["error"]
1424 
1425  if historyfile_create_request.histstatus != "":
1426  LOGGER.info("[createhistfile] status: {}".format(
1427  historyfile_create_request.histstatus))
1428  else:
1429  LOGGER.info(("[createhistfile] status: History file is being "
1430  "created on Mammoth location: `{}.gz`").format(
1431  mmth_histfile_path))
1432 
1433  mammoth_prep_done = False
1434  hist_get_resp = None
1435 
1436  while not mammoth_prep_done:
1437  LOGGER.info("Waiting for {} seconds for memhist file preparation "
1438  "on Mammoth side".format(HIST_CREATE_WAIT_TIME))
1439  # wait for somethime while the database records are being extracted
1440  # on Mammoth for accounthistory and accountloan tables
1441  time.sleep(HIST_CREATE_WAIT_TIME)
1442 
1443  # get the prepared gzipped file from Mammoth and stream download
1444  historyfile_get_request = MammothMigration(
1445  _cu,
1446  DATA_OPT_GET_HIST,
1447  _server,
1448  _user,
1449  _passwd,
1450  stream=True
1451  )
1452  historyfile_get_request.run()
1453 
1454  if historyfile_get_request.histstatus == "":
1455  LOGGER.info("[gethistfile] status: SQL dump file on Mammoth "
1456  "is ready for download.")
1457  hist_get_resp = historyfile_get_request.response
1458  mammoth_prep_done = True
1459  else:
1460  LOGGER.info("[createhistfile] status: {}".format(
1461  historyfile_get_request.histstatus))
1462 
1463  assert hist_get_resp is not None
1464  # first download/stream download data to local gzip file
1465  copy_http_file_stream(hist_get_resp, valid_history_gzip_import, _cu)
1466 
1467  # check if the content imported to the local gzip file, then execute
1468  if os.path.exists(valid_history_gzip_import):
1469  execute_sql_script_from_file(valid_history_gzip_import, _cu)
1470 
1471  # if file is not found, error
1472  else:
1473  error_msg = "Something went wrong in {} phase".format(
1474  DATA_OPT_GET_HIST)
1475  LOGGER.error(error_msg)
1476  raise SystemExit(error_msg)
1477 
1478 
1479 def get_valid_to_be_imported_members(cu_lower, _mmth_members_list):
1480  """Check invalid memberacct nums and exclude already imported members
1481 
1482  1. Identify non-integer member account numbers and exclude from the list
1483  2. Check locally on Odyssey what members have already been imported and
1484  exclude them from the import list
1485 
1486  Arguments:
1487  cu_lower -- lower case CU code
1488  _mmth_members_list -- all members' list for this CU
1489 
1490  Returns:
1491  updated_uname_accnums_list -- list of to be imported member accounts
1492  non_integer_mem_ids -- invalid non integer member accounts for logging
1493  """
1494  updated_uname_accnums_list = []
1495  LOGGER.info("Excluding already imported members from the import list...")
1496 
1497  # in test server, sometimes there are member account number
1498  # that are not integer, just catch them and throw warning
1499  # if there are any such members
1500  non_integer_mem_ids = []
1501 
1502  with pg_handler.PGSession() as conn:
1503  with conn.cursor() as cur:
1504  # temporary transaction
1505  members_batch_transaction = PgTransaction(conn, cur)
1506 
1507  # now iterate each member of this batch and migrate
1508  # contents obtained from Mammoth to Odyssey
1509  for mem in _mmth_members_list:
1510  # get odyssey username and memberacct info
1511  un_accnums = get_ody_user_ids(mem)
1512  ody_uname = un_accnums[0]
1513  memberacct_num = un_accnums[1]
1514 
1515  # just check if the member account number is integer
1516  try:
1517  int(memberacct_num)
1518  except ValueError:
1519  non_integer_mem_ids.append(un_accnums)
1520  continue
1521 
1522  cuuser_table = "{}user".format(cu_lower)
1523 
1524  ody_cuusers_count = members_batch_transaction(
1525  SELECT_RETURN,
1526  cuuser_table,
1527  return_only_count=True,
1528  where_conditions={
1529  "user_name": ody_uname
1530  }
1531  )
1532  assert ody_cuusers_count <= 1
1533 
1534  if ody_cuusers_count == 1:
1535  # member already migrated/exists in Odyssey, skip
1536  continue
1537  else:
1538  # add to import list
1539  updated_uname_accnums_list.append(un_accnums)
1540 
1541  return updated_uname_accnums_list, non_integer_mem_ids
1542 
1543 
1544 def skip_never_loggedin_cuuser(cuuser_record_mmth):
1545  """Filter mammoth cuusers record on lastlogin and pktdate"""
1546  # filter out users with no prior login and packet date
1547  lastlogin_cuuser = cuuser_record_mmth[
1548  "lastlogin"].strip() if hasattr(cuuser_record_mmth[
1549  "lastlogin"], "strip") else cuuser_record_mmth["lastlogin"]
1550  pktdate_cuuser = cuuser_record_mmth[
1551  "pktdate"].strip() if hasattr(cuuser_record_mmth[
1552  "pktdate"], "strip") else cuuser_record_mmth["pktdate"]
1553 
1554  if lastlogin_cuuser in [None, ''] and pktdate_cuuser in [None, '']:
1555  return True
1556  else:
1557  return False
1558 
1559 
1560 @file_exc_decorator
1561 @pg_crsr_hndlr_decrtr
1562 def move_members_data(_mmth_members_dict,
1563  _server,
1564  _verbose,
1565  _cu,
1566  _uname,
1567  _passwd):
1568  """Migrate memdata obtained from Mammoth to Odyssey.
1569 
1570  Migrate a batch of members at a time to reduce the Mammoth http endpoint
1571  roundtrip.
1572 
1573  Args:
1574  _mmth_members_dict: members list dictionary of the
1575  response json dictionary from mammoth
1576  _server: Mammoth endpoint server
1577  _verbose: verbose flag
1578  _cu: current credit union code
1579  _uname: Mammoth monitor username
1580  _passwd: Mammoth monitor password
1581 
1582  Raises:
1583  psycopg2.Error, psycopg2.Warning on db operation errors
1584  """
1585 
1586  cu_lower = _cu.lower()
1587  members_list = _mmth_members_dict["data"]["list"]
1588 
1589  # exclude already imported users in Odyssey from import list
1590  updated_uname_accnums_list, non_integer_mem_id_all =\
1592  cu_lower,
1593  members_list
1594  )
1595 
1596  # collect orphaned users
1597  orphaned_members_list = []
1598  # collect members with forceremain updated to unlock
1599  forceremain_update_list = []
1600  # collect members with no prior login info
1601  # (cuusers.lastlogin and cuusers.pktdate is null)
1602  never_loggedin_members_list = []
1603 
1604  # total members that will be migrated
1605  total_members = len(updated_uname_accnums_list)
1606 
1607  # total batches will be migrated
1608  total_batches = math.ceil(total_members / MEMBERS_PAGE_SIZE)
1609 
1610  LOGGER.info("Importing total of {} members' "
1611  "data over {} batch(es)...".format(
1612  total_members, total_batches))
1613 
1614  # flag to check if the member to user cross reference csv file exists
1615  reference_file_exist_before_open = os.path.exists(
1616  USERID_CROSS_REF_CSV_FILE.format(cu_lower))
1617 
1618  # flag to check if the csv file recording orphaned mammoth users exists
1619  mmth_orphaned_list_file_exist_before_open = os.path.exists(
1620  MAMMOTH_ORPHANED_MEMLIST_CSV_FILE.format(cu_lower))
1621 
1622  # flag to check if the csv file recording never logged in members
1623  mmth_never_logged_in_list_file_exist_before_open = os.path.exists(
1624  MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE.format(cu_lower))
1625 
1626  # flag to check if the csv file logging repeating transfers outliers
1627  mmth_schedxfer_outlier_exist_before_open = os.path.exists(
1628  SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE.format(cu_lower))
1629 
1630  # read the cusurveymaster.surveyid mapping to memory
1631  cusurveymaster_surveyid_map = {}
1632  if os.path.exists(ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(cu_lower)):
1633  with open(ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(cu_lower),
1634  newline='') as cusurveymap_csv:
1635  reader = csv.reader(cusurveymap_csv, delimiter=',', quotechar='|')
1636  next(reader) # skip header
1637  for [oldid, newid] in reader:
1638  cusurveymaster_surveyid_map[oldid.strip()] = newid.strip()
1639  else:
1640  raise SystemExit("cusurveymaster.surveyid mappings `{}` not found!".format(
1641  ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(cu_lower)))
1642 
1643  # open the user id cross reference csv file and orphaned member list csv
1644  # file in a context manager
1645  with open(USERID_CROSS_REF_CSV_FILE.format(cu_lower),
1646  'a',
1647  newline='') as csvfile,\
1648  open(MAMMOTH_ORPHANED_MEMLIST_CSV_FILE.format(cu_lower),
1649  'a',
1650  newline='') as csvfile_orphaned,\
1651  open(MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE.format(cu_lower),
1652  'a',
1653  newline='') as csvfile_never_loggedin,\
1654  open(SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE.format(cu_lower),
1655  'a',
1656  newline='') as csvfile_outlier_schedxfer:
1657 
1658  _mmth_accnum_to_ody_userid_ref_csv_writer = csv.writer(
1659  csvfile, delimiter=',',
1660  quoting=csv.QUOTE_MINIMAL
1661  )
1662  _mmth_orphaned_memlist_csv_writer = csv.writer(
1663  csvfile_orphaned,
1664  delimiter=',',
1665  quotechar='|',
1666  quoting=csv.QUOTE_MINIMAL
1667  )
1668  _mmth_never_loggedin_memlist_csv_writer = csv.writer(
1669  csvfile_never_loggedin,
1670  delimiter=',',
1671  quotechar='|',
1672  quoting=csv.QUOTE_MINIMAL
1673  )
1674  _mmth_repeating_transfers_outliers = csv.writer(
1675  csvfile_outlier_schedxfer,
1676  delimiter=',',
1677  quotechar='|',
1678  quoting=csv.QUOTE_MINIMAL
1679  )
1680 
1681  if not reference_file_exist_before_open:
1682  # then we need to write the header row
1683  _mmth_accnum_to_ody_userid_ref_csv_writer.writerow(
1684  ["old_user_id", "new_user_id", "new_member_id", "old_member_id",
1685  "account_type", "old_acctid", "key", "new_id"]
1686  )
1687 
1688  if not mmth_orphaned_list_file_exist_before_open:
1689  # write the header row
1690  _mmth_orphaned_memlist_csv_writer.writerow(
1691  ["orphaned_member_account", "user_alias", 'lastlogin', 'pktdate'])
1692 
1693  if not mmth_never_logged_in_list_file_exist_before_open:
1694  # write the header row
1695  _mmth_never_loggedin_memlist_csv_writer.writerow(
1696  ["never_loggedin_member_account",
1697  "user_alias",
1698  'lastlogin',
1699  'pktdate',
1700  'estmt_flag'])
1701  if not mmth_schedxfer_outlier_exist_before_open:
1702  # write the header row
1703  _mmth_repeating_transfers_outliers.writerow(
1704  ["description", "original_mammoth_curepeattx_record"]
1705  )
1706 
1707  # replicating Miki's www4:/home/httpd/hcubin/MxConvXRef
1708  # script to prepare a MoneyDesktop mammoth member to
1709  # Odyssey accounts mapping
1710  with pg_handler.PGSession() as conn:
1711  with conn.cursor() as cur:
1712  cu_admininfo_transaction = PgTransaction(conn, cur)
1713 
1714  select_cols_cuadmin = ["flagset2"]
1715  cuadmin_where = {"cu": _cu}
1716  cuadmin_rows = cu_admininfo_transaction(
1717  SELECT_RETURN,
1718  "cuadmin",
1719  where_conditions=cuadmin_where,
1720  select_columns=select_cols_cuadmin
1721  )
1722  if(len(cuadmin_rows) < 1):
1723  raise SystemExit("`cuadmin` record does not exist "
1724  "for this credit union: {}".format(cu_lower))
1725 
1726  flagset2 = cuadmin_rows[0][0] # returns integer
1727  money_desktop_credit_card_type18_flag = CU2_SPEC18 & flagset2
1728 
1729  # iteratively, migrate a batch of members at a time
1730  for batch_i in range(total_batches):
1731  # Start a transaction to migrate memdata for a batch of members
1732  # that is not migrated yet to Odyssey
1733  with pg_handler.PGSession() as conn:
1734  with conn.cursor() as cur:
1735 
1736  # instantiate batch transaction
1737  members_batch_transaction = PgTransaction(conn, cur)
1738  ody_uname_and_accnums_list = get_ith_batch(
1739  updated_uname_accnums_list,
1740  batch_i,
1741  MEMBERS_PAGE_SIZE
1742  )
1743 
1744  # list of valid member account number to be queried
1745  # with to Mammoth
1746  members_http_params = [un_accnum[1]
1747  for un_accnum in
1748  ody_uname_and_accnums_list]
1749 
1750  if len(members_http_params) > 0:
1751  LOGGER.info("`{}` valid members are being imported "
1752  "in batch `{}`.".format(
1753  len(members_http_params),
1754  batch_i + 1))
1755 
1756  # fetch data from Mammoth for this batch of
1757  # members `memdata`
1758  this_batch_migration = MammothMigration(
1759  _cu,
1760  DATA_OPT_MEMDATA,
1761  _server,
1762  _uname,
1763  _passwd,
1764  params={KEY_MEMBERS: get_valid_json(
1765  members_http_params)}
1766  )
1767  this_batch_migration.run()
1768  this_batch_migration_data =\
1769  this_batch_migration.response["data"]["account"]
1770 
1771  # now iterate each member of this batch and migrate
1772  # contents obtained from Mammoth to Odyssey
1773  for ody_uname, mmth_uname_also_ody_mem_accnum in\
1774  ody_uname_and_accnums_list:
1775 
1776  this_member_data = this_batch_migration_data[
1777  mmth_uname_also_ody_mem_accnum]
1778 
1779  # check for orphaned memberaccounts
1780  accountbalance_records_count = len(
1781  this_member_data["accountbalance"])
1782  loanbalance_records_count = len(
1783  this_member_data["loanbalance"])
1784  cuuser_acc_record = this_member_data["cuusers"]
1785  assert type(cuuser_acc_record) is dict
1786  mammoth_lastlogin = get_strip_value_dict(
1787  "lastlogin", cuuser_acc_record)
1788 
1789  mammoth_estmt_flag = get_strip_value_dict(
1790  "estmt_flag", cuuser_acc_record)
1791 
1792  mammoth_user_alias = get_strip_value_dict(
1793  "user_alias", cuuser_acc_record)
1794 
1795  # Filter orphan member account: do not migrate
1796  if accountbalance_records_count == 0\
1797  and loanbalance_records_count == 0\
1798  and mammoth_estmt_flag != 'Y':
1799  orphaned_members_list.append(
1800  mmth_uname_also_ody_mem_accnum)
1801 
1802  csv_record_orphan = [
1803  mmth_uname_also_ody_mem_accnum,
1804  mammoth_user_alias,
1805  mammoth_lastlogin,
1806  cuuser_acc_record["pktdate"]
1807  ]
1808 
1809  # record the member account in a csv file
1810  _mmth_orphaned_memlist_csv_writer.writerow(
1811  csv_record_orphan)
1812  # important
1813  continue
1814 
1815  # Filter non-orphan member who might have
1816  # never logged in to the Mammoth: do not migrate
1817  if skip_never_loggedin_cuuser(cuuser_acc_record):
1818  never_loggedin_members_list.append(
1819  mmth_uname_also_ody_mem_accnum)
1820  # record the member account in a csv file
1821  _mmth_never_loggedin_memlist_csv_writer.writerow(
1822  [mmth_uname_also_ody_mem_accnum,
1823  mammoth_user_alias,
1824  mammoth_lastlogin,
1825  cuuser_acc_record["pktdate"],
1826  mammoth_estmt_flag])
1827  continue
1828 
1829  # At this point, member is not orphan and has logged in
1830  # atleast once: migrate this member account
1831  else:
1832  # migrate individual member data
1834  members_batch_transaction,
1835  this_member_data,
1836  ody_uname,
1837  mmth_uname_also_ody_mem_accnum,
1838  _verbose,
1839  _cu,
1840  _mmth_accnum_to_ody_userid_ref_csv_writer,
1841  _mmth_repeating_transfers_outliers,
1842  forceremain_update_list,
1843  cusurveymaster_surveyid_map,
1844  money_desktop_credit_card_type18_flag
1845  )
1846 
1847  else:
1848  LOGGER.info(
1849  "No valid members to import in batch `{}`.".format(
1850  batch_i + 1))
1851 
1852  # commit after each batch of members
1853  members_batch_transaction.commit()
1854  LOGGER.info("[PROGRESS] Members imported:"" {}/{} "
1855  "| Batch: {}/{}".format(
1856  min(MEMBERS_PAGE_SIZE * (batch_i + 1),
1857  total_members),
1858  total_members,
1859  batch_i + 1,
1860  total_batches)
1861  )
1862 
1863  # log invalid members with non-integer member accounts (usually seen in
1864  # dev mode with cruisecu, scrubcu, etc.)
1865  if len(non_integer_mem_id_all) > 0:
1866  nimi_msg = "`{}` non-integer members encountered: [{}]".format(
1867  len(non_integer_mem_id_all),
1868  ", ".join([i[1] for i in non_integer_mem_id_all]))
1869  LOGGER.warning(nimi_msg)
1870 
1871  # log orphaned members list
1872  if len(orphaned_members_list) > 0:
1873  LOGGER.warning("`{}` orphaned members were "
1874  "found. Following members were "
1875  "not imported: [{}]".format(
1876  len(orphaned_members_list),
1877  ", ".join(orphaned_members_list)))
1878 
1879  # log members with forceremain updated
1880  if len(forceremain_update_list) > 0:
1881  LOGGER.warning("`forceremain` was updated for `{}` "
1882  "members: [{}]".format(
1883  len(forceremain_update_list),
1884  ", ".join(forceremain_update_list)))
1885 
1886  # log never loggedin members
1887  if len(never_loggedin_members_list) > 0:
1888  LOGGER.warning("`{}` members that never logged in to Mammoth were "
1889  "found. Following members were not "
1890  "imported : [{}]".format(
1891  len(never_loggedin_members_list),
1892  ", ".join(never_loggedin_members_list)))
1893 
1894 
1895 def populate_switch_accounts(switch_acc_transaction,
1896  cu_lower,
1897  switch_acc_from,
1898  switch_acc_data,
1899  non_existent_switch_accounts):
1900  """Populate switch accounts in useraccounts and memberacctrights tables
1901 
1902  Add additional records in useraccounts and memberacctrights tables
1903  for switch accounts.
1904 
1905  Arguments:
1906  switch_acc_transaction: instance of PgTransaction
1907  cu_lower: lowercase CU code
1908  switch_acc_from: member account number for which we want
1909  to add switch account
1910  switch_acc_data: list of culivetx data for member accounts
1911  which we want to add as switch accounts
1912  non_existent_switch_accounts: list of invalid tomember switch accounts
1913  """
1914 
1915  switch_acc_from = switch_acc_from.strip()
1916  from_accnum_where_conditions = {
1917  "accountnumber": switch_acc_from,
1918  }
1919  select_columns = ["primary_user"]
1920 
1921  # 1. get user_id_from for switch_acc_from
1922  user_from = switch_acc_transaction(
1923  SELECT_RETURN,
1924  "{}memberacct".format(cu_lower),
1925  where_conditions=from_accnum_where_conditions,
1926  select_columns=select_columns
1927  )
1928 
1929  # do not proceed if the from accountnumber is invalid in Odyssey
1930  # If record does not exist, member account is either locked or the
1931  # member has never been used in online banking in Mammoth
1932  if len(user_from) == 0:
1933  invalid_switch_account_from = {
1934  "Invalid Switch Account From": switch_acc_from,
1935  "Switch Account To": [acc_to["tomember"].strip()
1936  for acc_to in switch_acc_data]
1937  }
1938  LOGGER.warning("Cannot switch from member account `{}`. It is either"
1939  " locked"
1940  " or the member has never been used in"
1941  " online banking in Mammoth."
1942  .format(switch_acc_from) + \
1943  invalid_switch_account_from.__str__())
1944  non_existent_switch_accounts.append(invalid_switch_account_from)
1945  # important; do not proceed after this point
1946  return
1947 
1948  # from accountnumber is valid in Odyssey
1949  user_from = user_from[0][0]
1950 
1951  for switch_acc_to in switch_acc_data:
1952  to_member = switch_acc_to["tomember"].strip()
1953  to_accnum_where_conditions = {
1954  "accountnumber": to_member,
1955  }
1956 
1957  user_to = switch_acc_transaction(
1958  SELECT_RETURN,
1959  "{}memberacct".format(cu_lower),
1960  where_conditions=to_accnum_where_conditions,
1961  select_columns=select_columns
1962  )
1963 
1964  # 2. Verify that record exists for the `tomember` in memberacct table
1965  # If record does not exist, member account is either locked or the
1966  # member has never been used in online banking in Mammoth
1967  if len(user_to) == 0:
1968  invalid_switch_account_to = {
1969  "Switch Account From": switch_acc_from,
1970  "Invalid Switch Account To": to_member
1971  }
1972 
1973  LOGGER.warning("Cannot switch to member account `{}`. It is either"
1974  " locked"
1975  " or the member has never been used in"
1976  " online banking in Mammoth."
1977  .format(to_member) + invalid_switch_account_to.__str__())
1978 
1979  non_existent_switch_accounts.append(invalid_switch_account_to)
1980 
1981  else:
1982  # 3. If tomember exists in <cu>memberacct table,
1983  # get user_id_to for to_member
1984  user_to = user_to[0][0]
1985 
1986  # 4. Get all the records for user_id_to + to_member from
1987  # useraccounts table and memberacctrights table
1988  # 4.1 For all the records in those two tables, replace
1989  # user_id_to with user_id_from
1990 
1991  user_accounts_records = switch_acc_transaction(
1992  SELECT_RETURN,
1993  "{}useraccounts".format(cu_lower),
1994  where_conditions={'user_id': user_to,
1995  'accountnumber': to_member}
1996  )
1997 
1998  new_user_accounts_records = []
1999  for user_accounts_record in user_accounts_records:
2000  user_accounts_record = dict(user_accounts_record)
2001  user_accounts_record["user_id"] = user_from
2002  new_user_accounts_records.append(user_accounts_record)
2003 
2004  switch_acc_transaction(
2005  INSERT,
2006  "{}useraccounts".format(cu_lower),
2007  collection=new_user_accounts_records
2008  )
2009 
2010  memberacctrights_records = switch_acc_transaction(
2011  SELECT_RETURN,
2012  "{}memberacctrights".format(cu_lower),
2013  where_conditions={
2014  'user_id': user_to,
2015  'accountnumber': to_member
2016  }
2017  )
2018 
2019  new_memberacctrights_records = []
2020  for memberacctrights_record in memberacctrights_records:
2021  memberacctrights_record = dict(memberacctrights_record)
2022  memberacctrights_record["user_id"] = user_from
2023  new_memberacctrights_records.append(memberacctrights_record)
2024 
2025  switch_acc_transaction(
2026  INSERT,
2027  "{}memberacctrights".format(cu_lower),
2028  collection=new_memberacctrights_records
2029  )
2030 
2031 
2032 @pg_crsr_hndlr_decrtr
2033 def move_switch_accounts(_mmth_switch_accts_dict,
2034  _server,
2035  _verbose,
2036  _cu,
2037  _uname,
2038  _passwd):
2039  """Import all switch accounts for a Credit Union
2040 
2041  This operation is intended to be imported after all the member data
2042  is already migrated.
2043 
2044  Arguments:
2045  _mmth_switch_accts_dict: Mammoth response data
2046  _cu: CU Code
2047  _server: Mammoth endpoint server
2048  _uname: Mammoth monitor username
2049  _passwd: Mammoth monitor password
2050  _verbose: level of verbosity
2051  """
2052 
2053  cu_lower = _cu.lower()
2054  switch_acc_members_list = _mmth_switch_accts_dict["members"]
2055  # total members from response that we will be migrating
2056  total_members = len(switch_acc_members_list)
2057  # total batches we will be migrating
2058  total_batches = math.ceil(total_members / SWITCH_ACC_BATCH_SIZE)
2059 
2060  LOGGER.info("Importing total of {} members' switch account data!".format(
2061  total_members))
2062  non_existent_switch_accounts = []
2063  # iteratively, migrate a batch of members at a time
2064  for batch_i in range(total_batches):
2065  # Start a transaction to migrate memdata for a batch of members
2066  # that is not migrated yet to Odyssey
2067  with pg_handler.PGSession() as conn:
2068  with conn.cursor(cursor_factory=DictCursor) as cur:
2069  # with conn.cursor() as cur:
2070 
2071  # batch transaction
2072  switch_acc_transaction = PgTransaction(conn, cur)
2073 
2074  batch_switch_acc_members = get_ith_batch(
2075  switch_acc_members_list,
2076  batch_i,
2077  SWITCH_ACC_BATCH_SIZE
2078  )
2079 
2080  # fetch data from Mammoth for this batch of members
2081  this_batch_migration = MammothMigration(
2082  _cu,
2083  DATA_OPT_SWITCH_ACCOUNTS,
2084  _server,
2085  _uname,
2086  _passwd,
2087  params={KEY_MEMBERS: get_valid_json(
2088  batch_switch_acc_members)}
2089  )
2090 
2091  this_batch_migration.run()
2092  this_batch_migration_data = this_batch_migration.response
2093 
2094  for switch_acc_member, switch_acc_data in \
2095  this_batch_migration_data["culivetx"].items():
2097  switch_acc_transaction,
2098  cu_lower,
2099  switch_acc_member,
2100  switch_acc_data,
2101  non_existent_switch_accounts
2102  )
2103 
2104  # commit after each batch of switch account related members
2105  switch_acc_transaction.commit()
2106  # print progress
2107  LOGGER.info("[PROGRESS] Switch accounts imported:"
2108  " {}/{} | Batch: {}/{}".format(
2109  min(SWITCH_ACC_BATCH_SIZE * (batch_i + 1),
2110  total_members),
2111  total_members,
2112  batch_i + 1,
2113  total_batches)
2114  )
2115 
2116  # Log any invalid tomember in Mammoth
2117  if len(non_existent_switch_accounts) > 0:
2118  unused_memberaccts_in_mmth_msg = ("These `{}` member accounts out "
2119  "of `{}` total switch accounts are "
2120  "either locked or the members have "
2121  "never been used online banking in "
2122  "Mammoth. Please let the Credit "
2123  "Union know about it! ")
2124 
2125  LOGGER.warning(get_valid_json({unused_memberaccts_in_mmth_msg.format(
2126  len(non_existent_switch_accounts), total_members):
2127  non_existent_switch_accounts}))
2128 
2129 
2130 @file_exc_decorator
2131 @pg_crsr_hndlr_decrtr
2132 def cleanup_memdata(del_memdata_trnscn, _cu, **kwargs):
2133  """Cleanup memdata content
2134 
2135  If kwargs.get('commit_later') flag is True, we do not commit the
2136  transaction here (and only cleanup database tables, not the files),
2137  meaning that the the cleanup may have been executed before actual
2138  migration. If commit_later flag is False, we commit the transaction
2139  here and proceed with the file cleanup.
2140 
2141  Args:
2142  del_memdata_trnscn: instance of PgTransaction to which this
2143  cleanup process is part of
2144  **kwargs: Options:
2145  commit_later: flag to commit transaction later
2146  or now
2147  accountnumber: if we want to filter delete
2148  based on account number
2149 
2150  Raises:
2151  SystemExit: if IOError, NameError or PermissionError is caught
2152  psycopg2.Error, psycopg2.Warning on db operation errors
2153  """
2154  commit_later = kwargs.get("commit_later", False)
2155  accountnumber = kwargs.get("accountnumber", None)
2156  cu_lower = _cu.lower()
2157 
2158  # tables that can be conditioned only on CU for deletion
2159  tbls_condn_cu_only = ["cuadmeco", "cu_scheduledtxn", "cuovermicr"]
2160  cu_where = {
2161  "cu": _cu
2162  }
2163 
2164  # tables that can be conditioned both on CU and accountnum for deletion
2165  # accountnum is optional, but conditioning on CU is NECESSARY to make
2166  # sure we are deleting the content of the right CU
2167  # curepeattx also falls in this category, if need be
2168  tbls_condn_cu_accnum = [
2169  "cusurveysays",
2170  "cucmsresponse",
2171  "cu_alerts",
2172  "culogtrack"
2173  ]
2174 
2175  # tables that can be conditioned only on accountnumber for deletion
2176  # if need be, otherwise, delete altogether
2177  tbls_condn_accnum_only = [
2178  "{}accountbalance".format(cu_lower),
2179  "{}loanbalance".format(cu_lower),
2180  "{}useraccounts".format(cu_lower),
2181  "{}crossaccounts".format(cu_lower),
2182  "{}holds".format(cu_lower)
2183  ]
2184  accnum_where = {}
2185 
2186  # tables that can be deleted altogether
2187  tbls_del_all = [
2188  "{}audituser".format(cu_lower),
2189  "{}userlogins".format(cu_lower),
2190  "{}memberacct".format(cu_lower),
2191  "{}memberacctrights".format(cu_lower),
2192  "{}group".format(cu_lower),
2193  "{}usercontact".format(cu_lower),
2194  "{}transdtl".format(cu_lower),
2195  "{}transhdr".format(cu_lower),
2196  "{}user".format(cu_lower) # put it as a last element
2197  ]
2198  cu_and_accnum_where = {
2199  "cu": _cu,
2200  }
2201 
2202  # add optional account number condition,
2203  # helpful during development
2204  if accountnumber is not None:
2205  assert type(accountnumber) == str
2206  cu_and_accnum_where["accountnumber"] = accountnumber
2207  accnum_where["accountnumber"] = accountnumber
2208 
2209  for tbl in tbls_condn_cu_only:
2210  del_memdata_trnscn(
2211  DELETE,
2212  tbl,
2213  where_conditions=cu_where
2214  )
2215 
2216  for tbl in tbls_condn_cu_accnum:
2217  del_memdata_trnscn(
2218  DELETE,
2219  tbl,
2220  where_conditions=cu_and_accnum_where
2221  )
2222 
2223  for tbl in tbls_condn_accnum_only:
2224  del_memdata_trnscn(
2225  DELETE,
2226  tbl,
2227  where_conditions=accnum_where
2228  )
2229 
2230  for tbl in tbls_del_all:
2231  del_memdata_trnscn(
2232  DELETE,
2233  tbl
2234  )
2235 
2236  LOGGER.info("Cleaning up LOANAPPS records.")
2237  # also cleanup loanapps
2238  cleanup_loanapps(del_memdata_trnscn, _cu, commit_later=commit_later)
2239 
2240  if not commit_later:
2241  # remove the user_id/accountnum mappings reference csv file if exists
2242  if os.path.exists(USERID_CROSS_REF_CSV_FILE.format(cu_lower)):
2243  os.remove(USERID_CROSS_REF_CSV_FILE.format(cu_lower))
2244 
2245  # remove the orphaned members list containing csv file if exists
2246  if os.path.exists(MAMMOTH_ORPHANED_MEMLIST_CSV_FILE.format(cu_lower)):
2247  os.remove(MAMMOTH_ORPHANED_MEMLIST_CSV_FILE.format(cu_lower))
2248 
2249  # remove the never logged in members list containing csv file
2250  if os.path.exists(MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE.format(cu_lower)):
2251  os.remove(MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE.format(cu_lower))
2252 
2253  # remove the never logged in members list containing csv file
2254  if os.path.exists(SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE.format(cu_lower)):
2255  os.remove(SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE.format(cu_lower))
2256 
2257  # commit all the table deletions here
2258  del_memdata_trnscn.commit()
2259  del_memdata_trnscn(
2260  SUMMARY,
2261  DATA_OPT_MEMDATA,
2262  _cu
2263  )
2264 
2265 
2266 @file_exc_decorator
2267 @pg_crsr_hndlr_decrtr
2268 def cleanup_memhist(del_memhist_transaction, _cu, **kwargs):
2269  """Cleanup memhist content
2270 
2271  If kwargs.get('commit_later') flag is True, we do not commit the
2272  transaction here (and only cleanup database tables, not the files),
2273  meaning that the the cleanup may have been executed before actual
2274  migration. If commit_later flag is False, we commit the transaction
2275  here and proceed with the file cleanup.
2276 
2277  Args:
2278  del_memhist_transaction: instance of PgTransaction to which this
2279  cleanup process is part of
2280  **kwargs: Options:
2281  commit_later: flag to commit transaction later
2282  or now
2283  accountnumber: if we want to filter delete
2284  based on account number
2285 
2286  Raises:
2287  SystemExit: if IOError, NameError or PermissionError is caught
2288  psycopg2.Error, psycopg2.Warning on db operation errors
2289  """
2290  commit_later = kwargs.get("commit_later", False)
2291  accountnumber = kwargs.get("accountnumber", None)
2292 
2293  accnum_where = {}
2294  if accountnumber is not None:
2295  assert type(accountnumber) == str
2296  accnum_where["accountnumber"] = accountnumber
2297 
2298  hist_tbls_to_clean = ["{}{}".format(
2299  _cu.lower(), tbl_suffx) for
2300  tbl_suffx in ["accounthistory", "loanhistory"]]
2301 
2302  for tbl in hist_tbls_to_clean:
2303  del_memhist_transaction(
2304  DELETE,
2305  tbl,
2306  where_conditions=accnum_where
2307  )
2308 
2309  if not commit_later:
2310  memhist_file = MEMHIST_DOWNLOADED_FILE.format(_cu.lower())
2311  memhist_dir = os.path.dirname(memhist_file)
2312 
2313  accounthistory_file = os.path.join(
2314  memhist_dir, MEMHIST_ACCOUNT_HISTORY_FILENAME)
2315 
2316  loanhistory_file = os.path.join(
2317  memhist_dir, MEMHIST_LOAN_HISTORY_FILENAME)
2318 
2319  # cleanup file based contents, first
2320  for memhist_f in [memhist_file, accounthistory_file, loanhistory_file]:
2321  if os.path.exists(memhist_f):
2322  os.remove(memhist_f)
2323 
2324  # commit all the table deletions here
2325  del_memhist_transaction.commit()
2326  del_memhist_transaction(
2327  SUMMARY,
2328  DATA_OPT_MEMHIST,
2329  _cu
2330  )
2331 
2332 
2333 MEM_HIST_DATA_CONFIG = {
2334  DATA_OPT_MEMDATA: {
2335  ACTION_OPT_CLEAN: cleanup_memdata,
2336  ACTION_OPT_MIGRATE: move_members_data,
2337  "description": "Members Data",
2338  },
2339  DATA_OPT_MEMHIST: {
2340  ACTION_OPT_CLEAN: cleanup_memhist,
2341  ACTION_OPT_MIGRATE: move_members_hist,
2342  "description": "Members History",
2343  },
2344  DATA_OPT_SWITCH_ACCOUNTS: {
2345  ACTION_OPT_MIGRATE: move_switch_accounts,
2346  "description": "Members Switch Account",
2347  },
2348 }
2349 
2350 
2351 @pg_crsr_hndlr_decrtr
2353  server,
2354  action,
2355  user,
2356  passwd,
2357  verbose,
2358  data_ctgry,
2359  reset):
2360  """Entry point to the memdata AND history migration.
2361 
2362  Args:
2363  cu: current credit union code
2364  server: Mammoth endpoint server
2365  action: one of the migration options to be
2366  performed on settings data category
2367  user: Mammoth monitor username
2368  passwd: Mammoth monitor password
2369  data_ctgry: memdata or memhist
2370  verbose: level of verbosity
2371  reset: flag to cleanup tables before migration
2372 
2373  Raises:
2374  psycopg2.Error, psycopg2.Warning on db operation errors
2375  """
2376  # check if the tables already exist
2377  with pg_handler.PGSession() as conn:
2378  with conn.cursor() as cur:
2379  tables_exist_transaction = PgTransaction(conn, cur)
2380 
2381  (tables_exist_flag,
2382  do_not_exist_list) = tables_exist_transaction(
2383  TABLE_EXISTS,
2384  data_ctgry,
2385  cu.lower()
2386  )
2387 
2388  if not tables_exist_flag:
2389  error_msg = ("Stopping migration. Following table"
2390  " schemas do not exist: {}").format(
2391  get_valid_json(", ".join(do_not_exist_list)))
2392  LOGGER.error(error_msg)
2393  raise SystemExit(error_msg)
2394  else:
2395  LOGGER.info(
2396  "All required target tables exist! Continuing migration..")
2397 
2398  # clean up memdata or memhist
2399  if action == ACTION_OPT_MIGRATE:
2400  if reset:
2401  with pg_handler.PGSession() as conn:
2402  with conn.cursor() as cur:
2403  delete_transaction = PgTransaction(conn, cur)
2404  MEM_HIST_DATA_CONFIG[data_ctgry][ACTION_OPT_CLEAN](
2405  delete_transaction, cu)
2406 
2407  # log summary before migration
2408  with pg_handler.PGSession() as conn:
2409  with conn.cursor() as cur:
2410  summary_transaction = PgTransaction(conn, cur)
2411  summary_transaction(
2412  SUMMARY,
2413  data_ctgry,
2414  cu,
2415  msg_prefix="BEFORE"
2416  )
2417 
2418  # migrate member data from Mammoth to Odyssey
2419  if data_ctgry == DATA_OPT_MEMDATA:
2420  members_list_migration = MammothMigration(
2421  cu,
2422  DATA_OPT_MEMBERS,
2423  server,
2424  user,
2425  passwd
2426  )
2427 
2428  members_list_migration.run()
2429  # list for memdata
2430  all_members_list = members_list_migration.response
2431 
2432  # Migrate memdata or switch accounts to Odyssey
2433  MEM_HIST_DATA_CONFIG[data_ctgry][action](
2434  all_members_list,
2435  server,
2436  verbose,
2437  cu,
2438  user,
2439  passwd
2440  )
2441 
2442  elif data_ctgry == DATA_OPT_SWITCH_ACCOUNTS:
2443  switch_acc_members_migration = MammothMigration(
2444  cu,
2445  DATA_OPT_SWITCH_ACCT_MEMBERS,
2446  server,
2447  user,
2448  passwd
2449  )
2450 
2451  switch_acc_members_migration.run()
2452  # list for memdata
2453  switch_acc_members_list = switch_acc_members_migration.response
2454 
2455  # Migrate memdata or switch accounts to Odyssey
2456  MEM_HIST_DATA_CONFIG[data_ctgry][action](
2457  switch_acc_members_list,
2458  server,
2459  verbose,
2460  cu,
2461  user,
2462  passwd
2463  )
2464 
2465  # migrate member history from Mammoth to Odyssey
2466  elif data_ctgry == DATA_OPT_MEMHIST:
2467  MEM_HIST_DATA_CONFIG[data_ctgry][action](
2468  server,
2469  verbose,
2470  cu,
2471  user,
2472  passwd
2473  )
2474 
2475  # log summary after migration
2476  with pg_handler.PGSession() as conn:
2477  with conn.cursor() as cur:
2478  summary_transaction = PgTransaction(conn, cur)
2479  summary_transaction(
2480  SUMMARY,
2481  data_ctgry,
2482  cu,
2483  msg_prefix="AFTER"
2484  )
2485 
2486  LOGGER.info("{} migration completed!"
2487  .format(MEM_HIST_DATA_CONFIG[data_ctgry]["description"]))
2488 
2489  # clean up memdata or memhist
2490  elif action == ACTION_OPT_CLEAN:
2491  if data_ctgry == DATA_OPT_SWITCH_ACCOUNTS:
2492  LOGGER.error("`{}` is not available for `{}`".format(
2493  ACTION_OPT_CLEAN, DATA_OPT_SWITCH_ACCOUNTS))
2494  else:
2495  with pg_handler.PGSession() as conn:
2496  with conn.cursor() as cur:
2497  delete_transaction = PgTransaction(conn, cur)
2498  MEM_HIST_DATA_CONFIG[data_ctgry][
2499  action](delete_transaction, cu)
2500 
2501  LOGGER.info("{} clean up completed!".format(
2502  MEM_HIST_DATA_CONFIG[data_ctgry]["description"]))
2503 
2504  # log summary of table records count
2505  elif action == ACTION_OPT_SUMMARY:
2506  with pg_handler.PGSession() as conn:
2507  with conn.cursor() as cur:
2508  summary_transaction = PgTransaction(conn, cur)
2509  summary_transaction(
2510  SUMMARY,
2511  data_ctgry,
2512  cu
2513  )
def copy_http_file_stream(_resp, _target_path, _cu)
def migrate_members(cu, server, action, user, passwd, verbose, data_ctgry, reset)
def cleanup_memdata(del_memdata_trnscn, _cu, **kwargs)
def move_members_hist(_server, _verbose, _cu, _user, _passwd)
def migrate_individual_memdata(members_batch_transaction, member_data, ody_uname, mmth_uname_also_ody_mem_accnum, _verbose, _cu, _mmth_accnum_to_ody_userid_ref_csv_writer, _mmth_repeating_transfers_outliers, forceremain_update_list, cusurveymaster_surveyid_map, money_desktop_credit_card_type18_flag)
def move_members_data(_mmth_members_dict, _server, _verbose, _cu, _uname, _passwd)
def prepare_money_desktop_xref(_cu, user_id, mmth_uname_also_ody_mem_accnum, _mmth_accnum_to_ody_userid_ref_csv_writer, mdx_record, acc_or_loan, mdx_cc_type18_flag=False)
def handle_overloaded_accounts(from_or_to)
def move_switch_accounts(_mmth_switch_accts_dict, _server, _verbose, _cu, _uname, _passwd)
def get_ody_user_ids(member)
def cualerts_handler(members_batch_transaction, _verbose, _cualert_check_records, _cualert_bal_records, _cualert_loan_records, _cualert_trans_records, user_id)
def cuuser_main_handler(members_batch_transaction, ody_uname, mmth_uname_also_ody_mem_accnum, verbose, cuuser_acc_record, cuquestselect_records, forceremain_update_list, _cu)
def skip_never_loggedin_cuuser(cuuser_record_mmth)
def populate_switch_accounts(switch_acc_transaction, cu_lower, switch_acc_from, switch_acc_data, non_existent_switch_accounts)
def execute_sql_script_from_file(_target_gzip, _cu)
def get_valid_to_be_imported_members(cu_lower, _mmth_members_list)
def cleanup_memhist(del_memhist_transaction, _cu, **kwargs)
def cu_alert_table_transform(user_id, alrt_type_code, alrt_record)