Friday 28 April 2023

Spark internals

 


Spark is a distributed data processing engine and every spark application itself is distributed data processing engine. Spark need a cluster along with a resource manager to perform distributed computation. As of now, Spark supports the below cluster technologies.

·       Standalone

·       YARN

·       Kubernetes

·       Mesos

       But in this article, we are assuming that we have a YARN resource manager to manage the spark cluster. Below is the high-level architecture of the spark cluster.


 

Let’s try to understand how Spark works internally. This is an important concept which helps us in every aspect when you dealing with large amounts of data. I will try to make it very simple.

 

Firstly, we need to know how can we execute the Spark application, there are two ways to run the Spark application

1)      Interactive mode

2)      Batch mode

 Interactive mode, generally we do for testing purposes because we need immediate output after each instruction. We can do this by using spark-shell or Jupyter Notebook.

Batch mode, we perform in production when the spark application is fully completed. It executes completely and generates the final output as per business requirements. We can do this by using the spark-submit command.

Now let’s come to the above diagram, here you can see that programmer doing the spark submit. Spark is written in Scala (Scala is a JVM language) which means Spark's native language is Scala.

Let’s suppose I want to execute the Pyspark application then we have to use spark-submit command on the terminal. Then our request will be submitted to the resource manager then YARN RM  randomly select a worker node and creates one Application Master(AM) container and start the main method in the AM container. Again here raised one more question what is a container? Container is an isolated virtual run-time environment. Here is a question if Spark is written in Scala, then How Spark can execute other languages like Python, Java & R?

Let’s come inside the Application master container. AM Container is responsible to execute the main method of application but here we have a Python application that’s a Pyspark application. In the case of the Pyspark application, AM container consists Pyspark driver & JVM driver. When we submit the Pyspark application the Pyspark driver executes the main method of the Pyspark application because Pysaprk is designed to execute the main () of the Pysaprk application. Then Pyspark starts the JVM main method with the help of Py4j.

Refer to the below two diagrams for better visualization.





Once JVM main method started executing the AM request to the YARN resource manager for executors.

Then creates executors as per configuration and returns all details of executors to the AM.




In conclusion, Spark helps us break down intensive and high-computational jobs into smaller, more concise tasks which are then executed by the worker nodes. It also achieves the processing of real-time or archived data using its basic architecture.


Tuesday 14 February 2023

Demonstrating spark project on Docker

Hello everyone! In this article, we will discuss about few ecosystems which are widely used in big data domain. All stacks I have designed over docker contains and also I will give you little a bit basic understanding about the docker.

Let’s start with high level project understanding……



Here we have created four ecosystems on docker containers which are given below:

1)      Apache Airflow: This is used for orchestrating and scheduling the jobs.

2)      LIVY: We are using this to submit the spark job.

3)      Apache Spark: This is using for doing some data processing (Aggregations & manipulating the records) as per the business requirements.

4)      MySQL: This is our destination. This might be different, could be object storage (s3, gcs etc.), RDMS(oracle, mysql, postgres etc) or local filesystem as per your use case.


Let’s set-up the environments on docker containers. Firstly we are going to create environment for the Apache Airflow using docker-compose.yaml.



Copy the above code snippet and store in a docker-compose file and execute the below command on terminal or cmd.


Once Airflow services started in different containers then try to access the airflow webserver in web browser. In my case I am running docker in local and defined 9099 port for airflow webserver.


http://localhost:9099

default User: airflow

default password: airflow


Perfect! We have successfully built Apache Airflow setup on docker container. Now let’s jump on Spark and Livy setup.

Below is yaml file  snippet for creating Apache Spark and Apache Livy containers. We need to execute yaml file using command.


we have successfully built spark and Livy setup also. Now we need to test the environments, for that we are going to do the spark submit using Livy and also we will schedule the spark submit job from Apache airflow to Apache Livy and code will execute on spark worker nodes and writing the outcome to mysql database.

Copy the pysaprk code from local to Livy machine to path using below command.


Now login to airflow webserver node and create a DAG python file into dags directory.

It will look like below snapshot and we are good to trigger the job.


That’s it …. we have validated all the integration, further we are able to build the spark code as per business requirement and for the automation we have to follow same procedure.

Tuesday 21 September 2021

Demonstration Of Catalyst Optimizer

 


