Odyssey
ody_migr_benchmarks.py
1 #!/usr/bin/env python
2 """Script to profile execution times for multiple bulk insertion methods
3 
4 Findings from this script align to the conclusions made in the following
5 tutorial: https://trvrm.github.io/bulk-psycopg2-inserts.html
6 Conclusion:
7 
8 Using insertion script with select + unnest option is the most efficient way
9 of bulk insertion. Using unnest to load multiple rows simultaneously has the
10 following advantages:
11 
12 - It is significantly faster than a regular insert loop, especially when
13  inserting thousands of rows. Also aster with psycopg2.extra.
14  execute_values and inserting list of values with a normal single insertion
15  script with cursor.mogrify.
16 - The benefits of using unnest() increase at least up to 50,000 rows
17 - It still allows us to write straightforward parameterised and safe SQL
18  with no (injection prone) string concatenations - we use psycopg2.sql
19  module to generate safe sql scripts.
20 
21 But using `unnest` requires explicit type conversion incase of NULL values.
22 A little less efficient (but still order of magnitude faster than interative
23 approach) but more pythonic option is to use `execute_values`. `execute_values`
24 require a template of placeholders to be specified beforehand which is why
25 we should explicitly specify default values for the missing columns during
26 the source to destionation table mappings.
27 
28 psycopg2 API resources: http://initd.org/psycopg/docs/extras.html#fast-exec
29 """
30 
31 
32 import os
33 import ody_migr_db_handler as pg_handler
34 
35 import pandas
36 import psycopg2
37 import psycopg2.extras
38 from psycopg2.extras import execute_values, execute_batch
39 # psycopg2's SQL composer against sql injection
40 from psycopg2 import sql
41 import time
42 import contextlib
43 import json
44 
45 
46 # decorator to compute execution time of the function being tested
47 @contextlib.contextmanager
48 def timer(name="duration"):
49  '''Utility function for timing execution'''
50  start = time.time()
51  yield
52  duration = time.time() - start
53  print("{}: {:.5f} second(s)".format(name, duration))
54 
55 
56 # import database environment variables
57 DATABASE_HOST = os.environ.get("DATABASE_HOST")
58 DATABASE_PORT = os.environ.get("DATABASE_PORT")
59 DATABASE_NAME = os.environ.get("DATABASE_NAME")
60 DATABASE_USER = os.environ.get("DATABASE_USER")
61 DATABASE_PASSWORD = os.environ.get("DATABASE_PASSWORD")
62 
63 TEST_TABLE_NAME_STR = "psycopg2_insert_time_test"
64 
65 # database table setup script
66 SETUP_SQL = sql.SQL("""
67  DROP TABLE IF EXISTS {};
68 
69  CREATE TABLE {}(
70  id serial primary key,
71  first_name char(50) NOT NULL,
72  last_name char(50) NOT NULL,
73  email char(50),
74  gender char(50) NOT NULL,
75  ip_address char(50),
76  ssn char(50) NOT NULL,
77  company_name char(50),
78  race char(50) NOT NULL,
79  department char(50)
80  );
81 
82  GRANT ALL ON {} TO odyssey;
83 """).format(
84  sql.SQL(TEST_TABLE_NAME_STR),
85  sql.SQL(TEST_TABLE_NAME_STR),
86  sql.SQL(TEST_TABLE_NAME_STR)
87 )
88 
89 # database cleanup script
90 CLEANUP_SQL = sql.SQL("""
91  DROP TABLE IF EXISTS {};
92 """).format(
93  sql.SQL(TEST_TABLE_NAME_STR)
94 )
95 
96 
97 COL_FNAME = "first_name"
98 COL_LNAME = "last_name"
99 COL_EMAIL = "email"
100 COL_GENDER = "gender"
101 COL_IP = "ip_address"
102 COL_SSN = "ssn"
103 COL_CMPNYNAME = "company_name"
104 COL_RACE = "race"
105 COL_DPMT = "department"
106 
107 fixture_columns = [
108  COL_FNAME,
109  COL_LNAME,
110  COL_EMAIL,
111  COL_GENDER,
112  COL_IP,
113  COL_SSN,
114  COL_CMPNYNAME,
115  COL_RACE,
116  COL_DPMT,
117 ]
118 
119 
121  _columns,
122  _cursor,
123  _values_list):
124  """Prepare insertion script
125 
126  A little arduous but reasonably safe sql with placeholders and
127  cursor.mogrify formatted columns and values for bulk insertion.
128 
129  Example script:
130  INSERT INTO psycopg2_insert_time_test ("first_name", "last_name",
131  "email", "gender", "ip_address", "ssn", "company_name", "race",
132  "department")
133  VALUES
134  ('254.70.235.138','Product Management','Noir','Marlyn','Livefish',
135  'Female','mnoir0@bizjournals.com','Guamanian','651-05-8691'),
136  ('151.11.81.28','Sales','Doxey','Berty','Skaboo','Male',
137  'bdoxey1@amazon.com','Malaysian','658-15-9923')
138  ...
139  """
140  col_names = sql.SQL(", ").join(map(sql.Identifier, _columns))
141  placeholders = ",".join(["%s"] * len(fixture_columns))
142 
143  args_str = ','.join(_cursor.mogrify("({})".format(
144  placeholders), x).decode('utf-8') for x in _values_list)
145 
146  return sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
147  sql.SQL(_tbl_name),
148  col_names,
149  sql.SQL(args_str)
150  )
151 
152 
154  """Prepare insertion script
155 
156  Uses named placeholders to insert list of records (dictonary)
157 
158  Example script:
159  INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", "email",
160  "gender", "ip_address", "ssn", "company_name", "race", "department")
161  VALUES (%(first_name)s, %(last_name)s, %(email)s, %(gender)s,
162  %(ip_address)s, %(ssn)s, %(company_name)s, %(race)s, %(department)s)
163 
164  `values` attribute of cursor.executemany() must be a list of named dict
165  """
166  return pg_handler.pg_prepare_insert_single_dict(_tbl_name, _columns)
167 
168 
170  """Prepare insertion script
171 
172  Uses named placeholders to insert list of records (list)
173 
174  Example script:
175  INSERT INTO "psycopg2_insert_time_test" ("first_name", "last_name",
176  "email", "gender", "ip_address", "ssn", "company_name", "race",
177  "department") VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
178 
179 
180  `values` attribute of cursor.executemany() must be a list of multiple
181  records - list of list (each record values must have same order
182  as _columns)
183  """
184  return pg_handler.pg_prepare_insert_single_list(_tbl_name, _columns)
185 
186 
187 def get_data():
188  # load the fixture data for testing purpose from disk
189  json_file = ("smb://192.168.169.33/devops/"
190  "psycopg2_bulk/MOCK_DATA_FINAL.json")
191  fixture_data = {}
192  fixture_data_count = 0
193  with open(json_file) as jf:
194  fixture_data = json.load(jf)
195  fixture_data_count = len(fixture_data)
196  assert fixture_data_count > 0
197  return fixture_data, fixture_data_count
198 
199 
200 # connection
201 def connect():
202  connection = psycopg2.connect(
203  host=DATABASE_HOST,
204  database=DATABASE_NAME,
205  user=DATABASE_USER,
206  password=DATABASE_PASSWORD
207  )
208  return connection
209 
210 
211 # execute a script in a transaction
212 def execute(sql, params={}):
213  with connect() as connection:
214  with connection.cursor() as cursor:
215  cursor.execute(sql, params)
216 
217 
219  def __init__(self, count):
220  # create a fresh table
221  execute(SETUP_SQL)
222  assert count > 0, "experimental row count must be greater than zero!"
223 
224  fixture_data, fixture_data_count = get_data()
225  # prepare data for experiment
226  self.data = fixture_data
227  if fixture_data_count < count:
228  self.count = fixture_data_count
229  else:
230  self.count = count
231  self.data = self.data[:count]
232 
234  '''Creates a new connection for each insertion'''
235  for row in self.data:
236  cols = list(row.keys())
237  vals = list(row.values())
238  # get insertion script to insert values in list/tuple form
239  insert_script = pg_handler.pg_prepare_insert_single_list(
240  TEST_TABLE_NAME_STR, cols)
241  execute(insert_script, vals)
242 
244  '''
245  Creates a new connection for each insertion
246  Insertion script incorporate dictionary values
247  '''
248  for row in self.data:
249  # row is a dictionary of columns and values
250  single_insert = pg_handler.pg_prepare_insert_single_dict(
251  TEST_TABLE_NAME_STR, fixture_columns)
252  execute(single_insert, row)
253 
255  '''
256  One connection multiple queries (iterative).
257  Insertion script incorporate list/tuple values
258  '''
259  with connect() as connection:
260  with connection.cursor() as cursor:
261  for row in self.data:
262  cols = list(row.keys())
263  vals = list(row.values())
264  insert_script = pg_handler.pg_prepare_insert_single_list(
265  TEST_TABLE_NAME_STR, cols)
266  cursor.execute(insert_script, vals)
267 
269  '''
270  One connection multiple queries (iterative).
271  Insertion script incorporate dictionary values
272  '''
273  with connect() as connection:
274  with connection.cursor() as cursor:
275  for row in self.data:
276  # row is a dictionary
277  single_insert = pg_handler.pg_prepare_insert_single_dict(
278  TEST_TABLE_NAME_STR, fixture_columns)
279  cursor.execute(single_insert, row)
280 
282  '''
283  One connection multiple queries.
284  Uses cursor.executemany() - list of values (dictionary)
285 
286  '''
287  with connect() as connection:
288  with connection.cursor() as cursor:
289  executemany_script = pg_prepare_insert_script_executemany_dict(
290  TEST_TABLE_NAME_STR, fixture_columns)
291  cursor.executemany(executemany_script, self.data)
292 
294  '''
295  One connection multiple queries.
296  Uses cursor.executemany() - list of values (list)
297  '''
298  values = []
299  for row in self.data:
300  # still linear, done to match the order of cols and values
301  row_vals = [row[col] for col in fixture_columns]
302  values.append(row_vals)
303 
304  with connect() as connection:
305  with connection.cursor() as cursor:
306  executemany_script = pg_prepare_insert_script_executemany_list(
307  TEST_TABLE_NAME_STR, fixture_columns)
308 
309  cursor.executemany(executemany_script, values)
310 
312  '''
313  One connection, one query - multiple values insertions.
314  Uses psycopg2.extras.execute_values
315  '''
316  (insert_query,
317  template) = pg_handler.pg_prepare_insert_script_execute_values_dict(
318  TEST_TABLE_NAME_STR, fixture_columns)
319 
320  with connect() as connection:
321  with connection.cursor() as cursor:
322  execute_values(
323  cursor,
324  insert_query.as_string(connection),
325  self.data,
326  template=template.as_string(connection),
327  page_size=self.count
328  )
329 
331  '''
332  One connection, one query - multiple values insertions.
333  Uses psycopg2.extras.execute_values
334  '''
335  values = []
336  for row in self.data:
337  # still linear, done to match the order of cols and values
338  row_vals = [row[col] for col in fixture_columns]
339  values.append(row_vals)
340 
341  (insert_query,
342  template) = pg_handler.pg_prepare_insert_script_execute_values_list(
343  TEST_TABLE_NAME_STR, fixture_columns)
344 
345  with connect() as connection:
346  with connection.cursor() as cursor:
347  execute_values(cursor,
348  insert_query.as_string(connection),
349  values,
350  template=template.as_string(connection),
351  page_size=self.count
352  )
353 
355  '''
356  One connection, one query - multiple values insertions.
357  Uses psycopg2.extras.execute_batch() - list of dict
358  '''
359  # query similar to executemany()
361  TEST_TABLE_NAME_STR, fixture_columns)
362 
363  with connect() as connection:
364  with connection.cursor() as cursor:
365  execute_batch(cursor,
366  insert_query,
367  self.data,
368  page_size=self.count
369  )
370 
372  '''
373  One connection, one query - multiple values insertions.
374  Uses psycopg2.extras.execute_batch() - list of list
375 
376  '''
377  values = []
378  for row in self.data:
379  # still linear, done to match the order of cols and values
380  row_vals = [row[col] for col in fixture_columns]
381  values.append(row_vals)
382 
383  with connect() as connection:
384  with connection.cursor() as cursor:
385  # query similar to executemany()
387  TEST_TABLE_NAME_STR, fixture_columns)
388  execute_batch(cursor,
389  insert_query,
390  values,
391  page_size=self.count
392  )
393 
395  '''
396  One connection, one query - multiple values insertions.
397  Uses cursor.mogrify to validate scripts
398 
399  '''
400  values = []
401  for row in self.data:
402  # still linear, done to match the order of cols and values
403  row_vals = [row[col] for col in fixture_columns]
404  values.append(row_vals)
405 
406  with connect() as connection:
407  with connection.cursor() as cursor:
409  TEST_TABLE_NAME_STR, fixture_columns, cursor, values)
410  cursor.execute(insert_query)
411 
413  """
414  One connection, one query - multiple values insertions.
415  Insertion script incorporate unnest + dictionary keys.
416 
417  `values` can also be passed as:
418  first_name = [r['first_name'] for r in self.data]
419  last_name = [r['last_name'] for r in self.data]
420  email = [r['email'] for r in self.data]
421  gender = [r['gender'] for r in self.data]
422  ip_address = [r['ip_address'] for r in self.data]
423  ssn = [r['ssn'] for r in self.data]
424  company_name = [r['company_name'] for r in self.data]
425  race = [r['race'] for r in self.data]
426  department = [r['department'] for r in self.data]
427  values = locals()
428  """
429  insert_sql = pg_handler.pg_prepare_insert_script_unnest(
430  TEST_TABLE_NAME_STR, fixture_columns)
431 
432  values = {}
433  for col in fixture_columns:
434  values[col] = [r[col] for r in self.data]
435 
436  with connect() as connection:
437  with connection.cursor() as cursor:
438  cursor.execute(insert_sql, values)
439 
440  def run(self):
441  print("")
442  with timer('[slow] 1 transaction, 1 insertion '
443  '(a record is list)'):
445 
446  print("")
447 
448  with timer('[slow] 1 transaction, 1 insertion '
449  '(a record is a named dict)'):
451 
452  print("")
453 
454  with timer('[normal] Iterative insertion - 1 transaction, '
455  'many insertions (a record is list)'):
457 
458  print("")
459 
460  with timer('[normal] Iterative insertion - 1 transaction, '
461  'many insertions (a record is named dict)'):
463 
464  print("")
465 
466  with timer('[normal] cursor.executemany() (records are list of list)'):
468 
469  print("")
470 
471  with timer('[normal] cursor.executemany() '
472  '(records are list of named dict)'):
474 
475  print("")
476 
477  with timer('[fast] psycopg2.extra'
478  '.execute_batch() (values are list of list)'):
480 
481  print("")
482 
483  with timer('[fast] psycopg2.extra'
484  '.execute_batch() (values are list of named dict)'):
486 
487  print("")
488 
489  with timer('[fast] cursor.mogrify and'
490  ' cursor.execute() (many records results to'
491  ' very large script)'):
493 
494  print("")
495 
496  with timer('[fast] psycopg2.extra'
497  '.execute_values() (values are list of list)'):
499 
500  print("")
501 
502  with timer('[fast] psycopg2.extra'
503  '.execute_values() (values are list of named dict)'):
505 
506  print("")
507 
508  with timer('[fast] unnest and cursor.execute()'):
509  self.fast_insert_unnest()
510 
511  print("")
512  print("****************************************************")
513  print("")
514 
515 
516 # insertion rate table
517 def fast_insertion_rate(count):
518  tester = BulkInsertionTester(count)
519  start = time.time()
520  tester.fast_insert_unnest()
521  duration = time.time() - start
522  return count / duration
523 
524 
525 def normal_insertion_rate(count):
526  tester = BulkInsertionTester(count)
527  start = time.time()
528  tester.insert_iterative_single_conn_dict()
529  duration = time.time() - start
530  return count / duration
531 
532 
533 if __name__ == "__main__":
534  experiment_row_counts = [
535  1, 50, 100, 200, 500, 1000, 2000,
536  5000, 10000, 20000, 35000, 50000
537  ]
538 
539  for rec_count in experiment_row_counts:
540  print("Stats for {} row insertions!".format(rec_count))
541  tester = BulkInsertionTester(rec_count)
542  print("Data count loaded from fixture: {}".format(tester.count))
543  tester.run()
544 
545  rates = [
546  {
547  "count": count,
548  'rate of insertion (fast)': fast_insertion_rate(count),
549  'rate of insertion (normal)': normal_insertion_rate(count)
550 
551  }
552  for count in experiment_row_counts
553  ]
554 
555  # display result table (following two lines require `pandas`)
556  frame = pandas.DataFrame(rates).set_index('count')
557  print(frame)
558 
559  # plot (requires matplotlib)
560  # frame.plot(logx=True)
561 
562  # CLEANUP: drop table
563  execute(CLEANUP_SQL)
def timer(name="duration")
def pg_prepare_insert_script_executemany_dict(_tbl_name, _columns)
def pg_prepare_insert_script_executemany_list(_tbl_name, _columns)
def pg_prepare_insert_script_cursor_mogrify(_tbl_name, _columns, _cursor, _values_list)