Odyssey
ody_migr_loans.py
1 #!/usr/bin/env python
2 """Main module to migrate loanapps data."""
3 
4 
5 import logging
6 import csv
7 import os
8 import json
9 # import only necessary variables for settings
10 from ody_migr_config import (INSERT,
11  INSERT_ONE,
12  SELECT_RETURN,
13  DELETE,
14  SUMMARY,
15  TABLE_EXISTS,
16  DATA_OPT_LOANAPP,
17  ACTION_OPT_CLEAN,
18  ACTION_OPT_MIGRATE,
19  ACTION_OPT_SUMMARY,
20  LOANAPP_SCHEMAMASTER_LOANID_MAP_CSV_FILE,
21  LOANAPP_USER_USERID_MAP_CSV_FILE)
22 
23 from ody_migr_transaction import PgTransaction
24 from ody_migr_transaction import pg_crsr_hndlr_decrtr
25 import ody_migr_db_handler as pg_handler
26 from ody_migr_mmth_endpoint import MammothMigration
27 from ody_migr_utils import (get_valid_json,
28  get_strip_value_dict,
29  file_exc_decorator,
30  log_progress)
31 
32 LOGGER = logging.getLogger(__name__)
33 
34 
35 @pg_crsr_hndlr_decrtr
36 def move_loanapps(loans_transaction,
37  mmth_loanapp_dict,
38  verbose,
39  cucode):
40  """Migrate loanapps data in a single transaction
41 
42  Args:
43  loans_transaction: transaction to commit all insertions for loanapps
44  mmth_loanapp_dict: response dictionary from Mammoth for loanapps
45  verbose: verbosity flag
46  cucode: current credit union code
47 
48  Raises:
49  psycopg2.Error, psycopg2.Warning on db operation errors
50  """
51  mmth_loanapp_data = mmth_loanapp_dict["data"]
52  progress_dict = {"completed": 0, "total": 5}
53 
54  # 1. Populate lnappschemamaster table (also maintain mapping of
55  # old and new loanid)
56  mmth_lnappschemamaster_records = mmth_loanapp_data["lnappschemamaster"]
57  # store old and new loanid mapping
58  lnappschemamaster_loanid_map = {}
59 
60  # also write old loanid to new loanid mapping to a csv file for bookkeeping
61  with open(LOANAPP_SCHEMAMASTER_LOANID_MAP_CSV_FILE.format(cucode.lower()),
62  'w', newline='') as loanidmap_csvfile:
63  loanid_columns = ["mammoth_lnappschemamaster_loanid",
64  "odyssey_lnappschemamaster_loanid"]
65  csvwriter_loanid = csv.writer(loanidmap_csvfile, delimiter=",",
66  quotechar='|',quoting=csv.QUOTE_MINIMAL)
67  csvwriter_loanid.writerow(loanid_columns)
68 
69  for lnappschemamaster_rec in mmth_lnappschemamaster_records:
70  old_loanid = lnappschemamaster_rec["loanid"].strip()
71  # allow creating new loanid
72  del lnappschemamaster_rec["loanid"]
73  # insert and get back the new loanid
74  loans_transaction(
75  INSERT_ONE,
76  "lnappschemamaster",
77  list(lnappschemamaster_rec.keys()),
78  list(lnappschemamaster_rec.values()),
79  returning_col="loanid"
80  )
81  new_loanid = loans_transaction.cur.fetchone()[0]
82  # maintain mapping of old loanid to new loanid
83  lnappschemamaster_loanid_map[old_loanid] = new_loanid
84  # write to a csv file
85  csvwriter_loanid.writerow([old_loanid, new_loanid])
86 
87  progress_dict["completed"] += 1
88  log_progress(progress_dict)
89 
90  # 2. Populate lnappschemadetail table
91  lnappschemadetail_detailid_map = {}
92  mmth_lnappschemadetail_records = mmth_loanapp_data["lnappschemadetail"]
93  for lnappschemadetail_rec in mmth_lnappschemadetail_records:
94  old_detailid = lnappschemadetail_rec["detailid"].strip()
95  # allow creating new detailid
96  del lnappschemadetail_rec["detailid"]
97 
98  lnappschemadetail_rec["loanid"] = lnappschemamaster_loanid_map[
99  lnappschemadetail_rec["loanid"].strip()]
100 
101  # insert and get back the new detailid
102  loans_transaction(
103  INSERT_ONE,
104  "lnappschemadetail",
105  list(lnappschemadetail_rec.keys()),
106  list(lnappschemadetail_rec.values()),
107  returning_col="detailid"
108  )
109  new_detailid = loans_transaction.cur.fetchone()[0]
110  # maintain mapping of old detaild to new detailid to be used
111  # for lnappuserresponse.respapplication formfield mapping
112  lnappschemadetail_detailid_map[old_detailid] = new_detailid
113 
114  progress_dict["completed"] += 1
115  log_progress(progress_dict)
116 
117  # 3. Populate lnappuser table
118  mmth_lnappuser_records = mmth_loanapp_data["lnappuser"]
119  # store old and new lnappuser userid mapping
120  lnappuser_userid_map = {}
121  mmth_userids_with_no_ody_banking_user_id = []
122 
123  # also write old userid to new userid mapping to a csv file for bookkeeping
124  with open(LOANAPP_USER_USERID_MAP_CSV_FILE.format(cucode.lower()),
125  'w', newline='') as useridmap_csvfile:
126 
127  userid_columns = ["mammoth_lnappuser_userid",
128  "odyssey_lnappuser_userid"]
129  csvwriter_userid = csv.writer(useridmap_csvfile, delimiter=",",
130  quotechar='|',quoting=csv.QUOTE_MINIMAL)
131  csvwriter_userid.writerow(userid_columns)
132 
133  for lnappuser_rec in mmth_lnappuser_records:
134  old_userid = get_strip_value_dict("userid", lnappuser_rec)
135  # allow creating new userid
136  del lnappuser_rec["userid"]
137 
138  # drop user_name, and populate session_account
139  mmth_banking_user_name = lnappuser_rec["user_name"]
140  del lnappuser_rec["user_name"]
141  lnappuser_rec["session_account"] = mmth_banking_user_name
142 
143  # Populate banking_user_id if associated Mammoth lnappuser.user_name
144  # has corresponding Odyssey userid (<cu>memberacct.primary_user)
145  # If not, log the list of unassociated Mammoth loanappuser.userid
146  if mmth_banking_user_name not in [None, ""]:
147  ody_banking_user_id = loans_transaction(
148  SELECT_RETURN,
149  "{}memberacct".format(cucode.lower()),
150  where_conditions={"accountnumber": mmth_banking_user_name},
151  select_columns=["primary_user"]
152  )
153  if(len(ody_banking_user_id) > 0):
154  # Eg. [(xxxx,)]
155  lnappuser_rec["banking_user_id"] = ody_banking_user_id[0][0]
156  else:
157  mmth_userids_with_no_ody_banking_user_id.append(old_userid)
158 
159  # insert loan app user one by one
160  loans_transaction(
161  INSERT_ONE,
162  "lnappuser",
163  list(lnappuser_rec.keys()),
164  list(lnappuser_rec.values()),
165  returning_col="userid"
166  )
167  new_userid = loans_transaction.cur.fetchone()[0]
168  # maintain mapping of old userid to new userid
169  lnappuser_userid_map[old_userid] = new_userid
170  # write to csv file
171  csvwriter_userid.writerow([old_userid, new_userid])
172 
173  if len(mmth_userids_with_no_ody_banking_user_id) > 0:
174  LOGGER.warning("`{}` Loanapps user(s) (lnappuser.userids) in Mammoth"
175  " were found to have no associated Odyssey userid"
176  " (<cu>memberacct.primary_user)."
177  " Developers can refer to the mapping file to look"
178  " at the Odyssey lnappuser.userids if needed."
179  " Mammoth lnappuser.userids : [{}]".format(
180  len(mmth_userids_with_no_ody_banking_user_id),
181  ", ".join(mmth_userids_with_no_ody_banking_user_id)))
182 
183  progress_dict["completed"] += 1
184  log_progress(progress_dict)
185 
186  # 4. Populate lnappuser_questselect table
187  mmth_lnappuser_questselect_records = mmth_loanapp_data["lnappuser_questselect"]
188  for lnappuser_questselect_rec in mmth_lnappuser_questselect_records:
189  lnappuser_questselect_rec["userid"] = lnappuser_userid_map[lnappuser_questselect_rec["userid"].strip()]
190 
191  loans_transaction(
192  INSERT,
193  "lnappuser_questselect",
194  collection=mmth_lnappuser_questselect_records
195  )
196 
197  progress_dict["completed"] += 1
198  log_progress(progress_dict)
199 
200  # 5. Populate lnappuserresponse table
201  mmth_lnappuserresponse_records = mmth_loanapp_data["lnappuserresponse"]
202 
203  invalid_responses_indices = []
204 
205  for ind, lnappuserresponse_rec in enumerate(mmth_lnappuserresponse_records):
206  # allow creating new respid
207  del lnappuserresponse_rec["respid"]
208 
209  # look for instances where userid's in lnappuserresponse
210  # table have no associated userid's in lnappuser table
211  # if found any, mark for deletion and continue
212  if lnappuserresponse_rec["userid"].strip() not in lnappuser_userid_map:
213  invalid_responses_indices.append(ind)
214  continue
215 
216  # update userid and loanid with the ones created newly on Odyssey
217  lnappuserresponse_rec["userid"] = lnappuser_userid_map[lnappuserresponse_rec["userid"].strip()]
218  lnappuserresponse_rec["loanid"] = lnappschemamaster_loanid_map[lnappuserresponse_rec["loanid"].strip()]
219 
220  # exists in www{3,6}, does not exist in www4 and Odyssey
221  if "resploandesc" in lnappuserresponse_rec:
222  del lnappuserresponse_rec["resploandesc"]
223 
224  # respapplication field is json_encoded text; load as a dictionary
225  respapplication = json.loads(lnappuserresponse_rec["respapplication"])
226 
227  # update old detailid with new detailid in
228  # formfield keys in respapplication column
229  # update only the content in the json object format; must include
230  # the keys that is in the form 'formfield_DETAILID'
231  if type(respapplication) == dict:
232  updated_respapplication = {}
233  for k_resp_formfield in list(respapplication.keys()):
234  assert "formfield" in k_resp_formfield
235  ffield, resp_old_detailid = k_resp_formfield.strip().split("_")
236  # prepare a new formfield key using the new detailid
237  new_k_resp_formfield = "{}_{}".format(
238  ffield,
239  lnappschemadetail_detailid_map[resp_old_detailid])
240  updated_respapplication[new_k_resp_formfield] = respapplication[k_resp_formfield]
241  # update the respapplication content with updated detailid in the formfield
242  lnappuserresponse_rec["respapplication"] = get_valid_json(updated_respapplication)
243 
244  # remove lnappuserresponse records
245  [mmth_lnappuserresponse_records.pop(i) for i in sorted(invalid_responses_indices, reverse=True)]
246 
247  loans_transaction(
248  INSERT,
249  "lnappuserresponse",
250  collection=mmth_lnappuserresponse_records
251  )
252 
253  progress_dict["completed"] += 1
254  log_progress(progress_dict)
255 
256 
257 @file_exc_decorator
258 @pg_crsr_hndlr_decrtr
259 def cleanup_loanapps(del_loans_transaction, cucode, **kwargs):
260  """Cleanup loanapps content
261 
262  If kwargs.get('commit_later') flag is True, we do not commit the
263  transaction here (and only cleanup database tables, not the branding
264  files), meaning that the the cleanup may have been executed before actual
265  migration and we do not want to commit the database delete just yet.
266  If commit_later flag is False, we commit the transaction here and proceed
267  with the file cleanup.
268 
269  Args:
270  del_loans_transaction: instance of PgTransaction to which this
271  cleanup process is part of
272  cucode: current credit union code
273  **kwargs: Optional parameters:
274  commit_later: flag to commit transaction later
275  or now
276  accountnumber: if we want to filter delete
277  based on account number
278 
279  Raises:
280  SystemExit: if IOError, NameError or PermissionError is caught
281  psycopg2.Error, psycopg2.Warning on db operation errors
282  """
283  commit_later = kwargs.get("commit_later", False)
284  where_condition_cu = {"cu": cucode}
285 
286  # obtain list of associated loanids for cucode
287  lnappschemamaster_loanids = del_loans_transaction(
288  SELECT_RETURN,
289  "lnappschemamaster",
290  select_columns=["loanid"],
291  where_conditions=where_condition_cu
292  )
293 
294  # obtain list of associated userids for loanapp users for cucode
295  lnappuser_userids = del_loans_transaction(
296  SELECT_RETURN,
297  "lnappuser",
298  select_columns=["userid"],
299  where_conditions=where_condition_cu
300  )
301 
302 
303  # Delete lnappschemadetail and lnappuserresponse records based
304  # on loanids for cucode
305  # Eg. select * from lnappschemadetail where loanid in
306  # (select loanid from lnappschemamaster where cu=CUCODE);
307  for tbl_del_loanid in ["lnappschemadetail", "lnappuserresponse"]:
308  for lnappschemamaster_loanid in lnappschemamaster_loanids:
309  del_loans_transaction(
310  DELETE,
311  tbl_del_loanid,
312  where_conditions={"loanid": lnappschemamaster_loanid[0]}
313  )
314 
315  # Delete lnappuser_questselect and lnappuserresponse records based
316  # on loanids for cucode
317  # Eg. delete from lnappuser_questselect where userid in
318  # (select userid from lnappuser WHERE cu=CUCODE);
319  for tbl_del_userid in ["lnappuser_questselect", "lnappuserresponse"]:
320  for lnappuser_userid in lnappuser_userids:
321  del_loans_transaction(
322  DELETE,
323  tbl_del_userid,
324  where_conditions={"userid": lnappuser_userid[0]}
325 
326  )
327 
328  # cleanup loanapp based tables conditioned on cucode
329  # Eg. delete from {lnappschemamaster, lnappuser}
330  # where cu='CUCODE'
331  for tbl in ["lnappschemamaster", "lnappuser"]:
332  del_loans_transaction(
333  DELETE,
334  tbl,
335  where_conditions=where_condition_cu
336  )
337 
338  if os.path.exists(LOANAPP_SCHEMAMASTER_LOANID_MAP_CSV_FILE.format(cucode.lower())):
339  os.remove(LOANAPP_SCHEMAMASTER_LOANID_MAP_CSV_FILE.format(cucode.lower()))
340 
341  if os.path.exists(LOANAPP_USER_USERID_MAP_CSV_FILE.format(cucode.lower())):
342  os.remove(LOANAPP_USER_USERID_MAP_CSV_FILE.format(cucode.lower()))
343 
344  if not commit_later:
345  # commit all the table deletions here
346  del_loans_transaction.commit()
347  del_loans_transaction(
348  SUMMARY,
349  DATA_OPT_LOANAPP,
350  cucode
351  )
352 
353 
354 @pg_crsr_hndlr_decrtr
355 def migrate_loanapps(cu, server, action, user, passwd, verbose, reset):
356  """Entry point to the loanapps migration.
357 
358  Args:
359  cu: current credit union code
360  server: Mammoth endpoint server
361  action: one of the migration options to be
362  performed on settings data category
363  user: Mammoth monitor username
364  passwd: Mammoth monitor password
365  verbose: level of verbosity
366  reset: flag to cleanup tables before migration
367 
368  Raises:
369  psycopg2.Error, psycopg2.Warning on db operation errors
370  """
371  # check if the required tables exist
372  with pg_handler.PGSession() as conn:
373  with conn.cursor() as cur:
374  tables_exist_transaction = PgTransaction(conn, cur)
375 
376  (tables_exist_flag,
377  do_not_exist_list) = tables_exist_transaction(
378  TABLE_EXISTS,
379  DATA_OPT_LOANAPP,
380  cu.lower()
381  )
382 
383  if not tables_exist_flag:
384  error_msg = ("Stopping migration. Following table"
385  " schemas do not exist: {}").format(
386  get_valid_json(", ".join(do_not_exist_list)))
387  LOGGER.error(error_msg)
388  raise SystemExit(error_msg)
389  else:
390  LOGGER.info(
391  "All required target tables exist! Continuing migration..")
392 
393  # migrate loanapps data
394  if action == ACTION_OPT_MIGRATE:
395  if reset:
396  # clean up loanapps data
397  with pg_handler.PGSession() as conn:
398  with conn.cursor() as cur:
399  delete_transaction = PgTransaction(conn, cur)
400  cleanup_loanapps(delete_transaction, cu)
401 
402  # Get loanapps from Mammoth
403  loanapps_migration = MammothMigration(
404  cu,
405  DATA_OPT_LOANAPP,
406  server,
407  user,
408  passwd
409  )
410  loanapps_migration.run()
411 
412  LOGGER.info("Initiating migrating response data to odyssey")
413 
414  # migrate to Odyssey
415  # start a transaction to migrate memdata for each member
416  # that is not migrated yet to Odyssey
417  with pg_handler.PGSession() as conn:
418  with conn.cursor() as cur:
419  loans_transaction = PgTransaction(conn, cur)
420  # log summary before migration
421  loans_transaction(
422  SUMMARY,
423  DATA_OPT_LOANAPP,
424  cu,
425  msg_prefix="BEFORE"
426  )
427  # we will only commit this cleanup later together with
428  # all the migrations for settings
429  cleanup_loanapps(loans_transaction, cu, commit_later=True)
430 
431  # migrate obtained response to Odyssey
433  loans_transaction,
434  loanapps_migration.response,
435  verbose,
436  cu
437  )
438  loans_transaction.commit()
439 
440  # log summary after migration
441  loans_transaction(
442  SUMMARY,
443  DATA_OPT_LOANAPP,
444  cu,
445  msg_prefix="AFTER"
446  )
447 
448  LOGGER.info("Loan apps migration completed!")
449 
450  # clean up loanapps data
451  elif action == ACTION_OPT_CLEAN:
452  with pg_handler.PGSession() as conn:
453  with conn.cursor() as cur:
454  delete_transaction = PgTransaction(conn, cur)
455  cleanup_loanapps(delete_transaction, cu)
456 
457  LOGGER.info("Loan apps records clean up completed!")
458 
459  # log display loanapps tables' records count summary
460  elif action == ACTION_OPT_SUMMARY:
461  with pg_handler.PGSession() as conn:
462  with conn.cursor() as cur:
463  summary_transaction = PgTransaction(conn, cur)
464  summary_transaction(
465  SUMMARY,
466  DATA_OPT_LOANAPP,
467  cu
468  )
def migrate_loanapps(cu, server, action, user, passwd, verbose, reset)
def cleanup_loanapps(del_loans_transaction, cucode, **kwargs)
def move_loanapps(loans_transaction, mmth_loanapp_dict, verbose, cucode)