Hello Everyone! In this post, we will discuss about one of the advance topic in the Spark and it is very confusing concept for the spark beginners and recently people has started to ask questions based on this this topic and they want to understand that what is your knowledge about how spark internally handles your spark SQL and how spark internally handle your commands, whatever you have written as part of spark code.  So generally, they start with this question like what is catalyst optimizer.

In this article, initially I will try to understand you about Catalyst Optimizer through theory, later I will give you demonstration about this concept.

Let us start …



What happens if you are not a very experienced developer and your code will not be optimized and it will not give you very good results but if you use dataframe and dataset it gives you far better performance because SPARK internally takes care of optimizing your code so optimizing your code and creating RDD execution plan which will run fast.

Catalyst optimizer is a component in spark, which is used for this optimization your input can be coming from spark SQL, dataframe, dataset or some other things.  When submit your code to spark it builds query plan. It uses catalyst optimizer to create optimized query plan and then optimized query plan is used to generate a RDD code.


Now I am going to discuss all the above phases in details. Let us discuss each phase one by one.

1)     Unresolved Logical Plan:

In this phase, we submit Spark SQL query as an input. Let us suppose if we are using dataframe or spark SQL and then we submit a query to spark in form of unresolved logical plan.

2)     Logical Plan:

In this phase, firstly spark analyzes syntax, columns so on then it convert the spark query in the form of a tree.  Spark operations we have defined on RDD it converts those chain SQL operations into a tree and as you know first leaf nodes are regulated and then the parent nodes.

3)     Optimized Logical Plan:

In this phase, spark gets SQL query in the form of tree after that spark execute whole tree until it consumes the whole tree nodes.  In this phase we start with the logical plan tree so spark catalyst optimizer always talks in terms of trees so it applies transformation on a tree and generates a new tree so in the end of multiple transformations, we will get most optimized tree.

4)     Physical Plans:

In this phase, Spark check what kind of Joins or filter or aggregation operations you have applied on data.  After that spark, evaluate the cost and used resources by all the operations.

5)     Select Physical Plan:

In this phase, Spark select only cost and resource effective operations and the physical plan convert into RDD code then finally executed by Spark.

Let us demonstrated each phase:

Explain ():

Specifies the expected output format of plans.

  • Simple: Print only a physical plan.


  • Extended: Print both logical and physical plans.


  • codegen: Print a physical plan and generated codes if they are available.


  • cost: Print a logical plan and statistics if they are available.
  • formatted: Split explain output into two sections: a physical plan outline and node details.





Saturday 15 May 2021

ETL Pipeline Orchestration on Apache Airflow

 

Hello Everyone! Today we will discussion about the one of most important and interesting activity which is play a vital role in data engineering area. If you are data engineer or you sit around data lover then you must heard about ETL terminology. If you did not hear then no need to worries about. In this article, we are going to discuss about it in detail.


Before knowing about the ETL process, we need to know why this activity became as necessity these days? And after ETL, what we can achieve? As we know that there are multiple applications we have and every application generating or consume the data but it does not necessary every application will provide you data as per your required format so in this case you have to put some extra effort to convert the data as per your desire.


ETL refers to Extraction, Transform and Load. ETL operations are often performed by fit-for-purpose tools that have been on the market for a long time, and sometimes by custom in-house programs.


  • Extracts data from homogeneous or heterogeneous data sources.

  • Transforms the data for storing it in proper format or structure for querying and analysis purpose.

  • Load it into the final target (database, more specifically, operational data store, data mart, or data warehouse)

Let’s discuss about Big data platform.


Here I am using three nodes of CentOS cluster and I deployed Hadoop with Apache Spark, Apache Hive, Apache Airflow and Jupyter Notebook.

 


 


 

As you can see in ETL high level architecture there are four phases; source, staging, transform and publish. Let discuss about these four phases.





1) Source: In this phase, we have flat files and this file we can get from any applications but in my case this is available on my local Linux machine and we are going to copy to table hdfs location.


2) Staging(RAW Layer database): In this phase, we are going to create table in hive and defining all columns data types as String.



3) Transform(Transform Layer database): In this phase, we are going to transform the data types of raw layer table as data dictionary. Data dictionary is defined by business so we need to transform the data types of table’s columns as data dictionary.



4) Publish: This is final database layer which will be available for the analyzing purposes.

 

 

All the above phase, we will create the workflow on Apache Airflow.






Below is the Apache airflow DAG code


Step 1: create table in raw database.



Step 2: transfer the datasets into the hdfs raw databases table directory. you can get the code for this step in airflow etl.py file.

          

