Saturday, 15 May 2021

ETL Pipeline Orchestration on Apache Airflow

 

Hello Everyone! Today we will discussion about the one of most important and interesting activity which is play a vital role in data engineering area. If you are data engineer or you sit around data lover then you must heard about ETL terminology. If you did not hear then no need to worries about. In this article, we are going to discuss about it in detail.


Before knowing about the ETL process, we need to know why this activity became as necessity these days? And after ETL, what we can achieve? As we know that there are multiple applications we have and every application generating or consume the data but it does not necessary every application will provide you data as per your required format so in this case you have to put some extra effort to convert the data as per your desire.


ETL refers to Extraction, Transform and Load. ETL operations are often performed by fit-for-purpose tools that have been on the market for a long time, and sometimes by custom in-house programs.


  • Extracts data from homogeneous or heterogeneous data sources.

  • Transforms the data for storing it in proper format or structure for querying and analysis purpose.

  • Load it into the final target (database, more specifically, operational data store, data mart, or data warehouse)

Let’s discuss about Big data platform.


Here I am using three nodes of CentOS cluster and I deployed Hadoop with Apache Spark, Apache Hive, Apache Airflow and Jupyter Notebook.

 


 


 

As you can see in ETL high level architecture there are four phases; source, staging, transform and publish. Let discuss about these four phases.





1) Source: In this phase, we have flat files and this file we can get from any applications but in my case this is available on my local Linux machine and we are going to copy to table hdfs location.


2) Staging(RAW Layer database): In this phase, we are going to create table in hive and defining all columns data types as String.



3) Transform(Transform Layer database): In this phase, we are going to transform the data types of raw layer table as data dictionary. Data dictionary is defined by business so we need to transform the data types of table’s columns as data dictionary.



4) Publish: This is final database layer which will be available for the analyzing purposes.

 

 

All the above phase, we will create the workflow on Apache Airflow.






Below is the Apache airflow DAG code

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'Ravi',
'start_date': days_ago(2)
}
dag = DAG(
dag_id='etl_demo',
default_args=default_args,
schedule_interval='0 0 * * *',
catchup=False
)
create_raw_ddl = BashOperator(
task_id='create_raw_ddl',
bash_command='beeline -u jdbc:hive2://10.128.0.14:10000 root kskks -f /root/airflow/dags/raw_ddl.hql',
dag=dag
)
bucket_to_hdfs = BashOperator(
task_id='bucket_to_hdfs',
bash_command='sudo -u hdfs hdfs dfs -chmod 777 /warehouse/tablespace/external/hive/;sudo -u hdfs hdfs dfs -chown -R root /warehouse/tablespace/external/hive/;hdfs dfs -put -f /root/airflow/dags/OnlineRetail.csv /warehouse/tablespace/external/hive/raw.db/retail/',
dag=dag
)
Repaire_Raw_Table = BashOperator(
task_id='Repaire_Raw_Table',
bash_command='beeline -u jdbc:hive2://10.128.0.14:10000 root kskks -f /root/airflow/dags/raw_msck.hql ',
dag=dag
)
create_transform_ddl = BashOperator(
task_id='create_transform_ddl',
bash_command='beeline -u jdbc:hive2://10.128.0.14:10000 root kskks -f /root/airflow/dags/transform_ddl.hql ',
dag=dag
)
load_transform = BashOperator(
task_id='load_transform',
bash_command='beeline -u jdbc:hive2://10.128.0.14:10000 root kskks -f /root/airflow/dags/load_transform.hql ',
dag=dag
)
publish_ddl = BashOperator(
task_id='publish_ddl',
bash_command='beeline -u jdbc:hive2://10.128.0.14:10000 root kskks -f /root/airflow/dags/publish_ddl.hql ',
dag=dag
)
load_publish = BashOperator(
task_id='load_publish',
bash_command='beeline -u jdbc:hive2://10.128.0.14:10000 root kskks -f /root/airflow/dags/load_publish.hql ',
dag=dag
)
create_raw_ddl >> bucket_to_hdfs >> Repaire_Raw_Table >> create_transform_ddl >> load_transform >> publish_ddl >> load_publish
if __name__ == "__main__":
dag.cli()
view raw etl.py hosted with ❤ by GitHub


