Spark Partitioning and Bucketing

Spark Partitioning and Bucketing

Partitioning

Partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.

Partitioned data can be easy to query because they help to skip massive amounts of data to yield faster results, which when not handled correctly can result in small file problems.

in large datasets, you often want to query on filters such as date, type, etc. if we partition our data on date and type, our queries will skip unnecessary partitions and read-only those sections which are guaranteed to contain the desired fields. multiple partitions allow work to be distributed among more workers, but fewer partitions allow work to be done in larger chunks (and often quicker).

We can create RDDs with specific partitioning in two ways –

  1. partitionBy()- By Providing explicit partitioner. this transformation allows applying custom partitioning logic over the RDD.

  2. We can also apply transformations that return RDDs with specific partitioners.

    • join , GroupByKey, ReduceByKey, sort

Bucketing

Bucketing is a technique in both Spark and Hive used to optimize the performance of the task. In bucketing buckets (clustering columns) determine data partitioning and prevent data shuffle. Based on the value of one or more bucketing columns, the data is allocated to a predefined number of buckets.

Bucketing boosts performance by sorting and shuffling data before performing downstream operations, such as table joins. This technique benefits dimension tables, which are frequently used tables containing primary keys. It’s also beneficial when there are frequent join operations requiring large and small tables.

Bucketing is commonly used to optimize the performance of a join query by avoiding shuffles of tables participating in the join. It is beneficial to bucketing when pre-shuffled bucketed tables are used once within the query.

When we enable the buckets, it is critical that we specify the bucket number, for this, one needs to have an insight into the data. Alternatively, we can perform a hit and try to get the best number of buckets. Alternatively, we can start with the number that is the same as the number of the executor we have in our cluster and then adjust it till we get the best performance.

The below transformations will be benefited by bucketing:

Joins , Distinct , groupBy , reduceBy

Sample bucket creation

orders.write .bucketBy(42, "productName") .sortBy("ratings") .saveAsTable("product_bucketed")