Step 3: create table into transform database as per data dictionary.


Step 4: pull the from raw database, type cast as per data dictionary and insert into transform database.


Step 5: create table into publish layer database.


Step 6: Insert all records from transform layer database to Publish layer database.



Data Validation:

 

Raw Layer:

 


Transform Layer:


Publish Layer:

 




Finally we have successfully completed the fully automated ETL activity :) .

 

Wednesday 12 May 2021

Operations on Dataframe

 

Hello Everyone! In this post, I will be demonstrate some operations which are very important to know when you are dealing with Dataframe. 

 

 


As we get data from different sources in different form of data like Structured, semi-Structured and Unstructured and we need to be sure about data quality before visualization. It might change to get inconsistent, incomplete, ambiguous and duplicate data and we can’t get meaningful information from the raw data so in this case we need to use the some operations to manipulate the data.


Here I will load datasets into Dataframe and apply some operations on it.

So let’s start ….

Before performing the operations on Dataframe we need to bring the data into Dataframe.

I am going to create spark session and after i will load data into DataFrame

 

1) How to project columns?


There are three ways to project the columns from Dataframe.

1st Method:

 


2nd Method:

3rd Method:


2) How we can retrieve the desire records from the Datafram?

There are two methods filter(condition) and b) where(condition) through these method we can project only those records which are satisfying the conditions.

For example:

In the below example i am using where with some conditions.


In the below example, i am using filter method with some conditions.


3) How we can merge two columns into single column?

For this activity, i am going to load student datasets into new datframe df2. In this dataframe we can see that for student name we have two separate two coulmns; first_name and last_name. Now i am going to merge into single.

Here i am using concatenate method to merge  the columns.


4) How we can rename the column in dataframe?

Let's suppose, we have an datasets having columns name in not proper manner. there are two options to define the schema

 i) we can infer the schema from datasets and rename the column name as per you requirement but infer schema is not recommended for large datasets.

Let see the below example, how we can rename the column.

 Before renaming the dataframe column:


 After renaming the dataframe column:


 

 

ii) Define the schema and pass schema to dataframe when you are creating dataframe.

For example:






Sunday 19 July 2020

Understanding about How does Spark work?


Hello everyone! Today we are going to talk about the components of Apache Spark. Spark contains several components and each component has specific role in executing the Spark program.


First discuss about the components and its responsibilities then we will deep dive into the workflow of each component.



1)      Worker: It is one of the nodes which belong to Cluster and called slave. Workers have resources (CPU+RAM) for executing the task.

2)      Cluster Manager: It have all information about the workers. It aware about the available resource in workers. According to resource availability, decides to distribute the particular to Workers. As we know that Spark have embedded resource manager and we can use when we deploy the Spark on Standalone mode. Apart from standalone mode, we can use customize resource manager like Local, YARN, Mesos and Kubernetes.
3)      Driver:  As the part of the Spark application responsible for instantiating a SparkSession, the Spark driver has multiple roles: it communicates with the cluster manager; it requests
resources (CPU, memory, etc.) from the cluster manager for Spark’s executors (JVMs); and it transforms all the Spark operations into DAG computations, schedules them, and distributes their execution as tasks across the Spark executors. Once the resources are allocated, it communicates directly with the executors.

Question: What is difference between SparkSession and SparkContext?
Answer: Prior to Spark 2.0, entry point for the Spark applications included the SparkContext, used for Spark apps; SQLContext, HiveContext, StreamingContext. In Spark application, we have only one SparkContext.

The SparkSession object introduced in Spark 2.0 combines all these objects into single entry point that can be used for all Spark application. SparkSession object contains all the runtime configuration properties set by the user such as the master, application name, numbers of executors and more.

4)      Executors: A Spark executor runs on each worker node in the cluster. The executors communicate with the driver program and are responsible for executing tasks on the workers. In most deployments’ modes, only a single executor runs per node.

5)      Tasks: Small unit of job which is perform by SparkContext.



SparkSession/SparkContext initiates on the client machine and make the connection to master. Whenever we perform any Spark operation it will communicate to master and logically directed acyclic graph called DAG. master has all information about resources of workers. Now the driver talks to the cluster manager and negotiates the resources. Cluster manager launches executors in worker nodes on behalf of the driver. At this point, the driver will send the tasks to the executors based on data placement. When executors start, they register themselves with drivers. So, the driver will have a complete view of executors that are executing the task. During the execution of tasks, driver program will monitor the set of executors that runs. Driver node also schedules future tasks based on data placement.

