2 """Script to profile execution times for multiple bulk insertion methods 4 Findings from this script align to the conclusions made in the following 5 tutorial: https://trvrm.github.io/bulk-psycopg2-inserts.html 8 Using insertion script with select + unnest option is the most efficient way 9 of bulk insertion. Using unnest to load multiple rows simultaneously has the 12 - It is significantly faster than a regular insert loop, especially when 13 inserting thousands of rows. Also aster with psycopg2.extra. 14 execute_values and inserting list of values with a normal single insertion 15 script with cursor.mogrify. 16 - The benefits of using unnest() increase at least up to 50,000 rows 17 - It still allows us to write straightforward parameterised and safe SQL 18 with no (injection prone) string concatenations - we use psycopg2.sql 19 module to generate safe sql scripts. 21 But using `unnest` requires explicit type conversion incase of NULL values. 22 A little less efficient (but still order of magnitude faster than interative 23 approach) but more pythonic option is to use `execute_values`. `execute_values` 24 require a template of placeholders to be specified beforehand which is why 25 we should explicitly specify default values for the missing columns during 26 the source to destionation table mappings. 28 psycopg2 API resources: http://initd.org/psycopg/docs/extras.html#fast-exec 33 import ody_migr_db_handler
as pg_handler
37 import psycopg2.extras
38 from psycopg2.extras
import execute_values, execute_batch
40 from psycopg2
import sql
47 @contextlib.contextmanager
49 '''Utility function for timing execution''' 52 duration = time.time() - start
53 print(
"{}: {:.5f} second(s)".format(name, duration))
57 DATABASE_HOST = os.environ.get(
"DATABASE_HOST")
58 DATABASE_PORT = os.environ.get(
"DATABASE_PORT")
59 DATABASE_NAME = os.environ.get(
"DATABASE_NAME")
60 DATABASE_USER = os.environ.get(
"DATABASE_USER")
61 DATABASE_PASSWORD = os.environ.get(
"DATABASE_PASSWORD")
63 TEST_TABLE_NAME_STR =
"psycopg2_insert_time_test" 66 SETUP_SQL = sql.SQL(
""" 67 DROP TABLE IF EXISTS {}; 70 id serial primary key, 71 first_name char(50) NOT NULL, 72 last_name char(50) NOT NULL, 74 gender char(50) NOT NULL, 76 ssn char(50) NOT NULL, 77 company_name char(50), 78 race char(50) NOT NULL, 82 GRANT ALL ON {} TO odyssey; 84 sql.SQL(TEST_TABLE_NAME_STR),
85 sql.SQL(TEST_TABLE_NAME_STR),
86 sql.SQL(TEST_TABLE_NAME_STR)
90 CLEANUP_SQL = sql.SQL(
""" 91 DROP TABLE IF EXISTS {}; 93 sql.SQL(TEST_TABLE_NAME_STR)
97 COL_FNAME =
"first_name" 98 COL_LNAME =
"last_name" 100 COL_GENDER =
"gender" 101 COL_IP =
"ip_address" 103 COL_CMPNYNAME =
"company_name" 105 COL_DPMT =
"department" 124 """Prepare insertion script 126 A little arduous but reasonably safe sql with placeholders and 127 cursor.mogrify formatted columns and values for bulk insertion. 130 INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", 131 "email", "gender", "ip_address", "ssn", "company_name", "race", 134 ('254.70.235.138','Product Management','Noir','Marlyn','Livefish', 135 'Female','mnoir0@bizjournals.com','Guamanian','651-05-8691'), 136 ('151.11.81.28','Sales','Doxey','Berty','Skaboo','Male', 137 'bdoxey1@amazon.com','Malaysian','658-15-9923') 140 col_names = sql.SQL(
", ").join(map(sql.Identifier, _columns))
141 placeholders =
",".join([
"%s"] * len(fixture_columns))
143 args_str =
','.join(_cursor.mogrify(
"({})".format(
144 placeholders), x).decode(
'utf-8')
for x
in _values_list)
146 return sql.SQL(
"INSERT INTO {} ({}) VALUES {}").format(
154 """Prepare insertion script 156 Uses named placeholders to insert list of records (dictonary) 159 INSERT INTO psycopg2_insert_time_test ("first_name", "last_name", "email", 160 "gender", "ip_address", "ssn", "company_name", "race", "department") 161 VALUES (%(first_name)s, %(last_name)s, %(email)s, %(gender)s, 162 %(ip_address)s, %(ssn)s, %(company_name)s, %(race)s, %(department)s) 164 `values` attribute of cursor.executemany() must be a list of named dict 166 return pg_handler.pg_prepare_insert_single_dict(_tbl_name, _columns)
170 """Prepare insertion script 172 Uses named placeholders to insert list of records (list) 175 INSERT INTO "psycopg2_insert_time_test" ("first_name", "last_name", 176 "email", "gender", "ip_address", "ssn", "company_name", "race", 177 "department") VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) 180 `values` attribute of cursor.executemany() must be a list of multiple 181 records - list of list (each record values must have same order 184 return pg_handler.pg_prepare_insert_single_list(_tbl_name, _columns)
189 json_file = (
"smb://192.168.169.33/devops/" 190 "psycopg2_bulk/MOCK_DATA_FINAL.json")
192 fixture_data_count = 0
193 with open(json_file)
as jf:
194 fixture_data = json.load(jf)
195 fixture_data_count = len(fixture_data)
196 assert fixture_data_count > 0
197 return fixture_data, fixture_data_count
202 connection = psycopg2.connect(
204 database=DATABASE_NAME,
206 password=DATABASE_PASSWORD
212 def execute(sql, params={}):
213 with connect()
as connection:
214 with connection.cursor()
as cursor:
215 cursor.execute(sql, params)
219 def __init__(self, count):
222 assert count > 0,
"experimental row count must be greater than zero!" 224 fixture_data, fixture_data_count = get_data()
226 self.
data = fixture_data
227 if fixture_data_count < count:
228 self.
count = fixture_data_count
234 '''Creates a new connection for each insertion''' 235 for row
in self.
data:
236 cols = list(row.keys())
237 vals = list(row.values())
239 insert_script = pg_handler.pg_prepare_insert_single_list(
240 TEST_TABLE_NAME_STR, cols)
241 execute(insert_script, vals)
245 Creates a new connection for each insertion 246 Insertion script incorporate dictionary values 248 for row
in self.
data:
250 single_insert = pg_handler.pg_prepare_insert_single_dict(
251 TEST_TABLE_NAME_STR, fixture_columns)
252 execute(single_insert, row)
256 One connection multiple queries (iterative). 257 Insertion script incorporate list/tuple values 259 with connect()
as connection:
260 with connection.cursor()
as cursor:
261 for row
in self.
data:
262 cols = list(row.keys())
263 vals = list(row.values())
264 insert_script = pg_handler.pg_prepare_insert_single_list(
265 TEST_TABLE_NAME_STR, cols)
266 cursor.execute(insert_script, vals)
270 One connection multiple queries (iterative). 271 Insertion script incorporate dictionary values 273 with connect()
as connection:
274 with connection.cursor()
as cursor:
275 for row
in self.
data:
277 single_insert = pg_handler.pg_prepare_insert_single_dict(
278 TEST_TABLE_NAME_STR, fixture_columns)
279 cursor.execute(single_insert, row)
283 One connection multiple queries. 284 Uses cursor.executemany() - list of values (dictionary) 287 with connect()
as connection:
288 with connection.cursor()
as cursor:
290 TEST_TABLE_NAME_STR, fixture_columns)
291 cursor.executemany(executemany_script, self.
data)
295 One connection multiple queries. 296 Uses cursor.executemany() - list of values (list) 299 for row
in self.
data:
301 row_vals = [row[col]
for col
in fixture_columns]
302 values.append(row_vals)
304 with connect()
as connection:
305 with connection.cursor()
as cursor:
307 TEST_TABLE_NAME_STR, fixture_columns)
309 cursor.executemany(executemany_script, values)
313 One connection, one query - multiple values insertions. 314 Uses psycopg2.extras.execute_values 317 template) = pg_handler.pg_prepare_insert_script_execute_values_dict(
318 TEST_TABLE_NAME_STR, fixture_columns)
320 with connect()
as connection:
321 with connection.cursor()
as cursor:
324 insert_query.as_string(connection),
326 template=template.as_string(connection),
332 One connection, one query - multiple values insertions. 333 Uses psycopg2.extras.execute_values 336 for row
in self.
data:
338 row_vals = [row[col]
for col
in fixture_columns]
339 values.append(row_vals)
342 template) = pg_handler.pg_prepare_insert_script_execute_values_list(
343 TEST_TABLE_NAME_STR, fixture_columns)
345 with connect()
as connection:
346 with connection.cursor()
as cursor:
347 execute_values(cursor,
348 insert_query.as_string(connection),
350 template=template.as_string(connection),
356 One connection, one query - multiple values insertions. 357 Uses psycopg2.extras.execute_batch() - list of dict 361 TEST_TABLE_NAME_STR, fixture_columns)
363 with connect()
as connection:
364 with connection.cursor()
as cursor:
365 execute_batch(cursor,
373 One connection, one query - multiple values insertions. 374 Uses psycopg2.extras.execute_batch() - list of list 378 for row
in self.
data:
380 row_vals = [row[col]
for col
in fixture_columns]
381 values.append(row_vals)
383 with connect()
as connection:
384 with connection.cursor()
as cursor:
387 TEST_TABLE_NAME_STR, fixture_columns)
388 execute_batch(cursor,
396 One connection, one query - multiple values insertions. 397 Uses cursor.mogrify to validate scripts 401 for row
in self.
data:
403 row_vals = [row[col]
for col
in fixture_columns]
404 values.append(row_vals)
406 with connect()
as connection:
407 with connection.cursor()
as cursor:
409 TEST_TABLE_NAME_STR, fixture_columns, cursor, values)
410 cursor.execute(insert_query)
414 One connection, one query - multiple values insertions. 415 Insertion script incorporate unnest + dictionary keys. 417 `values` can also be passed as: 418 first_name = [r['first_name'] for r in self.data] 419 last_name = [r['last_name'] for r in self.data] 420 email = [r['email'] for r in self.data] 421 gender = [r['gender'] for r in self.data] 422 ip_address = [r['ip_address'] for r in self.data] 423 ssn = [r['ssn'] for r in self.data] 424 company_name = [r['company_name'] for r in self.data] 425 race = [r['race'] for r in self.data] 426 department = [r['department'] for r in self.data] 429 insert_sql = pg_handler.pg_prepare_insert_script_unnest(
430 TEST_TABLE_NAME_STR, fixture_columns)
433 for col
in fixture_columns:
434 values[col] = [r[col]
for r
in self.
data]
436 with connect()
as connection:
437 with connection.cursor()
as cursor:
438 cursor.execute(insert_sql, values)
442 with timer(
'[slow] 1 transaction, 1 insertion ' 443 '(a record is list)'):
448 with timer(
'[slow] 1 transaction, 1 insertion ' 449 '(a record is a named dict)'):
454 with timer(
'[normal] Iterative insertion - 1 transaction, ' 455 'many insertions (a record is list)'):
460 with timer(
'[normal] Iterative insertion - 1 transaction, ' 461 'many insertions (a record is named dict)'):
466 with timer(
'[normal] cursor.executemany() (records are list of list)'):
471 with timer(
'[normal] cursor.executemany() ' 472 '(records are list of named dict)'):
477 with timer(
'[fast] psycopg2.extra' 478 '.execute_batch() (values are list of list)'):
483 with timer(
'[fast] psycopg2.extra' 484 '.execute_batch() (values are list of named dict)'):
489 with timer(
'[fast] cursor.mogrify and' 490 ' cursor.execute() (many records results to' 491 ' very large script)'):
496 with timer(
'[fast] psycopg2.extra' 497 '.execute_values() (values are list of list)'):
502 with timer(
'[fast] psycopg2.extra' 503 '.execute_values() (values are list of named dict)'):
508 with timer(
'[fast] unnest and cursor.execute()'):
512 print(
"****************************************************")
517 def fast_insertion_rate(count):
520 tester.fast_insert_unnest()
521 duration = time.time() - start
522 return count / duration
525 def normal_insertion_rate(count):
528 tester.insert_iterative_single_conn_dict()
529 duration = time.time() - start
530 return count / duration
533 if __name__ ==
"__main__":
534 experiment_row_counts = [
535 1, 50, 100, 200, 500, 1000, 2000,
536 5000, 10000, 20000, 35000, 50000
539 for rec_count
in experiment_row_counts:
540 print(
"Stats for {} row insertions!".format(rec_count))
542 print(
"Data count loaded from fixture: {}".format(tester.count))
548 'rate of insertion (fast)': fast_insertion_rate(count),
549 'rate of insertion (normal)': normal_insertion_rate(count)
552 for count
in experiment_row_counts
556 frame = pandas.DataFrame(rates).set_index(
'count')
def insert_execute_batch_list(self)
def timer(name="duration")
def fast_insert_unnest(self)
def fast_insert_execute_values_list(self)
def insert_iterative_single_conn_dict(self)
def pg_prepare_insert_script_executemany_dict(_tbl_name, _columns)
def insert_executemany_list(self)
def slow_multiple_conns_list(self)
def fast_insert_execute_values_dict(self)
def slow_multiple_conns_dict(self)
def insert_executemany_dict(self)
def pg_prepare_insert_script_executemany_list(_tbl_name, _columns)
def fast_insert_many_mogrify(self)
def insert_execute_batch_dict(self)
def pg_prepare_insert_script_cursor_mogrify(_tbl_name, _columns, _cursor, _values_list)
def insert_iterative_single_conn_list(self)