Hello everyone! In this article, we will discuss about few ecosystems which are widely used in big data domain. All stacks I have designed over docker contains and also I will give you little a bit basic understanding about the docker.
Let’s start with high level project understanding……
Here we have created four ecosystems on docker containers which
are given below:
1)
Apache Airflow: This is used for orchestrating
and scheduling the jobs.
2)
LIVY: We are using this to submit the spark
job.
3)
Apache Spark: This is using for doing
some data processing (Aggregations & manipulating the records) as per the
business requirements.
4)
MySQL: This is our destination. This
might be different, could be object storage (s3, gcs etc.), RDMS(oracle, mysql,
postgres etc) or local filesystem as per your use case.
Let’s set-up the environments on docker containers. Firstly
we are going to create environment for the Apache Airflow using docker-compose.yaml.
# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an | |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the | |
# specific language governing permissions and limitations | |
# under the License. | |
# | |
# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. | |
# | |
# WARNING: This configuration is for local development. Do not use it in a production deployment. | |
# | |
# This configuration supports basic configuration using environment variables or an .env file | |
# The following variables are supported: | |
# | |
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. | |
# Default: apache/airflow:2.3.4 | |
# AIRFLOW_UID - User ID in Airflow containers | |
# Default: 50000 | |
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode | |
# | |
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). | |
# Default: airflow | |
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). | |
# Default: airflow | |
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. | |
# Default: '' | |
# | |
# Feel free to modify this file to suit your needs. | |
--- | |
version: '3' | |
x-airflow-common: | |
&airflow-common | |
# In order to add custom dependencies or upgrade provider packages you can use your extended image. | |
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml | |
# and uncomment the "build" line below, Then run `docker-compose build` to build the images. | |
image: ${AIRFLOW_IMAGE_NAME:-airflow-sqlserver} | |
# build: . | |
environment: | |
&airflow-common-env | |
AIRFLOW__CORE__EXECUTOR: CeleryExecutor | |
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow | |
# For backward compatibility, with Airflow <2.3 | |
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow | |
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow | |
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 | |
AIRFLOW__CORE__FERNET_KEY: '' | |
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' | |
AIRFLOW__CORE__LOAD_EXAMPLES: 'true' | |
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' | |
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} | |
volumes: | |
- ./dags:/opt/airflow/dags | |
- ./logs:/opt/airflow/logs | |
- ./plugins:/opt/airflow/plugins | |
user: "${AIRFLOW_UID:-50000}:0" | |
depends_on: | |
&airflow-common-depends-on | |
redis: | |
condition: service_healthy | |
postgres: | |
condition: service_healthy | |
services: | |
postgres: | |
image: postgres:13 | |
environment: | |
POSTGRES_USER: airflow | |
POSTGRES_PASSWORD: airflow | |
POSTGRES_DB: airflow | |
volumes: | |
- postgres-db-volume:/var/lib/postgresql/data | |
healthcheck: | |
test: ["CMD", "pg_isready", "-U", "airflow"] | |
interval: 5s | |
retries: 5 | |
restart: always | |
redis: | |
image: redis:latest | |
expose: | |
- 6379 | |
healthcheck: | |
test: ["CMD", "redis-cli", "ping"] | |
interval: 5s | |
timeout: 30s | |
retries: 50 | |
restart: always | |
airflow-webserver: | |
<<: *airflow-common | |
command: webserver | |
expose: | |
- 8998 | |
- 7077 | |
- 4040 | |
ports: | |
- 9099:8080 | |
healthcheck: | |
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] | |
interval: 10s | |
timeout: 10s | |
retries: 5 | |
restart: always | |
depends_on: | |
<<: *airflow-common-depends-on | |
airflow-init: | |
condition: service_completed_successfully | |
airflow-scheduler: | |
<<: *airflow-common | |
command: scheduler | |
healthcheck: | |
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] | |
interval: 10s | |
timeout: 10s | |
retries: 5 | |
restart: always | |
depends_on: | |
<<: *airflow-common-depends-on | |
airflow-init: | |
condition: service_completed_successfully | |
airflow-worker: | |
<<: *airflow-common | |
command: celery worker | |
healthcheck: | |
test: | |
- "CMD-SHELL" | |
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' | |
interval: 10s | |
timeout: 10s | |
retries: 5 | |
environment: | |
<<: *airflow-common-env | |
# Required to handle warm shutdown of the celery workers properly | |
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation | |
DUMB_INIT_SETSID: "0" | |
restart: always | |
depends_on: | |
<<: *airflow-common-depends-on | |
airflow-init: | |
condition: service_completed_successfully | |
airflow-triggerer: | |
<<: *airflow-common | |
command: triggerer | |
healthcheck: | |
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] | |
interval: 10s | |
timeout: 10s | |
retries: 5 | |
restart: always | |
depends_on: | |
<<: *airflow-common-depends-on | |
airflow-init: | |
condition: service_completed_successfully | |
airflow-init: | |
<<: *airflow-common | |
entrypoint: /bin/bash | |
# yamllint disable rule:line-length | |
command: | |
- -c | |
- | | |
function ver() { | |
printf "%04d%04d%04d%04d" $${1//./ } | |
} | |
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) | |
airflow_version_comparable=$$(ver $${airflow_version}) | |
min_airflow_version=2.2.0 | |
min_airflow_version_comparable=$$(ver $${min_airflow_version}) | |
if (( airflow_version_comparable < min_airflow_version_comparable )); then | |
echo | |
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" | |
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" | |
echo | |
exit 1 | |
fi | |
if [[ -z "${AIRFLOW_UID}" ]]; then | |
echo | |
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" | |
echo "If you are on Linux, you SHOULD follow the instructions below to set " | |
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." | |
echo "For other operating systems you can get rid of the warning with manually created .env file:" | |
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user" | |
echo | |
fi | |
one_meg=1048576 | |
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) | |
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) | |
disk_available=$$(df / | tail -1 | awk '{print $$4}') | |
warning_resources="false" | |
if (( mem_available < 4000 )) ; then | |
echo | |
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" | |
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" | |
echo | |
warning_resources="true" | |
fi | |
if (( cpus_available < 2 )); then | |
echo | |
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" | |
echo "At least 2 CPUs recommended. You have $${cpus_available}" | |
echo | |
warning_resources="true" | |
fi | |
if (( disk_available < one_meg * 10 )); then | |
echo | |
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" | |
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" | |
echo | |
warning_resources="true" | |
fi | |
if [[ $${warning_resources} == "true" ]]; then | |
echo | |
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" | |
echo "Please follow the instructions to increase amount of resources available:" | |
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin" | |
echo | |
fi | |
mkdir -p /sources/logs /sources/dags /sources/plugins | |
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} | |
exec /entrypoint airflow version | |
# yamllint enable rule:line-length | |
environment: | |
<<: *airflow-common-env | |
_AIRFLOW_DB_UPGRADE: 'true' | |
_AIRFLOW_WWW_USER_CREATE: 'true' | |
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} | |
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} | |
_PIP_ADDITIONAL_REQUIREMENTS: '' | |
user: "0:0" | |
volumes: | |
- .:/sources | |
airflow-cli: | |
<<: *airflow-common | |
profiles: | |
- debug | |
environment: | |
<<: *airflow-common-env | |
CONNECTION_CHECK_MAX_COUNT: "0" | |
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 | |
command: | |
- bash | |
- -c | |
- airflow | |
# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up | |
# or by explicitly targeted on the command line e.g. docker-compose up flower. | |
# See: https://docs.docker.com/compose/profiles/ | |
flower: | |
<<: *airflow-common | |
command: celery flower | |
profiles: | |
- flower | |
ports: | |
- 5559:5555 | |
healthcheck: | |
test: ["CMD", "curl", "--fail", "http://localhost:5555/"] | |
interval: 10s | |
timeout: 10s | |
retries: 5 | |
restart: always | |
depends_on: | |
<<: *airflow-common-depends-on | |
airflow-init: | |
condition: service_completed_successfully | |
volumes: | |
postgres-db-volume: |
Copy the above code snippet and store in a docker-compose
file and execute the below command on terminal or cmd.
docker-compose -f docker-compose.yaml up -d |
Once Airflow services started in different containers then try
to access the airflow webserver in web browser. In my case I am running docker
in local and defined 9099 port for airflow webserver.
http://localhost:9099
default User: airflow
default password: airflow
Perfect! We have successfully built Apache Airflow setup on
docker container. Now let’s jump on Spark and Livy setup.
Below is yaml file snippet for creating Apache Spark and Apache Livy
containers. We need to execute yaml file using command.
version: '2' | |
services: | |
spark-master: | |
image: renien/spark-stand-alone | |
command: bin/spark-class org.apache.spark.deploy.master.Master -h master | |
hostname: master | |
container_name: spark-master | |
environment: | |
MASTER: spark://master:7077 | |
SPARK_CONF_DIR: /conf | |
SPARK_PUBLIC_DNS: localhost | |
SPARK_DIVER_CORES: 1 | |
SPARK_DRIVER_MEMORY: 1g | |
expose: | |
- 7001 | |
- 7002 | |
- 7003 | |
- 7004 | |
- 7005 | |
- 7077 | |
- 6066 | |
- 9099 | |
ports: | |
- 6066:6066 | |
- 7077:7077 | |
- 8080:8080 | |
volumes: | |
- ./conf/master:/conf | |
- ./data:/tmp/data | |
spark-worker-1: | |
image: renien/spark-stand-alone | |
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077 | |
container_name: spark-worker-1 | |
environment: | |
SPARK_CONF_DIR: /conf | |
SPARK_WORKER_CORES: 1 | |
SPARK_WORKER_MEMORY: 2g | |
SPARK_WORKER_PORT: 8881 | |
SPARK_WORKER_WEBUI_PORT: 8081 | |
SPARK_PUBLIC_DNS: localhost | |
expose: | |
- 7012 | |
- 7013 | |
- 7014 | |
- 7015 | |
- 8881 | |
ports: | |
- 8081:8081 | |
volumes: | |
- ./conf/worker:/conf | |
- ./data:/tmp/data | |
livy: | |
image: renien/spark-stand-alone-livy | |
container_name: livy | |
environment: | |
- SPARK_CONF_DIR=/conf | |
- SPARK_DIVER_CORES=1 | |
- SPARK_DRIVER_MEMORY=1g | |
- SPARK_MASTER_ENDPOINT=master | |
- SPARK_MASTER_PORT=7077 | |
- LIVY_FILE_LOCAL_DIR_WHITELIST=/opt/jars | |
expose: | |
- 9099 | |
- 8080 | |
ports: | |
- 8998:8998 | |
- 4040:4040 | |
volumes: | |
- $PWD/jars:/opt/jars | |
- $PWD/conf/livy/livy.conf.extra:/usr/local/bin/livy.conf.extra | |
depends_on: | |
- "spark-master" | |
- "spark-worker-1" | |
docker-compose -f docker-spark-livy.yaml up -d |
we have successfully built spark and Livy setup also. Now we
need to test the environments, for that we are going to do the spark submit
using Livy and also we will schedule the spark submit job from Apache airflow
to Apache Livy and code will execute on spark worker nodes and writing the
outcome to mysql database.
Copy the pysaprk code from local to Livy machine to path
using below command.
Now login to airflow webserver node and create a DAG python
file into dags directory.
''' | |
Scripts: Read record from csv file and load to mysql | |
@author: Mr. Ravi Kumar | |
''' | |
from os import truncate | |
def create_session(): | |
from pyspark.sql import SparkSession | |
spark=SparkSession.builder.appName("Read csv and load to mysql").getOrCreate() | |
sc=spark.sparkContext | |
return spark, sc | |
def read_csv(spark, sc): | |
path=r'C:\Users\ravi\dataset' | |
#files | |
file1='links.csv' | |
file2='movies.csv' | |
file3='ratings.csv' | |
file4='tags.csv' | |
#dataframes | |
#movieId|imdbId|tmdbId | |
link_df=spark.read.csv(path +"\\"+file1, header="True", inferSchema="True") | |
#movieId|title|genres | |
movies_df=spark.read.csv(path +"\\"+file2, header="True", inferSchema="True") | |
#userId|movieId|rating|timestamp | |
ratings_df=spark.read.csv(path +"\\"+file3, header="True", inferSchema="True") | |
#userId|movieId|tag|timestamp | |
tags_df=spark.read.csv(path +"\\"+file4, header="True", inferSchema="True") | |
return link_df, movies_df, ratings_df, tags_df | |
#Question: movie name, year, timestamp and genres having rating more than and equal to 5. | |
def calculation(spark, sc): | |
import pyspark.sql.functions as F | |
import pyspark.sql.types as T | |
link_df, movies_df, ratings_df, tags_df=read_csv(spark, sc) | |
link_df.registerTempTable("link") | |
movies_df.registerTempTable("movies") | |
ratings_df.registerTempTable("ratings") | |
tags_df.registerTempTable("tags") | |
output=spark.sql(""" SELECT movies.movieId, movies.title, movies.genres, ratings.userId, ratings.rating, ratings.timestamp FROM movies inner join ratings on movies.movieId=ratings.movieId where rating>=5 """) | |
final=output.withColumn("timestamp", F.from_unixtime(output['timestamp'],"MM-dd-yyyy HH:mm:ss")) | |
return final | |
def write_mysql(spark, sc): | |
final=calculation(spark,sc) | |
#mysql connection details | |
driver = "com.mysql.jdbc.Driver" | |
url = "jdbc:mysql://127.0.0.1:3306/test" | |
user = "root" | |
pwd = "India@123" | |
final.write.format("jdbc").option("driver", driver)\ | |
.option("url", url)\ | |
.option("dbtable", "movies_ratings")\ | |
.option("user", user)\ | |
.option("password", pwd)\ | |
.save() | |
if __name__ == '__main__': | |
spark, sc=create_session() | |
read_csv(spark, sc) | |
calculation(spark,sc) | |
write_mysql(spark, sc) |
docker cp imdb_pyspark.py livy:/opt/jars/ |
from airflow.models import DAG | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.utils.dates import days_ago | |
default_args = { | |
'owner': 'Ravi', | |
'start_date': days_ago(2) | |
} | |
second_dag = DAG( | |
dag_id='second_dag', | |
default_args=default_args, | |
schedule_interval='0 0 * * *', | |
catchup=False | |
) | |
start_task = DummyOperator(task_id='start_task1', dag=second_dag) | |
livy_spark_submit = BashOperator( | |
task_id='spark-submit', | |
bash_command="spark-submit.sh", | |
dag=second_dag | |
) | |
end_task = DummyOperator(task_id='end_task', dag=second_dag) | |
start_task >> livy_spark_submit >> end_task | |
#!/bin/bash | |
#/opt/jars/test.py -> file location of livy local path | |
#Livy machine ip and port http://172.19.0.4:8998 | |
curl POST --data '{"file": "file:/opt/jars/spark_livy.py", "className": "org.apache.spark.examples.SparkPi"}' -H "Content-Type: application/json" http://172.19.0.4:8998/batches/ | |
It will look like below snapshot and we are good to trigger
the job.
That’s it …. we have validated all the integration, further
we are able to build the spark code as per business requirement and for the
automation we have to follow same procedure.
nice blog. keep it up
ReplyDelete