Odyssey
Classes | Functions | Variables
ody_migr_db_handler Namespace Reference

Classes

class  PGSession
 

Functions

def _unnest_col (col_identifier)
 
def pg_prepare_insert_script_unnest (_tbl_name, _columns)
 
def pg_prepare_insert_single_dict (_tbl_name, _columns, **kwargs)
 
def pg_prepare_insert_single_list (_tbl_name, _columns, **kwargs)
 
def pg_prepare_insert_script_execute_values_dict (_tbl_name, _columns)
 
def pg_prepare_insert_script_execute_values_list (_tbl_name, _columns)
 
def pg_prepare_select_script (_tbl_name, _where_conditions, select_columns, **kwargs)
 
def pg_check_table_exists (_tbl_name)
 
def pg_prepare_create_script (_tbl_name, _col_name_type_pair_str)
 
def pg_prepare_table_cleanup_script (_tbl_name, _where_conditions)
 
def pg_prepare_diagnostic_error (_diag)
 

Variables

 LOGGER = logging.getLogger(__name__)
 
 PG_DB_HOST = os.environ.get("DATABASE_HOST")
 
 PG_DB_PORT = os.environ.get("DATABASE_PORT")
 
 PG_DB_NAME = os.environ.get("DATABASE_NAME")
 
 PG_DB_USER = os.environ.get("DATABASE_USER")
 
 PG_DB_PWD = os.environ.get("DATABASE_PASSWORD")
 

Detailed Description

Module for safe sql generation and provides psycopg2 context manager.

This module contains class PGSession for psycopg2 connection session
handling and database utility functions for sql query generation.
psycopg2.sql module is being used to generate all, safe sql statements.

Function Documentation

◆ _unnest_col()

def ody_migr_db_handler._unnest_col (   col_identifier)
private
unnest lambda operation

Definition at line 64 of file ody_migr_db_handler.py.

64 def _unnest_col(col_identifier):
65  """unnest lambda operation"""
66  return sql.SQL("unnest( {} )").format(col_identifier)
67 
68 

◆ pg_check_table_exists()

def ody_migr_db_handler.pg_check_table_exists (   _tbl_name)
Returns a sql script to check if a table exist in database

Args:
    _tbl_name:  name of table to check existence of

Definition at line 307 of file ody_migr_db_handler.py.

307 def pg_check_table_exists(_tbl_name):
308  """Returns a sql script to check if a table exist in database
309 
310  Args:
311  _tbl_name: name of table to check existence of
312  """
313  query = """
314  SELECT EXISTS(
315  SELECT 1
316  FROM pg_catalog.pg_class c
317  JOIN pg_catalog.pg_namespace n ON n.oid=c.relnamespace
318  WHERE n.nspname='{}'
319  AND c.relname='{}'
320  AND c.relkind='r'
321  )
322  """.format('public', _tbl_name)
323  return query, None
324 
325 

◆ pg_prepare_create_script()

def ody_migr_db_handler.pg_prepare_create_script (   _tbl_name,
  _col_name_type_pair_str 
)
Prepare database table creation script

Args:
    _tbl_name: name of the table to be created
    _col_name_type_pair_str: name and type of the columns

Returns:
    table creation script, and values for placeholders, if any

Definition at line 326 of file ody_migr_db_handler.py.

326 def pg_prepare_create_script(_tbl_name, _col_name_type_pair_str):
327  """Prepare database table creation script
328 
329  Args:
330  _tbl_name: name of the table to be created
331  _col_name_type_pair_str: name and type of the columns
332 
333  Returns:
334  table creation script, and values for placeholders, if any
335  """
336  script = sql.SQL("CREATE TABLE {} ({})").format(
337  sql.Identifier(_tbl_name), _col_name_type_pair_str)
338  return script, None
339 
340 

◆ pg_prepare_diagnostic_error()

def ody_migr_db_handler.pg_prepare_diagnostic_error (   _diag)
Generate a detailed diagnostic report on database failure

Args:
    _diag: Diagnostics object exposed by psycopg2.DatabaseError

Returns:
    detailed error message (compact json string)

Definition at line 373 of file ody_migr_db_handler.py.

