Sunday, April 25, 2021

Handling SQL DB row-level errors in ADF (Azure data factory) Data Flows

If you are working with ADF (Azure data factory) data flows then you may have noticed there is a new feature released in Nov 2020 which is useful to capture any error while inserting/updating the records to the SQL database.

Fig 1: Error row handling at sink database

For error handling there are two options to choose from:

1) Stop on first error (default)

2) Continue on error


                                                    Fig 2: Error row handling options

By default, ADF pipeline will stop at the error. However, the main purpose of this feature to use option "Continue on error" to catch and log the error so that we can look at later and take action accordingly. 

Let's fill up the settings to catch errors rows, below figures show the settings and will also describe each setup (Please follow the numbering in the figure 3).

1) Error row handling: Since we wanted to catch the error so we have chosen "Continue of error" at Error row handling.

Fig 3: Settings Continue on error

2) Transaction Commit: Choose whether the data flow will be written in a single transaction or in batches, I have chosen single, it means whenever there is failure it will store the record on the other hand batch will store error records when full batch is completed.

3)Output rejected data: You need to make this check mark TRUE to store the error rows. The whole point of error row handling is you want to know the error records; if so, please tick check mark. Though you can avoid this, in that case pipeline will run but if there is any error you will not know which records causes the error.

4) Linked Service: Put the linked service and test the connection

5)Storage folder path: Storage path need to mention here, it's the path where you would like to store the error records in a file.

6)Report success on error: I don't put report on success checkbox to TRUE since I wanted to know if there is a failure.


After the settings, when you run the pipeline and if there is any error in the dataset, it will be stored in your storage folder as you have provided at point no 5 in the settings.


In general, when there is a failure at the time of inserting records to the database it takes sometime to find out the reason of failure. You may have to go through large chunk of dataset and look for miss match of data types or NULL value etc. to find out the root cause. Through this feature the error records will be captured and stored in the storage so you will be able to identify the reason for any error very quickly. And if you would like to ingest those error rows then you can fix those records and re-run the pipeline.

Sunday, March 28, 2021

Step by step guideline to install Jupyter Notebook

Whether you work as a Data Engineer or a Data Scientist, a Jupyter Notebook is a helpful tool. One of the projects I was working required a comparison of two parquet files. This is mainly a schema comparison, not a data comparison. Though the two .parquet were created from two different sources, the outcome should be completely alike, schema wise. At the beginning I was manually comparing them then I thought there must be a tool to do that. Well, that's how I found a Jupyter notebook can be useful to compare two .parquet files' schema.

The Jupyter Notebook can be used for data cleaning and transformation, data visualization, machine learning, statistical modeling and much more. This post will describe the step by step installation process of Jupyter notebook.

Step 1: Install python version 3.7.9

Python is a prerequisite for running a Jupyter notebook, so we need to install python first. Please follow this URL and choose right version to install: https://www.python.org/downloads/.

I have chosen 'Windows x86-64 executable installer' for my Windows 64 bit OS. Please choose the version as per your computer Operating system.

Fig 1: Windows Executable

You can download the executable file and save in any location at your computer.

Now next step is to create a 'Python' folder under the C: drive, we will use this folder as installation location at later step.

Fig 2: Python folder under C

 

Find out the downloaded executable file, I have saved the executable file under Downloads folder (shown in below figure 3). Now double click the executable file to initiate the installation process.

Fig 3: Python Execution file

Make sure to choose 'Customize Installation' and check mark 'Add Python 3.9 to PATH' as shown in figure 4. I followed the customization method to avoid setting up environment variable.

Fig 4: Python Installation wizard

As below figure 5 shown, the Customize installation location, where make sure you put the installation location folder C:\Python\Python39. We have created 'Python' folder in C drive in earlier step (Fig 2)

Fig 5: choose the location

Now hit the Install button. Installation will complete in a minute or two.

Let's test if python installed successfully, open command prompt and type "python". If python is installed correctly then you should able to see the python version number and some key help, as shown below in Fig 6.

Fig 6: Python installed successfully.

Step 2: Install the Jupyter Notebook

Let's move to the next step, which is to install the Jupyter notebook software. Open command prompt and type the below code:

>pip install jupyter
 
Fig 7: Jupyter Notebook installation started

When installation is complete, let's run the Jupyter Notebook web application. To do this, you need to go to a cmd prompt and execute this command, as shown in below figure 8:

Jupyter notebook
 
Fig 8: Opening Jupyter Notebook
 
As soon as you hit the above command button, It will open a browser with jupyter notebook as shown in figure 9.
 
Fig 9: Jupyter Notebook on browser

Now you can create a Notebook by choosing 'New' and choose Python 3 as show in fig 10. This will open a new browser tab where you will write the code.

