Thursday, April 19, 2018

Apache Spark: Performance Tuning Notes

I've been working with Apache Spark for the past 2 years. I'm building a data processing engine instead of just running the kinds of jobs that show up in all  the examples and tutorials. The basics of the app are something like this:
    1. read a batch of messages from kafka
    2. make a REST call to a server to register information about each message in the batch
    3. download a file
    4. upload the file to a new location
    5. make another REST call to a server indicating the message was processed
    6. store metrics about everything in openTSDB

Originally everything happened serially. This worked ok, since Spark was feeding work out across multiple executors. However, with a little extra work I was able to drastically improve performance.

First, I made sure that steps 2 and 5 above were changed so that they used an ExecutorService to queue up all the requests and then I waited on the futures so that processing didn't happen out of order. This made it so that if I was going to make 10 separate REST calls they would all be waiting at about the same time instead of one after the other. I'm going to do something very similar for steps 3 and 4.
Next, I have a separate thread storing the metrics in openTSDB. None of this needs to happen in sync with anything else.
Finally, the most important change I made was to the Spark configuration. After some research I figured out how the following settings work in Spark.
    spark.task.cpus
    spark.num.executors
    spark.executor.cores
    spark.executor.total.cores
There are lots of other settings I used but these are the ones that helped improve performance the best.

    spark.executor.total.cores - This controls the total number of cores that will be assigned to your spark application. This is important if you are running in a cluster where you can't just use everything. Since I'm running in a shared mesos environment I can't request all available resources for my long running spark application.

    spark.executor.cores - This controls the number of cores that each executor will be assigned.

    spark.num.executors - This controls the number of executors you want to have running. This number multiplied by the spark.executor.cores should equal the number set in spark.executor.total.cores.

    spark.task.cpus - This number lets Spark know how many cpus you think each task will use. Since my tasks are mostly IO bound, with multiple REST calls and downloads and uploads I assigned 1 cpu for each task.

By assigning each task 1 cpu, and then making sure I had 6 executors with at least 2 cores each I was able to get each batch to process 12 tasks at a time. By processing that many or more concurrently I was able to reduce the processing time for large batches (2000+ records) from hours to minutes.

No comments:

Post a Comment