Jump to content

Featured Replies

Posted

In June, Databricks announced that they are open sourcing Delta Lake 2.0. Delta Lake is quickly becoming the format of choice in data science and data engineering.

 

 

 

To import Delta Lake into a Synapse dedicated SQL Pool you would need Azure Data Factory/Synapse Pipelines or Spark to handle the Delta Lake files.

 

 

This is not ideal because it adds extra overheads of complexity, time, and costs.

 

 

 

As an intellectual challenge, I wondered if it's possible to import Delta Lake files directly into the dedicated SQL Pool and support features like time-travel. It turned out to be a great little project and a great way of learning about Delta Lake.

 

 

 

I have uploaded the script to the Azure Synapse Toolbox Github site. The Azure Synapse Toolbox is an open-source library of useful tools and scripts to use with Azure Synapse Analytics.

 

 

 

Here is a breakdown of how the script works.

 

 

 

  1. Read the _last_checkpoint file, this contains the highest checkpoint version number.
  2. Read all the checkpoint files, these contain the 'Add' and 'Remove' files that need to be included and their timestamps.
  3. Read the latest JSON transaction files (since the last checkpoint), these contain the most recent 'Add' and 'Remove' files that need to be included and their timestamps.
  4. Filter out the 'Add' and 'Remove' files based on the 'modificationTime' and 'deletionTimestamp'
  5. Return the data from all the 'Add' files excluding the 'Remove' files. (Batching up the COPY INTO statements)

 

 

 

 

 

 

 

/*

IF OBJECT_ID('mpmtest') IS NOT NULL

BEGIN;

DROP TABLE mpmtest

END

 

declare @path varchar(400), @dt datetime2, @credential varchar(500),@outputable varchar(500),@display int, @debug int;

set @path = 'https://storageaccount.blob.core.windows.net/container/delta/demo/'

--set @dt = convert(datetime2,'2022/07/06 18:37:00'); --getdate(); -- for time travel --

set @dt = getdate(); -- for time travel --

set @credential = 'IDENTITY= ''Shared Access Signature'', SECRET = ''___SECRET____''';

set @outputable = 'mpmtest' -- leave empty for temp table

set @display = 1; -- if 1 display results

set @debug = 1;

exec delta @path,@dt,@credential,@outputable,@display, @debug

 

--select count(*) from mpmtest;

*/

 

 

 

create PROC [dbo].[delta]

@folder [varchar](4000),

@dt [datetime2],

@credential [varchar](4000),

@dest_table [varchar](4000),

@display int, -- 0 dont display, 1 display

@debug int -- 0 dont debug , 1 debug mode

AS

begin

-- version info

-- 0.1 - Very basic, but sort of worked.

-- 0.2 - Added support for remove files.

-- 0.3 - Fixed bug , in authenication

-- 0.4 - Improved performance in handling add/remove files

-- 0.5 - Added support for checkpoint files

-- 0.6 - Changed the json handling, added latest checkpoint handing

 

DECLARE bigint,@tsmil bigint, @json varchar(8000), @tsql varchar(8000);

DECLARE @json_remove varchar(8000), @tsql_checkpoint varchar(8000);

DECLARE @td_checkpoint1 datetime2, @td_checkpoint2 datetime2;

DECLARE @last_checkpoint varchar(25), @jsonversionchar varchar(25),@checkpointversion varchar(25) ;

DECLARE @output_sql varchar(max);

DECLARE _file_count bigint;

 

-- default: AAD authenication

IF (@credential IS NULL)

SET @credential = ''

 

IF (@credential <> '')

SET @credential = ' CREDENTIAL = (' + @credential + ')'

 

-- default : display results

IF @display IS NULL

set @display = 1

 

-- default : debug is off

IF @debug IS NULL

SET @debug = 0

 

-- if the datetime is null, get the latest version

IF (@dt IS NULL)

set @dt = getdate()

 

-- number of milliseconds since 1970

-- Datediff doesn't return a bigint - so I need todo this into 2 parts

set = datediff( s, '1970/1/1', convert(date,@dt) )

set = * 1000;

 

set @tsmil = datediff(ms, convert(date,@dt) , @dt)

set = + @tsmil;

 

if (@dest_table IS NULL)

set @dest_table = '#tmp_output'

 

---- Holds the raw json information -----------------------------

IF OBJECT_ID('tempdb..#last_checkpoint') IS NOT NULL

DROP TABLE #last_checkpoint

 

create table #last_checkpoint

(

info varchar(max)

) with (distribution=round_robin, heap)

 

------ read _last_checkpoint

-- This gives us the checkpoint if it exists, this gives us the starting point

-- for the json files

set @tsql = '

copy into #last_checkpoint (info)

from ''' + @folder + '_delta_log/_last_checkpoint''

with ( ' + @credential + ' ,FILE_TYPE = ''CSV''

,fieldterminator =''0x0b'' ,fieldquote = ''0x0b'' ,rowterminator = ''0x0d'' ) '

 

if @debug = 1

print @tsql;

 

set @td_checkpoint1 = getdate();

exec(@tsql);

set @td_checkpoint2 = getdate();

 

print 'Loading _last_checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'

 

if @debug = 1

begin

select '_Last_checkpoint', * from #last_checkpoint

end

 

select @checkpointversion = ISNULL(JSON_VALUE(info,'$.version'),'00') from #last_checkpoint;

 

if NOT EXISTS(SELECT * FROM #last_checkpoint)

BEGIN

SET @checkpointversion = '00'

END

 

 

PRINT 'checkpointversion=' +@checkpointversion

 

set @jsonversionchar = '00000000000000000000';

 

IF OBJECT_ID('tempdb..#delta_checkpoint') IS NOT NULL

DROP TABLE #delta_checkpoint

 

create table #delta_checkpoint

(

txn varchar(max),

[add] varchar(max),

[remove] varchar(max),

metaData varchar(max),

protocol varchar(max)

) with (distribution=round_robin, heap)

 

--- Code to pull the the add / remove files from the checkpoint files.

set @tsql_checkpoint = '

copy into #delta_checkpoint from ''' + @folder + '_delta_log/*.checkpoint.parquet''

with ( ' + @credential + ' ,FILE_TYPE = ''PARQUET'', AUTO_CREATE_TABLE = ''ON'' ) '

 

if @debug = 1

BEGIN

print @tsql_checkpoint;

END

 

set @td_checkpoint1 = getdate()

exec(@tsql_checkpoint);

set @td_checkpoint2 = getdate()

 

print 'Loading checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'

 

SELECT _file_count = count(*) from #delta_checkpoint

 

if @debug = 1

BEGIN

print 'No of checkpoint files ' + convert(varchar(10),@checkpoint_file_count);

 

SELECT * FROM #delta_checkpoint order by [add],[remove]

END

-- Holds the information from the checkpoint files ---------

IF OBJECT_ID('tempdb..#delta_files_checkpoint') IS NOT NULL

BEGIN;

print 'Dropping table #delta_files_checkpoint'

DROP TABLE #delta_files_checkpoint

END

 

create table #delta_files_checkpoint

(

fname varchar(99),

ts bigint,

dt datetime2,

[action] char(1)

) with (distribution=round_robin, heap)

 

-- create a table of add/remove files with the datetimestamp

INSERT INTO #delta_files_checkpoint

select DISTINCT

CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN

JSON_VALUE([remove],'$.path')

ELSE

JSON_VALUE([add],'$.path')

END fname,

CASE

WHEN JSON_VALUE([add],'$.path') is NULL THEN

convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp'))

ELSE

convert(bigint,JSON_VALUE([add],'$.modificationTime'))

END updatetime,

CASE

WHEN JSON_VALUE([add],'$.path') is NULL THEN

dateadd(s, convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp')) / 1000 , '1970/1/1')

ELSE

dateadd(s, convert(bigint,JSON_VALUE([add],'$.modificationTime')) / 1000 , '1970/1/1')

END updatetimedt,

CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN

'R'

ELSE

'A'

END [action]

from #delta_checkpoint

 

DELETE from #delta_files_checkpoint where fname is null

 

if @debug = 1

SELECT 'checkpoint', * , , @dt FROM #delta_files_checkpoint order by dt desc

 

-- remove files after the given time

DELETE FROM #delta_files_checkpoint WHERE ts >

 

if @debug = 1

SELECT 'checkpoint filtered', * , , @dt FROM #delta_files_checkpoint order by dt desc

 

---- Holds the raw json information -----------------------------

IF OBJECT_ID('tempdb..#delta_json') IS NOT NULL

BEGIN;

DROP TABLE #delta_json

END

 

create table #delta_json

(

jsoninput varchar(max)

) with (distribution=round_robin, heap)

----------------------------------------------------------------

 

set @jsonversionchar= left(substring(@jsonversionchar,0,len(@jsonversionchar)-len(@checkpointversion)+1) + @checkpointversion,len(@jsonversionchar)-1) + '*'

 

-- Read all the delta transaction logs, we cant filter out things by file name.

set @tsql = '

copy into #delta_json (jsoninput)

from ''' + @folder + '_delta_log/'+ @jsonversionchar +'.json''

with ( ' + @credential + ' ,FILE_TYPE = ''CSV''

,fieldterminator =''0x0b'' ,fieldquote = ''0x0b'' ,rowterminator = ''0x0d'' ) '

 

if @debug = 1

print @tsql;

 

set @td_checkpoint1 = getdate();

exec(@tsql);

set @td_checkpoint2 = getdate();

 

print 'Loading json files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'

 

if @debug = 1

SELECT 'JSON Files', * , , @dt FROM #delta_json

 

---- delta file details..

IF OBJECT_ID('tempdb..#delta_active_json') IS NOT NULL

BEGIN;

DROP TABLE #delta_active_json

END

 

create table #delta_active_json

(

jsoninput varchar(max) ,

ts bigint,

dt datetime2,

[action] char(1)

) with (distribution=round_robin, heap)

-----------------------------------------------------------

 

-- inserting into temp table, so the date and time is associated to each row

insert into #delta_active_json

select

convert(varchar(max),jsoninput)

, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) as ts

, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1')

,''

from #delta_json

WHERE convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) <

 

 

if @debug = 1

select 'json filtered by date',*, , @dt from #delta_active_json order by dt desc

 

-- left in for debugging -- shows each version of the JSON

if @debug = 1

begin

select 'not filtered',

convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) as ts

, convert(varchar(8000),jsoninput)

, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1')

,ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY JSON_VALUE(jsoninput,'$.commitInfo.timestamp') DESC) AS "Row Number"

,convert(varchar(8000),jsoninput)

, , @dt

from #delta_json

order by convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) desc

end

 

 

---------------------------------------------------------

-- insert the JSON adds and removes to the existing list from checkpoint

insert into #delta_files_checkpoint

select substring(value,charindex('path',value)+7 ,67) [path]

, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts

, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td

,'A' as [action]

from #delta_active_json

cross apply STRING_SPLIT(jsoninput,',')

where value like '%path%' and charindex('add',value) > 0

union all

select substring(value,charindex('path',value)+7 ,67)

, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts

, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td

, 'R'

from #delta_active_json

cross apply STRING_SPLIT(jsoninput,',')

where value like '%path%' and charindex('remove',value) > 0

 

 

if @debug = 1

begin

-- all the adds and removes from the json

select 'adds', substring(value,charindex('path',value)+7 ,67) [path]

, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts

, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td

,'A' as [action] , , @dt

from #delta_active_json

cross apply STRING_SPLIT(jsoninput,',')

where value like '%path%' and charindex('add',value) > 0

 

select 'removes', substring(value,charindex('path',value)+7 ,67)

, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts

, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td

, 'R', , @dt

from #delta_active_json

cross apply STRING_SPLIT(jsoninput,',')

where value like '%path%' and charindex('remove',value) > 0

 

select 'New list with adds and removes', *, , @dt from #delta_files_checkpoint order by ts desc;

 

select distinct 'distinct list ', fname, dt, [action], , @dt from #delta_files_checkpoint order by dt desc;

end

 

 

---- Holds the raw json information -----------------------------

IF OBJECT_ID('tempdb..#delta_files') IS NOT NULL

BEGIN;

DROP TABLE #delta_files

END

 

create table #delta_files

(

filenames varchar(max), rownumber bigint

) with (distribution=round_robin, heap)

 

insert into #delta_files

select fname,

ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY fname DESC)

from #delta_files_checkpoint where [action] = 'A' and

fname not in (select fname from #delta_files_checkpoint where [action] = 'R')

 

if @debug = 1

select '#delta_files', * from #delta_files;

 

if @debug = 1

select 'parquet to read ', fname,ts, , dt, @dt from #delta_files_checkpoint where [action] = 'A' and

fname not in (select fname from #delta_files_checkpoint where [action] = 'R')

--where ts <

 

--- This batching code might appear over kill, but STRING_AGG falls over when the string

-- is too long.. so I am batching up the inserts.

 

DECLARE @flist varchar(max);

DECLARE @batchsize int = 100000;

DECLARE @iParquetFileCount int =0;

DECLARE @iParquetFilePos int =1;

select @iParquetFileCount = count(*) from #delta_files

 

IF OBJECT_ID('tempdb..#_files') IS NOT NULL

DROP TABLE #_files

 

create table #_files

( fname varchar(999) ) with (distribution=round_robin,heap)

 

---- creating batches of COPY INTO statements

while(@iParquetFilePos <= @iParquetFileCount)

BEGIN

INSERT INTO #_files

SELECT [filenames] FROM #delta_files where rownumber between @iParquetFilePos and @iParquetFilePos + @batchsize

 

-- Had issues with STRING_AGG with if the string gets too long

SELECT @flist = 'COPY INTO ' + @dest_table + ' FROM '

+ STRING_AGG(CAST(''''+@folder+fname+'''' AS VARCHAR(MAX)), ',

') + ' with ( ' + @credential + '

,FILE_TYPE = ''PARQUET'' , AUTO_CREATE_TABLE = ''ON'' )'

FROM #_files

 

if @debug =1

select '_filelist', * from #_files;

 

if @debug =1 print @flist;

if @debug =1 print len(@flist);

 

set @td_checkpoint1 = getdate();

exec(@flist);

set @td_checkpoint2 = getdate();

 

print 'Loading batch of parquet ' + convert(varchar(10),@iParquetFilePos) + '->' + convert(varchar(10), @iParquetFilePos + @batchsize ) + ' took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'

 

truncate table #_files;

set @iParquetFilePos = @iParquetFilePos + @batchsize +1;

END

 

-- if there is no file, there is no table and this causes a problem

if charindex('#',@dest_table) > 0

set @output_sql = 'IF OBJECT_ID(''tempdb..' + @dest_table + ''') IS NOT NULL SELECT * from ' + @dest_table + ';'

ELSE

set @output_sql = 'IF OBJECT_ID(''' + @dest_table + ''') IS NOT NULL SELECT * from ' + @dest_table + ';'

 

if @debug = 1

PRINT @output_sql;

 

if @display = 1

exec(@output_sql );

 

end

 

 

 

 

 

 

 

Our team publishes blog(s) each week and you can find all these blogs here: Azure Synapse Analytics Blog

 

For deeper level understanding of Synapse implementation best practices, please refer to the Synapse Success by Design site.

 

 

 

mediumvv2px400.png.359701f07221f4a75618ca8a81e53f5c.png

 

Author(s): Mark Pryce-Maher is a Program Manager in Azure Synapse Customer Success Engineering (CSE) team.

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...