Step 1: create table in raw database.


drop table raw.retail;
drop database raw;
create database raw;
drop table if exists raw.retail;
CREATE EXTERNAL TABLE raw.retail(invoiceno string, stockcode string, description string, quantity string, invoicedate string, unitprice string, customerid string, country string)
row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile tblproperties('serialization.null.format'='','skip.header.line.count'='1');


Step 2: transfer the datasets into the hdfs raw databases table directory. you can get the code for this step in airflow etl.py file.

          

Step 3: create table into transform database as per data dictionary.


drop table retail_transform.retail;
drop database retail_transform;
create database retail_transform;
drop table if exists retail_transform.retail;
CREATE EXTERNAL TABLE retail_transform.retail(invoiceno string, stockcode string, description string, quantity bigint, invoicedate timestamp, unitprice decimal, customerid bigint, country string)
row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile tblproperties('serialization.null.format'='','skip.header.line.count'='1');

Step 4: pull the from raw database, type cast as per data dictionary and insert into transform database.


set hive.execution.engine=spark;
INSERT OVERWRITE TABLE retail_transform.retail
SELECT
case when invoiceno = '' then '' else cast(invoiceno as string) end as invoiceno,
case when stockcode = '' then '' else cast(stockcode as string) end as stockcode,
case when description = '' then '' else cast(description as string) end as description,
case when quantity = '' then '' else cast(quantity as bigint) end as quantity,
case when invoicedate = '' then NULL else from_unixtime(unix_timestamp(invoicedate, 'dd-MM-yyyy' )) end as invoicedate,
case when unitprice = '' then '' else cast(unitprice as decimal) end as unitprice,
case when customerid = '' then '' else cast(customerid as bigint) end as customerid,
case when country = '' then '' else cast(country as string) end as country
from raw.retail;
MSCK REPAIR TABLE retail_transform.retail;
view raw load_transform hosted with ❤ by GitHub

Step 5: create table into publish layer database.


drop table retail_publish.retail;
drop database retail_publish;
create database retail_publish;
drop table if exists retail_publish.retail;
CREATE EXTERNAL TABLE retail_publish.retail(invoiceno string, stockcode string, description string, quantity bigint, invoicedate timestamp, unitprice decimal, customerid bigint, country string)
row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile tblproperties('serialization.null.format'='','skip.header.line.count'='1');

Step 6: Insert all records from transform layer database to Publish layer database.


set hive.execution.engine=spark;
INSERT OVERWRITE TABLE retail_publish.retail
SELECT
case when invoiceno = '' then '' else cast(invoiceno as string) end as invoiceno,
case when stockcode = '' then '' else cast(stockcode as string) end as stockcode,
case when description = '' then '' else cast(description as string) end as description,
case when quantity = '' then '' else cast(quantity as bigint) end as quantity,
case when invoicedate = '' then NULL else from_unixtime(unix_timestamp(invoicedate, 'dd-MM-yyyy' )) end as invoicedate,
case when unitprice = '' then '' else cast(unitprice as decimal) end as unitprice,
case when customerid = '' then '' else cast(customerid as bigint) end as customerid,
case when country = '' then '' else cast(country as string) end as country
from retail_transform.retail;
MSCK REPAIR TABLE retail_publish.retail;
view raw load_to_publish hosted with ❤ by GitHub


Data Validation:

 

Raw Layer:

 


Transform Layer:


Publish Layer:

 




Finally we have successfully completed the fully automated ETL activity :) .

 

Related Posts:

0 comments:

Post a Comment