Fig 10: Open Notebook

Let's write hello world program in the Jupyter notebook. The browser will look like figure 11 if you enter this code:

print('Hello world')

The output is shown after clicking the 'Run' button.

 
Fig 11: Hello world in Jupyter Notebook

Now you can write and run other notebooks.

In this article, we learned how to install python and Jupyter Notebooks and have also written a simple hello world program. There are different ways you can install Jupyter Notebook, but I followed this approach and found simple.

Tuesday, February 23, 2021

How to add local timestamp end of the files in ADF?

This article will describe how to add your local timestamp at the end of the each file in Azure Data Factory (ADF). In general, ADF gets a UTC timestamp, so we need to convert the timestamp from UTC to EST, since our local time zone is EST.

For example, if the input Source file name in SFTP is "_source_Customer.csv", then the expected outcome will be, "_source_Customer_2021-02-12T133751.csv". This means that the pipeline should add '_2021-02-12T133751' to the end of each file. This will work dynamically, which means that any file you pass from the source will have the timestamp added to it by using an ADF regular expression.

Let's set a simple pipeline and explain the scenario in a few steps. In this example, we receive files from an event based trigger and hold the file name in a parameter. The main part of this article is how to append the current date and time to the end of the file name we received. Please note that event based triggers will not be discussed here. If you like to know more about how to create trigger, please follow this link.

Step 1: Add Copy Activity

Create a simple pipeline with at least one Copy activity that connects a source and a sink, similar to what is shown in Fig 1.

 

                                           

                                          Fig 1: Pipeline with Copy Activity
 

Step 2: Adding a parameter to receive the file name 

Before adding the expression, we need to have a parameter in the pipeline that will catch the filename from a trigger. For the time being assume that the trigger will give us the file name and the parameter is to hold the filename.
 
                                                 
                                                              Fig 2: Adding parameter

Step 3: Prepare the sink dataset

In the Copy data activity there is a Sink dataset that needs a parameter. Click on the Sink dataset and when it opens, you will find the view similar to Fig 3.

                                                                         
                                                                                Fig 3: Adding parameter to the dataset
 
To add parameter to the dataset, click New and add the parameter name. Select the type, which should be a string. Now you you will see the sink dataset looks like Fig 4. The value edit box is where you need to add the dynamic content.
                                             
                                                  Fig 4: Dynamic content to add the timestamp

 

Step 4: Setting up a dynamic expression

Now, let's create the dynamic expression. As soon you hit the 'Add dynamic content', shown in Figure 5, you will able to write the expression that will convert the UTC timestamp to EST and then pad end of the file.

                                         
                                                                  Fig 5: expression language
 
We apply a number of functions to the pTriggerFile parameter from Step 1. Let's have closer look at the expression:
@concat(replace(pipeline().parameters.pTriggerFile,'.csv',''), '_', 
formatDateTime(convertTimeZone(utcnow(),'UTC','Eastern Standard Time'),'yyyy-MM-ddTHHmmss'), '.csv')

Find out the explanation of the above expression.

  1. First we need to get the filename from the parameter, pTriggerFile. The value here will be: _source_Customer.csv
  2. Next we use REPLACE() to replace the .csv with empty string : replace(pipeline().parameters.pTriggerFile,'.csv',''). This case, we get:  _source_Customer
  3. We need to get the timestamp. To do that, we convert utcnow() to EST with this function: convertTimeZone(utcnow(),'UTC','Eastern Standard Time')
  4. We want to format the date, and use this: formatDateTime(convertTimeZone(utcnow(),'UTC','Eastern Standard Time'),'yyyy-MM-ddTHHmmss'), which will return a value like: 2021-02-12T133751
  5. We put this all together with @concat(Step1,'_', 'Step4','.csv'), which will return _source_Customer_2021-02-12T133751.csv

We learned how to add local timestamp end of any file, though in this case, the source file was a .csv. However, you can follow the same process for .txt file where you only need to change '.csv' to '.txt' in the expression.

Saturday, January 30, 2021

How to work with SQL Store procedure output parameters in Azure data factory

To facilitate native SQL code we have stored procedure support in Azure Data Factory (ADF). When we work with stored procedures, mostly we use input parameters, however, a stored procedure can also have output parameters. In this case, we need to deal with return values, similar to how a value is returned by a function. This article will describe how you can work with stored procedure output parameters in ADF.

ADF has the SQL Server Stored Procedure Activity, which is used for any stored procedure you have in a SQL Server database. ADF also has a Lookup activity in which you can use a stored procedure. The example will use the Lookup activity to execute a stored procedure in a database.

Let's start by creating a stored procedure (SP) with an output parameter:

CREATE PROCEDURE [ETL].[sp_testprocOutParm]
(
 @input  VARCHAR(10),@Iambit BIT OUTPUT)