Monday 8 June 2020

Understanding about the Apache Nifi through the Practical Approach


Hello data lover! Today we are going to discuss about the most popular ecosystem which is Apache Nifi or Niagara files. If you are from data Science background you must heard about it once. I had discussion about Apache Nifi with the many people and most of people said to me it is very high level and quite complex technology, but I am going to prove them wrong through some demos. So, without wasting time let us start.


Before starting the Apache Nifi, we need to be clear about some basic concepts like Data Flow, Data Pipeline & ETL.
Let us discuss one by one.
Data Flow:
·         Moving data/content from Source to destination.
·         Data can be csv, JSON, XML, HTTP data, Image, Videos, Telemetry data, etc.
Data Pipeline:
Movement and transformation of data/content from source to destination.
ETL:
·         E: E stands for Extract.
·         T: T stands for Transformation.

·         L: L stands for Load.
·


Installation:
We will not discuss more about the installation process, it is quite simple Just need to download the tar file and put into the desire directory (Win/Linux/Unix). After decompressed, it will be look like below Screenshot.

How to start the Apache Nifi?
In my case, I installed Apache Nifi on Window machine. If you want to start the Nifi then we need to go \bin directory. Here you can see there are 6 files some files are .bat files and others are .sh files. As we know that .bat files are for windows and .sh files for Linux/Unix environments.

For window Machine:
Path: ApacheNifi\nifi-1.10.0-bin\nifi-1.10.0\bin
Command: run-nifi
For Linux/Unix:
Path: ApacheNifi/nifi-1.10.0-bin/nifi-1.10.0/bin
Command: nifi start


Apache Nifi Console Overview:
I am not going make this article more theorical that’s why I am not going to describe all components here. Click to read more about it.



Now we have enough information to create our first data flow. I am so excited for it. 😊
Lab1: In this activity, we will get Real-Time data from weather website and will it on desire directory.
Here we are using three processors for this activity:
1)      InvokeHTTP 1.10.0: This is for fetch the data from api of website.

2)      UpdateAttribute 1.10.0: this is for convert the data into specific format, here I am storing the files in JSON format.
3)      PutFile 1.10.0: This is used for saving the data into destination directory.
Note: Every Processors have three configuration tabs; Settings, Scheduling, Properties and Comments. According to your requirement you can configure the processor.


Below is screenshot of the dataflow.




Lab2: Customize the Logo in Apache Nifi:

As per organization or Project we can changes in Apache Nifi and Logo customization is one of them. Here I am going to put my photo as logo.

Steps:

1)      Modify the pixel specification of the logo picture using paint (61x90) and save it as png file.

2)      Go to the path \ApacheNifi\nifi-1.10.0-bin\nifi-1.10.0\lib and find nifi-framework-nar-1.x.x.nar.

3)      Use 7-Zip to open the nifi-framework-nar-1.x.x.nar and it will look like below









4)      Go to C: \ApacheNifi\nifi-1.10.0-bin\nifi-1.10.0\lib\nifi-framework-nar-1.10.0.nar\META-INF\bundled-dependencies\  and find nifi-web-ui-1.10.0.war file and open it.

5)      Go to image directory and past the logo png file.

6)      Now go to C: \ApacheNifi\nifi-1.10.0-bin\nifi-1.10.0\lib\nifi-framework-nar-1.10.0.nar\META-INF\bundled-dependencies\nifi-web-ui-1.10.0.war\css\ and find nf-canvas-all.css.gz and find the logo image path link and replace original nifi logo to customize logo png file and save it.

7)      Restart the Apache Nifi and you can able to see the customize logo.


Lab3: Customize the Title in Apache Nifi:

Steps:

1)      Go to the C: \ApacheNifi\nifi-1.10.0-bin\nifi-1.10.0\Conf

2)      Edit the flow.xml file

3)      Find the Flow-Nifi and replace with your own customize text and save it.

4)      Restart the Apache Nifi.





Data Provenance Page Overview:
We can enable tracking data flows from beginning to end. You can find the Data provenance page at Top Right corner.




Security in Nifi:


Apache Nifi supports SSL, SSH, HTTPS, encrypted content, and more. Provides multi-tenant authorization and internal policy management. You can enable security according to your requirement in Apache Nifi. We will discuss about the Security in separate post because important.
Happy Learning!