Friday, 29 May 2020

Operations on RDDs




Hello everyone! In this article, we are about to explore, how many operations we can perform on RDDs. As usual, we do focus on practical approach rather than theoretical and this time, we are going to do same thing. Trust me! You are going to get something from this article.

There are two types of operations generally we do on rdds: Transformations and Actions.




Transformations are operations performed against RDDs that result in the creation of new RDDs. According to our requirements we used the transformation methods to transform the rdds. Below are the basic transformations methods, initially we will discuss only basic transformation methods, later we will perform advance transformation on pair RDD.



map ():
·         It is used to transform some aspect of data per row to something else.
·         Map operation applies to each element of RDD and it returns new RDD. It means that the number of elements in RDD before transformation and after transformation will same.
·         It is lazy evaluated.
·         It is narrow transformation.

syntax:
RDD.map(<function>, preservesPartitioning=False)

Using Python version 2.7.5 (default, Apr  2 2020 13:16:51)
SparkSession available as 'spark'.
>>> data=['ravi','hadoop','spark','python'] # 4 elements in List
>>> rdd=sc.parallelize(data)
>>> newRdd=rdd.map(lambda x:x.upper())
>>> newRdd.collect() #Action
20/05/29 03:18:52 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
['RAVI', 'HADOOP', 'SPARK', 'PYTHON'] #After transformation, 4 elements in List.
#codearmyforce


Note:
preservesPartitioning is an optional Boolean argument. This parameter can be used by the spark schedular to optimize subsequent operations, such as join based on the partitioned key.


flatMap():
·         it is similar to map () however flatMap can return a list of output per input means one output for all input as a list.
·         It is lazy evaluated.
syntax:

RDD.flatMap(<function>, preservesPartitioning=False)

Using Python version 2.7.5 (default, Apr  2 2020 13:16:51)
SparkSession available as 'spark'.
>>> from pyspark import SparkContext
>>> data=['ravi','hadoop','spark','python']
>>> rdd=sc.parallelize(data)
>>> newRdd1=rdd.flatMap(lambda x:x.upper())
>>> newRdd1.collect()
['R', 'A', 'V', 'I', 'H', 'A', 'D', 'O', 'O', 'P', 'S', 'P', 'A', 'R', 'K', 'P', 'Y', 'T', 'H', 'O', 'N']


filter():
·         filter transformation evaluates a Boolean expression.
·         Generally, we use filter transformation for eliminating something from RDD or dataset.
Example: Suppose we have 1 to 10 numbers in a list and we want to take only even values in new rdd. Let us see below the code.

Using Python version 2.7.5 (default, Apr  2 2020 13:16:51)
SparkSession available as 'spark'.
>>> num=range(11)
>>> num
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> rdd=sc.parallelize(num)
>>> newRdd=rdd.filter(lambda x:x%2==0)
>>> newRdd.collect()
20/05/29 04:20:24 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
[0, 2, 4, 6, 8, 10]


distinct ():
it is used to remove the duplicates from RDD and create new RDD containing unique elements.
syntax:
RDD.distinct(numPartitions=None)
Note:
numPartitions is an optional argument.

Using Python version 2.7.5 (default, Apr  2 2020 13:16:51)
SparkSession available as 'spark'.
>>> data=['1','2','3','1','1','3','2','2','3','3']
>>> rdd=sc.parallelize(data)
>>> newRdd=rdd.distinct()
>>> newRdd.collect()
20/05/29 05:50:06 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
['1', '3', '2']

groupBy():
·         This transformation function is used to arrange the data frame either in ascending or descending for specific columns.
·         Here I am going to create dataframe which is basically a schema RDD.
Syntax:
 RDD.groupBy(<function>, numPartitions=None)
Example:

>>from pyspark.sql import SparkSession
# Here my data source laction is local directory
>>spark = SparkSession.builder.appName("groupbyagg").getOrCreate()
>>df = spark.read.csv('sales_info.csv', inferSchema=True, header=True)

>>df.printSchema()
root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)
#Showing the data
>>df.show()
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+
# OrderBy
# Ascending

>> df.orderBy(df["Sales"]).show()
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+

sortBy():
·         This is used to sort the data.
·         It is sorting by the keys and grab the values of respective keys
Syntax:
 RDD.groupBy(<function>)

Example:

   >>unsorted = sc.parallelize([('t', 3),('b', 4),('c', 1)])
>>Sorted = unsorted.sortBy(lambda a: a[1])
>>Sorted.collect()

[('c', 1),('t', 3),('b', 4)]

So far, we have completed all basic transformations and now we are moving to the next operation that is Actions.

Below are the basic Actions generally we use.

count():
It takes the total count of elements from RDD
Syntax:
RDD.count()

example:

Using Python version 2.7.5 (default, Apr  2 2020 13:16:51)
SparkSession available as 'spark'.
>>> data=['1','2','3','1','1','3','2','2','3','3']
>>> rdd=sc.parallelize(data)
>>> rdd.count()
20/05/29 08:18:07 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
10


collect():
This is used to print the all data in rdd.
Syntax:
RDD.collect()
example:

Using Python version 2.7.5 (default, Apr  2 2020 13:16:51)
SparkSession available as 'spark'.
>>> data=['1','2','3','1','1','3','2','2','3','3']
>>> rdd=sc.parallelize(data)
>>> rdd.collect()
['1', '2', '3', '1', '1', '3', '2', '2', '3', '3']

take():
The take() action returns the first n elements of an RDD. The elements taken
are not in any particular order; in fact, the elements returned from a take()
action are non-deterministic, meaning they can differ if the same action is run
again.
Syntax:
RDD.take(n)

top():
The top() action returns the top n elements from an RDD, but unlike with
take(), with top() the elements are ordered and returned in descending
order.
Syntax:
RDD.top(n, key=None)

first():
The first() action returns the first element in this RDD. Similar to the
take() and collect() actions and unlike the top() action, first()
does not consider the order of elements and is a non-deterministic operation.
Syntax:
RDD.first()

fold():
The fold() action aggregates the elements of each partition of an RDD and
then performs the aggregate operation against the results for all, using a given
function and a zeroValue.
Syntax:
RDD.fold(zeroValue, <function>)


reduce():
The reduce() action reduces the elements of an RDD using a specified
commutative and/or associative operator. The <function> argument specifies
two inputs (lambda x, y: ...) that represent values in a sequence from the
specified RDD. Listing 4.22 shows an example of a reduce() operation to
produce a sum against a list of numbers.
Syntax:
RDD.reduce(<function>)


foreach():
The foreach() action applies a function specified in the <function>
argument, anonymous or named, to all elements of an RDD.
Syntax:
RDD.foreach(<function>)

1 comment: