Odyssey
ody_migr_admin.py
1 #!/usr/bin/env python
2 """Main module to migrate admin data."""
3 
4 
5 import logging
6 import csv
7 import os
8 # import only necessary variables for settings
9 from ody_migr_config import (INSERT,
10  INSERT_ONE,
11  SELECT_RETURN,
12  DELETE,
13  SUMMARY,
14  TABLE_EXISTS,
15  DATA_OPT_ADMIN,
16  ACTION_OPT_CLEAN,
17  ACTION_OPT_MIGRATE,
18  ACTION_OPT_SUMMARY,
19  ADMIN_CUSURVEY_ID_MAP_CSV_FILE,
20  CUADMINPROGS_PERMISSIONS_MAPPING)
21 
22 from ody_migr_transaction import PgTransaction
23 from ody_migr_transaction import pg_crsr_hndlr_decrtr
24 import ody_migr_db_handler as pg_handler
25 from ody_migr_mmth_endpoint import MammothMigration
26 from ody_migr_utils import (get_valid_json,
27  get_strip_value_dict,
28  file_exc_decorator,
29  log_progress)
30 
31 LOGGER = logging.getLogger(__name__)
32 
33 
34 @pg_crsr_hndlr_decrtr
35 def move_admin(admin_transaction,
36  _mmth_admin_dict,
37  _verbose,
38  _cu):
39  """Migrate admin data in a single transaction
40 
41  Args:
42  admin_transaction: transaction to commit all insertions for admin
43  _mmth_admin_dict: response dictionary from Mammoth for admin
44  _verbose: verbosity flag
45  _cu: current credit union code
46 
47  Raises:
48  psycopg2.Error, psycopg2.Warning on db operation errors
49  """
50  mmth_admin_data = _mmth_admin_dict["data"]
51 
52  progress_dict = {"completed": 0, "total": 16}
53 
54  # First populate admin user specific tables
55  mmth_admin_users_coll = mmth_admin_data["adminusers"]
56  for adminuser_dict in mmth_admin_users_coll:
57  cuadminusers_coll = adminuser_dict["cuadminusers"]
58 
59  # no content for cuadminusers
60  if type(cuadminusers_coll) == list:
61  assert len(cuadminusers_coll) == 0
62  cuadminusers_coll = {}
63 
64  else:
65  assert type(cuadminusers_coll) == dict
66  mfaquest = get_valid_json({}) # default
67  mfaquest_dict = {"answers": {}}
68 
69  # Prepare cuadminusers.mfaquest field using cuadmquestselect
70  # records from Mammoth. Do not populate cuadmquestselect
71  # table: this table is not used in Odyssey. Eventually, will
72  # be deleted from Odyssey.
73  cuadmquestselect_coll = adminuser_dict["cuadmquestselect"]
74  if len(cuadmquestselect_coll) != 0:
75  for cuquest_record in cuadmquestselect_coll:
76  if cuadminusers_coll != {}:
77  qid = cuquest_record["quest_id"].strip()
78  ans = cuquest_record["answer"].strip()
79  mfaquest_dict["answers"][qid] = ans
80 
81  # populate cuadminusers table
82  if cuadminusers_coll != {}:
83  mfaquest = get_valid_json(mfaquest_dict)
84  cuadminusers_coll["mfaquest"] = mfaquest
85  # Make cuadminusers.user_name lowercase
86  cuadminusers_coll["user_name"] = cuadminusers_coll[
87  "user_name"].lower()
88  admin_transaction(
89  INSERT_ONE,
90  "cuadminusers",
91  list(cuadminusers_coll.keys()),
92  list(cuadminusers_coll.values())
93  )
94 
95  # populate cuadminallow, cuadminexclude tables
96  for tbl_allow_exclude_progs in ["cuadminallow", "cuadminexclude"]:
97 
98  # populate cuadminallow, cuadminexclude tables
99  cuadmin_allow_exclude_data = adminuser_dict[
100  tbl_allow_exclude_progs]
101 
102  for ind in range(len(cuadmin_allow_exclude_data)):
103  cuadminallow_rec = cuadmin_allow_exclude_data[ind]
104  cuadminallow_program = get_strip_value_dict("program",
105  cuadminallow_rec)
106 
107  # Make {cuadminallow, cuadminexclude}.user_name lowercase
108  cuadminallow_rec["user_name"] =\
109  cuadminallow_rec["user_name"].lower()
110 
111  if cuadminallow_program in CUADMINPROGS_PERMISSIONS_MAPPING:
112  cuadminallow_rec["program"] = \
113  CUADMINPROGS_PERMISSIONS_MAPPING[cuadminallow_program]
114 
115  admin_transaction(
116  INSERT,
117  tbl_allow_exclude_progs,
118  collection=cuadmin_allow_exclude_data
119  )
120 
121  # populate cuauditadmin table: records to be populated are already
122  # preprocessed on Mammoth side
123  cuauditadmin_coll = adminuser_dict["cuauditadmin"]
124 
125  for cuaudadm_rec in cuauditadmin_coll:
126  # Make cuauditadmin.user_name and cuauditadmin.auditsrcuser_name
127  # lowercase
128  cuaudadm_rec["user_name"] = cuaudadm_rec["user_name"].lower()
129 
130  mmth_cu = get_strip_value_dict(
131  "cu", cuaudadm_rec)
132  assert mmth_cu == _cu
133  del cuaudadm_rec["admuser"]
134  cuaudadm_rec["auditdate"] = get_strip_value_dict(
135  "chdate", cuaudadm_rec, pop=True)
136  cuaudadm_rec["auditaction"] = get_strip_value_dict(
137  "action", cuaudadm_rec, pop=True)
138  cuaudadm_rec["auditsrctype"] = "A"
139  cuaudadm_rec["auditsrcuser_name"] = get_strip_value_dict(
140  "user_name", cuaudadm_rec)
141  cuaudadm_rec["auditsrcemail"] = get_strip_value_dict(
142  "email", cuaudadm_rec, pop=True, default="a@b.cde")
143  cuaudadm_rec["auditsrcip"] = "127.0.0.1"
144 
145  admin_transaction(
146  INSERT,
147  "cuauditadmin",
148  collection=cuauditadmin_coll
149  )
150 
151  progress_dict["completed"] += 4
152  log_progress(progress_dict)
153 
154  cusurvey_surveyid_map = {}
155  mmth_cusurveymaster_records = mmth_admin_data["cusurveymaster"]
156  for cusurvey_rec in mmth_cusurveymaster_records:
157  old_id = cusurvey_rec["surveyid"].strip()
158  del cusurvey_rec["surveyid"]
159  admin_transaction(
160  INSERT_ONE,
161  "cusurveymaster",
162  list(cusurvey_rec.keys()),
163  list(cusurvey_rec.values()),
164  returning_col="surveyid"
165  )
166  new_id = admin_transaction.cur.fetchone()[0]
167  cusurvey_surveyid_map[old_id] = new_id
168 
169  adm_tbls_cusurvey = ["cusurveyquest", "cusurveydetail"]
170  for tbl_cusurvey in adm_tbls_cusurvey:
171  tbl_coll = mmth_admin_data[tbl_cusurvey]
172 
173  for cusurvey_other_rec in tbl_coll:
174  assert "surveyid" in cusurvey_other_rec
175  cusurvey_other_rec["surveyid"] = cusurvey_surveyid_map[
176  cusurvey_other_rec["surveyid"].strip()]
177 
178  admin_transaction(
179  INSERT,
180  tbl_cusurvey,
181  collection=tbl_coll
182  )
183 
184  with open(ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(_cu.lower()),
185  'w',
186  newline='') as cusurvey_csvfile:
187  cusurvey_csvwriter = csv.writer(
188  cusurvey_csvfile, delimiter=',',
189  quotechar='|',
190  quoting=csv.QUOTE_MINIMAL
191  )
192  cusurvey_csvwriter.writerow(["cusurveymaster_surveyid_old, cusurveymaster_surveyid_new"])
193  for survey_oldid, survey_newid in cusurvey_surveyid_map.items():
194  cusurvey_csvwriter.writerow([survey_oldid, survey_newid])
195 
196  # apply identity transformation and insert as is to the following
197  # tables with no schema change on Mamoth and Odyssey
198  adm_tbls_no_schema_chng = [
199  "cuadmnotify",
200  "cuhavetrans",
201  "cusurveyintro",
202  "cutrusteddetail"
203  ]
204 
205  for tbl in adm_tbls_no_schema_chng:
206  admin_transaction(
207  INSERT,
208  tbl,
209  collection=mmth_admin_data[tbl]
210  )
211 
212  # populate cualertmsgs -- reset alertid (unique index key constraint)
213  mmth_admin_cualertmsgs_coll = mmth_admin_data["cualertmsgs"]
214  for cualertmsgs_record in mmth_admin_cualertmsgs_coll:
215  # reset cualertmsgs.alertid
216  del cualertmsgs_record["alertid"]
217 
218  admin_transaction(
219  INSERT,
220  "cualertmsgs",
221  collection=mmth_admin_cualertmsgs_coll
222  )
223 
224  progress_dict["completed"] += 9
225  log_progress(progress_dict)
226 
227  # populate cucontact table
228  cucontact_coll = mmth_admin_data["cucontact"]
229  admin_transaction(
230  INSERT,
231  "cucontact",
232  collection=cucontact_coll
233  )
234 
235  progress_dict["completed"] += 1
236  log_progress(progress_dict)
237 
238  # populate <cu>extkey table
239  extkey_coll = mmth_admin_data["extkey"]
240  for extkey_rec in extkey_coll:
241  extk_accnum = get_strip_value_dict("accountnumber", extkey_rec)
242  assert extk_accnum is not None
243  user_id = admin_transaction(
244  SELECT_RETURN,
245  "{}memberacct".format(_cu.lower()),
246  where_conditions={"accountnumber": extk_accnum},
247  select_columns=["primary_user"]
248  )
249  # assert len(user_id) == 1
250  if len(user_id) == 1:
251  user_id = user_id[0][0]
252  else:
253  # used just for cudrop test case (sometimes this script might be
254  # executed with no user_id in cuuser table)
255  user_id = "1000"
256  extkey_rec["user_id"] = user_id
257 
258  admin_transaction(
259  INSERT,
260  "{}extkey".format(_cu.lower()),
261  collection=extkey_coll
262  )
263  progress_dict["completed"] += 1
264  log_progress(progress_dict)
265 
266 
267  # populate cualtroute
268  cualtroute_coll = mmth_admin_data["cualtroute"]
269  admin_transaction(
270  INSERT,
271  "cualtroute",
272  collection=cualtroute_coll
273  )
274 
275  progress_dict["completed"] += 1
276  log_progress(progress_dict)
277 
278 
279 @file_exc_decorator
280 @pg_crsr_hndlr_decrtr
281 def cleanup_admin(del_admin_transaction, _cu, **kwargs):
282  """Cleanup memhist content
283 
284  If kwargs.get('commit_later') flag is True, we do not commit the
285  transaction here (and only cleanup database tables, not the branding
286  files), meaning that the the cleanup may have been executed before actual
287  migration and we do not want to commit the database delete just yet.
288  If commit_later flag is False, we commit the transaction here and proceed
289  with the file cleanup.
290 
291  Args:
292  del_memhist_transaction: instance of PgTransaction to which this
293  cleanup process is part of
294  _cu: current credit union code
295  **kwargs: Optional parameters:
296  commit_later: flag to commit transaction later
297  or now
298  accountnumber: if we want to filter delete
299  based on account number
300 
301  Raises:
302  SystemExit: if IOError, NameError or PermissionError is caught
303  psycopg2.Error, psycopg2.Warning on db operation errors
304  """
305  commit_later = kwargs.get("commit_later", False)
306 
307  where_condition_cu = {"cu": _cu}
308  for tbl in ["cuauditadmin", "cuadminallow", "cuadminexclude"]:
309  del_admin_transaction(
310  DELETE,
311  tbl,
312  where_conditions=where_condition_cu
313  )
314 
315  admin_unames = del_admin_transaction(
316  SELECT_RETURN,
317  "cuadminusers",
318  select_columns=["user_name"],
319  where_conditions=where_condition_cu
320  )
321 
322  for tbl in ["cucontact"]:
323  for admin_uname in admin_unames:
324  del_admin_transaction(
325  DELETE,
326  tbl,
327  where_conditions={"user_name": admin_uname}
328  )
329 
330  del_admin_transaction(
331  DELETE,
332  "cuadminusers",
333  where_conditions=where_condition_cu
334  )
335 
336  adm_tbls_no_schema_chng = [
337  "cusurveyquest",
338  "cusurveydetail",
339  "cusurveymaster",
340  "cualertmsgs",
341  "cuadmnotify",
342  "cusurveyintro",
343  "cuhavetrans",
344  "cutrusteddetail", # TODO, check parms encryption handling
345  "cualtroute"
346 
347  ]
348  for tbl in adm_tbls_no_schema_chng:
349  del_admin_transaction(
350  DELETE,
351  tbl,
352  where_conditions=where_condition_cu
353  )
354 
355  del_admin_transaction(
356  DELETE,
357  "{}extkey".format(_cu.lower())
358  )
359 
360  if os.path.exists(ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(_cu.lower())):
361  os.remove(ADMIN_CUSURVEY_ID_MAP_CSV_FILE.format(_cu.lower()))
362 
363  if not commit_later:
364  # commit all the table deletions here
365  del_admin_transaction.commit()
366  del_admin_transaction(
367  SUMMARY,
368  DATA_OPT_ADMIN,
369  _cu
370  )
371 
372 
373 @pg_crsr_hndlr_decrtr
374 def migrate_admin(cu, server, action, user, passwd, verbose, reset):
375  """Entry point to the admin migration.
376 
377  Args:
378  cu: current credit union code
379  server: Mammoth endpoint server
380  action: one of the migration options to be
381  performed on settings data category
382  user: Mammoth monitor username
383  passwd: Mammoth monitor password
384  verbose: level of verbosity
385  reset: flag to cleanup tables before migration
386 
387  Raises:
388  psycopg2.Error, psycopg2.Warning on db operation errors
389  """
390  # check if the required tables exist
391  with pg_handler.PGSession() as conn:
392  with conn.cursor() as cur:
393  tables_exist_transaction = PgTransaction(conn, cur)
394 
395  (tables_exist_flag,
396  do_not_exist_list) = tables_exist_transaction(
397  TABLE_EXISTS,
398  DATA_OPT_ADMIN,
399  cu.lower()
400  )
401 
402  if not tables_exist_flag:
403  error_msg = ("Stopping migration. Following table"
404  " schemas do not exist: {}").format(
405  get_valid_json(", ".join(do_not_exist_list)))
406  LOGGER.error(error_msg)
407  raise SystemExit(error_msg)
408  else:
409  LOGGER.info(
410  "All required target tables exist! Continuing migration..")
411 
412  # migrate admin data
413  if action == ACTION_OPT_MIGRATE:
414  if reset:
415  # clean up admin data
416  with pg_handler.PGSession() as conn:
417  with conn.cursor() as cur:
418  delete_transaction = PgTransaction(conn, cur)
419  cleanup_admin(delete_transaction, cu)
420 
421  # get members list/content from Mammoth
422  admin_migration = MammothMigration(
423  cu,
424  DATA_OPT_ADMIN,
425  server,
426  user,
427  passwd
428  )
429  admin_migration.run()
430 
431  LOGGER.info("Initiating migrating response data to odyssey")
432 
433  # migrate to Odyssey
434  # start a transaction to migrate memdata for each member
435  # that is not migrated yet to Odyssey
436  with pg_handler.PGSession() as conn:
437  with conn.cursor() as cur:
438  admin_transaction = PgTransaction(conn, cur)
439  # log summary before migration
440  admin_transaction(
441  SUMMARY,
442  DATA_OPT_ADMIN,
443  cu,
444  msg_prefix="BEFORE"
445  )
446  # we will only commit this cleanup later together with
447  # all the migrations for settings
448  cleanup_admin(admin_transaction, cu, commit_later=True)
449 
450  # migrate obtained response to Odyssey
451  move_admin(
452  admin_transaction,
453  admin_migration.response,
454  verbose,
455  cu
456  )
457  admin_transaction.commit()
458 
459  # log summary after migration
460  admin_transaction(
461  SUMMARY,
462  DATA_OPT_ADMIN,
463  cu,
464  msg_prefix="AFTER"
465  )
466 
467  LOGGER.info("Admin migration completed!")
468 
469  # clean up admin data
470  elif action == ACTION_OPT_CLEAN:
471  with pg_handler.PGSession() as conn:
472  with conn.cursor() as cur:
473  delete_transaction = PgTransaction(conn, cur)
474  cleanup_admin(delete_transaction, cu)
475 
476  LOGGER.info("Admin clean up completed!")
477 
478  # log display admin tables' records count summary
479  elif action == ACTION_OPT_SUMMARY:
480  with pg_handler.PGSession() as conn:
481  with conn.cursor() as cur:
482  summary_transaction = PgTransaction(conn, cur)
483  summary_transaction(
484  SUMMARY,
485  DATA_OPT_ADMIN,
486  cu
487  )
def move_admin(admin_transaction, _mmth_admin_dict, _verbose, _cu)
def cleanup_admin(del_admin_transaction, _cu, **kwargs)
def migrate_admin(cu, server, action, user, passwd, verbose, reset)