Analyzing a billion files in S3
This is the story of how Freebird analyzed a billion files in S3, cut our monthly costs by thousands of dollars, and learned a lot about Hadoop, Elastic MapReduce, and S3. This post describes the problem we faced, our solution, and what we learned.
Freebird empowers travelers to skip the line and instantly book a new ticket after a flight disruption — on any airline, for free, with only three taps on their phone. Since we purchase a lot of last-minute plane tickets, it's important for us to be able to predict the prices of these tickets to understand and manage our financial risk.
Freebird uses machine learning techniques to predict last-minute flight prices, which means that we need a lot of historical pricing data to power our predictions. To gather this data, we've subscribed to several live feeds of flight pricing data. When a flight search is performed through one of our partners, we receive a copy of the search results as XML. To persist this data, we set up a server that saved each search result as an object in S3, so our data set grew gradually over time
The Data Swamp
After hiring a full-time data scientist, we finally had someone with time to analyze this data. By this point, we had accumulated over one billion objects in S3, totaling more than 100 TB of data. Although we were able to analyze tiny subsets of the data in Python, even expensive memory-optimized EC2 instances weren't performing as well as we wanted.
We chose the Hadoop stack as a replacement since it scales well and there are a variety of interoperable tools and a robust community. This meant that we needed to transform our data into a Hadoop-friendly format.
There are a few limitations when dealing with numerous and small files in Hadoop. Hadoop's whole thing is to provide high throughput by avoiding disk seeks. Since small files are not guaranteed to be stored contiguously on disk, they cause Hadoop to perform too many disk seeks. Additionally, there is a per-task bookkeeping overhead that keeps track of whether a task has been completed, which node is working on it, how many times it has failed, and so on. More files means more tasks and therefore a higher proportion of resources dedicated to bookkeeping instead of actual work.
S3 isn't optimized for storing this many objects either. Although it supports range queries by object key, listing our entire data set would have taken several days on a laptop.
Since we access this data infrequently, we want it to be highly compressed to save on storage costs. Small files usually make compression less effective: compression algorithms rely on efficiently encoding repeating patterns, but if each file is compressed individually then patterns that occur across multiple files can't be compressed.
Effective data science requires rapid iteration, so we need to be able to run jobs quickly and cheaply. Since reading all billion files for each analysis would have been slow and expensive, we decided to run a job to read the entire data set from S3, convert it into a better shape, and delete the original data from S3.
The job was designed to do three things:
- Archive many small files into a few bigger ones
- Compress the data to reduce storage costs and access speeds
- Ease future analysis by converting the data into a Hadoop-friendly format
The job's input was over a billion individual XML files stored in S3, organized by time. Its output was archives of compressed XML files, still organized by time. We chose to store the output in S3 rather than on the disk of a persistent HDFS cluster in order to save money.
Hitting a bug halfway through a multi-thousand-dollar job would be pretty embarrassing, especially if that job was supposed to save the company money! Therefore, we made the job as simple as possible to minimize the probability of developer error. It didn't parse the files at all; it simply compressed them and grouped them into archives.
We wanted to get at our data quickly without breaking the bank. There are already a few tools designed to do something like this, but none of them checked all the boxes for us. Although we could have run the job using a single gigantic EC2 instance, the biggest EC2 instances offer poor computing power per dollar. This meant that we needed to distribute our job across multiple EC2 instances for better value.
In order to effectively distribute this job, we chose Hadoop, MapReduce, and Amazon's Elastic MapReduce.
We needed an archive format, i.e. a way to store many small files inside of a single bigger file. Hadoop natively supports two archive formats: HAR and SequenceFile.
A HAR or Hadoop Archive contains an index that allows efficient random access to any file. However, streaming over all files isn't efficient because there is no way to iterate over the data without disk seeks.
A SequenceFile just writes a bunch of key-value pairs sequentially, with no index. This permits efficient streaming but not random access. In a SequenceFile, each value can be individually compressed (record compression) or larger groups of key-value pairs can be compressed together (block compression). Record compression could be useful if we want to filter by filename, since we only need to decompress the files that we want to look at. On the other hand, we get improvements in compression factor by using block compression, since the codec will naturally reuse patterns from one file to efficiently compress other files.
Since we wanted to optimize for streaming access over all records, we chose to write our output as SequenceFiles with block compression.
There is no single "best" compression codec. Here are the factors that we cared about:
- Compression factor: storing all this data was making a big dent in our AWS bill, and we wanted to fix that. Separately, a high compression factor improves data access speeds when network and disk access are bottlenecks.
- Convenience: we wanted a codec that was natively supported in Hadoop.
- Read throughput: we wanted to be able to run later jobs quickly, so we needed to be able to decompress the data fairly quickly.
- Write throughput: since the job was only to be run once, we only needed to pay the cost of writing it once. That meant that write throughput was less important to us.
Overall we wanted a convenient codec with great compression factor at the possible expense of throughput. Although LZMA outperforms bzip2 on paper, support for LZMA in Hadoop seems immature. By contrast, bzip2 is natively supported in Hadoop. Since it is splittable, it can be used to implement efficient block compression. Based on this, we went with bzip2.
We wanted to be able to easily train machine learning algorithms on the output of our job without writing custom code to load the data in. We accomplished this by choosing technologies that are natively supported in the Hadoop ecosystem:
- Our compression algorithm, bzip2
- Our archive format, SequenceFile
- Our storage medium, S3
This meant that we were able to train machine learning algorithms on our output without any custom loading code.
Instead of writing a vanilla MapReduce job, we made two major changes.
First, we avoided reduce steps, instead using only map steps. Normally, the outputs of map tasks are written to disk, from where they are read by reduce tasks. We wanted to avoid this, because paying for 100 TB of disk space would have increased the cost of this job considerably. Since both our inputs and outputs were organized by time, we knew in advance which inputs were relevant to each output. This meant that we could write each output as soon as we knew we had collected all relevant inputs. Since all the inputs for each output fit in memory, we didn't need to buffer them on disk at any point. Normally this type of optimization is not possible, because it's rare to know the mapping between inputs and outputs beforehand.
Second, instead of loading our files as a normal MapReduce input, we used the S3 Java client to parallelize listing the files. Letting Hadoop list the files wouldn't automatically distribute file listing across the cluster, which is important when there are a billion files.
Although writing our job as map-only took a little more effort, it ended up providing large gains in performance.
First, we logically partitioned our data set into one-hour intervals. Given this list of intervals, we then processed each one-hour interval independently, as follows.
Within each one-hour interval, we used the Java S3 client to retrieve the key and size of each object. Since our keys were human-readable times, we used a prefix query to list all the keys for an hour. Based on the sizes, we grouped contiguous objects into bins so that each bin had approximately the same total size. This step was fairly quick since we only needed to look at the metadata for each object to get its size. At the end of this step, we had a list of bins that could be processed completely independently.
Within each bin, we downloaded all the files, concatenated them, compressed them, and uploaded the resulting archive to S3. Although we customized the download step, we let MapReduce take care of everything else.
Even after putting some careful thought into our initial design, we were able to make our job much more efficient through optimization. We started with 10 GB of data and scaled up in two main dimensions: amount of data and computing resources.
Increasing the amount of data helped us to find rare bugs that only occurred at scale. Increasing the amount of computing resources helped us improve the performance of our job.
The straggler problem
Once we got up to 1TB of data, we noticed an effect where the job progress would quickly reach 99%, then a single task would take hours to complete.
This meant that our compute resources were sitting idle for more than 1/3 of the time! That's what we call a Hadoopsies. After a couple dead ends, we realized that this problem was caused by natural variance in the amount incoming of data per unit time. Since flight search volume varies naturally with time of day, the uniform-time 10-minute bins that we were using ended up with wildly different amounts of data
At this point, we switched from uniform-time bins to uniform-size 64 MB bins. This made the job about 3x as fast.
The CPU usage problem
Our compute-optimized EC2 instances were only using about 40% of their CPU. This suggested that we might be able to use our hardware more efficiently.
We approached this by squeezing more concurrent tasks onto each machine. Since task allocation was memory-limited, we needed to decrease the amount of memory allocated to each task. Since our tasks are simple, we were able to decrease the memory allocated by a factor of about three, which was enough to max out the CPU and speed up the job.
We paired this memory decrease with a few cute code changes to keep our memory usage low and predictable, preventing OOM errors:
- We compressed the data in a streaming manner as we downloaded it from S3.
- We wrote the compressed data directly into an array.
- We made sure to not copy the data unnecessarily.
The rate-limiting problem
In order to retrieve our data in a timely manner, we wanted to perform several thousand requests per second. Unfortunately, we soon hit S3's rate limit. AWS support suggested to us that we should reorganize our data, which you may have noticed we were trying to do.
The reason we hit S3's rate limit so soon is that S3 uses the key as a way to shard objects across different partitions. Rather than using hashing, keys are mapped to partitions using lexicographic ordering, which permits efficient range queries. The downside of this is that trying to read many objects with nearby keys will concentrate load onto a single S3 partition, which can overwhelm the I/O capacity of that partition. This is what happened in our case.
We wanted to use as much computing power as we could while staying just under S3's rate limit. To accomplish this, we hoped to auto-scale our cluster by CPU usage. When we hit the S3 rate limit, CPU usage would drop because the S3 client would spend more time waiting between requests. Unfortunately, we couldn't find an easy way to scale by CPU: although each machine in an Elastic MapReduce cluster publishes CPU usage metrics, they aren't automatically aggregated across a whole cluster. Although gathering and publishing custom metrics could have solved this, we decided that the time investment probably wouldn't be worth it.
The point of getting around the S3 rate limit was not to make our job cheaper, but rather to get the data sooner. Rather than spending more development time on autoscaling, we decided to run the job "low and slow". Although rate limiting prevented us from reaching the scale that we had hoped for, we were still able to use multiple large EC2 instances to accelerate the job. We were able to process each of our data sources on the first try.
After the job finished, we wanted to feel confident that we could delete the original data.
We validated our job by matching file sizes and number of records against the uncompressed data as well as by gathering some real-world statistics about the flight prices in our data and checking them against common sense.
The output of the job was only 1/49 the size of the uncompressed data. Although we initially assumed that we had lost some data, it turned out that we had just compressed it really well. We owe a lot to bzip2 and for this, but even more to XML for being such an inefficient format for storing the data in the first place. This high compression factor meant that we were able to reduce our monthly storage bills significantly.
The validation process also demonstrated the performance improvements that we had achieved. Whereas the original job took days due to rate limiting, a minimal validation job scanned over the entire data set in only four hours.
After completing validation we were ready to delete the original data. When we clicked "delete" on the S3 web console, the progress bar quickly reached 99.9% and then started going backwards. After trying this several times on several buckets, we think that the "delete" button does not work for S3 buckets with this many objects. It seems to delete a few objects then give up.
In order to get around this, we set a lifecycle configuration rule that deleted all objects after one day. After waiting for this to take effect, we were able to successfully delete our bucket using the web console.
What we learned
Going into this project, I thought of S3 as a magic cloud that seamlessly stores all of my data. Now I think of it as an ALMOST-magic cloud that ALMOST-seamlessly stores all of my data. Although it was nice to be able to look up objects by time, indexing by a dimension that we cared about paradoxically limited our ability to scale because of rate limits. We have now modified our ingestion service to give each key a random prefix. This means that we can no longer efficiently find any particular file or set of files. For example, to read out all the keys for January 2018, we have to list all of the files in the entire bucket then discard ones that don't have "2018-01" in the key. Although this sounds bad, we don't really care about being able find specific files; we would rather be able to efficiently batch-process large piles of them. Random prefixes have made our jobs much faster by avoiding rate limits for both writes and reads.
It would be ideal to sidestep this whole process by ingesting our data directly from the external live feed into a Hadoop-friendly format. Here are a few of the things that make this an interesting challenge:
- We'll need one or more durable persistent queues to buffer small files and write out big files
- We need a component that decides the name of each new file in a concurrency-safe way
- We'll need to be able to manually write bzipped SequenceFiles in the exact format that Hadoop expects
- Since there's much more machinery here, we'll need to be ready to fix it when it breaks in the middle of the night
Although our solution is still far from perfect, I look forward to creating faster, cheaper, and easier ways to analyze our data in the future. If you'd like to join me, Freebird's jobs page is right over here!