373 def pg_prepare_diagnostic_error(_diag):
374  """Generate a detailed diagnostic report on database failure
375 
376  Args:
377  _diag: Diagnostics object exposed by psycopg2.DatabaseError
378 
379  Returns:
380  detailed error message (compact json string)
381  """
382  diag_dict = {
383  "Severity": _diag.severity,
384  "Column name": _diag.column_name,
385  "Constraint name": _diag.constraint_name,
386  "Context": _diag.context,
387  "Datatype name": _diag.datatype_name,
388  "Internal position": _diag.internal_position,
389  "Internal query": _diag.internal_query,
390  "Message detail": _diag.message_detail,
391  "Message hint": _diag.message_hint,
392  "Message primary": _diag.message_primary,
393  "Schema name": _diag.schema_name,
394  "Source file": _diag.source_file,
395  "Source function": _diag.source_function,
396  "Source line": _diag.source_line,
397  "Sqlstate": _diag.sqlstate,
398  "Statement position": _diag.statement_position,
399  "Table name": _diag.table_name
400  }
401  return get_valid_json(diag_dict)

◆ pg_prepare_insert_script_execute_values_dict()

def ody_migr_db_handler.pg_prepare_insert_script_execute_values_dict (   _tbl_name,
  _columns 
)
Prepare insertion script

Uses single placeholder for list of multiple records as required by
psycopg2.extra.execute_values()

Example script:
INSERT INTO psycopg2_insert_time_test ("first_name", "last_name",
"email", "gender", "ip_address", "ssn", "company_name", "race",
"department") VALUES %s

Example template snippet:
( %(first_name)s, %(last_name)s, %(email)s, %(gender)s, %(ip_address)s,
%(ssn)s, %(company_name)s, %(race)s, %(department)s )

`argslist` for psycopg2.extra.execute_values is a sequence of dictionary

Args:
    _tbl_name:      name of the table to insert
    _column_names:  list/tuple of column names

Returns::
    record insertion script a single placeholders and template placeholder
    for each row

Definition at line 188 of file ody_migr_db_handler.py.

188 def pg_prepare_insert_script_execute_values_dict(_tbl_name, _columns):
189  """Prepare insertion script
190 
191  Uses single placeholder for list of multiple records as required by
192  psycopg2.extra.execute_values()
193 
194  Example script:
195  INSERT INTO psycopg2_insert_time_test ("first_name", "last_name",
196  "email", "gender", "ip_address", "ssn", "company_name", "race",
197  "department") VALUES %s
198 
199  Example template snippet:
200  ( %(first_name)s, %(last_name)s, %(email)s, %(gender)s, %(ip_address)s,
201  %(ssn)s, %(company_name)s, %(race)s, %(department)s )
202 
203  `argslist` for psycopg2.extra.execute_values is a sequence of dictionary
204 
205  Args:
206  _tbl_name: name of the table to insert
207  _column_names: list/tuple of column names
208 
209  Returns::
210  record insertion script a single placeholders and template placeholder
211  for each row
212  """
213  template = sql.SQL('( {} )').format(
214  sql.SQL(', ').join(map(sql.Placeholder, _columns)))
215  script = sql.SQL(
216  "INSERT INTO {} ({}) VALUES {}").format(
217  sql.SQL(_tbl_name),
218  sql.SQL(", ").join(map(sql.Identifier, _columns)),
219  sql.Placeholder())
220  return script, template
221 
222 

◆ pg_prepare_insert_script_execute_values_list()

def ody_migr_db_handler.pg_prepare_insert_script_execute_values_list (   _tbl_name,
  _columns 
)
Prepare insertion script

Uses single placeholder for list of multiple records as required by
psycopg2.extra.execute_values()

Example script:
INSERT INTO psycopg2_insert_time_test ("first_name", "last_name",
"email", "gender", "ip_address", "ssn", "company_name", "race",
"department") VALUES %s

Example template snippet:
( %s, %s, %s, %s, %s, %s, %s, %s, %s )

`argslist` for psycopg2.extra.execute_values is a sequence of sequence

Args:
    _tbl_name:      name of the table to insert
    _column_names:  list/tuple of column names

Returns::
    record insertion script a single placeholders and template placeholder
    for each row

Definition at line 223 of file ody_migr_db_handler.py.

