Guest Jose_Manuel_Jurado Posted August 19, 2022 Posted August 19, 2022 Today, We've been working on a service request that our customer wants to improve the performance of a bulk insert process. Following, I would like to share my experience working on that. Our customer mentioned that inserting data (100.000 rows) is taking 14 seconds in a database in Business Critical. I was able to reproduce this time using a single thread using a table with 20 columns. In order to improve this Python code, I suggested to run in parallel this bulk insert every batch size of 10.000 rows and also, I followed the best practices reducing the execution time of this process: Client Virtual Machine level: Accelerated networking enabled. Depending how many parallel process that I needed create a CPU/Vcore, in this case, 10 vCores. Placed the virtual machine in the same region that the DB is. [*]Database level: Create a table with 20 columns. As the PK is a sequential key I included in the clustered index definition the parameter OPTIMIZE_FOR_SEQUENTIAL_KEY = ON Configure the same number of CPU/vCores with the maximum number of parallel process that I would like to have. In this case, 10 vCores. Depeding on amount of data use Business Critical to reduce the storage latency. [*]Python code level: Using executemany method in order to reduce the network roundtrips, sending only the value of the parameters. Running in batches (1000,10000) instead a single process. Use SET NOCOUNT ON to reduce the replied response/rowset about how many rows were inserted. In the connectionstring use autocommit=False Example of python code that you could find here. This Python reads a CSV file and for every 10000 rows execute a bulk insert using thread pool. import csv import pyodbc import threading import os import datetime class ThreadsOrder: #Class to run in parallel the process. def ExecuteSQL(self,a,s,n): TExecutor = threading.Thread(target=ExecuteSQL,args=(a,s,n,)) TExecutor.start() def SaveResults( Message, bSaveFile): #Save the details of the file. try: print(Message) if (bSaveFile==True): file_object = open(filelog, "a") file_object.write(datetime.datetime.strftime(datetime.datetime.now(), '%d/%m/%y %H:%M:%S') + '-' + Message + '\n' ) file_object.close() except BaseException as e: print('And error occurred - ' , format(e)) def ExecuteSQLcc(sTableName): try: cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600) cursor = cnxn1.cursor() cursor.execute("DROP TABLE IF EXISTS" + sTableName ) cursor.commit() cursor.execute("CREATE TABLE " + sTableName + " (" \ " [Key] [int] NOT NULL," \ " [Num_TEST] [int] NULL," \ " [TEST_01] [varchar](6) NULL," \ " [TEST_02] [varchar](6) NULL," \ " [TEST_03] [varchar](6) NULL," \ " [TEST_04] [varchar](6) NULL," \ " [TEST_05] [varchar](6) NULL," \ " [TEST_06] [varchar](6) NULL," \ " [TEST_07] [varchar](6) NULL," \ " [TEST_08] [varchar](6) NULL," \ " [TEST_09] [varchar](6) NULL," \ " [TEST_10] [varchar](6) NULL," \ " [TEST_11] [varchar](6) NULL," \ " [TEST_12] [varchar](6) NULL," \ " [TEST_13] [varchar](6) NULL," \ " [TEST_14] [varchar](6) NULL," \ " [TEST_15] [varchar](6) NULL," \ " [TEST_16] [varchar](6) NULL," \ " [TEST_17] [varchar](6) NULL," \ " [TEST_18] [varchar](6) NULL," \ " [TEST_19] [varchar](6) NULL," \ " [TEST_20] [varchar](6) NULL)") cursor.commit() cursor.execute("CREATE CLUSTERED INDEX [ix_ms_example] ON " + sTableName + " ([Key] ASC) WITH (STATISTICS_NORECOMPUTE = OFF, DROP_EXISTING = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = ON) ON [PRIMARY]") cursor.commit() except BaseException as e: SaveResults('Executing SQL - an error occurred - ' + format(e),True) def ExecuteSQL(a,sTableName,n): try: Before = datetime.datetime.now() if n==-1: sTypeProcess = "NoAsync" else: sTypeProcess="Async - Thread:" + str(n) SaveResults('Executing at ' + str(Before) + " Process Type: " + sTypeProcess, True ) cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600) cursor = cnxn1.cursor() cursor.fast_executemany = True cursor.executemany("SET NOCOUNT ON;INSERT INTO " + sTableName +" ([Key], Num_TEST, TEST_01, TEST_02, TEST_03, TEST_04, TEST_05, TEST_06, TEST_07, TEST_08, TEST_09, TEST_10, TEST_11, TEST_12, TEST_13, TEST_14, TEST_15, TEST_16, TEST_17, TEST_18, TEST_19, TEST_20) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",a) cursor.commit() SaveResults('Time Difference INSERT process ' + str(datetime.datetime.now() - Before) + " " + sTypeProcess, True ) except BaseException as e: SaveResults('Executing SQL - an error occurred - ' + format(e),True) #Connectivity details. SQL_server = 'tcp:servername.database.windows.net,1433' SQL_database = 'databasename' SQL_user = 'username' SQL_password = 'password' #file details to read filepath = 'c:\\k\\' ##To Read the demo file filelog = filepath + '\\Error.log' #Save the log. chunksize = 10000 #Transaction batch rows. sTableName = "[test_data]" #Table Name (dummy) pThreadOrder = ThreadsOrder() nThread = 0 #Number of Threads -- Right now, we provided an unlimited threads. ExecuteSQLcc(sTableName) Before = datetime.datetime.now() line_count = 0 for directory, subdirectories, files in os.walk(filepath): for file in files: name, ext = os.path.splitext(file) if ext == '.csv': a=[] SaveResults('Reading the file ' + name ,True) BeforeFile= datetime.datetime.now() with open(os.path.join(directory,file), mode='r') as csv_file: csv_reader = csv.reader(csv_file, delimiter=',') for row in csv_reader: line_count+= 1 if line_count>1: a.append(row) if (line_count%chunksize)==0: deltaFile = datetime.datetime.now() - BeforeFile nThread=nThread+1 SaveResults('Time Difference Reading file is ' + str(deltaFile) + ' for ' + str(line_count) + ' rows', True ) pThreadOrder.ExecuteSQL(a,sTableName,nThread) #Open a new theard per transaction batch size. #ExecuteSQL(a,sTableName,-1) a=[] BeforeFile= datetime.datetime.now() SaveResults('Total Time Difference Reading file is ' + str(datetime.datetime.now() - Before) + ' for ' + str(line_count) + ' rows for the file: ' + name , True ) During the execution if you need to know the connections, number of rows and the impact in terms of resources see the following TSQL SELECT substring(REPLACE(REPLACE(SUBSTRING(ST.text, (req.statement_start_offset/2) + 1, ( (CASE statement_end_offset WHEN -1 THEN DATALENGTH(ST.text) ELSE req.statement_end_offset END - req.statement_start_offset)/2) + 1) , CHAR(10), ' '), CHAR(13), ' '), 1, 512) AS statement_text ,req.database_id ,program_name ,req.session_id , req.cpu_time 'cpu_time_ms' , req.status , wait_time , wait_resource , wait_type , last_wait_type , req.total_elapsed_time , total_scheduled_time , req.row_count as [Row Count] , command , scheduler_id , memory_usage , req.writes , req.reads , req.logical_reads, blocking_session_id FROM sys.dm_exec_requests AS req inner join sys.dm_exec_sessions as sess on sess.session_id = req.session_id CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) as ST where req.session_id <> @@SPID select count(*) from test_data select * from sys.dm_db_resource_stats order by end_time desc Enjoy! Continue reading... Quote
Recommended Posts
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.