Apache Spark

Describe what are Accumulators and Broadcast Variables in Spark and when to use these shared variables?

Accumulators & Broadcast Variables:

Both Accumulators and Broadcast variables are considered as Shared variables in spark. Meaning Spark will allow both worker nodes and Driver program to mutually access the values of these variable while processing the data in spark cluster environment.

Accumulators: Accumulators are the shared variables which will be defined inside the Driver Program. we can define them as: acctVar = sc.accumulator().

Though This variable – acctVar is defined inside driver program, but spark allows this variable accessed by the all worker nodes that are participated in the cluster.

To understand how accumulator variable works in spark, let us take the following real time scenario:

Problem Statement: Count how many customers are from country “USA” in Fake_Customers.csv

Solution: In order to find out count of customers, let us define spark accumulator variable as countUSACustomers = sc.accumulator(0); where sc→ SparkContext object; the variable as initial value of 0

As spark is a distributed computation framework, the driver program will have the logic to increment the value of countUSACustomers by 1 every time whenever the worker node encounters country as “USA” while the worker node(s) is processing it’s part of the file.

Conclusion: Once the file processing is completed, the final of countUSACustomers would gives the final solution to the problem statement

Meaning though the variable – countUSACustomers is defined inside the driver program, still it is accessed, and updated/written by each worker node.

Broadcast Variables: Accumulators allows the worker nodes to write/update the shared variable value, Broadcast variable on other hand will copy/broadcast the dataset/variable values to each of the worker nodes.

Let us take the following example in order to explain functionality of Broadcast variable: Problem Statement: Customer and CountryCodes are 2 input files which has following information Customer.csv: customer information plus country names Metadata: {First_Name, Last_Name, CountryName,City,Zip, Province} CountryCode.csv: CountyName and CountryCode – represents the International dialling code

Metadata: {CountryName, CountryCode} Broadcast variable declaration sc.boradcast(,)

In order to get the desired results, spark needs to join both these datasets across the worker nodes. This is definitely expensive operation as spark needs to shuffle b/w the worker nodes while joining both the datasets.

Alternatively spark allows to broadcast the 2nd data set (in this case CountryCode.csv since this is smaller dataset) onto each worker node; and each worker node can perform the local lookup on the 2nd data set while retrieving country code details by passing country name. Meaning CountryName will be passed as input in return CountryCode will be returned by the worker node. This is drastically increases the performance as no need to go over the network while joining the data between the datasets.

Code Snippet: Problem #1 – Accumulators demo In the following example – the variable num is defined as accumulator variable. It has initial value of 1. This accumulator variable is used by multiple worker nodes and returns accumulated value of 15 at the end a an output from pyspark import SparkContext, SparkConf if __name__ == “__main__”: conf = SparkConf().setAppName(‘AccumulatorsDemo’).setMaster(“local[*]”) sc = SparkContext(conf = conf) num = sc.accumulator(1) def f(x): global num num+=x rdd = sc.parallelize([2,3,4,5]) rdd.foreach(f) final = num.value print(“Accumulated value is -> %i” % (final))

Problem #2 – Broadcast demo In the following example – Showing a Broadcast variable called words_new, it has an attribute called value, this attribute stores the data and then it is used to return a broadcasted value as shown below: from pyspark import SparkContext, SparkConf if __name__ == “__main__”: conf = SparkConf().setAppName(‘BroadcastDemo’).setMaster(“local[*]”) sc = SparkContext(conf = conf) words_new = sc.broadcast([“USA”, “India”, “China”, “Canada”, “Brazil”]) data = words_new.value print(“Stored data ->”,data) element = words_new.value[2] print(“Printing a particular element in RDD -> %s”,element)

References: Following resources are used as a reference to build above 2 examples on Accumulators and Broadcast demos https://www.tutorialspoint.com/apache_spark/advanced_spark_programming.htm Other Study material – Canvas Advanced Spark Programming module

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: