Tuesday, 14 February 2023

Demonstrating spark project on Docker

Hello everyone! In this article, we will discuss about few ecosystems which are widely used in big data domain. All stacks I have designed over docker contains and also I will give you little a bit basic understanding about the docker.

Let’s start with high level project understanding……



Here we have created four ecosystems on docker containers which are given below:

1)      Apache Airflow: This is used for orchestrating and scheduling the jobs.

2)      LIVY: We are using this to submit the spark job.

3)      Apache Spark: This is using for doing some data processing (Aggregations & manipulating the records) as per the business requirements.

4)      MySQL: This is our destination. This might be different, could be object storage (s3, gcs etc.), RDMS(oracle, mysql, postgres etc) or local filesystem as per your use case.


Let’s set-up the environments on docker containers. Firstly we are going to create environment for the Apache Airflow using docker-compose.yaml.


# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.3.4
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-airflow-sqlserver}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
expose:
- 8998
- 7077
- 4040
ports:
- 9099:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
# or by explicitly targeted on the command line e.g. docker-compose up flower.
# See: https://docs.docker.com/compose/profiles/
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- 5559:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes:
postgres-db-volume:

Copy the above code snippet and store in a docker-compose file and execute the below command on terminal or cmd.


docker-compose -f docker-compose.yaml up -d
view raw docker_cmd.txt hosted with ❤ by GitHub

Once Airflow services started in different containers then try to access the airflow webserver in web browser. In my case I am running docker in local and defined 9099 port for airflow webserver.


http://localhost:9099

default User: airflow

default password: airflow


Perfect! We have successfully built Apache Airflow setup on docker container. Now let’s jump on Spark and Livy setup.

Below is yaml file  snippet for creating Apache Spark and Apache Livy containers. We need to execute yaml file using command.

version: '2'
services:
spark-master:
image: renien/spark-stand-alone
command: bin/spark-class org.apache.spark.deploy.master.Master -h master
hostname: master
container_name: spark-master
environment:
MASTER: spark://master:7077
SPARK_CONF_DIR: /conf
SPARK_PUBLIC_DNS: localhost
SPARK_DIVER_CORES: 1
SPARK_DRIVER_MEMORY: 1g
expose:
- 7001
- 7002
- 7003
- 7004
- 7005
- 7077
- 6066
- 9099
ports:
- 6066:6066
- 7077:7077
- 8080:8080
volumes:
- ./conf/master:/conf
- ./data:/tmp/data
spark-worker-1:
image: renien/spark-stand-alone
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
container_name: spark-worker-1
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 1
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8881
SPARK_WORKER_WEBUI_PORT: 8081
SPARK_PUBLIC_DNS: localhost
expose:
- 7012
- 7013
- 7014
- 7015
- 8881
ports:
- 8081:8081
volumes:
- ./conf/worker:/conf
- ./data:/tmp/data
livy:
image: renien/spark-stand-alone-livy
container_name: livy
environment:
- SPARK_CONF_DIR=/conf
- SPARK_DIVER_CORES=1
- SPARK_DRIVER_MEMORY=1g
- SPARK_MASTER_ENDPOINT=master
- SPARK_MASTER_PORT=7077
- LIVY_FILE_LOCAL_DIR_WHITELIST=/opt/jars
expose:
- 9099
- 8080
ports:
- 8998:8998
- 4040:4040
volumes:
- $PWD/jars:/opt/jars
- $PWD/conf/livy/livy.conf.extra:/usr/local/bin/livy.conf.extra
depends_on:
- "spark-master"
- "spark-worker-1"
docker-compose -f docker-spark-livy.yaml up -d
view raw command hosted with ❤ by GitHub


we have successfully built spark and Livy setup also. Now we need to test the environments, for that we are going to do the spark submit using Livy and also we will schedule the spark submit job from Apache airflow to Apache Livy and code will execute on spark worker nodes and writing the outcome to mysql database.

Copy the pysaprk code from local to Livy machine to path using below command.


Now login to airflow webserver node and create a DAG python file into dags directory.

'''
Scripts: Read record from csv file and load to mysql
@author: Mr. Ravi Kumar
'''
from os import truncate
def create_session():
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Read csv and load to mysql").getOrCreate()
sc=spark.sparkContext
return spark, sc
def read_csv(spark, sc):
path=r'C:\Users\ravi\dataset'
#files
file1='links.csv'
file2='movies.csv'
file3='ratings.csv'
file4='tags.csv'
#dataframes
#movieId|imdbId|tmdbId
link_df=spark.read.csv(path +"\\"+file1, header="True", inferSchema="True")
#movieId|title|genres
movies_df=spark.read.csv(path +"\\"+file2, header="True", inferSchema="True")
#userId|movieId|rating|timestamp
ratings_df=spark.read.csv(path +"\\"+file3, header="True", inferSchema="True")
#userId|movieId|tag|timestamp
tags_df=spark.read.csv(path +"\\"+file4, header="True", inferSchema="True")
return link_df, movies_df, ratings_df, tags_df
#Question: movie name, year, timestamp and genres having rating more than and equal to 5.
def calculation(spark, sc):
import pyspark.sql.functions as F
import pyspark.sql.types as T
link_df, movies_df, ratings_df, tags_df=read_csv(spark, sc)
link_df.registerTempTable("link")
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
tags_df.registerTempTable("tags")
output=spark.sql(""" SELECT movies.movieId, movies.title, movies.genres, ratings.userId, ratings.rating, ratings.timestamp FROM movies inner join ratings on movies.movieId=ratings.movieId where rating>=5 """)
final=output.withColumn("timestamp", F.from_unixtime(output['timestamp'],"MM-dd-yyyy HH:mm:ss"))
return final
def write_mysql(spark, sc):
final=calculation(spark,sc)
#mysql connection details
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/test"
user = "root"
pwd = "India@123"
final.write.format("jdbc").option("driver", driver)\
.option("url", url)\
.option("dbtable", "movies_ratings")\
.option("user", user)\
.option("password", pwd)\
.save()
if __name__ == '__main__':
spark, sc=create_session()
read_csv(spark, sc)
calculation(spark,sc)
write_mysql(spark, sc)
view raw imdb_pyspark.py hosted with ❤ by GitHub
docker cp imdb_pyspark.py livy:/opt/jars/
view raw command hosted with ❤ by GitHub
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'Ravi',
'start_date': days_ago(2)
}
second_dag = DAG(
dag_id='second_dag',
default_args=default_args,
schedule_interval='0 0 * * *',
catchup=False
)
start_task = DummyOperator(task_id='start_task1', dag=second_dag)
livy_spark_submit = BashOperator(
task_id='spark-submit',
bash_command="spark-submit.sh",
dag=second_dag
)
end_task = DummyOperator(task_id='end_task', dag=second_dag)
start_task >> livy_spark_submit >> end_task
view raw spark_livy.py hosted with ❤ by GitHub
#!/bin/bash
#/opt/jars/test.py -> file location of livy local path
#Livy machine ip and port http://172.19.0.4:8998
curl POST --data '{"file": "file:/opt/jars/spark_livy.py", "className": "org.apache.spark.examples.SparkPi"}' -H "Content-Type: application/json" http://172.19.0.4:8998/batches/
view raw spark-submit.sh hosted with ❤ by GitHub

It will look like below snapshot and we are good to trigger the job.


That’s it …. we have validated all the integration, further we are able to build the spark code as per business requirement and for the automation we have to follow same procedure.

Related Posts:

  • Creating RDD from Databases Hello Everyone! in this post we are going to discuss about “How we can create RDD from databases”. As we can that, there are multiple ways to cr… Read More
  • Introduction to Apache Spark Hello folks, today we will explore some basic and important things about Apache Spark. In this post we will only focus on Introduction part abou… Read More
  • Lazy Evaluation in Spark Hello Everyone! Today we will discuss about “Lazy Evaluation” in Spark. When I was started Spark and at the first time when I read Lazy Eval… Read More
  • Operations on RDDs Hello everyone! In this article, we are about to explore, how many operations we can perform on RDDs. As usual, we do focus on practical a… Read More
  • Deployment Modes in Spark Hello folks!! If you are learning Apache Spark, you will need deploy the Spark on machine(s). Before Using the Apache Spark, you must f… Read More

1 comment: