Odyssey
ody_migr_db_handler.py
1 #!/usr/bin/env python
2 """ Module for safe sql generation and provides psycopg2 context manager.
3 
4 This module contains class PGSession for psycopg2 connection session
5 handling and database utility functions for sql query generation.
6 psycopg2.sql module is being used to generate all, safe sql statements.
7 """
8 
9 
10 import os
11 import psycopg2
12 import logging
13 # psycopg2's SQL composer against sql injection
14 from psycopg2 import sql
15 from ody_migr_utils import get_valid_json
16 
17 LOGGER = logging.getLogger(__name__)
18 
19 # import database environment variables
20 PG_DB_HOST = os.environ.get("DATABASE_HOST")
21 PG_DB_PORT = os.environ.get("DATABASE_PORT")
22 PG_DB_NAME = os.environ.get("DATABASE_NAME")
23 PG_DB_USER = os.environ.get("DATABASE_USER")
24 PG_DB_PWD = os.environ.get("DATABASE_PASSWORD")
25 
26 
27 class PGSession:
28  """Context manager for PG database connection
29 
30  TODO: investigate connection pooling:
31  http://initd.org/psycopg/docs/pool.html
32  """
33 
34  def __init__(self):
35  """initialize database manager"""
36  self.conn = None
37 
38  def __enter__(self):
39  """open database connection"""
40  try:
41  self.conn = psycopg2.connect(("host={} dbname={} "
42  "user={} password={}")
43  .format(
44  PG_DB_HOST,
45  PG_DB_NAME,
46  PG_DB_USER,
47  PG_DB_PWD
48  ))
49  return self.conn
50 
51  except psycopg2.OperationalError as err:
52  raise SystemExit("Unable to connect!{}".format(err))
53 
54  def __exit__(self, exception_type, exception_value, _traceback):
55  """close database connection"""
56  if exception_type:
57  LOGGER.error(exception_value)
58  if self.conn is not None:
59  self.conn.close()
60 
61 
62 # DATABASE UTILITIES
63 
64 def _unnest_col(col_identifier):
65  """unnest lambda operation"""
66  return sql.SQL("unnest( {} )").format(col_identifier)
67 
68 
69 def pg_prepare_insert_script_unnest(_tbl_name, _columns):
70  """Prepare insertion script - efficient unnest operation
71 
72  Uses SELECT unnest property to efficiently read column wise values one
73  at a time from the {col1: list of values, col2: list of values,
74  col3: list of values} formatted dictionary for efficient bulk insertion.
75 
76  Example script:
77  INSERT INTO psycopg2_insert_time_test
78  ("first_name", "last_name", "email", "gender", "ip_address",
79  "ssn", "company_name", "race", "department")
80  SELECT unnest( %(first_name)s ), unnest( %(last_name)s ),
81  unnest( %(email)s ),
82  unnest( %(gender)s ),
83  unnest( %(ip_address)s ),
84  unnest( %(ssn)s ),
85  unnest( %(company_name)s ),
86  unnest( %(race)s ),
87  unnest( %(department)s )
88 
89  `values` in cursor.execute() would be a dictionary of following
90  structure:
91  {
92  ...
93  email: list of email values,
94  gender: list of gender values,
95  ...
96  }
97  All the values (list) must have same number of elements for each
98  key (columns).
99  For example, first element of each of the key in the dictionary is
100  first record to be inserted in the database table, and so on.
101  """
102  return sql.SQL("INSERT INTO {} ({}) SELECT {}").format(
103  sql.SQL(_tbl_name),
104  sql.SQL(', ').join(map(sql.Identifier, _columns)),
105  sql.SQL(", ").join(
106  map(_unnest_col, map(sql.Placeholder, _columns))))
107 
108 
109 def pg_prepare_insert_single_dict(_tbl_name, _columns, **kwargs):
110  """Prepare insertion script - dictionary placeholders
111 
112  Same for a single insertion and multiple insertion using executemany()
113 
114  Example script:
115  INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", "email",
116  "gender", "ip_address", "ssn", "company_name", "race", "department")
117  VALUES (%(first_name)s, %(last_name)s, %(email)s, %(gender)s,
118  %(ip_address)s, %(ssn)s, %(company_name)s, %(race)s, %(department)s)
119 
120 
121  `values` for cursor.execute() must be a single record (dict of values)
122  `values` for cursor.executemany must be a list/tuple of many records
123  (list/tuple of named dict)
124 
125  Args:
126  _tbl_name: name of the table to insert
127  _columns: list/tuple of column names
128  **kwargs: returning_col: (optional) specify column name
129  you want to return after the insert execution script
130  Returns::
131  record insertion script with named placeholders
132  """
133  returning_col = kwargs.get("returning_col", "")
134 
135  insert_script = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
136  sql.SQL(_tbl_name),
137  sql.SQL(', ').join(map(sql.Identifier, _columns)),
138  sql.SQL(', ').join(map(sql.Placeholder, _columns)))
139 
140  if returning_col == "":
141  return insert_script
142  else:
143  script_returning = sql.SQL("RETURNING {}").format(
144  sql.Identifier(returning_col))
145  final_script = sql.SQL(' ').join([insert_script, script_returning])
146  return final_script
147 
148 
149 def pg_prepare_insert_single_list(_tbl_name, _columns, **kwargs):
150  """Prepare database table insertion script - general placeholders
151 
152  Same for a single insertion and multiple insertion using executemany()
153 
154  Example script:
155  INSERT INTO "psycopg2_insert_time_test" ("ssn", "race", "first_name",
156  "last_name", "email", "company_name", "gender", "department", "ip_address")
157  VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
158 
159  values` for cursor.execute() must be a single record (list/tuple of values)
160  `values` for cursor.executemany must be a list/tuple of many records
161  (list/tuple of list/tuple)
162 
163  Args:
164  _tbl_name: name of the table to insert
165  _column_names: list/tuple of column names
166  **kwargs: returning_col: (optional) specify column name
167  you want to return after the insert execution script
168  Returns::
169  record insertion script with general placeholders
170  """
171  returning_col = kwargs.get("returning_col", "")
172 
173  insert_script = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
174  sql.Identifier(_tbl_name),
175  sql.SQL(', ').join(map(sql.Identifier, _columns)),
176  sql.SQL(', ').join(sql.Placeholder() * len(_columns))
177  )
178 
179  if returning_col == "":
180  return insert_script
181  else:
182  script_returning = sql.SQL("RETURNING {}").format(
183  sql.Identifier(returning_col))
184  final_script = sql.SQL(' ').join([insert_script, script_returning])
185  return final_script
186 
187 
189  """Prepare insertion script
190 
191  Uses single placeholder for list of multiple records as required by
192  psycopg2.extra.execute_values()
193 
194  Example script:
195  INSERT INTO psycopg2_insert_time_test ("first_name", "last_name",
196  "email", "gender", "ip_address", "ssn", "company_name", "race",
197  "department") VALUES %s
198 
199  Example template snippet:
200  ( %(first_name)s, %(last_name)s, %(email)s, %(gender)s, %(ip_address)s,
201  %(ssn)s, %(company_name)s, %(race)s, %(department)s )
202 
203  `argslist` for psycopg2.extra.execute_values is a sequence of dictionary
204 
205  Args:
206  _tbl_name: name of the table to insert
207  _column_names: list/tuple of column names
208 
209  Returns::
210  record insertion script a single placeholders and template placeholder
211  for each row
212  """
213  template = sql.SQL('( {} )').format(
214  sql.SQL(', ').join(map(sql.Placeholder, _columns)))
215  script = sql.SQL(
216  "INSERT INTO {} ({}) VALUES {}").format(
217  sql.SQL(_tbl_name),
218  sql.SQL(", ").join(map(sql.Identifier, _columns)),
219  sql.Placeholder())
220  return script, template
221 
222 
224  """Prepare insertion script
225 
226  Uses single placeholder for list of multiple records as required by
227  psycopg2.extra.execute_values()
228 
229  Example script:
230  INSERT INTO psycopg2_insert_time_test ("first_name", "last_name",
231  "email", "gender", "ip_address", "ssn", "company_name", "race",
232  "department") VALUES %s
233 
234  Example template snippet:
235  ( %s, %s, %s, %s, %s, %s, %s, %s, %s )
236 
237  `argslist` for psycopg2.extra.execute_values is a sequence of sequence
238 
239  Args:
240  _tbl_name: name of the table to insert
241  _column_names: list/tuple of column names
242 
243  Returns::
244  record insertion script a single placeholders and template placeholder
245  for each row
246  """
247  template = sql.SQL('( {} )').format(
248  sql.SQL(', ').join(sql.Placeholder() * len(_columns)))
249 
250  script = sql.SQL(
251  "INSERT INTO {} ({}) VALUES {}").format(
252  sql.SQL(_tbl_name),
253  sql.SQL(", ").join(map(sql.Identifier, _columns)),
254  sql.Placeholder())
255  return script, template
256 
257 
259  _where_conditions,
260  select_columns,
261  **kwargs):
262  """Prepare table selection script
263 
264  Args:
265  _tbl_name: name of the table to be created
266  _where_conditions: dict of where clauses
267  select_columns: list of columns name to select
268 
269  Returns:
270  table selection script, and values for placeholders, if any
271  """
272  get_count = kwargs.get("count", False)
273  assert type(_where_conditions) == dict
274 
275  if select_columns == []:
276  if get_count:
277  select_sql = sql.SQL(
278  "SELECT count(*) FROM {}").format(sql.Identifier(_tbl_name))
279  else:
280  select_sql = sql.SQL(
281  "SELECT * FROM {}").format(sql.Identifier(_tbl_name))
282  else:
283  assert type(select_columns) == list
284  select_columns_str = sql.SQL(', ').join(
285  [sql.Identifier(i) for i in select_columns])
286  select_sql = sql.SQL(
287  "SELECT {} FROM {}").format(select_columns_str,
288  sql.Identifier(_tbl_name))
289 
290  where_values = [] if len(_where_conditions) > 0 else None
291  where_conditions_sequence = []
292 
293  for column_cond, value_cond in _where_conditions.items():
294  wh = sql.SQL("{}={}").format(
295  sql.Identifier(column_cond), sql.Placeholder())
296  where_conditions_sequence.append(wh)
297  where_values.append(value_cond)
298 
299  where_sql = sql.SQL(' AND ').join(where_conditions_sequence)
300 
301  if where_sql != sql.Composed([]):
302  select_sql = sql.SQL(' WHERE ').join([select_sql, where_sql])
303 
304  return select_sql, where_values
305 
306 
307 def pg_check_table_exists(_tbl_name):
308  """Returns a sql script to check if a table exist in database
309 
310  Args:
311  _tbl_name: name of table to check existence of
312  """
313  query = """
314  SELECT EXISTS(
315  SELECT 1
316  FROM pg_catalog.pg_class c
317  JOIN pg_catalog.pg_namespace n ON n.oid=c.relnamespace
318  WHERE n.nspname='{}'
319  AND c.relname='{}'
320  AND c.relkind='r'
321  )
322  """.format('public', _tbl_name)
323  return query, None
324 
325 
326 def pg_prepare_create_script(_tbl_name, _col_name_type_pair_str):
327  """Prepare database table creation script
328 
329  Args:
330  _tbl_name: name of the table to be created
331  _col_name_type_pair_str: name and type of the columns
332 
333  Returns:
334  table creation script, and values for placeholders, if any
335  """
336  script = sql.SQL("CREATE TABLE {} ({})").format(
337  sql.Identifier(_tbl_name), _col_name_type_pair_str)
338  return script, None
339 
340 
341 def pg_prepare_table_cleanup_script(_tbl_name, _where_conditions):
342  """Prepare delete record from table script.
343 
344  Args:
345  _tbl_name: table name to delete the records from
346  _where_conditions: dictionary of where conditions
347 
348  Returns:
349  table cleanup script, and values for placeholders, if any
350  """
351  assert type(_where_conditions) == dict
352 
353  delete_script = sql.SQL("DELETE FROM {}").format(
354  sql.Identifier(_tbl_name))
355 
356  values = [] if len(_where_conditions) > 0 else None
357  where_conditions_sequence = []
358 
359  for column_cond, value_cond in _where_conditions.items():
360  wh = sql.SQL("{}={}").format(
361  sql.Identifier(column_cond), sql.Placeholder())
362  where_conditions_sequence.append(wh)
363  values.append(value_cond)
364 
365  where_sql = sql.SQL(' AND ').join(where_conditions_sequence)
366 
367  if where_sql != sql.Composed([]):
368  delete_script = sql.SQL(' WHERE ').join([delete_script, where_sql])
369 
370  return delete_script, values
371 
372 
374  """Generate a detailed diagnostic report on database failure
375 
376  Args:
377  _diag: Diagnostics object exposed by psycopg2.DatabaseError
378 
379  Returns:
380  detailed error message (compact json string)
381  """
382  diag_dict = {
383  "Severity": _diag.severity,
384  "Column name": _diag.column_name,
385  "Constraint name": _diag.constraint_name,
386  "Context": _diag.context,
387  "Datatype name": _diag.datatype_name,
388  "Internal position": _diag.internal_position,
389  "Internal query": _diag.internal_query,
390  "Message detail": _diag.message_detail,
391  "Message hint": _diag.message_hint,
392  "Message primary": _diag.message_primary,
393  "Schema name": _diag.schema_name,
394  "Source file": _diag.source_file,
395  "Source function": _diag.source_function,
396  "Source line": _diag.source_line,
397  "Sqlstate": _diag.sqlstate,
398  "Statement position": _diag.statement_position,
399  "Table name": _diag.table_name
400  }
401  return get_valid_json(diag_dict)
def __exit__(self, exception_type, exception_value, _traceback)
def pg_prepare_insert_script_unnest(_tbl_name, _columns)
def pg_prepare_insert_script_execute_values_list(_tbl_name, _columns)
def pg_prepare_table_cleanup_script(_tbl_name, _where_conditions)
def pg_prepare_diagnostic_error(_diag)
def pg_prepare_insert_single_dict(_tbl_name, _columns, **kwargs)
def pg_prepare_insert_script_execute_values_dict(_tbl_name, _columns)
def pg_prepare_select_script(_tbl_name, _where_conditions, select_columns, **kwargs)
def pg_prepare_create_script(_tbl_name, _col_name_type_pair_str)
def pg_check_table_exists(_tbl_name)
def pg_prepare_insert_single_list(_tbl_name, _columns, **kwargs)
def _unnest_col(col_identifier)