223 def pg_prepare_insert_script_execute_values_list(_tbl_name, _columns):
224  """Prepare insertion script
225 
226  Uses single placeholder for list of multiple records as required by
227  psycopg2.extra.execute_values()
228 
229  Example script:
230  INSERT INTO psycopg2_insert_time_test ("first_name", "last_name",
231  "email", "gender", "ip_address", "ssn", "company_name", "race",
232  "department") VALUES %s
233 
234  Example template snippet:
235  ( %s, %s, %s, %s, %s, %s, %s, %s, %s )
236 
237  `argslist` for psycopg2.extra.execute_values is a sequence of sequence
238 
239  Args:
240  _tbl_name: name of the table to insert
241  _column_names: list/tuple of column names
242 
243  Returns::
244  record insertion script a single placeholders and template placeholder
245  for each row
246  """
247  template = sql.SQL('( {} )').format(
248  sql.SQL(', ').join(sql.Placeholder() * len(_columns)))
249 
250  script = sql.SQL(
251  "INSERT INTO {} ({}) VALUES {}").format(
252  sql.SQL(_tbl_name),
253  sql.SQL(", ").join(map(sql.Identifier, _columns)),
254  sql.Placeholder())
255  return script, template
256 
257 

◆ pg_prepare_insert_script_unnest()

def ody_migr_db_handler.pg_prepare_insert_script_unnest (   _tbl_name,
  _columns 
)
Prepare insertion script - efficient unnest operation

Uses SELECT unnest property to efficiently read column wise values one
at a time from the {col1: list of values, col2: list of values,
col3: list of values} formatted dictionary for efficient bulk insertion.

Example script:
    INSERT INTO psycopg2_insert_time_test
        ("first_name", "last_name", "email", "gender", "ip_address",
        "ssn", "company_name", "race", "department")
    SELECT unnest( %(first_name)s ), unnest( %(last_name)s ),
        unnest( %(email)s ),
        unnest( %(gender)s ),
        unnest( %(ip_address)s ),
        unnest( %(ssn)s ),
        unnest( %(company_name)s ),
        unnest( %(race)s ),
        unnest( %(department)s )

`values` in cursor.execute() would be a dictionary of following
structure:
    {
        ...
        email: list of email values,
        gender: list of gender values,
        ...
    }
    All the values (list) must have same number of elements for each
    key (columns).
    For example, first element of each of the key in the dictionary is
    first record to be inserted in the database table, and so on.

Definition at line 69 of file ody_migr_db_handler.py.

69 def pg_prepare_insert_script_unnest(_tbl_name, _columns):
70  """Prepare insertion script - efficient unnest operation
71 
72  Uses SELECT unnest property to efficiently read column wise values one
73  at a time from the {col1: list of values, col2: list of values,
74  col3: list of values} formatted dictionary for efficient bulk insertion.
75 
76  Example script:
77  INSERT INTO psycopg2_insert_time_test
78  ("first_name", "last_name", "email", "gender", "ip_address",
79  "ssn", "company_name", "race", "department")
80  SELECT unnest( %(first_name)s ), unnest( %(last_name)s ),
81  unnest( %(email)s ),
82  unnest( %(gender)s ),
83  unnest( %(ip_address)s ),
84  unnest( %(ssn)s ),
85  unnest( %(company_name)s ),
86  unnest( %(race)s ),
87  unnest( %(department)s )
88 
89  `values` in cursor.execute() would be a dictionary of following
90  structure:
91  {
92  ...
93  email: list of email values,
94  gender: list of gender values,
95  ...
96  }
97  All the values (list) must have same number of elements for each
98  key (columns).
99  For example, first element of each of the key in the dictionary is
100  first record to be inserted in the database table, and so on.
101  """
102  return sql.SQL("INSERT INTO {} ({}) SELECT {}").format(
103  sql.SQL(_tbl_name),
104  sql.SQL(', ').join(map(sql.Identifier, _columns)),
105  sql.SQL(", ").join(
106  map(_unnest_col, map(sql.Placeholder, _columns))))
107 
108 

◆ pg_prepare_insert_single_dict()

def ody_migr_db_handler.pg_prepare_insert_single_dict (   _tbl_name,
  _columns,
**  kwargs 
)
Prepare insertion script - dictionary placeholders

Same for a single insertion and multiple insertion using executemany()

Example script:
INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", "email",
"gender", "ip_address", "ssn", "company_name", "race", "department")
VALUES (%(first_name)s, %(last_name)s, %(email)s, %(gender)s,
%(ip_address)s, %(ssn)s, %(company_name)s, %(race)s, %(department)s)


`values` for cursor.execute() must be a single record (dict of values)
`values` for cursor.executemany must be a list/tuple of many records
(list/tuple of named dict)

Args:
    _tbl_name:      name of the table to insert
    _columns:  list/tuple of column names
    **kwargs:       returning_col: (optional) specify column name
                    you want to return after the insert execution script
Returns::
    record insertion script with named placeholders

Definition at line 109 of file ody_migr_db_handler.py.

109 def pg_prepare_insert_single_dict(_tbl_name, _columns, **kwargs):
110  """Prepare insertion script - dictionary placeholders
111 
112  Same for a single insertion and multiple insertion using executemany()
113 
114  Example script:
115  INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", "email",
116  "gender", "ip_address", "ssn", "company_name", "race", "department")
117  VALUES (%(first_name)s, %(last_name)s, %(email)s, %(gender)s,
118  %(ip_address)s, %(ssn)s, %(company_name)s, %(race)s, %(department)s)
119 
120 
121  `values` for cursor.execute() must be a single record (dict of values)
122  `values` for cursor.executemany must be a list/tuple of many records
123  (list/tuple of named dict)
124 
125  Args:
126  _tbl_name: name of the table to insert
127  _columns: list/tuple of column names
128  **kwargs: returning_col: (optional) specify column name
129  you want to return after the insert execution script
130  Returns::
131  record insertion script with named placeholders
132  """
133  returning_col = kwargs.get("returning_col", "")
134 
135  insert_script = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
136  sql.SQL(_tbl_name),
137  sql.SQL(', ').join(map(sql.Identifier, _columns)),
138  sql.SQL(', ').join(map(sql.Placeholder, _columns)))
139 
140  if returning_col == "":
141  return insert_script
142  else:
143  script_returning = sql.SQL("RETURNING {}").format(
144  sql.Identifier(returning_col))
145  final_script = sql.SQL(' ').join([insert_script, script_returning])
146  return final_script
147 
148 

◆ pg_prepare_insert_single_list()

def ody_migr_db_handler.pg_prepare_insert_single_list (   _tbl_name,
  _columns,
**  kwargs 
)
Prepare database table insertion script - general placeholders

Same for a single insertion and multiple insertion using executemany()

Example script:
INSERT INTO "psycopg2_insert_time_test" ("ssn", "race", "first_name",
"last_name", "email", "company_name", "gender", "department", "ip_address")
 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)

