Odyssey
ody_migr_transaction.py
1 #!/usr/bin/env python
2 """Main module that handles all database operations.
3 
4 - Each database operation can take relevant optional parameters.
5 - Safe sql query generation is performed in ody_migr_db_handler.py script.
6 
7 Specific handler such as `menu_insert_cu_featuremenu` must catch and
8 raise psycopg2 specific database errors and raise so that execute_migration
9 sees them and handles appropriately. A decorator `pg_crsr_hndlr_decrtr` is
10 used for that purpose.
11 """
12 
13 
14 import logging
15 import psycopg2
16 from functools import wraps
17 import ody_migr_db_handler as pg_handler
18 from ody_migr_utils import (get_valid_json,
19  sanitize_insert_values,
20  sanitize_dict,
21  sanitize_insert_collection)
22 
23 # import only necessary variables for settings
24 from ody_migr_config import (CREATE,
25  INSERT,
26  INSERT_ONE,
27  SELECT,
28  SELECT_RETURN,
29  SELECT_COUNT,
30  DELETE,
31  SUMMARY,
32  TABLE_EXISTS,
33  KEY_CU_PREFIXED_TBLS,
34  KEY_COMMON_TBLS,
35  ODY_MUST_EXIST_TABLES,
36  FEATURE_NOT_FOUND)
37 
38 LOGGER = logging.getLogger(__name__)
39 
40 # common sql and transaction related default keywords
41 # used in kwargs in various db operations
42 PG_ARG_DEFAULT = "arg_default"
43 PGARG_COL_SCHMA = "column_schema"
44 PGARG_COLL = "collection"
45 PGARG_CLN_BFRE_INSRT = "cleanup_before_insert"
46 PGARG_RTRNG_COLMN = "returning_col"
47 PGARG_VRBSE = "verbose"
48 PGARG_WHR_CONDS = "where_conditions"
49 PGARG_RTRN_ONLY_CNT = "return_only_count"
50 PGARG_SLCT_COLS = "select_columns"
51 PGARG_HNDLR_SPECIFIC = "handler_specific_args"
52 
53 # dictionary of default values of different
54 # kwargs arguments being used in PgTransaction methods
55 PGDEF_ARGS_DICT = {
56  PGARG_COL_SCHMA: None,
57  PGARG_COLL: [],
58  PGARG_CLN_BFRE_INSRT: False,
59  PGARG_RTRNG_COLMN: "",
60  PGARG_VRBSE: False,
61  PGARG_WHR_CONDS: {},
62  PGARG_RTRN_ONLY_CNT: False,
63  PGARG_SLCT_COLS: [],
64  PGARG_HNDLR_SPECIFIC: {}
65 }
66 
67 
68 def pg_crsr_hndlr_decrtr(original_action_handler):
69  """Common psycopg2 exception handler decorator
70 
71  Catches and raise psycopg2 related errors and warnings.
72 
73  Args:
74  original_action_handler: function to wrap
75  """
76  @wraps(original_action_handler)
77  def wrapper(*args, **kwargs):
78  try:
79  return original_action_handler(*args, **kwargs)
80  except (psycopg2.Error, psycopg2.Warning):
81  raise
82  return wrapper
83 
84 
85 class PgTransaction(object):
86  """Main Transaction class to enwrap multiple db operations into a
87  single transaction.
88 
89  Currently, a transaction is on single connection and uses
90  a single cursor. This class is the central object to handle
91  all database/sql operations with the help of ody_migr_db_handler.py
92  module being used for safe sql generation.
93  """
94 
95  def __init__(self, conn, cur):
96  """PgTransaction constructor
97 
98  Args:
99  conn: psycopg2 connection object
100  cur: psycopg2 cursor object, must be a cursor of conn
101  """
102  self.conn = conn
103  self.cur = cur
104 
105  def __call__(self, db_operation, arg0, *args, **kwargs):
106  """Central transaction executing calling function of PgTransaction
107 
108  Performs appropriate method call based on the db_operation
109 
110  Args:
111  db_operation: well defined database operation keyword
112  (CREATE, DELETE, etc)
113  arg0: table name for all db operation except TABLE_EXISTS,
114  data_category for TABLE_EXISTS and SUMMARY
115  in which case it is data_category
116  args: list of positional arguments to be forwarded to
117  a specified database operation handling method.
118  kwargs: list of keyword arguments to be forwarded to
119  a specified database operation handling method, mainly
120  used to generate sql scripts
121  """
122 
123  try:
124  # default data strucure for insertion is dict
125  insert_data_str = kwargs.pop("data_struct", list)
126  assert insert_data_str in [list, dict]
127  tbl_name = ""
128 
129  # if required table exist
130  if db_operation == TABLE_EXISTS:
131  data_ctgry = arg0
132  cu_tbl_prefix = args[0]
133  return self.table_exists(data_ctgry, cu_tbl_prefix, **kwargs)
134 
135  # logs summary of records count for all the tables associated with
136  # the specified data category
137  elif db_operation == SUMMARY:
138  data_ctgry = arg0
139  cu_tbl_prefix = args[0]
140  return self.display_summary(
141  data_ctgry,
142  cu_tbl_prefix,
143  **kwargs
144  )
145 
146  else:
147  tbl_name = arg0
148 
149  # create database table
150  if db_operation == CREATE:
151  return self.create(tbl_name, **kwargs)
152 
153  # insert one record
154  elif db_operation == INSERT_ONE:
155  if insert_data_str == list:
156  assert len(args) == 2
157  assert type(args[0]) in [list, tuple]
158  assert type(args[1]) in [list, tuple]
159  cols = args[0]
160  values = args[1]
161  return self.insert_one_record(
162  tbl_name,
163  cols,
164  values,
165  **kwargs
166  )
167  else:
168  assert len(args) == 1
169  assert args[0] == dict
170  _data_dict = args[0]
171  return self.insert_one_record_dict(
172  tbl_name,
173  _data_dict,
174  **kwargs
175  )
176 
177  # insert many records
178  elif db_operation == INSERT:
179  return self.insert(tbl_name, **kwargs)
180 
181  # select records from a table and log output
182  elif db_operation == SELECT:
183  return self.select_and_print(tbl_name, **kwargs)
184 
185  # select records from a table and return list
186  elif db_operation == SELECT_RETURN:
187  return self.select_and_return(tbl_name, **kwargs)
188 
189  # select count of records
190  elif db_operation == SELECT_COUNT:
191  return self.select_count(tbl_name, **kwargs)
192 
193  # delete records from a table
194  elif db_operation == DELETE:
195  return self.delete(tbl_name, **kwargs)
196 
197  # exit if not a valid db operation
198  else:
199  raise SystemExit(
200  "'{}' is not a valid sql operator.".format(
201  db_operation))
202 
203  # some database error caught, rollback the transaction
204  except psycopg2.Error as err:
205  err_dict = {
206  "PSYCOPG_DETAIL": err.diag.message_primary,
207  "PSYCOPG_PGCODE": err.pgcode,
208  "PSYCOPG DIAGNOSTICS":
209  pg_handler.pg_prepare_diagnostic_error(err.diag),
210  "PSYCOPG_{}".format(err.diag.severity): err.pgerror
211  }
212  LOGGER.error(get_valid_json(err_dict))
213  # rollback on any error while building a transaction and exit
214  self.conn.rollback()
215  # any failure that will cause a rollback of a
216  # transaction halts the script processing
217  raise SystemExit(
218  "[Error] Rolling back transaction: {} ({})"
219  .format(tbl_name, db_operation))
220 
221  # postgres warning, possibly due to data truncations during insertions
222  except psycopg2.Warning as warning:
223  LOGGER.warning(get_valid_json(warning))
224  # we rollback for any warning while buildinga transaction and exit
225  self.conn.rollback()
226  raise SystemExit(
227  "[Warning] Rolling back transaction: {} ({})"
228  .format(tbl_name, db_operation))
229 
230  # Following will be executed if the operation in the try clause
231  # does not raise an exception.
232  # Even though we are good to commit for the operation recenly
233  # executed on this transaction so far, but we enforce to
234  # commit explicitly after all the operations are done executed
235  else:
236  # this is a typical place to commit the transactions, but we choose
237  # to commit by explicitly calling the commit method when ready to
238  # have more control over the transaction.
239  LOGGER.info(
240  "Successfully executed sql operation: `{}` on table `{}`."
241  .format(tbl_name, db_operation))
242 
243  def _single_table_exists(self, _tbl_name):
244  """Check if a single table exists in the database
245 
246  Args:
247  _tbl_name: name of the table to check the existence of
248  """
249  (check_script,
250  check_values) = pg_handler.pg_check_table_exists(_tbl_name)
251  self.cur.execute(check_script, check_values)
252  return self.cur.fetchone()[0]
253 
254  def table_exists(self, _data_category, cu_tbl_prefix, **kwargs):
255  """Check if required tables for each data category exist in the db
256 
257  Args:
258  _data_category: one of {memdata, memhist, admin, settings}
259  cu_tbl_prefix: current credit union code
260  kwargs: optional keyword arguments
261 
262  Returns:
263  all_tbls_exist: flag to notify if all required tables exist
264  do_not_exist_list: list of tables that donot exist if
265  all_tbls_exist is False
266  """
267  all_tbls_exist = True
268  do_not_exist_list = []
269 
270  for cu_prefixed_tbl in ODY_MUST_EXIST_TABLES[_data_category][
271  KEY_CU_PREFIXED_TBLS]:
272  tbl = cu_prefixed_tbl.format(cu_tbl_prefix)
273  if not self._single_table_exists(tbl):
274  do_not_exist_list.append(tbl)
275  all_tbls_exist = False
276 
277  for common_tbl in ODY_MUST_EXIST_TABLES[_data_category][
278  KEY_COMMON_TBLS]:
279  if not self._single_table_exists(common_tbl):
280  do_not_exist_list.append(common_tbl)
281  all_tbls_exist = False
282  return all_tbls_exist, do_not_exist_list
283 
284  def display_summary(self, data_ctgry, _cu, **kwargs):
285  """Log tables' records count summary for each data category
286 
287  Args:
288  data_ctgry: one of {memdata, memhist, admin, settings}
289  _cu: current credit union code
290  kwargs: optional keyword arguments
291  """
292  msg_prefix = kwargs.get("msg_prefix", "")
293  settings_summary = {}
294  for tbl in ODY_MUST_EXIST_TABLES[data_ctgry][KEY_CU_PREFIXED_TBLS]:
295  cu_tbl = tbl.format(_cu.lower())
296  this_count = self.select_count(cu_tbl)
297  settings_summary[cu_tbl] = this_count
298 
299  for tbl in ODY_MUST_EXIST_TABLES[data_ctgry][KEY_COMMON_TBLS]:
300  # handle special case tables that do not condition on <cu> code
301  if tbl in ["cuadmquestselect", "cucontact", "lnappschemadetail",
302  "lnappuser_questselect", "lnappuserresponse"]:
303  this_count = self.select_count(tbl)
304  settings_summary[tbl] = this_count
305  continue
306  this_count = self.select_count(tbl, where_conditions={"cu": _cu})
307  settings_summary[tbl] = this_count
308 
309  if msg_prefix == "":
310  msg = "Summary of table records count:"
311  else:
312  msg = "[{}] Summary of table records count:".format(msg_prefix)
313 
314  LOGGER.info("{} {}".format(msg, get_valid_json(settings_summary)))
315 
316  def create(self, _tbl_name, **kwargs):
317  """Handles table creation operation
318 
319  Args:
320  _tbl_name: name of table to create
321  kwargs: optional keyword arguments
322  """
323  (table_exists_script,
324  table_exists_values) = pg_handler.pg_check_table_exists(_tbl_name)
325  column_schema = kwargs.get(
326  PGARG_COL_SCHMA, PGDEF_ARGS_DICT[PGARG_COL_SCHMA])
327  assert column_schema != ""
328 
329  create_script, create_values = pg_handler.pg_prepare_create_script(
330  _tbl_name,
331  column_schema)
332 
333  # execute table creation script
334  self.cur.execute(create_script, create_values)
335  LOGGER.debug(("(SQL) Create Script: {}").format(
336  self.cur.query))
337 
338  def insert_one_record_dict(self, _tbl_name, _dict, **kwargs):
339  """Handles single record insertion operation.
340 
341  Note: a record is a dictioanry and script uses named placeholder.
342 
343  Args:
344  _tbl_name: name of table to insert a record into
345  _dict: record to insert
346  **kwargs: optional keyword arguments
347  """
348  _dict = sanitize_dict(_dict)
349  script = pg_handler.pg_prepare_insert_single_dict(
350  _tbl_name,
351  list(_dict.keys())
352  )
353  # execute insertion script
354  self.cur.execute(script, _dict)
355 
356  LOGGER.debug(("(SQL) Insert A Record Script: {}").format(
357  self.cur.query))
358 
359  def insert_one_record(self, _tbl_name, _columns, _values, **kwargs):
360  """Handles single record insertion operation.
361 
362  Note: columns and values are two lists in same order and script
363  uses general placeholder.
364 
365  Args:
366  _tbl_name: name of table to insert a record into
367  _columns_: column names
368  _values: values to insert in the same order as column names
369  **kwargs: optional keyword arguments
370  """
371  # strip whitespaces if applicable
372  _values = sanitize_insert_values(_values)
373  insert_script = pg_handler.pg_prepare_insert_single_list(
374  _tbl_name,
375  _columns,
376  **kwargs
377  )
378  # execute insertion script
379  self.cur.execute(insert_script, _values)
380 
381  LOGGER.debug(("(SQL) Insert A Record Script: {}").format(
382  self.cur.query))
383 
384  def insert(self, _tbl_name, **kwargs):
385  """Handles multiple record insertion operation
386 
387  Args:
388  _tbl_name: name of table for insertion
389  kwargs: optional keyword arguments
390  """
391  # cleanup_before_insert = kwargs.get(
392  # PGARG_CLN_BFRE_INSRT, PGDEF_ARGS_DICT[PGARG_CLN_BFRE_INSRT])
393  collection = kwargs.get(PGARG_COLL, PGDEF_ARGS_DICT[PGARG_COLL])
394 
395  assert type(collection) == list
396  if len(collection) <= 0:
397  LOGGER.debug(
398  "No records to insert in table `{}`.".format(_tbl_name))
399  return
400  assert type(collection[0]) == dict
401 
402  LOGGER.debug(("START INSERTING A COLLECTION of {}-records."
403  " into a table: `{}`").
404  format(len(collection), _tbl_name))
405 
406  collection = sanitize_insert_collection(collection)
407  for row in collection:
408  # row is a dictionary
409  self.insert_one_record_dict(_tbl_name, row, **kwargs)
410 
411  LOGGER.debug(("FINISH INSERTING A COLLECTION of {}-records."
412  " into a table: `{}`").
413  format(len(collection), _tbl_name))
414 
415  def select_count(self, _tbl_name, **kwargs):
416  """Handles select and return count operation
417 
418  Args:
419  _tbl_name: name of table
420 
421  Returns:
422  number of records in _tbl_name for the provided condition
423  """
424  where_conditions = kwargs.get(
425  PGARG_WHR_CONDS, PGDEF_ARGS_DICT[PGARG_WHR_CONDS])
426 
427  select_script, select_values = pg_handler.pg_prepare_select_script(
428  _tbl_name,
429  where_conditions,
430  [],
431  count=True
432  )
433  self.cur.execute(select_script, select_values)
434  LOGGER.debug(("(SQL) Select and Return Script: {}.").format(
435  self.cur.query))
436  return self.cur.fetchone()[0]
437 
438  def select_and_return(self, _tbl_name, **kwargs):
439  """Handles record selection and returns records or count
440 
441  Args:
442  _tbl_name: involved table
443  kwargs: optional keyword arguments
444 
445  Returns:
446  list of records or count of records
447  """
448  select_result = None
449  return_only_count = kwargs.get(
450  PGARG_RTRN_ONLY_CNT, PGDEF_ARGS_DICT[PGARG_RTRN_ONLY_CNT])
451  select_columns = kwargs.get(
452  PGARG_SLCT_COLS, PGDEF_ARGS_DICT[PGARG_SLCT_COLS])
453  where_conditions = kwargs.get(
454  PGARG_WHR_CONDS, PGDEF_ARGS_DICT[PGARG_WHR_CONDS])
455 
456  select_script, select_values = pg_handler.pg_prepare_select_script(
457  _tbl_name,
458  where_conditions,
459  select_columns
460  )
461  self.cur.execute(select_script, select_values)
462  LOGGER.debug(("(SQL) Select and Return Script: {}.").format(
463  self.cur.query))
464 
465  # return only the count of records
466  if return_only_count:
467  select_result = self.cur.rowcount # len(all_records)
468  # return whole records list
469  else:
470  select_result = []
471  select_result = self.cur.fetchall()
472 
473  return select_result
474 
475  def select_and_print(self, _tbl_name, **kwargs):
476  """Handles records selection from db table and print
477 
478  Args:
479  _tbl_name: involved table
480  kwargs: optional keyword arguments
481  """
482  select_columns = kwargs.get(
483  PGARG_SLCT_COLS, PGDEF_ARGS_DICT[PGARG_SLCT_COLS])
484  where_conditions = kwargs.get(
485  PGARG_WHR_CONDS, PGDEF_ARGS_DICT[PGARG_WHR_CONDS])
486 
487  select_script, select_values = pg_handler.pg_prepare_select_script(
488  _tbl_name,
489  where_conditions,
490  select_columns
491  )
492  self.cur.execute(select_script, select_values)
493  LOGGER.debug(("(SQL) Select and Print Script: {}.").format(
494  self.cur.query))
495 
496  all_records = self.cur.fetchall()
497 
498  for i, record in enumerate(all_records):
499  if (_tbl_name == "cu_featuremenu" and
500  FEATURE_NOT_FOUND in str(record)):
501  LOGGER.warning("`{}` Code not found => {} => : "
502  .format(_tbl_name,
503  FEATURE_NOT_FOUND) + str(record))
504  else:
505  LOGGER.debug("`{}` => {}.".format(
506  _tbl_name, str(record)))
507 
508  def delete(self, _tbl_name, **kwargs):
509  """Handles table record deletion operation
510 
511  Args:
512  _tbl_name: involved table
513  kwargs: optional keyword arguments
514  """
515  where_conditions = kwargs.get(
516  PGARG_WHR_CONDS, PGDEF_ARGS_DICT[PGARG_WHR_CONDS])
517 
518  (delete_script,
519  delete_values) = pg_handler.pg_prepare_table_cleanup_script(
520  _tbl_name, where_conditions)
521 
522  self.cur.execute(delete_script, delete_values)
523  LOGGER.debug(("(SQL) Delete Script: {}.").format(
524  self.cur.query))
525 
526  def commit(self):
527  """commit this transaction"""
528  self.conn.commit()
529 
530  def rollback(self):
531  """rollback this transaction"""
532  self.conn.rollback()
def create(self, _tbl_name, **kwargs)
def insert_one_record_dict(self, _tbl_name, _dict, **kwargs)
def delete(self, _tbl_name, **kwargs)
def __call__(self, db_operation, arg0, *args, **kwargs)
def insert_one_record(self, _tbl_name, _columns, _values, **kwargs)
def select_count(self, _tbl_name, **kwargs)
def select_and_return(self, _tbl_name, **kwargs)
def table_exists(self, _data_category, cu_tbl_prefix, **kwargs)
def display_summary(self, data_ctgry, _cu, **kwargs)
def select_and_print(self, _tbl_name, **kwargs)
def insert(self, _tbl_name, **kwargs)
def pg_crsr_hndlr_decrtr(original_action_handler)