tuning the performance of Apache Flume, a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of event data.
To kick off this series, I’d like to start off discussing some important Flume concepts that come into play when tuning your Flume flows for maximum performance: the channel and the transaction batch size.
Setting Up a Data Flow
Imagine you want to take a heavy stream of user activity from your application server logs and store that onto your Hadoop cluster for analytics. If you have a large deployment of application servers, you would likely want to build a fan-in architecture, where you are sending data from many nodes to relatively fewer nodes.
If you are sending these user events one at a time, each time waiting for acknowledgment that it was delivered, your throughput may be limited by network latency. Naturally, you would want to batch up the events into larger transactions, so that you amortize the latency of the acknowledgment over a larger number of events and therefore get more throughput.
Channels
So, what happens if the storage tier goes down momentarily, as in the case of a network partition? What happens to the events if a Flume agent machine crashes? We still want to be able to serve our users on the application tier and retain our data somehow. In order to accomplish this, we need a buffering mechanism on each agent that allows it to store events in the case of downstream failures or slowdowns. In Flume, the channel is what persists events at each hop in the flow. Below is a diagram that illustrates where the channel sits in the architecture of a Flume agent.
Memory Channel vs. File Channel
An important decision to make when designing your Flume flow is what type of channel you want to use. At the time of this writing, the two recommended channels are the file channel and the memory channel. The file channel is a durable channel, as it persists all events that are stored in it to disk. So, even if the Java virtual machine is killed, or the operating system crashes or reboots, events that were not successfully transferred to the next agent in the pipeline will still be there when the Flume agent is restarted. The memory channel is a volatile channel, as it buffers events in memory only: if the Java process dies, any events stored in the memory channel are lost. Naturally, the memory channel also exhibits very low put/take latencies compared to the file channel, even for a batch size of 1. Since the number of events that can be stored is limited by available RAM, its ability to buffer events in the case of temporary downstream failure is quite limited. The file channel, on the other hand, has far superior buffering capability due to utilizing cheap, abundant hard disk space.
Flume Event Batching
As mentioned earlier, Flume can batch events. The batch size is the maximum number of events that a sink or client will attempt to take from a channel in a single transaction. Tuning the batch size trades throughput vs. latency and duplication under failure. With a small batch size, throughput decreases, but the risk of event duplication is reduced if a failure were to occur. With a large batch size, you get much better throughput, but increased latency, and in the case of a transaction failure, the number of possible duplicates increases.
Transactions are a critical concept in Flume, because the delivery and durability guarantees made by channels only take effect at the end of each successful transaction. For example, when a source receives or generates an event, in order to store that event into a channel a transaction must first be opened on that channel. Within the transaction, the source puts up to the batch size number of events into the channel, and on success commits the transaction. A sink must go through the same process of operating within a transaction when taking events from a channel.
Batch size is configured at the sink level. The larger the batch, the faster the channels operate (but there is a caveat). In the case of the file channel, this speed difference is because all buffers are flushed and then synced to disk when each transaction is committed. A disk sync is a time-consuming operation (it may take several milliseconds), but it is required to ensure the durability of the data. Likewise, with the memory channel, there is a memory synchronization step when each transaction is committed. Of course, memory synchronization is much faster than a disk sync. For more information on the inner workings of the file channel, please see Brock Noland’s article about the Apache Flume File Channel.
The downside of using a large batch size is that if there is some type of failure in the middle of a transaction, such as a downstream host or network failure, there is a possibility of duplicates being created. So, for example, if you set your batch size to 1000, and the machine you are writing to goes offline, duplicates may be generated in groups of up to 1000. This may occur in special cases, for example, if the events got written to the downstream machine but then the connection failed before it could acknowledge that it had received them. However, duplicate events will only appear in exceptional circumstances.
Choosing a Batch Size
To squeeze all the performance possible out of a Flume system, batch sizes should be tuned with care through experimentation. While I will get into this in more detail in the follow-up to this post, here are some rules of thumb for selecting batch sizes.
- Start off with batch sizes equal to the sum of the batch sizes of the input streams coming into that tier. For example, if you have a downstream Flume agent running an Avro source with 10 upstream agents sending events via Avro sinks using a batch size of 100 each, consider starting that downstream agent with a batch size of 1,000. Tune / experiment from there.
- If you find yourself setting the batch size very high (say, higher than 10,000) then consider adding another Sink instead, in order to increase the parallelism (each sink typically runs on its own thread). Say you were going to use one HDFS sink with a batch size of 20,000. Experiment with using 2 HDFS sinks with batch sizes of 5,000 or 10,000 to see if that helps more.
- Prefer the lowest batch size that gives you acceptable performance.
- Monitor the steady-state channel sizes to get more tuning insight (more on this in the next article).
Different Batch Sizes in Different Situations
Due to the performance / duplication tradeoff of the batch size parameter, I often see varying batch size settings depending on the use case. In the case of using Flume to write to HBase using the HBase Sink, incrementing counters, a smaller batch size on the order of 100 is often used to reduce the impact in case of hiccups in the system. On the other hand, with an HDFS sink, in order to get maximum throughput, I see people running with batch sizes of 1,000 or even 10,000, since typically they can easily run map-reduce jobs to de-duplicate the data at processing time. Note that when writing large events with large batch sizes to HDFS, often other parameters need to be increased as well. One such parameter is hdfs.callTimeout, which may be increased to 60 seconds or more to account for the long tail of occasional higher-latency calls to HDFS.
At the other end of the spectrum, in cases where batching events at the application (Flume client) tier is not possible, the memory channel is often used at the collector-tier Flume agent (or a localhost agent on the app servers) to get acceptable performance with a batch size of 1, while using larger batch sizes and file channels in the downstream agents in order to get most of the benefits of durability there. For the best performance, however, all tiers including the client/application tier would perform some level of batching. (Please see above diagram for an illustration of the tiers referenced in this scenario.)
Configuration Parameters and Gotchas
The actual parameter used to set the batch size varies between sinks, but for most sinks it’s simply called batchSize or batch-size. For the HDFS sink, it’s actually called hdfs.batchSize for historical reasons; I recommend setting hdfs.txnEventMax to the same value as hdfs.batchSize for simplicity. Historically, in the HDFS sink, the number of events taken in a single transaction can be different from the number of events written to HDFS before a sync() operation; In practice, there is little reason these should not be set to the same value.
One potentially confusing gotcha when tuning the batch size on a sink is that it must be less than or equal to the transactionCapacity set on the corresponding channel. The transactionCapacity should be set to the value of the largest batch size that will be used to store or remove events from that channel.
tl;dr: Below is a “cheat sheet” batch size tuning summary for your convenience. Please note that these are just starting points for tuning.
Sink Type | Config parameter | Typical value |
Avro | batch-size | 100 |
HDFS | hdfs.batchSize, hdfs.txnEventMax | 1000 |
HBaseSink | batchSize | 100 |
AsyncHBaseSink | batchSize | 100 |
That’s all I have space for in one blog post. Please leave feedback and questions in the comments. Look for another post in the future with tips on using Flume’s monitoring capabilities to take advantage of important information which can aid you in your quest for optimum performance.
a
No comments:
Post a Comment