values` for cursor.execute() must be a single record (list/tuple of values)
`values` for cursor.executemany must be a list/tuple of many records
(list/tuple of list/tuple)

Args:
    _tbl_name:      name of the table to insert
    _column_names:  list/tuple of column names
    **kwargs:       returning_col: (optional) specify column name
                    you want to return after the insert execution script
Returns::
    record insertion script with general placeholders

Definition at line 149 of file ody_migr_db_handler.py.

149 def pg_prepare_insert_single_list(_tbl_name, _columns, **kwargs):
150  """Prepare database table insertion script - general placeholders
151 
152  Same for a single insertion and multiple insertion using executemany()
153 
154  Example script:
155  INSERT INTO "psycopg2_insert_time_test" ("ssn", "race", "first_name",
156  "last_name", "email", "company_name", "gender", "department", "ip_address")
157  VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
158 
159  values` for cursor.execute() must be a single record (list/tuple of values)
160  `values` for cursor.executemany must be a list/tuple of many records
161  (list/tuple of list/tuple)
162 
163  Args:
164  _tbl_name: name of the table to insert
165  _column_names: list/tuple of column names
166  **kwargs: returning_col: (optional) specify column name
167  you want to return after the insert execution script
168  Returns::
169  record insertion script with general placeholders
170  """
171  returning_col = kwargs.get("returning_col", "")
172 
173  insert_script = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
174  sql.Identifier(_tbl_name),
175  sql.SQL(', ').join(map(sql.Identifier, _columns)),
176  sql.SQL(', ').join(sql.Placeholder() * len(_columns))
177  )
178 
179  if returning_col == "":
180  return insert_script
181  else:
182  script_returning = sql.SQL("RETURNING {}").format(
183  sql.Identifier(returning_col))
184  final_script = sql.SQL(' ').join([insert_script, script_returning])
185  return final_script
186 
187 

◆ pg_prepare_select_script()

def ody_migr_db_handler.pg_prepare_select_script (   _tbl_name,
  _where_conditions,
  select_columns,
**  kwargs 
)
Prepare table selection script

Args:
    _tbl_name:          name of the table to be created
    _where_conditions:  dict of where clauses
    select_columns:     list of columns name to select

Returns:
    table selection script, and values for placeholders, if any

Definition at line 258 of file ody_migr_db_handler.py.

258 def pg_prepare_select_script(_tbl_name,
259  _where_conditions,
260  select_columns,
261  **kwargs):
262  """Prepare table selection script
263 
264  Args:
265  _tbl_name: name of the table to be created
266  _where_conditions: dict of where clauses
267  select_columns: list of columns name to select
268 
269  Returns:
270  table selection script, and values for placeholders, if any
271  """
272  get_count = kwargs.get("count", False)
273  assert type(_where_conditions) == dict
274 
275  if select_columns == []:
276  if get_count:
277  select_sql = sql.SQL(
278  "SELECT count(*) FROM {}").format(sql.Identifier(_tbl_name))
279  else:
280  select_sql = sql.SQL(
281  "SELECT * FROM {}").format(sql.Identifier(_tbl_name))
282  else:
283  assert type(select_columns) == list
284  select_columns_str = sql.SQL(', ').join(
285  [sql.Identifier(i) for i in select_columns])
286  select_sql = sql.SQL(
287  "SELECT {} FROM {}").format(select_columns_str,
288  sql.Identifier(_tbl_name))
289 
290  where_values = [] if len(_where_conditions) > 0 else None
291  where_conditions_sequence = []
292 
293  for column_cond, value_cond in _where_conditions.items():
294  wh = sql.SQL("{}={}").format(
295  sql.Identifier(column_cond), sql.Placeholder())
296  where_conditions_sequence.append(wh)
297  where_values.append(value_cond)
298 
299  where_sql = sql.SQL(' AND ').join(where_conditions_sequence)
300 
301  if where_sql != sql.Composed([]):
302  select_sql = sql.SQL(' WHERE ').join([select_sql, where_sql])
303 
304  return select_sql, where_values
305 
306 

◆ pg_prepare_table_cleanup_script()

def ody_migr_db_handler.pg_prepare_table_cleanup_script (   _tbl_name,
  _where_conditions 
)
Prepare delete record from table script.

Args:
    _tbl_name:      table name to delete the records from
    _where_conditions: dictionary of where conditions

Returns:
    table cleanup script, and values for placeholders, if any

Definition at line 341 of file ody_migr_db_handler.py.

341 def pg_prepare_table_cleanup_script(_tbl_name, _where_conditions):
342  """Prepare delete record from table script.
343 
344  Args:
345  _tbl_name: table name to delete the records from
346  _where_conditions: dictionary of where conditions
347 
348  Returns:
349  table cleanup script, and values for placeholders, if any
350  """
351  assert type(_where_conditions) == dict
352 
353  delete_script = sql.SQL("DELETE FROM {}").format(
354  sql.Identifier(_tbl_name))
355 
356  values = [] if len(_where_conditions) > 0 else None
357  where_conditions_sequence = []
358 
359  for column_cond, value_cond in _where_conditions.items():
360  wh = sql.SQL("{}={}").format(
361  sql.Identifier(column_cond), sql.Placeholder())
362  where_conditions_sequence.append(wh)
363  values.append(value_cond)
364 
365  where_sql = sql.SQL(' AND ').join(where_conditions_sequence)
366 
367  if where_sql != sql.Composed([]):
368  delete_script = sql.SQL(' WHERE ').join([delete_script, where_sql])
369 
370  return delete_script, values
371 
372