Hello
everyone! In this article, we are about to explore, how many operations we can
perform on RDDs. As usual, we do focus on practical approach rather than theoretical
and this time, we are going to do same thing. Trust me! You are going to get something
from this article.
There are two types of operations generally we do on rdds: Transformations
and Actions.
Transformations are operations performed against RDDs
that result in the creation of new RDDs. According to our requirements we used
the transformation methods to transform the rdds. Below are the basic
transformations methods, initially we will discuss only basic transformation
methods, later we will perform advance transformation on pair RDD.
map ():
·
It
is used to transform some aspect of data per row to something else.
·
Map
operation applies to each element of RDD and it returns new RDD. It means that
the number of elements in RDD before transformation and after transformation will
same.
·
It
is lazy evaluated.
·
It
is narrow transformation.
syntax:
RDD.map(<function>, preservesPartitioning=False)
Using Python version 2.7.5 (default, Apr 2 2020 13:16:51) SparkSession available as 'spark'. >>> data=['ravi','hadoop','spark','python'] # 4 elements in List >>> rdd=sc.parallelize(data) >>> newRdd=rdd.map(lambda x:x.upper()) >>> newRdd.collect() #Action 20/05/29 03:18:52 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes ['RAVI', 'HADOOP', 'SPARK', 'PYTHON'] #After transformation, 4 elements in List. #codearmyforce
Note:
preservesPartitioning is an optional Boolean argument.
This parameter can be used by the spark schedular to optimize subsequent
operations, such as join based on the partitioned key.
flatMap():
·
it
is similar to map () however flatMap can return a list of output per input
means one output for all input as a list.
·
It
is lazy evaluated.
The take() action returns the first n elements of an RDD. The elements taken
are not in any particular order; in fact, the elements returned from a take()
action are non-deterministic, meaning they can differ if the same action is run
again.
Syntax:
RDD.take(n)
top():
The top() action returns the top n elements from an RDD, but unlike with
take(), with top() the elements are ordered and returned in descending
order.
Syntax:
RDD.top(n, key=None)
first():
The first() action returns the first element in this RDD. Similar to the
take() and collect() actions and unlike the top() action, first()
does not consider the order of elements and is a non-deterministic operation.
Syntax:
RDD.first()
fold():
The fold() action aggregates the elements of each partition of an RDD and
then performs the aggregate operation against the results for all, using a given
function and a zeroValue.
Syntax:
RDD.fold(zeroValue, <function>)
reduce():
The reduce() action reduces the elements of an RDD using a specified
commutative and/or associative operator. The <function> argument specifies
two inputs (lambda x, y: ...) that represent values in a sequence from the
specified RDD. Listing 4.22 shows an example of a reduce() operation to
produce a sum against a list of numbers.
Syntax:
RDD.reduce(<function>)
foreach():
The foreach() action applies a function specified in the <function>
argument, anonymous or named, to all elements of an RDD.
Syntax:
RDD.foreach(<function>)
syntax:
RDD.flatMap(<function>, preservesPartitioning=False)
Using Python version 2.7.5 (default, Apr 2 2020 13:16:51) SparkSession available as 'spark'. >>> from pyspark import SparkContext >>> data=['ravi','hadoop','spark','python'] >>> rdd=sc.parallelize(data) >>> newRdd1=rdd.flatMap(lambda x:x.upper()) >>> newRdd1.collect() ['R', 'A', 'V', 'I', 'H', 'A', 'D', 'O', 'O', 'P', 'S', 'P', 'A', 'R', 'K', 'P', 'Y', 'T', 'H', 'O', 'N']
filter():
·
filter
transformation evaluates a Boolean expression.
·
Generally,
we use filter transformation for eliminating something from RDD or dataset.
Example: Suppose we have 1 to 10 numbers in a list and we
want to take only even values in new rdd. Let us see below the code.
Using Python version 2.7.5 (default, Apr 2 2020 13:16:51) SparkSession available as 'spark'. >>> num=range(11) >>> num [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> rdd=sc.parallelize(num) >>> newRdd=rdd.filter(lambda x:x%2==0) >>> newRdd.collect() 20/05/29 04:20:24 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes [0, 2, 4, 6, 8, 10]
distinct
():
it is used
to remove the duplicates from RDD and create new RDD containing unique elements.
syntax:
RDD.distinct(numPartitions=None)
Note:
numPartitions is an optional argument.
Using Python version 2.7.5 (default, Apr 2 2020 13:16:51) SparkSession available as 'spark'. >>> data=['1','2','3','1','1','3','2','2','3','3'] >>> rdd=sc.parallelize(data) >>> newRdd=rdd.distinct() >>> newRdd.collect() 20/05/29 05:50:06 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes ['1', '3', '2']
groupBy():
·
This
transformation function is used to arrange the data frame either in ascending
or descending for specific columns.
·
Here
I am going to create dataframe which is basically a schema RDD.
Syntax:
RDD.groupBy(<function>, numPartitions=None)
Example:
Example:
>>from pyspark.sql import SparkSession # Here my data source laction is local directory >>spark = SparkSession.builder.appName("groupbyagg").getOrCreate() >>df = spark.read.csv('sales_info.csv', inferSchema=True, header=True) >>df.printSchema() root |-- Company: string (nullable = true) |-- Person: string (nullable = true) |-- Sales: double (nullable = true) #Showing the data >>df.show() +-------+-------+-----+ |Company| Person|Sales| +-------+-------+-----+ | GOOG| Sam|200.0| | GOOG|Charlie|120.0| | GOOG| Frank|340.0| | MSFT| Tina|600.0| | MSFT| Amy|124.0| | MSFT|Vanessa|243.0| | FB| Carl|870.0| | FB| Sarah|350.0| | APPL| John|250.0| | APPL| Linda|130.0| | APPL| Mike|750.0| | APPL| Chris|350.0| +-------+-------+-----+ # OrderBy # Ascending >> df.orderBy(df["Sales"]).show() +-------+-------+-----+ |Company| Person|Sales| +-------+-------+-----+ | GOOG|Charlie|120.0| | MSFT| Amy|124.0| | APPL| Linda|130.0| | GOOG| Sam|200.0| | MSFT|Vanessa|243.0| | APPL| John|250.0| | GOOG| Frank|340.0| | FB| Sarah|350.0| | APPL| Chris|350.0| | MSFT| Tina|600.0| | APPL| Mike|750.0| | FB| Carl|870.0| +-------+-------+-----+
sortBy():
·
This
is used to sort the data.
·
It
is sorting by the keys and grab the values of respective keys
Syntax:
RDD.groupBy(<function>)
Example:
>>unsorted = sc.parallelize([('t', 3),('b', 4),('c', 1)]) >>Sorted = unsorted.sortBy(lambda a: a[1]) >>Sorted.collect() [('c', 1),('t', 3),('b', 4)]
So far, we
have completed all basic transformations and now we are moving to the next
operation that is Actions.
count():
It takes the
total count of elements from RDD
Syntax:
RDD.count()
example:
Using Python version 2.7.5 (default, Apr 2 2020 13:16:51) SparkSession available as 'spark'. >>> data=['1','2','3','1','1','3','2','2','3','3'] >>> rdd=sc.parallelize(data) >>> rdd.count() 20/05/29 08:18:07 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 10
collect():
This is used
to print the all data in rdd.
Syntax:
RDD.collect()
example:
take():Using Python version 2.7.5 (default, Apr 2 2020 13:16:51) SparkSession available as 'spark'. >>> data=['1','2','3','1','1','3','2','2','3','3'] >>> rdd=sc.parallelize(data) >>> rdd.collect() ['1', '2', '3', '1', '1', '3', '2', '2', '3', '3']
The take() action returns the first n elements of an RDD. The elements taken
are not in any particular order; in fact, the elements returned from a take()
action are non-deterministic, meaning they can differ if the same action is run
again.
Syntax:
RDD.take(n)
top():
The top() action returns the top n elements from an RDD, but unlike with
take(), with top() the elements are ordered and returned in descending
order.
Syntax:
RDD.top(n, key=None)
first():
The first() action returns the first element in this RDD. Similar to the
take() and collect() actions and unlike the top() action, first()
does not consider the order of elements and is a non-deterministic operation.
Syntax:
RDD.first()
fold():
The fold() action aggregates the elements of each partition of an RDD and
then performs the aggregate operation against the results for all, using a given
function and a zeroValue.
Syntax:
RDD.fold(zeroValue, <function>)
reduce():
The reduce() action reduces the elements of an RDD using a specified
commutative and/or associative operator. The <function> argument specifies
two inputs (lambda x, y: ...) that represent values in a sequence from the
specified RDD. Listing 4.22 shows an example of a reduce() operation to
produce a sum against a list of numbers.
Syntax:
RDD.reduce(<function>)
foreach():
The foreach() action applies a function specified in the <function>
argument, anonymous or named, to all elements of an RDD.
Syntax:
RDD.foreach(<function>)
Informative......
ReplyDelete