2 """ Module for safe sql generation and provides psycopg2 context manager. 4 This module contains class PGSession for psycopg2 connection session 5 handling and database utility functions for sql query generation. 6 psycopg2.sql module is being used to generate all, safe sql statements. 14 from psycopg2
import sql
15 from ody_migr_utils
import get_valid_json
17 LOGGER = logging.getLogger(__name__)
20 PG_DB_HOST = os.environ.get(
"DATABASE_HOST")
21 PG_DB_PORT = os.environ.get(
"DATABASE_PORT")
22 PG_DB_NAME = os.environ.get(
"DATABASE_NAME")
23 PG_DB_USER = os.environ.get(
"DATABASE_USER")
24 PG_DB_PWD = os.environ.get(
"DATABASE_PASSWORD")
28 """Context manager for PG database connection 30 TODO: investigate connection pooling: 31 http://initd.org/psycopg/docs/pool.html 35 """initialize database manager""" 39 """open database connection""" 41 self.
conn = psycopg2.connect((
"host={} dbname={} " 42 "user={} password={}")
51 except psycopg2.OperationalError
as err:
52 raise SystemExit(
"Unable to connect!{}".format(err))
54 def __exit__(self, exception_type, exception_value, _traceback):
55 """close database connection""" 57 LOGGER.error(exception_value)
58 if self.
conn is not None:
65 """unnest lambda operation""" 66 return sql.SQL(
"unnest( {} )").format(col_identifier)
70 """Prepare insertion script - efficient unnest operation 72 Uses SELECT unnest property to efficiently read column wise values one 73 at a time from the {col1: list of values, col2: list of values, 74 col3: list of values} formatted dictionary for efficient bulk insertion. 77 INSERT INTO psycopg2_insert_time_test 78 ("first_name", "last_name", "email", "gender", "ip_address", 79 "ssn", "company_name", "race", "department") 80 SELECT unnest( %(first_name)s ), unnest( %(last_name)s ), 83 unnest( %(ip_address)s ), 85 unnest( %(company_name)s ), 87 unnest( %(department)s ) 89 `values` in cursor.execute() would be a dictionary of following 93 email: list of email values, 94 gender: list of gender values, 97 All the values (list) must have same number of elements for each 99 For example, first element of each of the key in the dictionary is 100 first record to be inserted in the database table, and so on. 102 return sql.SQL(
"INSERT INTO {} ({}) SELECT {}").format(
104 sql.SQL(
', ').join(map(sql.Identifier, _columns)),
106 map(_unnest_col, map(sql.Placeholder, _columns))))
110 """Prepare insertion script - dictionary placeholders 112 Same for a single insertion and multiple insertion using executemany() 115 INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", "email", 116 "gender", "ip_address", "ssn", "company_name", "race", "department") 117 VALUES (%(first_name)s, %(last_name)s, %(email)s, %(gender)s, 118 %(ip_address)s, %(ssn)s, %(company_name)s, %(race)s, %(department)s) 121 `values` for cursor.execute() must be a single record (dict of values) 122 `values` for cursor.executemany must be a list/tuple of many records 123 (list/tuple of named dict) 126 _tbl_name: name of the table to insert 127 _columns: list/tuple of column names 128 **kwargs: returning_col: (optional) specify column name 129 you want to return after the insert execution script 131 record insertion script with named placeholders 133 returning_col = kwargs.get(
"returning_col",
"")
135 insert_script = sql.SQL(
"INSERT INTO {} ({}) VALUES ({})").format(
137 sql.SQL(
', ').join(map(sql.Identifier, _columns)),
138 sql.SQL(
', ').join(map(sql.Placeholder, _columns)))
140 if returning_col ==
"":
143 script_returning = sql.SQL(
"RETURNING {}").format(
144 sql.Identifier(returning_col))
145 final_script = sql.SQL(
' ').join([insert_script, script_returning])
150 """Prepare database table insertion script - general placeholders 152 Same for a single insertion and multiple insertion using executemany() 155 INSERT INTO "psycopg2_insert_time_test" ("ssn", "race", "first_name", 156 "last_name", "email", "company_name", "gender", "department", "ip_address") 157 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) 159 values` for cursor.execute() must be a single record (list/tuple of values) 160 `values` for cursor.executemany must be a list/tuple of many records 161 (list/tuple of list/tuple) 164 _tbl_name: name of the table to insert 165 _column_names: list/tuple of column names 166 **kwargs: returning_col: (optional) specify column name 167 you want to return after the insert execution script 169 record insertion script with general placeholders 171 returning_col = kwargs.get(
"returning_col",
"")
173 insert_script = sql.SQL(
"INSERT INTO {} ({}) VALUES ({})").format(
174 sql.Identifier(_tbl_name),
175 sql.SQL(
', ').join(map(sql.Identifier, _columns)),
176 sql.SQL(
', ').join(sql.Placeholder() * len(_columns))
179 if returning_col ==
"":
182 script_returning = sql.SQL(
"RETURNING {}").format(
183 sql.Identifier(returning_col))
184 final_script = sql.SQL(
' ').join([insert_script, script_returning])
189 """Prepare insertion script 191 Uses single placeholder for list of multiple records as required by 192 psycopg2.extra.execute_values() 195 INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", 196 "email", "gender", "ip_address", "ssn", "company_name", "race", 197 "department") VALUES %s 199 Example template snippet: 200 ( %(first_name)s, %(last_name)s, %(email)s, %(gender)s, %(ip_address)s, 201 %(ssn)s, %(company_name)s, %(race)s, %(department)s ) 203 `argslist` for psycopg2.extra.execute_values is a sequence of dictionary 206 _tbl_name: name of the table to insert 207 _column_names: list/tuple of column names 210 record insertion script a single placeholders and template placeholder 213 template = sql.SQL(
'( {} )').format(
214 sql.SQL(
', ').join(map(sql.Placeholder, _columns)))
216 "INSERT INTO {} ({}) VALUES {}").format(
218 sql.SQL(
", ").join(map(sql.Identifier, _columns)),
220 return script, template
224 """Prepare insertion script 226 Uses single placeholder for list of multiple records as required by 227 psycopg2.extra.execute_values() 230 INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", 231 "email", "gender", "ip_address", "ssn", "company_name", "race", 232 "department") VALUES %s 234 Example template snippet: 235 ( %s, %s, %s, %s, %s, %s, %s, %s, %s ) 237 `argslist` for psycopg2.extra.execute_values is a sequence of sequence 240 _tbl_name: name of the table to insert 241 _column_names: list/tuple of column names 244 record insertion script a single placeholders and template placeholder 247 template = sql.SQL(
'( {} )').format(
248 sql.SQL(
', ').join(sql.Placeholder() * len(_columns)))
251 "INSERT INTO {} ({}) VALUES {}").format(
253 sql.SQL(
", ").join(map(sql.Identifier, _columns)),
255 return script, template
262 """Prepare table selection script 265 _tbl_name: name of the table to be created 266 _where_conditions: dict of where clauses 267 select_columns: list of columns name to select 270 table selection script, and values for placeholders, if any 272 get_count = kwargs.get(
"count",
False)
273 assert type(_where_conditions) == dict
275 if select_columns == []:
277 select_sql = sql.SQL(
278 "SELECT count(*) FROM {}").format(sql.Identifier(_tbl_name))
280 select_sql = sql.SQL(
281 "SELECT * FROM {}").format(sql.Identifier(_tbl_name))
283 assert type(select_columns) == list
284 select_columns_str = sql.SQL(
', ').join(
285 [sql.Identifier(i)
for i
in select_columns])
286 select_sql = sql.SQL(
287 "SELECT {} FROM {}").format(select_columns_str,
288 sql.Identifier(_tbl_name))
290 where_values = []
if len(_where_conditions) > 0
else None 291 where_conditions_sequence = []
293 for column_cond, value_cond
in _where_conditions.items():
294 wh = sql.SQL(
"{}={}").format(
295 sql.Identifier(column_cond), sql.Placeholder())
296 where_conditions_sequence.append(wh)
297 where_values.append(value_cond)
299 where_sql = sql.SQL(
' AND ').join(where_conditions_sequence)
301 if where_sql != sql.Composed([]):
302 select_sql = sql.SQL(
' WHERE ').join([select_sql, where_sql])
304 return select_sql, where_values
308 """Returns a sql script to check if a table exist in database 311 _tbl_name: name of table to check existence of 316 FROM pg_catalog.pg_class c 317 JOIN pg_catalog.pg_namespace n ON n.oid=c.relnamespace 322 """.format(
'public', _tbl_name)
327 """Prepare database table creation script 330 _tbl_name: name of the table to be created 331 _col_name_type_pair_str: name and type of the columns 334 table creation script, and values for placeholders, if any 336 script = sql.SQL(
"CREATE TABLE {} ({})").format(
337 sql.Identifier(_tbl_name), _col_name_type_pair_str)
342 """Prepare delete record from table script. 345 _tbl_name: table name to delete the records from 346 _where_conditions: dictionary of where conditions 349 table cleanup script, and values for placeholders, if any 351 assert type(_where_conditions) == dict
353 delete_script = sql.SQL(
"DELETE FROM {}").format(
354 sql.Identifier(_tbl_name))
356 values = []
if len(_where_conditions) > 0
else None 357 where_conditions_sequence = []
359 for column_cond, value_cond
in _where_conditions.items():
360 wh = sql.SQL(
"{}={}").format(
361 sql.Identifier(column_cond), sql.Placeholder())
362 where_conditions_sequence.append(wh)
363 values.append(value_cond)
365 where_sql = sql.SQL(
' AND ').join(where_conditions_sequence)
367 if where_sql != sql.Composed([]):
368 delete_script = sql.SQL(
' WHERE ').join([delete_script, where_sql])
370 return delete_script, values
374 """Generate a detailed diagnostic report on database failure 377 _diag: Diagnostics object exposed by psycopg2.DatabaseError 380 detailed error message (compact json string) 383 "Severity": _diag.severity,
384 "Column name": _diag.column_name,
385 "Constraint name": _diag.constraint_name,
386 "Context": _diag.context,
387 "Datatype name": _diag.datatype_name,
388 "Internal position": _diag.internal_position,
389 "Internal query": _diag.internal_query,
390 "Message detail": _diag.message_detail,
391 "Message hint": _diag.message_hint,
392 "Message primary": _diag.message_primary,
393 "Schema name": _diag.schema_name,
394 "Source file": _diag.source_file,
395 "Source function": _diag.source_function,
396 "Source line": _diag.source_line,
397 "Sqlstate": _diag.sqlstate,
398 "Statement position": _diag.statement_position,
399 "Table name": _diag.table_name
401 return get_valid_json(diag_dict)
def __exit__(self, exception_type, exception_value, _traceback)
def pg_prepare_insert_script_unnest(_tbl_name, _columns)
def pg_prepare_insert_script_execute_values_list(_tbl_name, _columns)
def pg_prepare_table_cleanup_script(_tbl_name, _where_conditions)
def pg_prepare_diagnostic_error(_diag)
def pg_prepare_insert_single_dict(_tbl_name, _columns, **kwargs)
def pg_prepare_insert_script_execute_values_dict(_tbl_name, _columns)
def pg_prepare_select_script(_tbl_name, _where_conditions, select_columns, **kwargs)
def pg_prepare_create_script(_tbl_name, _col_name_type_pair_str)
def pg_check_table_exists(_tbl_name)
def pg_prepare_insert_single_list(_tbl_name, _columns, **kwargs)
def _unnest_col(col_identifier)