2 """Main module to migrate members data and history.""" 11 from psycopg2.extras
import DictCursor
12 from copy
import deepcopy
15 from ody_migr_config
import (INSERT,
24 DATA_OPT_SWITCH_ACCT_MEMBERS,
26 DATA_OPT_SWITCH_ACCOUNTS,
28 USERID_CROSS_REF_CSV_FILE,
29 MAMMOTH_ORPHANED_MEMLIST_CSV_FILE,
30 MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE,
31 SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE,
32 ADMIN_CUSURVEY_ID_MAP_CSV_FILE,
33 SWITCH_ACC_BATCH_SIZE,
34 SCHEDULE_INTERVAL_MAPPING,
38 HIST_CREATE_WAIT_TIME,
39 MEMHIST_DOWNLOADED_FILE,
40 MEMHIST_ACCOUNT_HISTORY_FILENAME,
41 MEMHIST_LOAN_HISTORY_FILENAME,
43 DEF_MEMACCT_RIGHTS_OPTIONS,
51 from ody_migr_transaction
import PgTransaction
52 from ody_migr_transaction
import pg_crsr_hndlr_decrtr
53 import ody_migr_db_handler
as pg_handler
54 from ody_migr_mmth_endpoint
import MammothMigration
55 from ody_migr_utils
import (get_valid_json,
60 from ody_migr_loans
import cleanup_loanapps
62 LOGGER = logging.getLogger(__name__)
68 mmth_uname_also_ody_mem_accnum,
71 cuquestselect_records,
72 forceremain_update_list,
74 """Populate cuusers table from Mammoth and populate related tables in Ody 76 This method is a part of memdata data category. 79 members_batch_transaction: PgTransaction instance to prepare 80 insertion transaction for the current 82 ody_uname: odyssey user_name 83 mmth_uname_also_ody_mem_accnum: mammoth user_name, and odysseys acc num 84 verbose: verbosity flag 85 cuuser_acc_record: cuusers response dict from Mammoth 86 1 insertion in <cu>user in odyssey also 87 includes 1 record insertion in 88 <cu>memberacct, <cu>user, <cu>group, 89 <cu>usercontact tables and 4 records 90 in <cu>memberaccrights table. 91 cuquestselect_records: records from this table are used to 92 prepare mfaquest column in <cu>user 93 forceremain_update_list: list to collect memebrs whose 94 forceremain is updated 98 user_id: returning user_id from <cu>user 99 member_acct_balance_attempt: to be populated in other related tables 100 member_acct_balance_stamp: to be populated in other related tables 103 psycopg2.Error, psycopg2.Warning on db operation errors 106 cu_lower = _cu.lower()
107 assert type(cuquestselect_records)
is list
108 assert type(cuuser_acc_record)
is dict
110 cuuser_returning_col =
"user_id" 112 if len(cuuser_acc_record) == 0:
115 mfaquest = get_valid_json({})
117 mfaquest_dict = {
"answers": {}}
118 if len(cuquestselect_records) != 0:
119 for cuquest_record
in cuquestselect_records:
120 qid = cuquest_record[
"quest_id"].strip()
121 ans = cuquest_record[
"answer"].strip()
122 mfaquest_dict[
"answers"][qid] = ans
123 mfaquest = get_valid_json(mfaquest_dict)
126 member_acct_tbl_name =
"{}memberacct".format(cu_lower)
127 member_acct_rights_tbl_name =
"{}memberacctrights".format(cu_lower)
128 cuuser_tbl_name =
"{}user".format(cu_lower)
129 group_table_name =
"{}group".format(cu_lower)
130 profile_tbl_name =
"cu_profile" 131 contact_tbl_name =
"{}usercontact".format(cu_lower)
134 cu_group_columns = [
"group_name",
"profile_id",
"contact",
"tax_id"]
137 cuuser_acc_record[
"user_name"] = ody_uname
139 cuuser_acc_record[
"pkattempt"] = get_strip_value_dict(
140 "pktattempt", cuuser_acc_record, pop=
True)
144 member_acct_balance_attempt = get_strip_value_dict(
145 "pkattempt", cuuser_acc_record, default=0)
146 member_acct_balance_stamp = get_strip_value_dict(
147 "pktstamp", cuuser_acc_record, pop=
True)
148 member_acct_billpayid = get_strip_value_dict(
149 "billpayid", cuuser_acc_record, pop=
True)
150 member_acct_estmt_flag = get_strip_value_dict(
151 "estmt_flag", cuuser_acc_record, pop=
True, default=
'N')
152 member_acct_rdcsetting = int(float(get_strip_value_dict(
153 "depositlimit", cuuser_acc_record, pop=
True, default=0)))
156 profile_where_conditions = {
157 "profile_code":
"DEF",
160 profile_select_columns = [
"profile_id"]
162 cu_profile_records = members_batch_transaction(
165 where_conditions=profile_where_conditions,
166 select_columns=profile_select_columns
169 assert len(cu_profile_records) == 1
170 profile_id = cu_profile_records[0][0]
174 "g{}".format(ody_uname),
181 members_batch_transaction(
186 returning_col=
"group_id" 189 group_id = members_batch_transaction.cur.fetchone()[0]
202 contact_values = [
"",
"",
"",
"",
"",
""]
205 members_batch_transaction(
210 returning_col=
"contact_id" 214 contact = members_batch_transaction.cur.fetchone()[0]
217 cuuser_acc_record[
"mfaquest"] = mfaquest
218 cuuser_acc_record[
"group_id"] = group_id
220 cuuser_acc_record[
"other_rights"] =
None 221 cuuser_acc_record[
"ip"] =
None 222 cuuser_acc_record[
"is_group_primary"] =
True 223 cuuser_acc_record[
"contact"] = contact
225 cuuser_forceremain = get_strip_value_dict(
"forceremain", cuuser_acc_record)
226 cuuser_forcechange = get_strip_value_dict(
"forcechange", cuuser_acc_record)
230 if cuuser_forcechange
is not None and cuuser_forceremain
is not None:
231 if cuuser_forcechange ==
'N' and int(cuuser_forceremain) == 0:
232 cuuser_acc_record[
"forceremain"] = 5
233 forceremain_update_list.append(mmth_uname_also_ody_mem_accnum)
235 del cuuser_acc_record[
"pktdate"]
236 del cuuser_acc_record[
"cu"]
237 del cuuser_acc_record[
"user_alias"]
239 assert "user_alias" not in cuuser_acc_record
240 assert "cu" not in cuuser_acc_record
241 assert "pktdate" not in cuuser_acc_record
242 assert "pktstamp" not in cuuser_acc_record
243 assert "billpayid" not in cuuser_acc_record
244 assert "estmt_flag" not in cuuser_acc_record
246 columns = list(cuuser_acc_record.keys())
247 values = list(cuuser_acc_record.values())
248 values = [val.strip()
if type(val) == str
else val
for val
in values]
251 members_batch_transaction(
256 returning_col=cuuser_returning_col
259 user_id = members_batch_transaction.cur.fetchone()[0]
263 member_accnt_columns = [
276 member_accnt_record = [
277 mmth_uname_also_ody_mem_accnum,
278 member_acct_estmt_flag,
279 member_acct_billpayid,
280 member_acct_rdcsetting,
283 member_acct_balance_stamp,
284 member_acct_balance_attempt,
290 members_batch_transaction(
298 assert len(member_accnt_columns) == len(member_accnt_record)
301 members_batch_transaction(
303 member_acct_tbl_name,
304 member_accnt_columns,
309 memberacctrights_cols = [
318 for memberacctright
in MEMBER_ACC_RIGHTS:
320 platform = get_valid_json(DEF_MEMACCT_RIGHTS_OPTIONS)
321 if memberacctright ==
"ACCESS":
326 mmth_uname_also_ody_mem_accnum,
332 members_batch_transaction(
334 member_acct_rights_tbl_name,
335 memberacctrights_cols,
342 member_acct_balance_attempt,
343 member_acct_balance_stamp)
347 """Prepares a single cu_alerts record for all 4 alert type codes 349 Update (inplace) alrt_record keys to match with the current schema of 350 cu_alerts table in Odyssey. 353 user_id: returning user_id from <cu>user table 354 alert_type_code: {'C': checking, 'L': loan, 'T': transaction, 356 alrt_record: dictionary of a single record from one of the 357 {cualertcheck, cualertbal, cualertloan, 358 cualerttrans} tables from Mammoth 361 alrt_record.pop(
'id',
None)
363 assert "cu" in alrt_record
364 assert "accountnumber" in alrt_record
366 if alrt_type_code ==
'L':
367 if "loannumber" in alrt_record:
368 alrt_record[
"accounttype"] = alrt_record.pop(
"loannumber")
369 if "alert_days_prior" in alrt_record:
370 alrt_record[
"notifyloandaysprior"] = alrt_record.pop(
373 alrt_record[
"user_id"] = user_id
374 alrt_record[
"alerttype"] = alrt_type_code
376 alrt_record[
"certnumber"] = alrt_record.get(
379 alrt_record[
"incbal"] = alrt_record.get(
"incbal",
None)
380 alrt_record[
"incamt"] = alrt_record.get(
"incamt",
None)
381 alrt_record[
"inctransdesc"] = alrt_record.get(
382 "inctransdesc",
None)
389 for col
in [
"useavailbal",
"notifyrange"]:
390 Y_or_N = get_strip_value_dict(
391 col, alrt_record, default=
None)
393 if Y_or_N
in [
None,
'']:
394 alrt_record[col] =
None 396 assert Y_or_N
in YesNoMap
397 alrt_record[col] = YesNoMap[Y_or_N]
400 @pg_crsr_hndlr_decrtr
403 _cualert_check_records,
404 _cualert_bal_records,
405 _cualert_loan_records,
406 _cualert_trans_records,
408 """Populate updated records in cu_alerts table in Odyssey 410 Associated tables from Mammoth are cualertcheck, cualertbal, cualertloan 414 members_batch_transaction: PgTransaction instance to prepare 415 insertion transaction for the current 417 _verbose: verbosity flag 418 _cualert_check_records: list of dictionaries: cualertcheck records 419 _cualert_bal_records: list of dictionaries: cualertbal records 420 _cualert_loan_records: list of dictionaries: cualertloan records 421 _cualert_trans_records: list of dictionaries: cualerttrans records 422 user_id: returning user_id from <cu>user table 425 psycopg2.Error, psycopg2.Warning on db operation errors 428 'C': _cualert_check_records,
429 'L': _cualert_loan_records,
430 'T': _cualert_trans_records,
431 'B': _cualert_bal_records
433 for alrt_type_code
in alert_types.keys():
434 clbt_alert_records = alert_types[alrt_type_code]
435 for alrt_record
in clbt_alert_records:
439 members_batch_transaction(
442 collection=clbt_alert_records
448 mmth_uname_also_ody_mem_accnum,
449 _mmth_accnum_to_ody_userid_ref_csv_writer,
452 mdx_cc_type18_flag=False):
453 """Prepare a MDX complying CSV record 455 Applies to all associated subaccounts(loanbalance and accountbalance 456 records) of each member. 460 xref_record.append(
"{}:{}".format(_cu, mmth_uname_also_ody_mem_accnum))
462 xref_record.append(
"{}-{}".format(_cu, user_id))
464 xref_record.append(
"M-{}-{}".format(_cu, user_id))
466 xref_record.append(
"M-{}:{}".format(_cu, mmth_uname_also_ody_mem_accnum))
469 if (acc_or_loan ==
"A"):
470 deposittype = get_strip_value_dict(
"deposittype", mdx_record).upper()
471 certnum = get_strip_value_dict(
"certnumber", mdx_record)
472 accounttype = get_strip_value_dict(
"accounttype", mdx_record)
475 if deposittype ==
'Y':
476 xref_record.append(
'CHECKING')
478 xref_record.append(
'SAVINGS')
482 xref_record.append(accounttype)
484 xref_record.append(
"{}_{}".format(accounttype, certnum))
487 acc_key =
"D|{}|{}|{}".format(mmth_uname_also_ody_mem_accnum,
490 xref_record.append(acc_key)
492 xref_record.append(generate_mdx_hash(_cu, acc_key))
494 _mmth_accnum_to_ody_userid_ref_csv_writer.writerow(xref_record)
498 cbtype = mdx_record[
"cbtype"]
499 loannumber = get_strip_value_dict(
"loannumber", mdx_record)
503 if mdx_cc_type18_flag:
505 cc_key =
"C|{}|{}".format(mmth_uname_also_ody_mem_accnum,
512 generate_mdx_hash(_cu, cc_key)
516 _mmth_accnum_to_ody_userid_ref_csv_writer.writerow(xref_record)
520 if cbtype !=
'18' or cbtype
is None:
521 loan_key =
"L|{}|{}".format(mmth_uname_also_ody_mem_accnum,
528 generate_mdx_hash(_cu, loan_key)
532 _mmth_accnum_to_ody_userid_ref_csv_writer.writerow(xref_record)
536 """Update from/to format for overloaded accounts 541 "L|XXXXX|LN06:01@YYYYY" **overloaded 546 "D|XXXXX|SAV0:00@YYYYY" **overloaded 550 Returns: updated value for from_or_to ("from" or "to" attribute 551 in cu_scheduledtxn.txn_data) 553 "L|XXXXX|LN06:01@YYYYY" ---> "L|YYYYY|LN06:01" **overloaded 554 "L|XXXXX|LN06:01" ---> "L|XXXXX|LN06:01" 555 "L|XXXXX|LN06:01|0" ---> "L|XXXXX|LN06:01|0" 557 "D|XXXXX|SAV0:00@YYYYY" ---> "D|YYYYY|SAV0:00" **overloaded 558 "D|XXXXX|SAV0:00" ---> "D|XXXXX|SAV0:00" 559 "D|XXXXX|SAV0:00|0" ---> "D|XXXXX|SAV0:00|0" 561 elems = from_or_to.split(
"|")
562 assert len(elems) >= 3
565 from_to_subaccount, overloaded_accountnumber = elems[2].split(
"@")
566 elems[1] = overloaded_accountnumber
567 elems[2] = from_to_subaccount
569 return "|".join(elems)
572 @pg_crsr_hndlr_decrtr
576 mmth_uname_also_ody_mem_accnum,
579 _mmth_accnum_to_ody_userid_ref_csv_writer,
580 _mmth_repeating_transfers_outliers,
581 forceremain_update_list,
582 cusurveymaster_surveyid_map,
583 money_desktop_credit_card_type18_flag):
584 """Migrate member data tables of individual member""" 585 cu_lower = _cu.lower()
589 memacct_balance_attempt,
591 members_batch_transaction,
593 mmth_uname_also_ody_mem_accnum,
595 member_data[
"cuusers"],
596 member_data[
"cuquestselect"],
597 forceremain_update_list,
603 members_batch_transaction,
605 member_data[
"cualertcheck"],
606 member_data[
"cualertbal"],
607 member_data[
"cualertloan"],
608 member_data[
"cualerttrans"],
613 mmth_tbls_no_schema_change = [
617 for mmth_key
in mmth_tbls_no_schema_change:
618 mmth_data_nsc = member_data[mmth_key]
620 if mmth_key ==
"holds":
621 ody_table_name =
"{}{}".format(
624 ody_table_name = mmth_key
626 members_batch_transaction(
629 collection=mmth_data_nsc
639 tbl_cuadmeco =
"cuadmeco" 640 tbl_cuadmeco_mmth_data = member_data[tbl_cuadmeco]
642 if type(tbl_cuadmeco_mmth_data) == dict:
643 for mmth_msg_id, cuadmeco_message_records
in\
644 tbl_cuadmeco_mmth_data.items():
645 members_batch_transaction.cur.execute(
646 "select nextval('cuadmeco_messageid_seq')")
647 next_ody_message_id = members_batch_transaction.cur.fetchone()[0]
648 this_message_thread_collection = []
649 for ind, rec
in enumerate(cuadmeco_message_records):
652 rec[
"messageid"] = next_ody_message_id
658 rec[
"parentid"] = next_ody_message_id
660 rec[
"user_id"] = user_id
661 del rec[
"accountnumber"]
662 rec[
"admin"] = cu_lower
663 this_message_thread_collection.append(rec)
664 members_batch_transaction(
667 collection=this_message_thread_collection
670 for cusurveysays_rec
in member_data[
"cusurveysays"]:
671 assert "surveyid" in cusurveysays_rec,
"cusurveysays: missing surveyid" 673 if cusurveysays_rec[
"surveyid"].strip() \
674 not in cusurveymaster_surveyid_map:
675 LOGGER.warning(
"Invalid surveyid found: {}".format(
676 cusurveysays_rec[
"surveyid"].strip()))
679 cusurveysays_rec[
"surveyid"] = cusurveymaster_surveyid_map[
680 cusurveysays_rec[
"surveyid"].strip()]
681 assert (cusurveysays_rec[
"accountnumber"].strip() ==
682 mmth_uname_also_ody_mem_accnum)
683 cusurveysays_rec[
"user_id"] = user_id
684 members_batch_transaction(
687 list(cusurveysays_rec.keys()),
688 list(cusurveysays_rec.values())
691 mmth_tbls_userid_relation = [
"cucmsresponse",
"userlogins"]
693 for mmth_key
in mmth_tbls_userid_relation:
694 mmth_data_with_userid = member_data[mmth_key]
696 if mmth_key ==
"userlogins":
697 ody_table_name =
"{}{}".format(
700 ody_table_name = mmth_key
702 for mmth_rec
in mmth_data_with_userid:
703 assert (mmth_rec[
"accountnumber"].strip() ==
704 mmth_uname_also_ody_mem_accnum)
705 mmth_rec[
"user_id"] = user_id
707 if mmth_key ==
"userlogins":
708 del mmth_rec[
"accountnumber"]
709 del mmth_rec[
"userloginid"]
710 mmth_rec[
"user_name"] = ody_uname
712 members_batch_transaction(
715 collection=mmth_data_with_userid
723 cu_useraccounts_cols = [
724 "user_id",
"accountnumber",
725 "accounttype",
"certnumber",
726 "recordtype",
"display_name",
740 cuuseraccounts_rec_default_crossaccts = [
741 False,
False,
True,
False,
False,
False, 0]
743 culivetx_records_for_crossaccts = member_data[
"culivetx_x"]
746 for mmth_culivetx_x
in culivetx_records_for_crossaccts:
747 del mmth_culivetx_x[
"id"]
748 del mmth_culivetx_x[
"cu"]
750 acc_type_x = get_strip_value_dict(
"accounttype", mmth_culivetx_x)
751 deposit_type_x = get_strip_value_dict(
"deposittype", mmth_culivetx_x)
752 accnum_x = get_strip_value_dict(
"accountnumber", mmth_culivetx_x)
753 tomember_x = get_strip_value_dict(
"tomember", mmth_culivetx_x)
754 description_x = get_strip_value_dict(
"description", mmth_culivetx_x)
756 useraccounts_rec_from_culivetx = []
760 useraccounts_rec_from_culivetx.append(user_id)
762 useraccounts_rec_from_culivetx.append(accnum_x)
764 useraccounts_rec_from_culivetx.append(
"{}#{}".format(
765 acc_type_x, tomember_x))
767 useraccounts_rec_from_culivetx.append(0)
769 if deposit_type_x.upper()
in [
'S',
'Y',
'N',
'C']:
770 useraccounts_rec_from_culivetx.append(
"T")
772 useraccounts_rec_from_culivetx.append(
"P")
774 useraccounts_rec_from_culivetx.append(description_x)
776 useraccounts_rec_from_culivetx.extend(
777 cuuseraccounts_rec_default_crossaccts)
780 members_batch_transaction(
782 "{}useraccounts".format(cu_lower),
783 cu_useraccounts_cols,
784 useraccounts_rec_from_culivetx
787 members_batch_transaction(
789 "{}crossaccounts".format(cu_lower),
790 collection=culivetx_records_for_crossaccts
795 cu_useraccounts_cols.append(
"display_qty")
796 cu_useraccounts_cols.append(
"display_qty_type")
808 cuuseraccounts_rec_default_ln_bal = [
809 None,
True,
True,
True,
True,
True,
True, 0, 30,
"D"]
813 account_balance_coll = member_data[
"accountbalance"]
814 for accbal_rec
in account_balance_coll:
815 acc_type = get_strip_value_dict(
"accounttype", accbal_rec)
816 deposit_type = get_strip_value_dict(
"deposittype", accbal_rec)
817 accbal_accnum = get_strip_value_dict(
"accountnumber", accbal_rec)
818 accbal_certnum = get_strip_value_dict(
"certnumber", accbal_rec)
820 assert acc_type
is not None 821 assert accbal_accnum
is not None 822 assert accbal_accnum == mmth_uname_also_ody_mem_accnum
826 if deposit_type.upper()
in [
'Y',
'N',
'S']:
829 accbal_rec[
"interestrate"] =
None 830 accbal_rec[
"maturitydate"] =
None 831 accbal_rec[
"misc1"] =
None 832 accbal_rec[
"regdcount"] =
None 833 accbal_rec[
"may_deposit"] = may_deposit
834 accbal_rec[
"may_withdraw"] = may_withdraw
835 accbal_rec[
"balance_stamp"] = member_acct_balance_stamp
836 accbal_rec[
"history_stamp"] = member_acct_balance_stamp
837 accbal_rec[
"balance_attempt"] = memacct_balance_attempt
838 accbal_rec[
"history_attempt"] = memacct_balance_attempt
840 useraccounts_rec_from_accbal = []
844 useraccounts_rec_from_accbal.append(user_id)
846 useraccounts_rec_from_accbal.append(accbal_accnum)
848 useraccounts_rec_from_accbal.append(acc_type)
850 useraccounts_rec_from_accbal.append(accbal_certnum)
852 useraccounts_rec_from_accbal.append(
"D")
853 useraccounts_rec_from_accbal.extend(cuuseraccounts_rec_default_ln_bal)
856 members_batch_transaction(
858 "{}useraccounts".format(cu_lower),
859 cu_useraccounts_cols,
860 useraccounts_rec_from_accbal
867 mmth_uname_also_ody_mem_accnum,
868 _mmth_accnum_to_ody_userid_ref_csv_writer,
873 members_batch_transaction(
875 "{}accountbalance".format(cu_lower),
876 collection=account_balance_coll
881 loan_balance_coll = member_data[
"loanbalance"]
882 for lnbal_rec
in loan_balance_coll:
883 lnbal_accnum = get_strip_value_dict(
"accountnumber",
885 lnbal_lnnum = get_strip_value_dict(
"loannumber",
888 assert lnbal_accnum
is not None 889 assert lnbal_accnum == mmth_uname_also_ody_mem_accnum
891 credit_limit = get_strip_value_dict(
"creditlimit", lnbal_rec)
892 lnbal_rec[
"payoff"] = get_strip_value_dict(
893 "payoff", lnbal_rec, default=0)
894 lnbal_rec[
"currentdue"] = 0
896 lnbal_rec[
"misc1"] = get_strip_value_dict(
897 "misc1", lnbal_rec, default=
None)
899 lnbal_rec[
"cbtype"] = get_strip_value_dict(
900 "type", lnbal_rec, pop=
True)
902 lnbal_rec[
"balance_stamp"] = member_acct_balance_stamp
903 lnbal_rec[
"history_stamp"] = member_acct_balance_stamp
904 lnbal_rec[
"balance_attempt"] = memacct_balance_attempt
905 lnbal_rec[
"history_attempt"] = memacct_balance_attempt
907 if float(credit_limit) > 0:
908 lnbal_rec[
"may_addon"] =
True 910 lnbal_rec[
"may_addon"] =
False 912 lnbal_rec[
"may_payment"] =
True 917 lnbal_rec[
"unpaidinterest"] = get_strip_value_dict(
918 "unpaidinterest", lnbal_rec, default=
None)
920 lnbal_rec[
"frequencyperyear"] = get_strip_value_dict(
921 "frequencyperyear", lnbal_rec, default=
None)
923 lnbal_rec[
"originalamount"] = get_strip_value_dict(
924 "originalamount", lnbal_rec, default=
None)
926 lnbal_rec[
"originaldate"] = get_strip_value_dict(
927 "originaldate", lnbal_rec, default=
None)
929 lnbal_rec[
"term"] = get_strip_value_dict(
930 "term", lnbal_rec, default=
None)
932 lnbal_rec[
"creditdisability"] = get_strip_value_dict(
933 "cdi", lnbal_rec, pop=
True, default=
None)
935 lnbal_rec[
"creditlife"] = get_strip_value_dict(
936 "cli", lnbal_rec, pop=
True, default=
None)
939 useraccounts_rec_from_lnbal = []
943 useraccounts_rec_from_lnbal.append(user_id)
945 useraccounts_rec_from_lnbal.append(lnbal_accnum)
947 useraccounts_rec_from_lnbal.append(lnbal_lnnum)
949 useraccounts_rec_from_lnbal.append(0)
951 useraccounts_rec_from_lnbal.append(
"L")
952 useraccounts_rec_from_lnbal.extend(cuuseraccounts_rec_default_ln_bal)
955 members_batch_transaction(
957 "{}useraccounts".format(cu_lower),
958 cu_useraccounts_cols,
959 useraccounts_rec_from_lnbal
966 mmth_uname_also_ody_mem_accnum,
967 _mmth_accnum_to_ody_userid_ref_csv_writer,
970 money_desktop_credit_card_type18_flag)
973 members_batch_transaction(
975 "{}loanbalance".format(cu_lower),
976 collection=loan_balance_coll
980 curepeattx_mammoth_records = member_data[
"curepeattx"]
981 for mmth_curepeattx_rec
in curepeattx_mammoth_records:
982 mmth_curepeattx_rec_copy = deepcopy(mmth_curepeattx_rec)
984 del mmth_curepeattx_rec[
"txid"]
986 curepeattx_interval = get_strip_value_dict(
992 mmth_curepeattx_rec[
"repeating_parameters"] = get_valid_json({
993 "interval": SCHEDULE_INTERVAL_MAPPING[curepeattx_interval]})
995 mmth_curepeattx_rec[
"user_id"] = user_id
997 mmth_curepeattx_rec[
"interval_count"] = get_strip_value_dict(
1003 mmth_curepeattx_rec[
"next_trigger_date"] = get_strip_value_dict(
1005 mmth_curepeattx_rec,
1009 mmth_curepeattx_rec[
"end_date"] = get_strip_value_dict(
1011 mmth_curepeattx_rec,
1015 curepeattx_startdate = get_strip_value_dict(
1017 mmth_curepeattx_rec,
1021 mmth_curepeattx_rec[
"start_date"] = curepeattx_startdate
1022 mmth_curepeattx_rec[
"create_date"] = curepeattx_startdate
1023 mmth_curepeattx_rec[
"approved_date"] = curepeattx_startdate
1024 mmth_curepeattx_rec[
"last_edit_date"] = curepeattx_startdate
1026 mmth_curepeattx_rec[
"approved_by"] = user_id
1027 mmth_curepeattx_rec[
"last_edit_by"] = user_id
1028 mmth_curepeattx_rec[
"approved_status"] = 10
1030 mmth_curepeattx_rec[
"failure_count"] = 0
1031 mmth_curepeattx_rec[
"feature_code"] =
"TRN" 1034 curepeattx_transactioncode = get_strip_value_dict(
1036 mmth_curepeattx_rec,
1040 curepeattx_fromsuffix = get_strip_value_dict(
1042 mmth_curepeattx_rec,
1045 curepeattx_tosuffix = get_strip_value_dict(
1047 mmth_curepeattx_rec,
1050 curepeattx_accountnumber = get_strip_value_dict(
1052 mmth_curepeattx_rec,
1056 curepeattx_tomember = get_strip_value_dict(
1058 mmth_curepeattx_rec,
1062 curepeattx_amount = get_strip_value_dict(
1064 mmth_curepeattx_rec,
1071 if curepeattx_transactioncode ==
"AT":
1074 elif curepeattx_transactioncode
in [
"LP",
"CP"]:
1077 if curepeattx_accountnumber != curepeattx_tomember:
1083 for loanbalance_rec
in loan_balance_coll:
1084 if curepeattx_tosuffix == loanbalance_rec[
"loannumber"]:
1085 if loanbalance_rec[
"cbtype"] ==
'18':
1103 _mmth_repeating_transfers_outliers.writerow(
1104 [(
"'{}' is not a valid loan/credit card account " 1105 "for member '{}'.").format(curepeattx_tosuffix,
1106 curepeattx_accountnumber),
1107 get_valid_json(mmth_curepeattx_rec_copy)])
1109 elif curepeattx_transactioncode
in [
"LA",
"CA"]:
1113 if from_type ==
"" or to_type ==
"":
1114 error_msg =
"cu_scheduledtxn from_type or to_type not populated" 1115 LOGGER.error(error_msg)
1120 if curepeattx_transactioncode ==
"LA":
1121 from_txn =
"{}|{}|{}".format(
1123 curepeattx_accountnumber,
1124 curepeattx_fromsuffix)
1126 from_txn =
"{}|{}|{}|{}".format(
1128 curepeattx_accountnumber,
1129 curepeattx_fromsuffix,
1134 if curepeattx_transactioncode
in [
"LP",
"CP"]:
1135 to_txn =
"{}|{}|{}".format(
1137 curepeattx_tomember,
1138 curepeattx_tosuffix)
1140 to_txn =
"{}|{}|{}|{}".format(
1142 curepeattx_tomember,
1143 curepeattx_tosuffix,
1150 if curepeattx_accountnumber != curepeattx_tomember:
1151 if curepeattx_transactioncode ==
'AT':
1152 curepeattx_transactioncode =
'XA' 1153 elif curepeattx_transactioncode ==
'LP':
1154 curepeattx_transactioncode =
'XP' 1156 cuscheduled_txndata = {}
1157 cuscheduled_txndata[
"txn"] = {
1159 "frommember": curepeattx_accountnumber,
1160 "fromsuffix": curepeattx_fromsuffix,
1161 "fromtype": from_type,
1163 "tomember": curepeattx_tomember,
1164 "tosuffix": curepeattx_tosuffix,
1166 "amount": curepeattx_amount,
1167 "transactioncode": curepeattx_transactioncode,
1168 "deposittype": from_type,
1171 mmth_curepeattx_rec[
"txn_data"] = get_valid_json(cuscheduled_txndata)
1174 members_batch_transaction(
1177 collection=curepeattx_mammoth_records
1182 cuauditusers_coll = member_data[
"cuauditusers"]
1183 for cuaudituser_rec
in cuauditusers_coll:
1184 mmth_cu = get_strip_value_dict(
1185 "cu", cuaudituser_rec, pop=
True)
1186 assert mmth_cu == _cu
1187 del cuaudituser_rec[
"admuser"]
1188 cuaudituser_rec[
"user_id"] = user_id
1189 cuaudituser_rec[
"auditdate"] = get_strip_value_dict(
1190 "chdate", cuaudituser_rec, pop=
True)
1191 cuaudituser_rec[
"auditaction"] = get_strip_value_dict(
1192 "action", cuaudituser_rec, pop=
True)
1193 cuaudituser_rec[
"auditsrctype"] =
"U" 1194 cuaudituser_rec[
"auditsrcuser_name"] = get_strip_value_dict(
1195 "user_name", cuaudituser_rec, pop=
True)
1196 cuaudituser_rec[
"auditsrcemail"] = get_strip_value_dict(
1197 "email", cuaudituser_rec, pop=
True, default=
"a@b.cde")
1198 cuaudituser_rec[
"auditsrcip"] =
"127.0.0.1" 1199 cuaudituser_rec[
"accountnumber"] = mmth_uname_also_ody_mem_accnum
1203 if cuaudituser_rec[
"auditaction"]
in AUDITUSER_DESCMAP:
1204 val = cuaudituser_rec[
"auditaction"]
1205 cuaudituser_rec[
"auditfulldesc"] = AUDITUSER_DESCMAP[val]
1208 members_batch_transaction(
1210 "{}audituser".format(cu_lower),
1211 collection=cuauditusers_coll
1215 cuovermicr_coll = member_data[
"cuovermicr"]
1216 members_batch_transaction(
1219 collection=cuovermicr_coll
1224 """Get Odyssey user_name and member account number 1227 member: dictionary (with content <cuusers>.user_name and 1228 <cuusers>.user_alias from Mammoth) 1231 ody_uname (odyssey user_name): Mammoth's <cuusers>.user_alias if 1232 any, otherwise <cuusers>.user_name 1233 mmth_uname_also_ody_mem_accnum (odyssey member acc num): 1234 Mammoth's <cuusers>.user_name 1236 mmth_uname_also_ody_mem_accnum = member[
"user_name"]
1237 mmth_ualias = member[
"user_alias"]
1240 if hasattr(mmth_uname_also_ody_mem_accnum,
'strip'):
1241 mmth_uname_also_ody_mem_accnum =\
1242 mmth_uname_also_ody_mem_accnum.strip()
1244 if hasattr(mmth_ualias,
'strip'):
1245 mmth_ualias = mmth_ualias.strip()
1248 if mmth_ualias
is not None and mmth_ualias !=
"":
1249 assert (mmth_uname_also_ody_mem_accnum
is 1251 mmth_uname_also_ody_mem_accnum !=
1252 ""), (
"Mammoth 'user_name' is empty!")
1253 ody_uname = mmth_ualias
1256 assert (mmth_uname_also_ody_mem_accnum
is 1258 mmth_uname_also_ody_mem_accnum !=
1259 ""), (
"Both 'user_alias' and" 1260 "'user_name' from Mammoth empty")
1261 ody_uname = mmth_uname_also_ody_mem_accnum
1262 return ody_uname, mmth_uname_also_ody_mem_accnum
1267 """Stream download the http response content to a local file 1270 SystemExit: if IOError, NameError or PermissionError is caught 1272 LOGGER.info(
"[gethistfile]: Downloading (stream download, decode gzip and " 1273 "deflate transfer-encodings) zipped sql file - `{}`" 1274 .format(_target_path))
1283 with open(_target_path,
"wb")
as f:
1284 for chunk
in _resp.iter_content(chunk_size=8192 * 2):
1287 LOGGER.info(
"[gethistfile]: Download complete.")
1290 @pg_crsr_hndlr_decrtr
1293 """Loads the COPY FROM command including file and execute a bulk 1294 insertion command to populate <cu>accounthistory and <cu>loanhistory 1298 _target_gzip: gzip file that contains COPY FROM script and csv 1299 formatted content from Mammoth for <cu>accounthistory 1300 and <cu>loanhistory tables 1303 SystemExit: if IOError, NameError or PermissionError is caught 1304 psycopg2.Error, psycopg2.Warning on db operation errors 1306 LOGGER.info(
"[gethistfile]: Loading local memhist files and " 1307 "preparing SQL execution!")
1309 accounthistory_sql =
"COPY {}accounthistory".format(_cu.lower())
1310 loanhistory_sql =
"COPY {}loanhistory".format(_cu.lower())
1312 accounthistory_complete_sql =
"" 1313 loanhistory_complete_sql =
"" 1315 zip_dir = os.path.dirname(_target_gzip)
1316 accounthistory_file = os.path.join(
1317 zip_dir, MEMHIST_ACCOUNT_HISTORY_FILENAME)
1318 loanhistory_file = os.path.join(zip_dir, MEMHIST_LOAN_HISTORY_FILENAME)
1320 DECODE_OPTION =
"utf-8" 1322 write_account =
False 1325 with open(accounthistory_file,
"wb")
as f_a, \
1326 open(loanhistory_file,
"wb")
as f_l:
1327 with gzip.open(_target_gzip,
"rb")
as f:
1328 for line
in f.readlines():
1329 decoded_line = line.decode(DECODE_OPTION)
1331 if(accounthistory_sql
in decoded_line):
1332 write_account =
True 1334 accounthistory_complete_sql = decoded_line
1337 if(loanhistory_sql
in decoded_line):
1338 write_account =
False 1340 loanhistory_complete_sql = decoded_line
1354 with pg_handler.PGSession()
as conn:
1355 with conn.cursor()
as cur:
1359 with open(accounthistory_file,
"rb")
as f_a, \
1360 open(loanhistory_file,
"rb")
as f_l:
1362 memhist_transaction.cur.copy_expert(
1363 accounthistory_complete_sql, f_a)
1365 memhist_transaction.cur.copy_expert(
1366 loanhistory_complete_sql, f_l)
1369 memhist_transaction.commit()
1372 @pg_crsr_hndlr_decrtr
1375 """Move members history tables 1378 1. createhistfile: creates the gzip file including sql command on Mammoth 1379 2. gethistfile: downloads gzip file from Mammoth, saves as a local copy 1380 then execute the COPY command using the content from 1383 Migration involves reading the downloaded content, preparing the COPY FROM 1384 sql command for <cu>accounthistory and <cu>loanhistory tables and commiting 1388 _server: Mammoth endpoint server 1389 _verbose: verbose flag 1390 _cu: current credit union code 1391 _user: Mammoth monitor username 1392 _passwd: Mammoth monitor password 1395 SystemExit: if IOError, NameError or PermissionError is caught 1396 psycopg2.Error, psycopg2.Warning on db operation errors 1399 valid_history_gzip_import = MEMHIST_DOWNLOADED_FILE.format(_cu.lower())
1402 if os.path.exists(valid_history_gzip_import):
1403 LOGGER.warning(
"Local copy of SQL dump file exist! You might be " 1404 "loading outdated file! If you think the data might " 1405 " have been changed from last time, clean up and re-run" 1406 " to download the most recent data!")
1414 DATA_OPT_CREATE_HIST,
1418 params={
"restart":
"Y"}
1420 historyfile_create_request.run()
1421 hist_creation_resp = historyfile_create_request.response
1422 mmth_histfile_path = hist_creation_resp[
"file"]
1425 if historyfile_create_request.histstatus !=
"":
1426 LOGGER.info(
"[createhistfile] status: {}".format(
1427 historyfile_create_request.histstatus))
1429 LOGGER.info((
"[createhistfile] status: History file is being " 1430 "created on Mammoth location: `{}.gz`").format(
1431 mmth_histfile_path))
1433 mammoth_prep_done =
False 1434 hist_get_resp =
None 1436 while not mammoth_prep_done:
1437 LOGGER.info(
"Waiting for {} seconds for memhist file preparation " 1438 "on Mammoth side".format(HIST_CREATE_WAIT_TIME))
1441 time.sleep(HIST_CREATE_WAIT_TIME)
1452 historyfile_get_request.run()
1454 if historyfile_get_request.histstatus ==
"":
1455 LOGGER.info(
"[gethistfile] status: SQL dump file on Mammoth " 1456 "is ready for download.")
1457 hist_get_resp = historyfile_get_request.response
1458 mammoth_prep_done =
True 1460 LOGGER.info(
"[createhistfile] status: {}".format(
1461 historyfile_get_request.histstatus))
1463 assert hist_get_resp
is not None 1468 if os.path.exists(valid_history_gzip_import):
1473 error_msg =
"Something went wrong in {} phase".format(
1475 LOGGER.error(error_msg)
1476 raise SystemExit(error_msg)
1480 """Check invalid memberacct nums and exclude already imported members 1482 1. Identify non-integer member account numbers and exclude from the list 1483 2. Check locally on Odyssey what members have already been imported and 1484 exclude them from the import list 1487 cu_lower -- lower case CU code 1488 _mmth_members_list -- all members' list for this CU 1491 updated_uname_accnums_list -- list of to be imported member accounts 1492 non_integer_mem_ids -- invalid non integer member accounts for logging 1494 updated_uname_accnums_list = []
1495 LOGGER.info(
"Excluding already imported members from the import list...")
1500 non_integer_mem_ids = []
1502 with pg_handler.PGSession()
as conn:
1503 with conn.cursor()
as cur:
1509 for mem
in _mmth_members_list:
1512 ody_uname = un_accnums[0]
1513 memberacct_num = un_accnums[1]
1519 non_integer_mem_ids.append(un_accnums)
1522 cuuser_table =
"{}user".format(cu_lower)
1524 ody_cuusers_count = members_batch_transaction(
1527 return_only_count=
True,
1529 "user_name": ody_uname
1532 assert ody_cuusers_count <= 1
1534 if ody_cuusers_count == 1:
1539 updated_uname_accnums_list.append(un_accnums)
1541 return updated_uname_accnums_list, non_integer_mem_ids
1545 """Filter mammoth cuusers record on lastlogin and pktdate""" 1547 lastlogin_cuuser = cuuser_record_mmth[
1548 "lastlogin"].strip()
if hasattr(cuuser_record_mmth[
1549 "lastlogin"],
"strip")
else cuuser_record_mmth[
"lastlogin"]
1550 pktdate_cuuser = cuuser_record_mmth[
1551 "pktdate"].strip()
if hasattr(cuuser_record_mmth[
1552 "pktdate"],
"strip")
else cuuser_record_mmth[
"pktdate"]
1554 if lastlogin_cuuser
in [
None,
'']
and pktdate_cuuser
in [
None,
'']:
1561 @pg_crsr_hndlr_decrtr
1568 """Migrate memdata obtained from Mammoth to Odyssey. 1570 Migrate a batch of members at a time to reduce the Mammoth http endpoint 1574 _mmth_members_dict: members list dictionary of the 1575 response json dictionary from mammoth 1576 _server: Mammoth endpoint server 1577 _verbose: verbose flag 1578 _cu: current credit union code 1579 _uname: Mammoth monitor username 1580 _passwd: Mammoth monitor password 1583 psycopg2.Error, psycopg2.Warning on db operation errors 1586 cu_lower = _cu.lower()
1587 members_list = _mmth_members_dict[
"data"][
"list"]
1590 updated_uname_accnums_list, non_integer_mem_id_all =\
1597 orphaned_members_list = []
1599 forceremain_update_list = []
1602 never_loggedin_members_list = []
1605 total_members = len(updated_uname_accnums_list)
1608 total_batches = math.ceil(total_members / MEMBERS_PAGE_SIZE)
1610 LOGGER.info(
"Importing total of {} members' " 1611 "data over {} batch(es)...".format(
1612 total_members, total_batches))
1615 reference_file_exist_before_open = os.path.exists(
1616 USERID_CROSS_REF_CSV_FILE.format(cu_lower))
1619 mmth_orphaned_list_file_exist_before_open = os.path.exists(
1620 MAMMOTH_ORPHANED_MEMLIST_CSV_FILE.format(cu_lower))
1623 mmth_never_logged_in_list_file_exist_before_open = os.path.exists(
1624 MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE.format(cu_lower))
1627 mmth_schedxfer_outlier_exist_before_open = os.path.exists(
1628 SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE.format(cu_lower))
1631 cusurveymaster_surveyid_map = {}
1632 if os.path.exists(ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(cu_lower)):
1633 with open(ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(cu_lower),
1634 newline=
'')
as cusurveymap_csv:
1635 reader = csv.reader(cusurveymap_csv, delimiter=
',', quotechar=
'|')
1637 for [oldid, newid]
in reader:
1638 cusurveymaster_surveyid_map[oldid.strip()] = newid.strip()
1640 raise SystemExit(
"cusurveymaster.surveyid mappings `{}` not found!".format(
1641 ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(cu_lower)))
1645 with open(USERID_CROSS_REF_CSV_FILE.format(cu_lower),
1647 newline=
'')
as csvfile,\
1648 open(MAMMOTH_ORPHANED_MEMLIST_CSV_FILE.format(cu_lower),
1650 newline=
'')
as csvfile_orphaned,\
1651 open(MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE.format(cu_lower),
1653 newline=
'')
as csvfile_never_loggedin,\
1654 open(SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE.format(cu_lower),
1656 newline=
'')
as csvfile_outlier_schedxfer:
1658 _mmth_accnum_to_ody_userid_ref_csv_writer = csv.writer(
1659 csvfile, delimiter=
',',
1660 quoting=csv.QUOTE_MINIMAL
1662 _mmth_orphaned_memlist_csv_writer = csv.writer(
1666 quoting=csv.QUOTE_MINIMAL
1668 _mmth_never_loggedin_memlist_csv_writer = csv.writer(
1669 csvfile_never_loggedin,
1672 quoting=csv.QUOTE_MINIMAL
1674 _mmth_repeating_transfers_outliers = csv.writer(
1675 csvfile_outlier_schedxfer,
1678 quoting=csv.QUOTE_MINIMAL
1681 if not reference_file_exist_before_open:
1683 _mmth_accnum_to_ody_userid_ref_csv_writer.writerow(
1684 [
"old_user_id",
"new_user_id",
"new_member_id",
"old_member_id",
1685 "account_type",
"old_acctid",
"key",
"new_id"]
1688 if not mmth_orphaned_list_file_exist_before_open:
1690 _mmth_orphaned_memlist_csv_writer.writerow(
1691 [
"orphaned_member_account",
"user_alias",
'lastlogin',
'pktdate'])
1693 if not mmth_never_logged_in_list_file_exist_before_open:
1695 _mmth_never_loggedin_memlist_csv_writer.writerow(
1696 [
"never_loggedin_member_account",
1701 if not mmth_schedxfer_outlier_exist_before_open:
1703 _mmth_repeating_transfers_outliers.writerow(
1704 [
"description",
"original_mammoth_curepeattx_record"]
1710 with pg_handler.PGSession()
as conn:
1711 with conn.cursor()
as cur:
1714 select_cols_cuadmin = [
"flagset2"]
1715 cuadmin_where = {
"cu": _cu}
1716 cuadmin_rows = cu_admininfo_transaction(
1719 where_conditions=cuadmin_where,
1720 select_columns=select_cols_cuadmin
1722 if(len(cuadmin_rows) < 1):
1723 raise SystemExit(
"`cuadmin` record does not exist " 1724 "for this credit union: {}".format(cu_lower))
1726 flagset2 = cuadmin_rows[0][0]
1727 money_desktop_credit_card_type18_flag = CU2_SPEC18 & flagset2
1730 for batch_i
in range(total_batches):
1733 with pg_handler.PGSession()
as conn:
1734 with conn.cursor()
as cur:
1738 ody_uname_and_accnums_list = get_ith_batch(
1739 updated_uname_accnums_list,
1746 members_http_params = [un_accnum[1]
1748 ody_uname_and_accnums_list]
1750 if len(members_http_params) > 0:
1751 LOGGER.info(
"`{}` valid members are being imported " 1752 "in batch `{}`.".format(
1753 len(members_http_params),
1764 params={KEY_MEMBERS: get_valid_json(
1765 members_http_params)}
1767 this_batch_migration.run()
1768 this_batch_migration_data =\
1769 this_batch_migration.response[
"data"][
"account"]
1773 for ody_uname, mmth_uname_also_ody_mem_accnum
in\
1774 ody_uname_and_accnums_list:
1776 this_member_data = this_batch_migration_data[
1777 mmth_uname_also_ody_mem_accnum]
1780 accountbalance_records_count = len(
1781 this_member_data[
"accountbalance"])
1782 loanbalance_records_count = len(
1783 this_member_data[
"loanbalance"])
1784 cuuser_acc_record = this_member_data[
"cuusers"]
1785 assert type(cuuser_acc_record)
is dict
1786 mammoth_lastlogin = get_strip_value_dict(
1787 "lastlogin", cuuser_acc_record)
1789 mammoth_estmt_flag = get_strip_value_dict(
1790 "estmt_flag", cuuser_acc_record)
1792 mammoth_user_alias = get_strip_value_dict(
1793 "user_alias", cuuser_acc_record)
1796 if accountbalance_records_count == 0\
1797 and loanbalance_records_count == 0\
1798 and mammoth_estmt_flag !=
'Y':
1799 orphaned_members_list.append(
1800 mmth_uname_also_ody_mem_accnum)
1802 csv_record_orphan = [
1803 mmth_uname_also_ody_mem_accnum,
1806 cuuser_acc_record[
"pktdate"]
1810 _mmth_orphaned_memlist_csv_writer.writerow(
1818 never_loggedin_members_list.append(
1819 mmth_uname_also_ody_mem_accnum)
1821 _mmth_never_loggedin_memlist_csv_writer.writerow(
1822 [mmth_uname_also_ody_mem_accnum,
1825 cuuser_acc_record[
"pktdate"],
1826 mammoth_estmt_flag])
1834 members_batch_transaction,
1837 mmth_uname_also_ody_mem_accnum,
1840 _mmth_accnum_to_ody_userid_ref_csv_writer,
1841 _mmth_repeating_transfers_outliers,
1842 forceremain_update_list,
1843 cusurveymaster_surveyid_map,
1844 money_desktop_credit_card_type18_flag
1849 "No valid members to import in batch `{}`.".format(
1853 members_batch_transaction.commit()
1854 LOGGER.info(
"[PROGRESS] Members imported:"" {}/{} " 1855 "| Batch: {}/{}".format(
1856 min(MEMBERS_PAGE_SIZE * (batch_i + 1),
1865 if len(non_integer_mem_id_all) > 0:
1866 nimi_msg =
"`{}` non-integer members encountered: [{}]".format(
1867 len(non_integer_mem_id_all),
1868 ", ".join([i[1]
for i
in non_integer_mem_id_all]))
1869 LOGGER.warning(nimi_msg)
1872 if len(orphaned_members_list) > 0:
1873 LOGGER.warning(
"`{}` orphaned members were " 1874 "found. Following members were " 1875 "not imported: [{}]".format(
1876 len(orphaned_members_list),
1877 ", ".join(orphaned_members_list)))
1880 if len(forceremain_update_list) > 0:
1881 LOGGER.warning(
"`forceremain` was updated for `{}` " 1882 "members: [{}]".format(
1883 len(forceremain_update_list),
1884 ", ".join(forceremain_update_list)))
1887 if len(never_loggedin_members_list) > 0:
1888 LOGGER.warning(
"`{}` members that never logged in to Mammoth were " 1889 "found. Following members were not " 1890 "imported : [{}]".format(
1891 len(never_loggedin_members_list),
1892 ", ".join(never_loggedin_members_list)))
1899 non_existent_switch_accounts):
1900 """Populate switch accounts in useraccounts and memberacctrights tables 1902 Add additional records in useraccounts and memberacctrights tables 1903 for switch accounts. 1906 switch_acc_transaction: instance of PgTransaction 1907 cu_lower: lowercase CU code 1908 switch_acc_from: member account number for which we want 1909 to add switch account 1910 switch_acc_data: list of culivetx data for member accounts 1911 which we want to add as switch accounts 1912 non_existent_switch_accounts: list of invalid tomember switch accounts 1915 switch_acc_from = switch_acc_from.strip()
1916 from_accnum_where_conditions = {
1917 "accountnumber": switch_acc_from,
1919 select_columns = [
"primary_user"]
1922 user_from = switch_acc_transaction(
1924 "{}memberacct".format(cu_lower),
1925 where_conditions=from_accnum_where_conditions,
1926 select_columns=select_columns
1932 if len(user_from) == 0:
1933 invalid_switch_account_from = {
1934 "Invalid Switch Account From": switch_acc_from,
1935 "Switch Account To": [acc_to[
"tomember"].strip()
1936 for acc_to
in switch_acc_data]
1938 LOGGER.warning(
"Cannot switch from member account `{}`. It is either" 1940 " or the member has never been used in" 1941 " online banking in Mammoth." 1942 .format(switch_acc_from) + \
1943 invalid_switch_account_from.__str__())
1944 non_existent_switch_accounts.append(invalid_switch_account_from)
1949 user_from = user_from[0][0]
1951 for switch_acc_to
in switch_acc_data:
1952 to_member = switch_acc_to[
"tomember"].strip()
1953 to_accnum_where_conditions = {
1954 "accountnumber": to_member,
1957 user_to = switch_acc_transaction(
1959 "{}memberacct".format(cu_lower),
1960 where_conditions=to_accnum_where_conditions,
1961 select_columns=select_columns
1967 if len(user_to) == 0:
1968 invalid_switch_account_to = {
1969 "Switch Account From": switch_acc_from,
1970 "Invalid Switch Account To": to_member
1973 LOGGER.warning(
"Cannot switch to member account `{}`. It is either" 1975 " or the member has never been used in" 1976 " online banking in Mammoth." 1977 .format(to_member) + invalid_switch_account_to.__str__())
1979 non_existent_switch_accounts.append(invalid_switch_account_to)
1984 user_to = user_to[0][0]
1991 user_accounts_records = switch_acc_transaction(
1993 "{}useraccounts".format(cu_lower),
1994 where_conditions={
'user_id': user_to,
1995 'accountnumber': to_member}
1998 new_user_accounts_records = []
1999 for user_accounts_record
in user_accounts_records:
2000 user_accounts_record = dict(user_accounts_record)
2001 user_accounts_record[
"user_id"] = user_from
2002 new_user_accounts_records.append(user_accounts_record)
2004 switch_acc_transaction(
2006 "{}useraccounts".format(cu_lower),
2007 collection=new_user_accounts_records
2010 memberacctrights_records = switch_acc_transaction(
2012 "{}memberacctrights".format(cu_lower),
2015 'accountnumber': to_member
2019 new_memberacctrights_records = []
2020 for memberacctrights_record
in memberacctrights_records:
2021 memberacctrights_record = dict(memberacctrights_record)
2022 memberacctrights_record[
"user_id"] = user_from
2023 new_memberacctrights_records.append(memberacctrights_record)
2025 switch_acc_transaction(
2027 "{}memberacctrights".format(cu_lower),
2028 collection=new_memberacctrights_records
2032 @pg_crsr_hndlr_decrtr
2039 """Import all switch accounts for a Credit Union 2041 This operation is intended to be imported after all the member data 2042 is already migrated. 2045 _mmth_switch_accts_dict: Mammoth response data 2047 _server: Mammoth endpoint server 2048 _uname: Mammoth monitor username 2049 _passwd: Mammoth monitor password 2050 _verbose: level of verbosity 2053 cu_lower = _cu.lower()
2054 switch_acc_members_list = _mmth_switch_accts_dict[
"members"]
2056 total_members = len(switch_acc_members_list)
2058 total_batches = math.ceil(total_members / SWITCH_ACC_BATCH_SIZE)
2060 LOGGER.info(
"Importing total of {} members' switch account data!".format(
2062 non_existent_switch_accounts = []
2064 for batch_i
in range(total_batches):
2067 with pg_handler.PGSession()
as conn:
2068 with conn.cursor(cursor_factory=DictCursor)
as cur:
2074 batch_switch_acc_members = get_ith_batch(
2075 switch_acc_members_list,
2077 SWITCH_ACC_BATCH_SIZE
2083 DATA_OPT_SWITCH_ACCOUNTS,
2087 params={KEY_MEMBERS: get_valid_json(
2088 batch_switch_acc_members)}
2091 this_batch_migration.run()
2092 this_batch_migration_data = this_batch_migration.response
2094 for switch_acc_member, switch_acc_data
in \
2095 this_batch_migration_data[
"culivetx"].items():
2097 switch_acc_transaction,
2101 non_existent_switch_accounts
2105 switch_acc_transaction.commit()
2107 LOGGER.info(
"[PROGRESS] Switch accounts imported:" 2108 " {}/{} | Batch: {}/{}".format(
2109 min(SWITCH_ACC_BATCH_SIZE * (batch_i + 1),
2117 if len(non_existent_switch_accounts) > 0:
2118 unused_memberaccts_in_mmth_msg = (
"These `{}` member accounts out " 2119 "of `{}` total switch accounts are " 2120 "either locked or the members have " 2121 "never been used online banking in " 2122 "Mammoth. Please let the Credit " 2123 "Union know about it! ")
2125 LOGGER.warning(get_valid_json({unused_memberaccts_in_mmth_msg.format(
2126 len(non_existent_switch_accounts), total_members):
2127 non_existent_switch_accounts}))
2131 @pg_crsr_hndlr_decrtr
2133 """Cleanup memdata content 2135 If kwargs.get('commit_later') flag is True, we do not commit the 2136 transaction here (and only cleanup database tables, not the files), 2137 meaning that the the cleanup may have been executed before actual 2138 migration. If commit_later flag is False, we commit the transaction 2139 here and proceed with the file cleanup. 2142 del_memdata_trnscn: instance of PgTransaction to which this 2143 cleanup process is part of 2145 commit_later: flag to commit transaction later 2147 accountnumber: if we want to filter delete 2148 based on account number 2151 SystemExit: if IOError, NameError or PermissionError is caught 2152 psycopg2.Error, psycopg2.Warning on db operation errors 2154 commit_later = kwargs.get(
"commit_later",
False)
2155 accountnumber = kwargs.get(
"accountnumber",
None)
2156 cu_lower = _cu.lower()
2159 tbls_condn_cu_only = [
"cuadmeco",
"cu_scheduledtxn",
"cuovermicr"]
2168 tbls_condn_cu_accnum = [
2177 tbls_condn_accnum_only = [
2178 "{}accountbalance".format(cu_lower),
2179 "{}loanbalance".format(cu_lower),
2180 "{}useraccounts".format(cu_lower),
2181 "{}crossaccounts".format(cu_lower),
2182 "{}holds".format(cu_lower)
2188 "{}audituser".format(cu_lower),
2189 "{}userlogins".format(cu_lower),
2190 "{}memberacct".format(cu_lower),
2191 "{}memberacctrights".format(cu_lower),
2192 "{}group".format(cu_lower),
2193 "{}usercontact".format(cu_lower),
2194 "{}transdtl".format(cu_lower),
2195 "{}transhdr".format(cu_lower),
2196 "{}user".format(cu_lower)
2198 cu_and_accnum_where = {
2204 if accountnumber
is not None:
2205 assert type(accountnumber) == str
2206 cu_and_accnum_where[
"accountnumber"] = accountnumber
2207 accnum_where[
"accountnumber"] = accountnumber
2209 for tbl
in tbls_condn_cu_only:
2213 where_conditions=cu_where
2216 for tbl
in tbls_condn_cu_accnum:
2220 where_conditions=cu_and_accnum_where
2223 for tbl
in tbls_condn_accnum_only:
2227 where_conditions=accnum_where
2230 for tbl
in tbls_del_all:
2236 LOGGER.info(
"Cleaning up LOANAPPS records.")
2238 cleanup_loanapps(del_memdata_trnscn, _cu, commit_later=commit_later)
2240 if not commit_later:
2242 if os.path.exists(USERID_CROSS_REF_CSV_FILE.format(cu_lower)):
2243 os.remove(USERID_CROSS_REF_CSV_FILE.format(cu_lower))
2246 if os.path.exists(MAMMOTH_ORPHANED_MEMLIST_CSV_FILE.format(cu_lower)):
2247 os.remove(MAMMOTH_ORPHANED_MEMLIST_CSV_FILE.format(cu_lower))
2250 if os.path.exists(MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE.format(cu_lower)):
2251 os.remove(MAMMOTH_NEVER_LOGGEDIN_MEMLIST_CSV_FILE.format(cu_lower))
2254 if os.path.exists(SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE.format(cu_lower)):
2255 os.remove(SCHEDULED_TRANSFER_OUTLIERS_CSV_FILE.format(cu_lower))
2258 del_memdata_trnscn.commit()
2267 @pg_crsr_hndlr_decrtr
2269 """Cleanup memhist content 2271 If kwargs.get('commit_later') flag is True, we do not commit the 2272 transaction here (and only cleanup database tables, not the files), 2273 meaning that the the cleanup may have been executed before actual 2274 migration. If commit_later flag is False, we commit the transaction 2275 here and proceed with the file cleanup. 2278 del_memhist_transaction: instance of PgTransaction to which this 2279 cleanup process is part of 2281 commit_later: flag to commit transaction later 2283 accountnumber: if we want to filter delete 2284 based on account number 2287 SystemExit: if IOError, NameError or PermissionError is caught 2288 psycopg2.Error, psycopg2.Warning on db operation errors 2290 commit_later = kwargs.get(
"commit_later",
False)
2291 accountnumber = kwargs.get(
"accountnumber",
None)
2294 if accountnumber
is not None:
2295 assert type(accountnumber) == str
2296 accnum_where[
"accountnumber"] = accountnumber
2298 hist_tbls_to_clean = [
"{}{}".format(
2299 _cu.lower(), tbl_suffx)
for 2300 tbl_suffx
in [
"accounthistory",
"loanhistory"]]
2302 for tbl
in hist_tbls_to_clean:
2303 del_memhist_transaction(
2306 where_conditions=accnum_where
2309 if not commit_later:
2310 memhist_file = MEMHIST_DOWNLOADED_FILE.format(_cu.lower())
2311 memhist_dir = os.path.dirname(memhist_file)
2313 accounthistory_file = os.path.join(
2314 memhist_dir, MEMHIST_ACCOUNT_HISTORY_FILENAME)
2316 loanhistory_file = os.path.join(
2317 memhist_dir, MEMHIST_LOAN_HISTORY_FILENAME)
2320 for memhist_f
in [memhist_file, accounthistory_file, loanhistory_file]:
2321 if os.path.exists(memhist_f):
2322 os.remove(memhist_f)
2325 del_memhist_transaction.commit()
2326 del_memhist_transaction(
2333 MEM_HIST_DATA_CONFIG = {
2335 ACTION_OPT_CLEAN: cleanup_memdata,
2336 ACTION_OPT_MIGRATE: move_members_data,
2337 "description":
"Members Data",
2340 ACTION_OPT_CLEAN: cleanup_memhist,
2341 ACTION_OPT_MIGRATE: move_members_hist,
2342 "description":
"Members History",
2344 DATA_OPT_SWITCH_ACCOUNTS: {
2345 ACTION_OPT_MIGRATE: move_switch_accounts,
2346 "description":
"Members Switch Account",
2351 @pg_crsr_hndlr_decrtr
2360 """Entry point to the memdata AND history migration. 2363 cu: current credit union code 2364 server: Mammoth endpoint server 2365 action: one of the migration options to be 2366 performed on settings data category 2367 user: Mammoth monitor username 2368 passwd: Mammoth monitor password 2369 data_ctgry: memdata or memhist 2370 verbose: level of verbosity 2371 reset: flag to cleanup tables before migration 2374 psycopg2.Error, psycopg2.Warning on db operation errors 2377 with pg_handler.PGSession()
as conn:
2378 with conn.cursor()
as cur:
2382 do_not_exist_list) = tables_exist_transaction(
2388 if not tables_exist_flag:
2389 error_msg = (
"Stopping migration. Following table" 2390 " schemas do not exist: {}").format(
2391 get_valid_json(
", ".join(do_not_exist_list)))
2392 LOGGER.error(error_msg)
2393 raise SystemExit(error_msg)
2396 "All required target tables exist! Continuing migration..")
2399 if action == ACTION_OPT_MIGRATE:
2401 with pg_handler.PGSession()
as conn:
2402 with conn.cursor()
as cur:
2404 MEM_HIST_DATA_CONFIG[data_ctgry][ACTION_OPT_CLEAN](
2405 delete_transaction, cu)
2408 with pg_handler.PGSession()
as conn:
2409 with conn.cursor()
as cur:
2411 summary_transaction(
2419 if data_ctgry == DATA_OPT_MEMDATA:
2428 members_list_migration.run()
2430 all_members_list = members_list_migration.response
2433 MEM_HIST_DATA_CONFIG[data_ctgry][action](
2442 elif data_ctgry == DATA_OPT_SWITCH_ACCOUNTS:
2445 DATA_OPT_SWITCH_ACCT_MEMBERS,
2451 switch_acc_members_migration.run()
2453 switch_acc_members_list = switch_acc_members_migration.response
2456 MEM_HIST_DATA_CONFIG[data_ctgry][action](
2457 switch_acc_members_list,
2466 elif data_ctgry == DATA_OPT_MEMHIST:
2467 MEM_HIST_DATA_CONFIG[data_ctgry][action](
2476 with pg_handler.PGSession()
as conn:
2477 with conn.cursor()
as cur:
2479 summary_transaction(
2486 LOGGER.info(
"{} migration completed!" 2487 .format(MEM_HIST_DATA_CONFIG[data_ctgry][
"description"]))
2490 elif action == ACTION_OPT_CLEAN:
2491 if data_ctgry == DATA_OPT_SWITCH_ACCOUNTS:
2492 LOGGER.error(
"`{}` is not available for `{}`".format(
2493 ACTION_OPT_CLEAN, DATA_OPT_SWITCH_ACCOUNTS))
2495 with pg_handler.PGSession()
as conn:
2496 with conn.cursor()
as cur:
2498 MEM_HIST_DATA_CONFIG[data_ctgry][
2499 action](delete_transaction, cu)
2501 LOGGER.info(
"{} clean up completed!".format(
2502 MEM_HIST_DATA_CONFIG[data_ctgry][
"description"]))
2505 elif action == ACTION_OPT_SUMMARY:
2506 with pg_handler.PGSession()
as conn:
2507 with conn.cursor()
as cur:
2509 summary_transaction(
def copy_http_file_stream(_resp, _target_path, _cu)
def migrate_members(cu, server, action, user, passwd, verbose, data_ctgry, reset)
def cleanup_memdata(del_memdata_trnscn, _cu, **kwargs)
def move_members_hist(_server, _verbose, _cu, _user, _passwd)
def migrate_individual_memdata(members_batch_transaction, member_data, ody_uname, mmth_uname_also_ody_mem_accnum, _verbose, _cu, _mmth_accnum_to_ody_userid_ref_csv_writer, _mmth_repeating_transfers_outliers, forceremain_update_list, cusurveymaster_surveyid_map, money_desktop_credit_card_type18_flag)
def move_members_data(_mmth_members_dict, _server, _verbose, _cu, _uname, _passwd)
def prepare_money_desktop_xref(_cu, user_id, mmth_uname_also_ody_mem_accnum, _mmth_accnum_to_ody_userid_ref_csv_writer, mdx_record, acc_or_loan, mdx_cc_type18_flag=False)
def handle_overloaded_accounts(from_or_to)
def move_switch_accounts(_mmth_switch_accts_dict, _server, _verbose, _cu, _uname, _passwd)
def get_ody_user_ids(member)
def cualerts_handler(members_batch_transaction, _verbose, _cualert_check_records, _cualert_bal_records, _cualert_loan_records, _cualert_trans_records, user_id)
def cuuser_main_handler(members_batch_transaction, ody_uname, mmth_uname_also_ody_mem_accnum, verbose, cuuser_acc_record, cuquestselect_records, forceremain_update_list, _cu)
def skip_never_loggedin_cuuser(cuuser_record_mmth)
def populate_switch_accounts(switch_acc_transaction, cu_lower, switch_acc_from, switch_acc_data, non_existent_switch_accounts)
def execute_sql_script_from_file(_target_gzip, _cu)
def get_valid_to_be_imported_members(cu_lower, _mmth_members_list)
def cleanup_memhist(del_memhist_transaction, _cu, **kwargs)
def cu_alert_table_transform(user_id, alrt_type_code, alrt_record)