We all know the power and advantages of KAFKA. Apache Kafka is publish-subscribe messaging system which basically has three major components
- KAFKA CONSUMER
- KAFKA PRODUCER and
- KAFKA BROKER
This blog is all about how we can achieve maximum throughput while planning to have KAFKA in production or in POCs.
There are many organizations running Kafka in their production and also they have provided default configuration to maximize Kafka performance. I’ll be using some of the existing configuration details along with some extra information to describe the configuration we need to take care in setting up the cluster configuration.
Before diving into Kafka tuning pool lets get some idea what exactly Kafka is and what feature it provides-
INTRODUCTION
Apache Kafka is an open-source message broker project, which was developed by Apache Software Foundation.
Kafka provides distributed, partitioned and replicated commit log-service with functionality of a messaging system.
The features which makes Kafka more reliable and fast are mentioned below:
High Throughput
A single Kafka broker is capable of handling hundreds of megabytes of reads and writes per second from thousands of clients.
Scalable
Kafka is designed in such a way that it can be elastically and transparently expanded without any downtime. It means you can add new broker node at any point of time without shutting down your Kafka cluster.
Durability
All the messages are persisted on the disk and also has a replica of it within the cluster to avoid data loss at any point. Brokers are capable enough of handling terabytes of data without affecting the performance.
Distributed
Kafka also provides distributed processing of messages and its cluster-centric design offers you strong durability and fault-tolerance. Kafka allows us to have partition of any topic which will help us to increase throughput of the system.
So, at very high level we can say that Producers produced the messages and sends them to the Kafka cluster over the network, which will be consumed by respective Consumers who has subscribed for it.
KAFKA USECASE DIAGRAM
TUNING KAFKA
When we talk about tuning Kafka, there are few configuration parameters to be considered. The most important configurations to improve performance are the one, which controls the disk flush rate.
We can also divide these configurations on component basis as well.
Most important configurations which needs to be taken care at Producer side are –
- Compression
- Batch size
- Sync or Async
The important configuration at Consumer side is –
- Fetch size
We can also have multiple Consumers running to fetch maximum data from partitioned topic available on Kafka Brokers.
Apache has maintained a very good documentation to describe all the configurations parameter and their definitions. To get the details please follow the link.
Production server configurations
Here are some the configurations parameter and their values –
There are many organizations running Kafka in their production and also they have provided default configuration to maximize Kafka performance. I’ll be using some of the existing configuration details along with some extra information to describe the configuration we need to take care in setting up the cluster configuration.
Kafka provides distributed, partitioned and replicated commit log-service with functionality of a messaging system.
A single Kafka broker is capable of handling hundreds of megabytes of reads and writes per second from thousands of clients.
Kafka is designed in such a way that it can be elastically and transparently expanded without any downtime. It means you can add new broker node at any point of time without shutting down your Kafka cluster.
All the messages are persisted on the disk and also has a replica of it within the cluster to avoid data loss at any point. Brokers are capable enough of handling terabytes of data without affecting the performance.
Kafka also provides distributed processing of messages and its cluster-centric design offers you strong durability and fault-tolerance. Kafka allows us to have partition of any topic which will help us to increase throughput of the system.
Most important configurations which needs to be taken care at Producer side are –
Property Name | Default | Description |
num.replica.fetchers | 1 | Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker. |
replica.fetch.max.bytes | 1048576 | The number of bytes of messages to attempt to fetch each partition in the fetch requests. The replicas are sent to the leader. |
replica.fetch.wait.max.ms | 500 | The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader. |
replica.high.watermark.checkpoint.interval.ms | 5000 | The frequency with which each replica saves its high watermark to disk to handle recovery. |
replica.socket.timeout.ms | 30000 | The socket timeout for network requests to the leader for replicating data. |
replica.socket.receive.buffer.bytes | 65536 | The socket receive buffer for network requests to the leader for replicating data. |
replica.lag.time.max.ms | 10000 | If a follower hasn’t sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead. |
replica.lag.max.messages | 4000 | If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead. |
controller.socket.timeout.ms | 30000 | The socket timeout for commands from the partition management controller to the replicas |
controller.message.queue.size | 10 | The buffer size for controller-to-broker-channels |
num.partitions | 8 | The default number of partitions per topic if a partition count isn’t given at topic creation time |
message.max.bytes | 1000000 | This is largest message size Kafka will allow to be appended to this topic. Note that if you increase this size you must also increase your consumer’s fetch size so they can fetch such large messages. |
auto.create.topics.enable | true | Enable auto creation of topic on the server. If this is set to true then attempts to produce data or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions. |
log.index.interval.bytes | 4096 | The byte interval at which we add an entry to the offset index. When executing a fetch request the server must do a linear scan for up to this many bytes to find the correct position in the log to begin and end the fetch. So setting this value to be larger will mean larger index files (and a bit more memory usage) but less scanning. However the server will never add more than one index entry per log append (even if more than log.index.interval worth of messages are appended). In general you probably don’t need to mess with this value. |
log.index.size.max.bytes | 10485760 | The maximum size in bytes we allow for the offset index for each log segment. Note that we will always pre-allocate a sparse file with this much space and shrink it down when the log rolls. If the index fills up we will roll a new log segment even if we haven’t reached the log.segment.bytes limit. This setting can be overridden on a per-topic basis |
log.flush.interval.ms | 10000 | The maximum time between fsync calls on the log. If used in conjuction with log.flush.interval.messages the log will be flushed when either criteria is met. |
log.flush.interval.messages | 20000 | The number of messages written to a log partition before we force a fsync on the log. Setting this lower will sync data to disk more often but will have a major impact on performance. We generally recommend that people make use of replication for durability rather than depending on single-server fsync, however this setting can be used to be extra certain. |
log.flush.scheduler.interval.ms | 2000 | The frequency in ms that the log flusher checks whether any log is eligible to be flushed to disk. |
log.retention.check.interval.ms | 300000 | The period with which we check whether any log segment is eligible for deletion to meet the retention policies. |
log.segment.bytes | 1073741824 | The log for a topic partition is stored as a directory of segment files. This setting controls the size to which a segment file will grow before a new segment is rolled over in the log. This setting can be overridden on a per-topic basis |
num.io.threads | 8 | The number of I/O threads that the server uses for executing requests. You should have at least as many threads as you have disks |
num.network.threads | 8 | The number of network threads that the server uses for handling network requests. You probably don’t need to change this. |
socket.request.max.bytes | 104857600 | The maximum request size the server will allow. This prevents the server from running out of memory and should be smaller than the Java heap size. |
socket.receive.buffer.bytes | 1048576 | The socket receive buffer for network requests |
socket.send.buffer.bytes | 1048576 | The SO_SNDBUFF buffer the server prefers for socket connections |
queued.max.requests | 16 | The number of requests that can be queued up for processing by the I/O threads before the network threads stop reading in new requests |
fetch.purgatory.purge.interval.requests | 100 | The purge interval (in number of requests) of the fetch request purgatory |
producer.purgatory.purge.interval.requests | 100 | The purge interval (in number of requests) of the producer request purgatory |
All the above configurations are broker side configuration. In most of the cases these configuration works well but there are few configurations, which you can modify as per the availability of the cluster environment and machine configuration which are listed below:
num.replica.fetchers
This configuration parameter defines the number of threads which will be replicating data from leader to the follower. Value of this parameter can be modified as per availability of thread. If we have threads available we should have more number of replica fetchers to complete replication in parallel.
replica.fetch.max.bytes
This parameter is all about how much data you want to fetch from any partition in each fetch request. It’s good to increase value for this parameter so that it helps to create replica fast in the followers.
replica.socket.receive.buffer.bytes
In case of less thread available for creating replica, we can increase the size of buffer. It will help to hold more data if replication thread is slow as compared to the incoming message rate.
num.partitions
This is the very important configuration which we should be taken care while having Kafka in live. As many partitions are there, we can have that level of parallelism and write data in parallel which will automatically increase the throughput.
Increasing number of partition can also slow down your performance and throughput if the system configuration is not capable of handling it.
Now the question that rises from the above statement is How??
Yes, this is true if system does not have sufficient threads or just have single disk then it does not make sense in creating lots of partition for better throughput. Creating more partition for a topic is directly dependent on available threads and disk.
num.io.threads
Setting value for I/O threads directly depends on how much disk you have in your cluster. These threads are used by server for executing request. We should have at least as many threads as we have disks.
Apart from these configurations there are few other factors which I have mentioned earlier in the blog like batch size and sync/async mode of message transfer.
When we think about batch size its always confusing what batch size will be optimal. Large batch size may be great to have high throughput but you might feel latency issue in that. So, we can conclude that latency and throughput is inversely proportional to each other.
But there are ways to have low latency with high throughput where we have to choose a proper batch-size. We can also use queue-time or refresh-interval to find the required right balance.
References:
No comments:
Post a Comment