2 """Main module that handles all database operations. 4 - Each database operation can take relevant optional parameters. 5 - Safe sql query generation is performed in ody_migr_db_handler.py script. 7 Specific handler such as `menu_insert_cu_featuremenu` must catch and 8 raise psycopg2 specific database errors and raise so that execute_migration 9 sees them and handles appropriately. A decorator `pg_crsr_hndlr_decrtr` is 10 used for that purpose. 16 from functools
import wraps
17 import ody_migr_db_handler
as pg_handler
18 from ody_migr_utils
import (get_valid_json,
19 sanitize_insert_values,
21 sanitize_insert_collection)
24 from ody_migr_config
import (CREATE,
35 ODY_MUST_EXIST_TABLES,
38 LOGGER = logging.getLogger(__name__)
42 PG_ARG_DEFAULT =
"arg_default" 43 PGARG_COL_SCHMA =
"column_schema" 44 PGARG_COLL =
"collection" 45 PGARG_CLN_BFRE_INSRT =
"cleanup_before_insert" 46 PGARG_RTRNG_COLMN =
"returning_col" 47 PGARG_VRBSE =
"verbose" 48 PGARG_WHR_CONDS =
"where_conditions" 49 PGARG_RTRN_ONLY_CNT =
"return_only_count" 50 PGARG_SLCT_COLS =
"select_columns" 51 PGARG_HNDLR_SPECIFIC =
"handler_specific_args" 56 PGARG_COL_SCHMA:
None,
58 PGARG_CLN_BFRE_INSRT:
False,
59 PGARG_RTRNG_COLMN:
"",
62 PGARG_RTRN_ONLY_CNT:
False,
64 PGARG_HNDLR_SPECIFIC: {}
69 """Common psycopg2 exception handler decorator 71 Catches and raise psycopg2 related errors and warnings. 74 original_action_handler: function to wrap 76 @wraps(original_action_handler)
77 def wrapper(*args, **kwargs):
79 return original_action_handler(*args, **kwargs)
80 except (psycopg2.Error, psycopg2.Warning):
86 """Main Transaction class to enwrap multiple db operations into a 89 Currently, a transaction is on single connection and uses 90 a single cursor. This class is the central object to handle 91 all database/sql operations with the help of ody_migr_db_handler.py 92 module being used for safe sql generation. 96 """PgTransaction constructor 99 conn: psycopg2 connection object 100 cur: psycopg2 cursor object, must be a cursor of conn 105 def __call__(self, db_operation, arg0, *args, **kwargs):
106 """Central transaction executing calling function of PgTransaction 108 Performs appropriate method call based on the db_operation 111 db_operation: well defined database operation keyword 112 (CREATE, DELETE, etc) 113 arg0: table name for all db operation except TABLE_EXISTS, 114 data_category for TABLE_EXISTS and SUMMARY 115 in which case it is data_category 116 args: list of positional arguments to be forwarded to 117 a specified database operation handling method. 118 kwargs: list of keyword arguments to be forwarded to 119 a specified database operation handling method, mainly 120 used to generate sql scripts 125 insert_data_str = kwargs.pop(
"data_struct", list)
126 assert insert_data_str
in [list, dict]
130 if db_operation == TABLE_EXISTS:
132 cu_tbl_prefix = args[0]
133 return self.
table_exists(data_ctgry, cu_tbl_prefix, **kwargs)
137 elif db_operation == SUMMARY:
139 cu_tbl_prefix = args[0]
150 if db_operation == CREATE:
151 return self.
create(tbl_name, **kwargs)
154 elif db_operation == INSERT_ONE:
155 if insert_data_str == list:
156 assert len(args) == 2
157 assert type(args[0])
in [list, tuple]
158 assert type(args[1])
in [list, tuple]
168 assert len(args) == 1
169 assert args[0] == dict
178 elif db_operation == INSERT:
179 return self.
insert(tbl_name, **kwargs)
182 elif db_operation == SELECT:
186 elif db_operation == SELECT_RETURN:
190 elif db_operation == SELECT_COUNT:
194 elif db_operation == DELETE:
195 return self.
delete(tbl_name, **kwargs)
200 "'{}' is not a valid sql operator.".format(
204 except psycopg2.Error
as err:
206 "PSYCOPG_DETAIL": err.diag.message_primary,
207 "PSYCOPG_PGCODE": err.pgcode,
208 "PSYCOPG DIAGNOSTICS":
209 pg_handler.pg_prepare_diagnostic_error(err.diag),
210 "PSYCOPG_{}".format(err.diag.severity): err.pgerror
212 LOGGER.error(get_valid_json(err_dict))
218 "[Error] Rolling back transaction: {} ({})" 219 .format(tbl_name, db_operation))
222 except psycopg2.Warning
as warning:
223 LOGGER.warning(get_valid_json(warning))
227 "[Warning] Rolling back transaction: {} ({})" 228 .format(tbl_name, db_operation))
240 "Successfully executed sql operation: `{}` on table `{}`." 241 .format(tbl_name, db_operation))
244 """Check if a single table exists in the database 247 _tbl_name: name of the table to check the existence of 250 check_values) = pg_handler.pg_check_table_exists(_tbl_name)
251 self.
cur.execute(check_script, check_values)
252 return self.
cur.fetchone()[0]
255 """Check if required tables for each data category exist in the db 258 _data_category: one of {memdata, memhist, admin, settings} 259 cu_tbl_prefix: current credit union code 260 kwargs: optional keyword arguments 263 all_tbls_exist: flag to notify if all required tables exist 264 do_not_exist_list: list of tables that donot exist if 265 all_tbls_exist is False 267 all_tbls_exist =
True 268 do_not_exist_list = []
270 for cu_prefixed_tbl
in ODY_MUST_EXIST_TABLES[_data_category][
271 KEY_CU_PREFIXED_TBLS]:
272 tbl = cu_prefixed_tbl.format(cu_tbl_prefix)
274 do_not_exist_list.append(tbl)
275 all_tbls_exist =
False 277 for common_tbl
in ODY_MUST_EXIST_TABLES[_data_category][
280 do_not_exist_list.append(common_tbl)
281 all_tbls_exist =
False 282 return all_tbls_exist, do_not_exist_list
285 """Log tables' records count summary for each data category 288 data_ctgry: one of {memdata, memhist, admin, settings} 289 _cu: current credit union code 290 kwargs: optional keyword arguments 292 msg_prefix = kwargs.get(
"msg_prefix",
"")
293 settings_summary = {}
294 for tbl
in ODY_MUST_EXIST_TABLES[data_ctgry][KEY_CU_PREFIXED_TBLS]:
295 cu_tbl = tbl.format(_cu.lower())
297 settings_summary[cu_tbl] = this_count
299 for tbl
in ODY_MUST_EXIST_TABLES[data_ctgry][KEY_COMMON_TBLS]:
301 if tbl
in [
"cuadmquestselect",
"cucontact",
"lnappschemadetail",
302 "lnappuser_questselect",
"lnappuserresponse"]:
304 settings_summary[tbl] = this_count
306 this_count = self.
select_count(tbl, where_conditions={
"cu": _cu})
307 settings_summary[tbl] = this_count
310 msg =
"Summary of table records count:" 312 msg =
"[{}] Summary of table records count:".format(msg_prefix)
314 LOGGER.info(
"{} {}".format(msg, get_valid_json(settings_summary)))
317 """Handles table creation operation 320 _tbl_name: name of table to create 321 kwargs: optional keyword arguments 323 (table_exists_script,
324 table_exists_values) = pg_handler.pg_check_table_exists(_tbl_name)
325 column_schema = kwargs.get(
326 PGARG_COL_SCHMA, PGDEF_ARGS_DICT[PGARG_COL_SCHMA])
327 assert column_schema !=
"" 329 create_script, create_values = pg_handler.pg_prepare_create_script(
334 self.
cur.execute(create_script, create_values)
335 LOGGER.debug((
"(SQL) Create Script: {}").format(
339 """Handles single record insertion operation. 341 Note: a record is a dictioanry and script uses named placeholder. 344 _tbl_name: name of table to insert a record into 345 _dict: record to insert 346 **kwargs: optional keyword arguments 348 _dict = sanitize_dict(_dict)
349 script = pg_handler.pg_prepare_insert_single_dict(
354 self.
cur.execute(script, _dict)
356 LOGGER.debug((
"(SQL) Insert A Record Script: {}").format(
360 """Handles single record insertion operation. 362 Note: columns and values are two lists in same order and script 363 uses general placeholder. 366 _tbl_name: name of table to insert a record into 367 _columns_: column names 368 _values: values to insert in the same order as column names 369 **kwargs: optional keyword arguments 372 _values = sanitize_insert_values(_values)
373 insert_script = pg_handler.pg_prepare_insert_single_list(
379 self.
cur.execute(insert_script, _values)
381 LOGGER.debug((
"(SQL) Insert A Record Script: {}").format(
385 """Handles multiple record insertion operation 388 _tbl_name: name of table for insertion 389 kwargs: optional keyword arguments 393 collection = kwargs.get(PGARG_COLL, PGDEF_ARGS_DICT[PGARG_COLL])
395 assert type(collection) == list
396 if len(collection) <= 0:
398 "No records to insert in table `{}`.".format(_tbl_name))
400 assert type(collection[0]) == dict
402 LOGGER.debug((
"START INSERTING A COLLECTION of {}-records." 403 " into a table: `{}`").
404 format(len(collection), _tbl_name))
406 collection = sanitize_insert_collection(collection)
407 for row
in collection:
411 LOGGER.debug((
"FINISH INSERTING A COLLECTION of {}-records." 412 " into a table: `{}`").
413 format(len(collection), _tbl_name))
416 """Handles select and return count operation 419 _tbl_name: name of table 422 number of records in _tbl_name for the provided condition 424 where_conditions = kwargs.get(
425 PGARG_WHR_CONDS, PGDEF_ARGS_DICT[PGARG_WHR_CONDS])
427 select_script, select_values = pg_handler.pg_prepare_select_script(
433 self.
cur.execute(select_script, select_values)
434 LOGGER.debug((
"(SQL) Select and Return Script: {}.").format(
436 return self.
cur.fetchone()[0]
439 """Handles record selection and returns records or count 442 _tbl_name: involved table 443 kwargs: optional keyword arguments 446 list of records or count of records 449 return_only_count = kwargs.get(
450 PGARG_RTRN_ONLY_CNT, PGDEF_ARGS_DICT[PGARG_RTRN_ONLY_CNT])
451 select_columns = kwargs.get(
452 PGARG_SLCT_COLS, PGDEF_ARGS_DICT[PGARG_SLCT_COLS])
453 where_conditions = kwargs.get(
454 PGARG_WHR_CONDS, PGDEF_ARGS_DICT[PGARG_WHR_CONDS])
456 select_script, select_values = pg_handler.pg_prepare_select_script(
461 self.
cur.execute(select_script, select_values)
462 LOGGER.debug((
"(SQL) Select and Return Script: {}.").format(
466 if return_only_count:
467 select_result = self.
cur.rowcount
471 select_result = self.
cur.fetchall()
476 """Handles records selection from db table and print 479 _tbl_name: involved table 480 kwargs: optional keyword arguments 482 select_columns = kwargs.get(
483 PGARG_SLCT_COLS, PGDEF_ARGS_DICT[PGARG_SLCT_COLS])
484 where_conditions = kwargs.get(
485 PGARG_WHR_CONDS, PGDEF_ARGS_DICT[PGARG_WHR_CONDS])
487 select_script, select_values = pg_handler.pg_prepare_select_script(
492 self.
cur.execute(select_script, select_values)
493 LOGGER.debug((
"(SQL) Select and Print Script: {}.").format(
496 all_records = self.
cur.fetchall()
498 for i, record
in enumerate(all_records):
499 if (_tbl_name ==
"cu_featuremenu" and 500 FEATURE_NOT_FOUND
in str(record)):
501 LOGGER.warning(
"`{}` Code not found => {} => : " 503 FEATURE_NOT_FOUND) + str(record))
505 LOGGER.debug(
"`{}` => {}.".format(
506 _tbl_name, str(record)))
509 """Handles table record deletion operation 512 _tbl_name: involved table 513 kwargs: optional keyword arguments 515 where_conditions = kwargs.get(
516 PGARG_WHR_CONDS, PGDEF_ARGS_DICT[PGARG_WHR_CONDS])
519 delete_values) = pg_handler.pg_prepare_table_cleanup_script(
520 _tbl_name, where_conditions)
522 self.
cur.execute(delete_script, delete_values)
523 LOGGER.debug((
"(SQL) Delete Script: {}.").format(
527 """commit this transaction""" 531 """rollback this transaction""" def create(self, _tbl_name, **kwargs)
def insert_one_record_dict(self, _tbl_name, _dict, **kwargs)
def delete(self, _tbl_name, **kwargs)
def __call__(self, db_operation, arg0, *args, **kwargs)
def insert_one_record(self, _tbl_name, _columns, _values, **kwargs)
def select_count(self, _tbl_name, **kwargs)
def __init__(self, conn, cur)
def _single_table_exists(self, _tbl_name)
def select_and_return(self, _tbl_name, **kwargs)
def table_exists(self, _data_category, cu_tbl_prefix, **kwargs)
def display_summary(self, data_ctgry, _cu, **kwargs)
def select_and_print(self, _tbl_name, **kwargs)
def insert(self, _tbl_name, **kwargs)
def pg_crsr_hndlr_decrtr(original_action_handler)