Skip to main content
Version: 1.0

Spark Workflow and Partitioning

Optimisation

Okay...I know how to wrangle/transform my data...but how do I actually optimize my job’s performance? Golden Rule: In the real world, make sure your dataset/table is partitioned well

  • Lots of small files are the enemy!
    • Having lots of tiny files will result in S3 needing to do lots of file listing operations. These are extremely slow and can even be expensive
    • Lots of small files means lots of data shuffling through the network. This is slow!
  • HUGE files are also bad
    • Having too few files (all being huge) means you probably won’t take advantage of all of the cores in your cluster. In other words, the data can’t be easily distributed around the cluster
    • Each node in your cluster might even have to try and break down each of these huge files in order to redistribute some data to other nodes. This is a waste of time and money (must-watch)
  • So what’s a suitable strategy?
    • There’s no ‘best’ number. Try to target each .snappy.parquet file to be somewhere roughly between 256MB to 1GB
    • More importantly, make sure that you’re partitioning on columns that you frequently filter or do groupBy on
    • DO NOT partition on columns with high cardinality (e.g. a userId, which has millions of distinct values) this will result in lots of small files and lots of file listing operations

Partitioning

partitioning.png

info

Partitioning strategy is the most important decision we have to get right!

If your partitioning strategy is decent, you’ll most likely be fine and won’t need to tweak other knobs. Especially going forward in the future with Spark 3.0’s Adaptive Query Execution (AQE), a lot of optimizations will be automated for you!

So how does a partitioned table look?

  • It would actually look like a bunch of hierarchical folders
  • The partitioning values become their own folder (e.g. year=2018)
  • The underlying data will be at the bottom of the hierarchy and will
  • often have a .snappy.parquet file extension (if using Spark)

Can you give me an example?

  • Partitioning the table based on some notion of time is a popular option (check if that makes sense for your project first though!)
  • e.g. assuming each day of data for the table is of the order of 128MB - 1GB, then your partitioning keys can be (“year”, “month”, “day”)
  • You don’t need to explicitly define all the values, Spark will smartly create a new partition for each distinct combination of your partitioning values

Working with Partitioned Data

Partitioning FAQs

partitioning.png

info

Question: So...is a parquet file a file or a folder of files?

info

Short Answer: either!

  • With a single-node library like Pandas, you can write a single .snappy.parquet file if you want
  • However, in the real-world they are often folders of partitions
    • This way you can read/write an entire table with just one path (the root of the table)
      • e.g. s3://my-bucket/my-table/ or s3://my-bucket/my-table.parquet/ (both of these styles are still folders)
      • Underneath all of the partitioning folders, you will find your .snappy.parquet files
    • The query engine (e.g. Spark or Presto) will then take care of understanding the partitioning structure of the table and will optimize your queries around that
    • Spark will always write the output of a DataFrame as a folder at the root level rather than a single file (because it’s designed for distributed/concurrent reading/writing of data, which often involves multiple files)

Shuffling

Resources

fear-path-to-dark-side.png

  • Spark DAGs and planning (optional)
    • Just know that bad partitioning → shuffling → pain (must-watch)
    • You can check how ‘shuffly’ your Spark job looks by viewing the DAG
  • Managing Partitioning
    • Important: understand that repartition() and DataFrame.write.partitionBy() are not the same thing
      • Repartition can take in 2 different types of arguments:
        • a number: controls the number of .snappy.parquet files
        • a bunch of column names: it will ensure 1 .snappy.parquet file per each distinct combination of your provided columns
      • DataFrame.write.partitionBy defines the folder structure of the table
        • however, it does not guarantee how many .snappy.parquet files will be in each folder
      • Sometimes you might even need to do both e.g. df.repartition(“year”, “month”).write.partitionBy(“year”, ”month”)... in order to guarantee exactly 1 .snappy.parquet file per each month folder
    • Try to read up on the difference between repartition and coalesce
      • Short Answer: The coalesce transformation applied to a DataFrame (not to be confused with coalesce() applied to a column), will try to merge partitions to reach your desired number. You only use coalesce when you want to reduce the number of partitions in your data.
      • On the other hand, repartition() will full shuffle all of the data around (more expensive).
      • If you need to increase the number of partitions in your data, then you will need repartition()
  • Practice Repartitioning vs PartitionBy in DataBricks