Saturday 9 May 2020

Creating RDD from Databases



Hello Everyone! in this post we are going to discuss about “How we can create RDD from databases”. As we can that, there are multiple ways to create RDD such as from text files/sequence files, from database server, from programmatical generated data and from object Storage. But here we will be exploring one way that is “Create RDD from databases”.
The preferred methods of creating an RDD from a relational database table or query involve using functions from the SparkSession object. As we know that, Sparksession is the main entry point for working with all types of data in Spark, including tabular data. The SparkSession exposes a read function, which returns a DataFrameReader object. You can then use this object to read data in a DataFrame, a special type of RDD previously referred to as a SchemaRDD. The read () method has a jdbc function that can connect to and collect data from any JDBC-compliant data source.
Note: Spark processes run in Java virtual machines (JVMs), JDBC is natively available to Spark.
Data can come from a variety of host systems and database platforms, including Oracle, MySQL, Postgres, and SQL Server but we are Considering a MySQL Server called mysqlserver with a database named employees with a table called employees.
Syntax:
spark.read.jdbc(url, table,
column=None,
lowerBound=None,
upperBound=None,
numPartitions=None,
predicates=None,
properties=None)

=> url and table arguments specify the target database and table to read.
=>  column argument helps Spark choose the appropriate column.
=> upperBound and lowerBound arguments are used in conjunction with the column argument to assist      Spark in creating the
partitions.
ð  minimum and maximum values for the specified column in the source table.
ð  The optional argument predicates allows for including WHERE conditions to filter unneeded records while loading partitions.

ð  properties argument to pass parameters to the JDBC API, such as the database user credentials; if supplied, this argument must be a Python dictionary, a set of name/value pairs representing the various configuration options.
               Ex: properties={“user”:”root”, “Password”:”pwd”}
Demo:
employeesdf =
spark.read.jdbc(url="jdbc:mysql://localhost:3306/employees",
table="employees",column="emp_no",numPartitions="2",lowerBound="10001",
upperBound="499999",properties={"user":"<user>","password":"<pwd>"})


=>Running SQL Queries Against Spark DataFrames:

df2 = spark.sql("SELECT emp_no, first_name, last_name FROM employees
LIMIT 2")
df2.show()

2 comments:

  1. From employeesdf you are getting only one column "emp_no" but in select query of df2 you are querying on 3 columns..

    ReplyDelete
  2. This comment has been removed by a blog administrator.

    ReplyDelete