Jump to content

Lesson Learned #234: Parallel vs Single Running a Bulk Insert with Python


Recommended Posts

Guest Jose_Manuel_Jurado
Posted

Today, We've been working on a service request that our customer wants to improve the performance of a bulk insert process. Following, I would like to share my experience working on that.

 

 

 

Our customer mentioned that inserting data (100.000 rows) is taking 14 seconds in a database in Business Critical. I was able to reproduce this time using a single thread using a table with 20 columns.

 

 

 

In order to improve this Python code, I suggested to run in parallel this bulk insert every batch size of 10.000 rows and also, I followed the best practices reducing the execution time of this process:

 

 

 

  • Client Virtual Machine level:


    • Depending
      how many parallel process that I needed create a CPU/Vcore, in this case, 10 vCores.

    • Placed the virtual machine in the same region that the DB is.

    [*]
    Database level:

    • Create a table with 20 columns.

    • As the
      PK is a sequential key I included in the clustered index definition
      the parameter
      = ON

    • Configure the same number of CPU/vCores
      with the maximum number of parallel process that I would like to have. In this case, 10 vCores.

    • Depeding on amount of data use
      Business Critical
      to reduce the
      storage latency.

    [*]
    Python code level:

    • Using
      method in order to reduce the network roundtrips, sending only the value of the parameters.

    • Running in batches (1000,10000) instead a single process.

    • Use
      ON to reduce the replied
      response/rowset about how many rows were inserted.

    • In the
      connectionstring use autocommit=False

 

Example of python code that you could find here. This Python reads a CSV file and for every 10000 rows execute a bulk insert using thread pool.

 

 

 

 

 

 

 

 

 

import csv

import pyodbc

import threading

import os

import datetime

 

class ThreadsOrder: #Class to run in parallel the process.

def ExecuteSQL(self,a,s,n):

TExecutor = threading.Thread(target=ExecuteSQL,args=(a,s,n,))

TExecutor.start()

 

def SaveResults( Message, bSaveFile): #Save the details of the file.

try:

print(Message)

 

if (bSaveFile==True):

file_object = open(filelog, "a")

file_object.write(datetime.datetime.strftime(datetime.datetime.now(), '%d/%m/%y %H:%M:%S') + '-' + Message + '\n' )

file_object.close()

except BaseException as e:

print('And error occurred - ' , format(e))

 

 

def ExecuteSQLcc(sTableName):

try:

cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600)

cursor = cnxn1.cursor()

cursor.execute("DROP TABLE IF EXISTS" + sTableName )

cursor.commit()

cursor.execute("CREATE TABLE " + sTableName + " (" \

" [Key] [int] NOT NULL," \

" [Num_TEST] [int] NULL," \

" [TEST_01] [varchar](6) NULL," \

" [TEST_02] [varchar](6) NULL," \

" [TEST_03] [varchar](6) NULL," \

" [TEST_04] [varchar](6) NULL," \

" [TEST_05] [varchar](6) NULL," \

" [TEST_06] [varchar](6) NULL," \

" [TEST_07] [varchar](6) NULL," \

" [TEST_08] [varchar](6) NULL," \

" [TEST_09] [varchar](6) NULL," \

" [TEST_10] [varchar](6) NULL," \

" [TEST_11] [varchar](6) NULL," \

" [TEST_12] [varchar](6) NULL," \

" [TEST_13] [varchar](6) NULL," \

" [TEST_14] [varchar](6) NULL," \

" [TEST_15] [varchar](6) NULL," \

" [TEST_16] [varchar](6) NULL," \

" [TEST_17] [varchar](6) NULL," \

" [TEST_18] [varchar](6) NULL," \

" [TEST_19] [varchar](6) NULL," \

" [TEST_20] [varchar](6) NULL)")

cursor.commit()

cursor.execute("CREATE CLUSTERED INDEX [ix_ms_example] ON " + sTableName + " ([Key] ASC) WITH (STATISTICS_NORECOMPUTE = OFF, DROP_EXISTING = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = ON) ON [PRIMARY]")

cursor.commit()

except BaseException as e:

SaveResults('Executing SQL - an error occurred - ' + format(e),True)

 

 

def ExecuteSQL(a,sTableName,n):

try:

Before = datetime.datetime.now()

 

if n==-1:

sTypeProcess = "NoAsync"

else:

sTypeProcess="Async - Thread:" + str(n)

SaveResults('Executing at ' + str(Before) + " Process Type: " + sTypeProcess, True )

cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600)

 

cursor = cnxn1.cursor()

cursor.fast_executemany = True

cursor.executemany("SET NOCOUNT ON;INSERT INTO " + sTableName +" ([Key], Num_TEST, TEST_01, TEST_02, TEST_03, TEST_04, TEST_05, TEST_06, TEST_07, TEST_08, TEST_09, TEST_10, TEST_11, TEST_12, TEST_13, TEST_14, TEST_15, TEST_16, TEST_17, TEST_18, TEST_19, TEST_20) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",a)

cursor.commit()

SaveResults('Time Difference INSERT process ' + str(datetime.datetime.now() - Before) + " " + sTypeProcess, True )

 

except BaseException as e:

SaveResults('Executing SQL - an error occurred - ' + format(e),True)

 

 

#Connectivity details.

SQL_server = 'tcp:servername.database.windows.net,1433'

SQL_database = 'databasename'

SQL_user = 'username'

SQL_password = 'password'

 

 

#file details to read

filepath = 'c:\\k\\' ##To Read the demo file

filelog = filepath + '\\Error.log' #Save the log.

 

chunksize = 10000 #Transaction batch rows.

sTableName = "[test_data]" #Table Name (dummy)

 

pThreadOrder = ThreadsOrder()

nThread = 0 #Number of Threads -- Right now, we provided an unlimited threads.

 

ExecuteSQLcc(sTableName)

 

Before = datetime.datetime.now()

line_count = 0

for directory, subdirectories, files in os.walk(filepath):

for file in files:

name, ext = os.path.splitext(file)

if ext == '.csv':

a=[]

SaveResults('Reading the file ' + name ,True)

BeforeFile= datetime.datetime.now()

with open(os.path.join(directory,file), mode='r') as csv_file:

csv_reader = csv.reader(csv_file, delimiter=',')

for row in csv_reader:

line_count+= 1

if line_count>1:

a.append(row)

 

if (line_count%chunksize)==0:

deltaFile = datetime.datetime.now() - BeforeFile

nThread=nThread+1

SaveResults('Time Difference Reading file is ' + str(deltaFile) + ' for ' + str(line_count) + ' rows', True )

pThreadOrder.ExecuteSQL(a,sTableName,nThread) #Open a new theard per transaction batch size.

#ExecuteSQL(a,sTableName,-1)

a=[]

BeforeFile= datetime.datetime.now()

 

SaveResults('Total Time Difference Reading file is ' + str(datetime.datetime.now() - Before) + ' for ' + str(line_count) + ' rows for the file: ' + name , True )

 

 

 

 

 

 

 

 

 

 

 

 

 

During the execution if you need to know the connections, number of rows and the impact in terms of resources see the following TSQL

 

 

 

 

 

 

 

 

 

 

 

SELECT

substring(REPLACE(REPLACE(SUBSTRING(ST.text, (req.statement_start_offset/2) + 1, (

(CASE statement_end_offset WHEN -1 THEN DATALENGTH(ST.text) ELSE req.statement_end_offset END

- req.statement_start_offset)/2) + 1) , CHAR(10), ' '), CHAR(13), ' '), 1, 512) AS statement_text

,req.database_id

,program_name

,req.session_id

, req.cpu_time 'cpu_time_ms'

, req.status

, wait_time

, wait_resource

, wait_type

, last_wait_type

, req.total_elapsed_time

, total_scheduled_time

, req.row_count as [Row Count]

, command

, scheduler_id

, memory_usage

, req.writes

, req.reads

, req.logical_reads, blocking_session_id

FROM sys.dm_exec_requests AS req

inner join sys.dm_exec_sessions as sess on sess.session_id = req.session_id

CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) as ST

where req.session_id <> @@SPID

 

select count(*) from test_data

 

select * from sys.dm_db_resource_stats order by end_time desc

 

 

 

 

 

 

 

 

 

 

 

 

Enjoy!

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...