Apache Spark Optimization techniques

Apache Spark is one of the most popular cluster computing frameworks for big data processing. However, running complex spark jobs that execute efficiently requires a good understanding of how spark works and various ways to optimise the jobs for better performance characteristics, depending on the data distribution and workload. Following are some of the techniques which would help you tune your Spark jobs for efficiency (CPU, network bandwidth, and memory)

1) Persist/Unpersist

Spark provides the optimization mechanism to store intermediate computation of any Spark Dataframe to reuse in the subsequent actions. The Spark jobs are to be designed in such a way so that they should reuse the repeating computations, or else it will degrade the performance when loaded with billions or trillions of data. Each node stores its partitioned data into the memory and reuses them in different actions when the Dataset is persisted. The Spark persisted data on the nodes are always fault-tolerant; if any partition of the Dataset is lost, it will automatically be recomputed using original transformations that initially created it.

Unpersist removes the stored data from memory and disk. Make sure you unpersist the data at the end of your spark job.

2) Shuffle Partition

Tuning your spark configuration to a right shuffle partition count is very important, Let's say I have a very small dataset and I decide to do a groupBy with the default shuffle partition count 200. In this case, I might overkill my spark resources with too many partitions. In another case, I have a very huge dataset, and performing a groupBy with the default shuffle partition count. In this case, I might under utilize my spark resources.

The spark shuffle partition count can be dynamically varied using the conf method in Spark sessionsparkSession.conf.set("spark.sql.shuffle.partitions",150) or dynamically set while initializing through spark-submit operator spark.sql.shuffle.partitions:150

3) Push Down filters

In SQL, whenever you use a query that has both join and where condition, what happens is Join first happens across the entire data and then filtering happens based on where condition. What will happen if spark behaves the same way as SQL does, for a very huge dataset, the join would take several hours of computation to join the dataset since it is happening over the unfiltered dataset, after which again it takes several hours to filter using the where condition.

Predicate pushdown, the name itself is self-explanatory, Predicate is generally a where condition which will return True or False. During the Map phase what spark does is, it pushes down the predicate conditions directly to the database, filters the data at the database level itself using the predicate conditions, hence reducing the data retrieved from the database and enhances the query performance. Since the filtering is happening at the data store itself, the querying is very fast and also since filtering has happened already it avoids transferring unfiltered data over the network and now only the filtered data is stored in the memory. We can use the explain method to see the physical plan of the dataframe whether predicate pushdown is used or not.

4) BroadCast Joins

When we do a join with two large dataset’s what happens in the backend is, huge loads of data gets shuffled between partitions in the same cluster and also get shuffled between partitions of different executors.

Broadcast joins are used whenever we need to join a larger dataset with a smaller dataset. When we use broadcast join spark broadcasts the smaller dataset to all nodes in the cluster since the data to be joined is available in every cluster nodes, spark can do a join without any shuffling. Using this broadcast join you can avoid sending huge loads of data over the network and shuffling.