AS
BEGIN
IF @input >= '1'
BEGIN
    SET @iambit = 1;
	RETURN;
	END
END;

Let's see if the SP returns the expected value. Well, how do we execute the SP in SQL Server Management Studio (SSMS)? Please write below syntax to find the outcome.

DECLARE @Iambit bit 

EXEC ETL.sp_testprocOutParm '1', @Iambit OUTPUT
SELECT @Iambit Iambit

You should have outcome like below Fig 1.

Fig 1: Execution of output parameter in SSMS

The SP is created in the database and it has returned the expected outcome. Now we need to move to ADF.

This article assume you know how to add Lookup activity to ADF pipeline. In your pipeline get the Lookup activity as like figure 1.1.Now, Go Settings of the Lookup activity and choose stored procedure from theUse query selections (as shown in Figure 2). Then you should able to see the stored procedure under the dropdown list which you just created in the database. If you are not able to see the stored procedure then it most likely a problem with your access in ADF. Please remember, though you can execute the SP in SSMS, this doesn't mean you will have access to the stored procedure from ADF, You will need to talk with your portal admin and find out if your user has been given enough permissions to execute the stored procedure.

Fig 2: Connect stored procedure via Lookup in ADF

If you find out the stored procedure in the list,  you can continue to the next step. The next step is to import parameters by clicking the button, import parameter, as shown in Fig 3.

Fig 3: import parameter

The import parameter will load all the SP parameters, both input and output parameters. In this case the single output parameter will be shown. Having an output parameter means you want to return some value from the stored procedure and use it in ADF. In this example, the return value will control the next activities in the pipeline. Let's store the return value into a variable.

Drag and drop the Set Variable activity and connect it with the with the Lookup, as shown below in Fig 4

Fig 4: Add Set variable

We need a variable as shown in Fig 5, put variable name and select type of the variable. Since variable is at pipeline scope, so make sure you have selected pipeline itself not any activities in the pipeline.

Fig 5: Creating variable

While creating variables, you will find there are three types of variables: string, Boolean and Array. We have chosen Boolean type for the variable since our stored procedure returns Boolean.

Fig 6: Creating variable

Now, let's get to the pipeline and select 'set variable' activity (as shown in Fig 7). In the Set variable activity, please click the variable tab. Here is where you will find the recently created variable under the name drop down. Select it, as shown in Fig 7.

 

Fig 7: Choose the variable

To bind the value returned from the stored procedure to the variable, you need write the expression under the 'value' of the variable, as shown in Fig 8

Fig 8: expression to hold return value

The expression to bind return value from stored procedure is:

@activity('Lookup1').output.firstRow.Iambit

Let's explain what expression means. The above expression will return first row from the lookup activity, named Lookup1, and return the column name from the stored procedure, which is 'Iambit'. Here, 'Iambit' will return the result as 1 (a Boolean TRUE).

Be Cautious: Though the variable is set to be Boolean and the stored procedure returned the value as a bit, I found an error in the ADF with the above expression. I had to modify the expression to explicitly convert the return value to Boolean, like the code below:

@bool(activity('Lookup1').output.firstRow.Iambit)

In summary, output parameter in stored procedure a great facility when in need and happy to see that works in ADF. Please note that, the example we depicted here in this article by using very simple stored procedure, however; in business scenario your stored procedure could be a bit complex. For example, you may have logging tables in the data warehouse and you want any activity in ADF will only execute when you pass particular pipeline name as input parameter as a return if the stored procedure returns TRUE.

Wednesday, December 30, 2020

ADF data flow: can particular date format generate NULL value?

I am going to share recent finding in ADF data flow where my source data in .csv got correct date. However, as long as when I did some transformation and saved in .parquet found those date all got empty values. This blog post will describe the issue and the resolution.


Source data in .csv have startDate and CloseDate like below figure 1.0 where format of the date is MM/dd/yyyy

Fig 1.0: Date in the source .csv



I have used ADF data flow to cleanup/transform the files and saved into .parquet format. However, in the .parquet file these two date columns 'StartDate' and 'CloseDate' become empty.

 

Fig 1.1: Dates become empty in .parquet


After looking into the dataflow and specific looking at the projection of the source found auto detect date format ‘MM/dd/YYYY’ which is original source date format.


Fig 1.3: Date format auto detected in the data flow

 And when previewed the data those date shown as NULL which was kind of weird.

   

Fig 1.4: Date shown as NULL in the data preview

How to solve it? 

To fix this issue, what you need to do is, go under projection and change the date format to ‘yyyy-MM-dd’ as like below figure 2.0


Fig 2.0: Change date format

 

 And you can go and see the preview, it looks good now.

 

Fig 2.1: After changing the date format preview looks perfect


 

 Note that, I have tried with other format from projection such as yyyy/MM/dd and so on but those did not resolve the issue.

Fig 2.2: 


Other Solution?

 You can also take other approach,  change the format from 'date' to 'string'  under the projection

Fig 3.0: change data type from date to string


And then use derive column activity 

Fig 3.1: get 'derived column' in the data flow


Now, use expression to convert into the correct date format : toDate(OpenDate,'yyyy-MM-dd') 

Fig 3.2: expression to convert from string to date


In summary, if you find any discrepancy at output file's date columns then look closely the date format, preview the data in the ADF data flow then either change the format from source projections or use derived column activities to fix it.

Saturday, November 14, 2020

How to deal with NULL in ADF dataflow compared with SSIS?

When you are working with ETL/ELT, sometimes you may need to transform NULL into something meaningful value. If you worked with SSIS, you know how you handle that. This blog post will describe how do we do in SSIS and how the very same task can be done in ADF Dataflow.

 Consider, we have a .csv file where Name columns have NULL value for 2nd record (figure: 1.0)



Fig 1.0: Sample .csv file with NULL record




After connecting the .csv file through flat file source in SSIS data flow, we can debug and view the record through data viewer which will look like below figure 1.1

Fig 1.1: Result in SSIS data flow - data viewer


If you would like to replace the NULL value with meaning value, in that case you need to use derive column activity and use expression.

SSIS data flow expression got REPLACENULL function, which will replace NULL to the expected value that you want.

The expression: REPLACENULL(Name,"Unknown")

The above expression will return 'Unknown' when Name is NULL otherwise it will return the original value.

Fig 1.2: Expression in SSIS Data flow to replace NULL with 'Unknown'


When it comes to ADF data flow regular expression similar like SSIS expression; isNull only give you true or false. And isNull function take only one argument, e.g. below fig 2.1 took the argument Name and return True (✓) if the value is NULL.


Fig 2.0: ADF dataflow isNull function


Now, let's find out how to transform NULL value into something meaningful in ADF data flow. ADF doesn’t have the same function REPLACENULL which used in SSIS, rather there are two ways you can replace the NULL values in ADF dataflow.


Approach 1: Combination of iif and isNULL function


Expression: iif(isNull(Name), 'Unknown', Name)

The function iif will check the condition isNull(Name), if Name have Null value it will return 'Unknown' otherwise original value will be returned.


Fig 2.1:  using iif and isNull in ADF dataflow

Approach 2: By using iifNull function

The smartest solution is to use iifNull which will return exactly the same result we found via approach 1.

expression: iifNull(Name, 'Unknown') will return 'Unknown' if Name have NULL values otherwise it will return original value.

Fig 2.2:iifNULL function to replace NULL value

In summary, expression is similar to replace NULL values for both SSIS and ADF data flow, however, the function you need to use is different for two different tools.

Sunday, October 18, 2020

How to handle Case statement in Azure Data Factory (ADF) compare to SSIS?

This post will describe how do you use CASE WHEN statement in Azure data factory(ADF). If you are coming from SSIS background, you know a piece of SQL statement will do the task. However let's see how you do it in SSIS and the very same thing can be achieved in ADF.

Problem statement:

For my simple scenario, In case PortfolioTypeCode is either 'Mutual Fund' or 'Pooled Fund' it should return 1 Else it should return 0.

 

How do you do in SSIS?

In SSIS, under data flow you will have OLEDB source like below fig 1:

Fig 1: SSIS OLEDB source

 

And open the OLEDB source and then Write SQL command like below and you are done:

SELECT Col1,

          CASE WHEN PortfolioCode IN('Mutual fund','Pooled fund')

            THEN 1

            ELSE 0

END  IsFund,

Col2

From Table1

Fig 2: CASE WHEN under SQL command in SSIS

 

How do you implement in ADF?

However in ADF, to achieve the same you need to use Expressions. ADF have very same concept of data flow like SSIS. In the data flow, after the source dataset is established you can add 'Derived Column' activity like below Fig 3:

Fig 3: Adding derive column under data flow

 

Now you can give a new column name and then add the expression (Fig 4):

Fig 4: Derived column expression

 

Let's see how Case expression works: it takes 3 arguments, those are condition, true and false. However, it can have alternating condition which describe in the figure 5:

Fig 5: Case in Expression

 

For my simple scenario, If PortfolioTypeCode is either 'Mutual Fund' or 'Pooled Fund' it should return 1 Else it should return 0.

Since you can't have CASE WHEN rather using case as Expression, the code will look like below:

  case( PortfolioTypeCode=='Mutual Fund',1,

       PortfolioTypeCode=='Pooled Fund',1,0)