In this article by Saurabh Minni, author of Apache Kafka Cookbook, we will cover the following topics:
- Configuring basic settings
- Configuring threads and performance
- Configuring log settings
- Configuring replica settings
- Configuring the ZooKeeper settings
- Configuring other miscellaneous parameters
(For more resources related to this topic, see here.)
This article explains the configurations of a Kafka broker. Before we get started with Kafka, it is critical to configure it to suit us best. The best part about Kafka is that it's highly configurable. Though, most of the time you will be good to go with the default settings in place, when dealing with scale and performance, you might want to get your hands with a configuration that suits your application best.
Configuring basic settings
Let's configure the basic settings for your Apache Kafka broker.
Getting ready
I believe you have already Kafka installed. Make a copy of the server.properties file from the config folder. Now, let's get cracking with your favorite editor.
How to do it...
Open your server.properties file:
- The first configuration that you need to change is broker.id:
broker.id=0
- Next, give a host name to your machine:
host.name=localhost
- You also need to set the port number: to listen to.
port=9092
- Lastly, the directory for data persistence is as follows:
log.dirs=/disk1/kafka-logs
How it works…
With these basic configuration parameters in place, your Kafka broker is ready to be setup. All you need to do is pass on this new configuration file when you start the broker as a parameter. Some of the important configurations used in the configuration files are explained here:
- broker.id: This should be a nonnegative integer ID. It should be unique for a cluster as it is used for all intents, purposes, and names of brokers. This also allows the broker to be moved to a different host and/or port without additional changes on the side of a consumer. Its default value is 0.
- host.name: The refers to the default value, which is null. If it's not specified, Kafka will bind to all interfaces in a system. If it's specified, it will bind only to a particular address. If you want clients to connect only to a particular interface, it is a good idea to specify the host name.
- port: This defines the port number that the Kafka broker will be listening to, to accept client connections.
- log.dirs: This tells the broker the directory where it should store files for the persistence of messages. You can specify multiple directories here using commas that separate locations. The default value for this is /tmp/kafka-logs.
There's more…
Kafka also lets you specify two more parameters, which are very interesting:
- advertised.host.name: This is the hostname that is given out to producers, consumers, and other brokers to connect to. Usually, this is the same as host.name and you need not specify it.
- advertised.port: This specifies the port that other producers, consumers, and brokers need to connect to. If not specified, it uses the one mentioned in the port configuration parameters.
The real use case of the preceding parameters is when you make use of bridged connections where your internal host.name and port number might be different from one that external parties need to connect to.
Configuring threads and performance
When using Kafka, these settings are something you need not modify. However, when you want to extract every last bit of performance from your machines, this comes in handy.
Getting ready
You are all set with your broker properties file and are set to edit it in your favorite editor.
How to do it...
Open your server.properties file.
- Change message.max.bytes:
message.max.bytes=1000000
- Set the number of network threads:
num.network.threads=3
- Set the number of IO threads:
num.io.threads=8
- Set the number of threads that perform background processing:
background.threads=10
- Set the maximum number of requests to be queued up:
queued.max.requests=500
- Set the send socket buffer size:
socket.send.buffer.bytes=102400
- Set the receive socket buffer size:
socket.receive.buffer.bytes=102400
- Set the maximum request size:
socket.request.max.bytes=104857600
- Set the number of partitions:
num.partitions=1
How it works…
These network and performance configurations are set to an optimal level for your application. You might need to experiment a little to come up with an optimal configuration. Here are some explanations for these confugurations:
- message.max.bytes: This sets the maximum size of the message that a server can receive. This should be set in order to prevent any producer from inadvertently sending extra large messages and swamping consumers. The default size should be set to 1000000.
- num.network.threads: This sets the number of threads running to handle a network request. If you have too many requests coming in, you need to change this value. Else, you are good to go in most use cases. The default value for this should be set to 3.
- num.io.threads: This sets the number of threads that are spawned for IO operations. This should be set to at least the number of disks that are present. The default value for this should be set to 8.
- background.threads: This sets the number of threads that run various background jobs. These include deleting old log files. The default value is 10 and you might not need to change this.
- queued.max.requests: This sets the size of the queue that holds pending messages, while others are processed by IO threads. If the queue is full, the network threads will stop accepting any more messages. If you have erratic loads in your application, you need to set it to some value at which this does not throttle.
- socket.send.buffer.bytes: This sets the SO_SNDBUFF buffer size, which is used for socket connections.
- socket.receive.buffer.bytes: This sets the SO_RCVBUFF buffer size, which is used for socket connections.
- socket.request.max.bytes: This sets the maximum request size for a server to receive. This should be smaller than the Java heap size that you have set.
- num.partitions: This sets the number of default partitions for any topic you create without explicitly mentioning any partition size.
There's more
You might also need to configure your Java installation for maximum performance. This includes settings for heap, socket size, and so on.
Configuring log settings
Log settings are perhaps the most important configurations that you need to change based on your system requirements.
Getting ready
Just open the server.properties file in your favorite editor.
How to do it...
Open your server.properties file. Here are the default values for it:
- Change the log.segment.bytes value:
log.segment.bytes=1073741824
- Set the log.roll.{ms,hours} value:
log.roll.{ms,hours}=168 hours
- Set the log.cleanup.policy value:
log.cleanup.policy=delete
- Set the log.retention.{ms,minutes,hours} value:
log.retention.{ms,minutes,hours}=168 hours
- Set the log.retention.bytes value:
log.retention.bytes=-1
- Set the log.retention.check.interval.ms value:
log.retention.check.interval.ms= 30000
- Set the log.cleaner.enable value:
log.cleaner.enable=false
- Set the log.cleaner.threads value:
log.cleaner.threads=1
- Set the log.cleaner.backoff.ms value:
log.cleaner.backoff.ms=15000
- Set the log.index.size.max.bytes value:
log.index.size.max.bytes=10485760
- Set the log.index.interval.bytes value:
log.index.interval.bytes=4096
- Set the log.flush.interval.messages value:
log.flush.interval.messages=Long.MaxValue
- Set the log.flush.interval.ms value:
log.flush.interval.ms=Long.MaxValue
How it works…
Here is the explanation of log settings:
Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at €14.99/month. Cancel anytime
- log.segment.bytes: This defines the maximum segment size in bytes. Once a segment reaches a particular size, a new segment file is created. A topic is stored as a bunch of segment files in a directory. This can also be set on a per topic basis. Its default value is 1 GB.
- log.roll.{ms,hours}: This sets the time period after which a new segment file is created even if it has not reached the required size limit. This setting can also be set on a per topic basis. Its default value is 7 days.
- log.cleanup.policy: The value for this can be either deleted or compacted. With the delete option set, log segments are deleted periodically when it reaches its time threshold or size limit. If a compact option is set, log compaction will be used to clean up obsolete records. This setting can be set on a per topic basis.
- log.retention.{ms,minutes,hours}: This sets the amount of time that logs segments are retained. This can be set on a per topic basis. The default value for this is 7 days.
- log.retention.bytes: This sets the maximum number of byte logs per partition that are retained before they are deleted. This value can be set for a per topic basis. When either of the log time or size limits are reached, segments are deleted.
- log.retention.check.interval.ms: This sets the time interval at which logs are checked for deletion to meet retention policies. The default value for this is 5 minutes.
- log.cleaner.enable: For log compaction to be enabled, this has to be set as true.
- log.cleaner.threads: This sets the number of threads that work to clean logs for compaction.
- log.cleaner.backoff.ms: This defines the interval at which logs check if any other logs need cleaning.
- log.index.size.max.bytes: This settings sets the maximum size allowed for the offset index of each log segment. This can be set for per topic basis as well.
- log.index.interval.bytes: This defines the byte interval at which a new entry is added to the offset index. For each fetch request, the broker performs a linear scan for a particular number of bytes to find the correct position in the log to begin and end a fetch. Setting this as a larger value will mean larger index files (and a bit more memory usage) but less scanning.
- log.flush.interval.messages: This is the number of messages that are kept in memory till they're flushed to the disk. Though this does not guarantee durability, it gives finer control.
- log.flush.interval.ms: This sets the time interval at which the messages are flushed to the disk.
There's more
Some other settings are listed at http://kafka.apache.org/documentation.html#brokerconfigs.
See also
- More on log compassion is available at http://kafka.apache.org/documentation.html#compaction.
Configuring replica settings
You will also want set up a replica for reliability purposes. Let's see some of the important settings that you need to handle for replication to work best for you.
Getting ready
Open the server.properties file in your favorite editor.
How to do it...
Open your server.properties file. Here are default values for the settings:
- Set the default.replication.factor value:
default.replication.factor=1
- Set the replica.lag.time.max.ms value:
replica.lag.time.max.ms=10000
- Set the replica.lag.max.messages value:
replica.lag.max.messages=4000
- Set the replica.fetch.max.bytes value:
replica.fetch.max.bytes=1048576
- Set the replica.fetch.wait.max.ms value:
replica.fetch.wait.max.ms=500
- Set the num.replica.fetchers value:
num.replica.fetchers=1
- Set the replica.high.watermark.checkpoint.interval.ms value:
replica.high.watermark.checkpoint.interval.ms=5000
- Set the fetch.purgatory.purge.interval.requests value:
fetch.purgatory.purge.interval.requests=1000
- Set the producer.purgatory.purge.interval.requests value:
producer.purgatory.purge.interval.requests=1000
- Set the replica.socket.timeout.ms value:
replica.socket.timeout.ms=30000
- Set the replica.socket.receive.buffer.bytes value:
replica.socket.receive.buffer.bytes=65536
How it works…
Here is the explanation of the preceding settings:
- default.replication.factor: This sets the default replication factor for automatically created topics.
- replica.lag.time.max.ms: This is time period within which if a leader does not receive any fetch request, its moved out of in-sync replicas and is treated as dead.
- replica.lag.max.messages: This is maximum number of messages a follower can be behind the leader by before it is considered dead and not in-sync.
- replica.fetch.max.bytes: This sets the maximum number of bytes of data that a follower will fetch in a request from its leader.
- replica.fetch.wait.max.ms: This sets the maximum amount of time for the leader to respond to a replica's fetch request.
- num.replica.fetchers: This specifies the number of threads used to replicate messages from the leader. Increasing the number of threads increases the IO rate to a degree.
- replica.high.watermark.checkpoint.interval.ms: This specifies the frequency with which each replica saves its high watermark to disk for recovery.
- fetch.purgatory.purge.interval.requests: This sets the fetch request purgatory's purge interval. This purgatory is the place where the fetch requests are kept on hold till they can be serviced.
- producer.purgatory.purge.interval.requests: This sets the producer request purgatory's purge interval. This purgatory is the place where the producer requests are kept on hold till they have been serviced.
There's more
Some other settings are listed at http://kafka.apache.org/documentation.html#brokerconfigs.
Configuring the ZooKeeper settings
ZooKeeper is used in Kafka for cluster management and to maintain the details of topics.
Getting ready
Just open the server.properties file in your favorite editor.
How to do it…
Open your server.properties file. Here are the default values for the settings:
- Set the zookeeper.connect property:
zookeeper.connect=127.0.0.1:2181,192.168.0.32:2181
- Set the zookeeper.session.timeout.ms property:
zookeeper.session.timeout.ms=6000
- Set the zookeeper.connection.timeout.ms property:
zookeeper.connection.timeout.ms=6000
- Set the zookeeper.sync.time.ms property:
zookeeper.sync.time.ms=2000
How it works…
Here is the explanation of these settings:
- zookeeper.connect: This is where you specify the ZooKeeper connection string in the form of hostname:port. You can use comma-separated values to specify multiple ZooKeeper nodes. This ensures reliability and continuity of Kafka clusters even in the event of a ZooKeeper node being down. ZooKeeper allows you to use the chroot path to make a particular Kafka data available only under a particular path. This enables you to have the same ZooKeeper clusters support multiple Kafka clusters. Here is the method to specify connection a string in this case:
host1:port1,host2:port2,host3:port3/chroot/path
The preceding statement puts all the cluster data in the /chroot/path path. This path must be created prior to starting Kafka clusters and users must use the same string.
- zookeeper.session.timeout.ms: This specifies the time within which if the heartbeat from a server is not received, then it is considered dead. The value for this must be carefully selected because if this heartbeat has too long an interval, it will not be able to detect a dead server in time and also lead to issues. Also, if the time period is too small, a live server might be considered dead.
- zookeeper.connection.timeout.ms: This specifies the maximum connection time that a client waits to accept a connection.
- zookeeper.sync.time.ms property: This specifies the time period by which a ZooKeeper follower can be behind its leader
- The ZooKeeper management details from the Kafka perspective are highlighted at http://kafka.apache.org/documentation.html#zk.
- You can find ZooKeeper at https://zookeeper.apache.org/
See also
Configuring other miscellaneous parameters
Besides the configurations mentioned previously, there are some other configurations that also need to be set.
Getting ready
Open the server.properties file in your favorite editor. We will look at the default values of the properties in the following section.
How to do it...
- Set the auto.create.topics.enable property:
auto.create.topics.enable=true
- Set the controlled.shutdown.enable property:
controlled.shutdown.enable=true
- Set the controlled.shutdown.max.retries property:
controlled.shutdown.max.retries=3
- Set the controlled.shutdown.retry.backoff.ms property:
controlled.shutdown.retry.backoff.ms=5000
- Set the auto.leader.rebalance.enable property:
auto.leader.rebalance.enable=true
- Set the leader.imbalance.per.broker.percentage property:
leader.imbalance.per.broker.percentage=10
- Set the leader.imbalance.check.interval.seconds property:
leader.imbalance.check.interval.seconds=300
- Set the offset.metadata.max.bytes property:
offset.metadata.max.bytes=4096
- Set the max.connections.per.ip property:
max.connections.per.ip=Int.MaxValue
- Set the connections.max.idle.ms property:
connections.max.idle.ms=600000
- Set the unclean.leader.election.enable property:
unclean.leader.election.enable=true
- Set the offsets.topic.num.partitions property:
offsets.topic.num.partitions=50
- Set the offsets.topic.retention.minutes property:
offsets.topic.retention.minutes=1440
- Set the offsets.retention.check.interval.ms property:
offsets.retention.check.interval.ms=600000
- Set the offsets.topic.replication.factor property:
offsets.topic.replication.factor=3
- Set the offsets.topic.segment.bytes property:
offsets.topic.segment.bytes=104857600
- Set the offsets.load.buffer.size property:
offsets.load.buffer.size=5242880
- Set the offsets.commit.required.acks property:
offsets.commit.required.acks=-1
- Set the offsets.commit.timeout.ms property:
offsets.commit.timeout.ms=5000
How it works…
An explanation of the settings is as follows.
- auto.create.topics.enable: Setting this value to true will make sure that if you fetch metadata or produce messages for a nonexistent topic, it will automatically be created. Ideally, in a production environment, you should set this value to false.
- controlled.shutdown.enable: This is set to true to make sure that when shutdown is called on the broker, if it's the leader of any topic, then it gracefully moves all leaders to a different broker before it shuts down. This increases the availability of the system overall.
- controlled.shutdown.max.retries: This sets the maximum number of retries that the broker makes to perform a controlled shutdown before performing an unclean one.
- controlled.shutdown.retry.backoff.ms: This sets the backoff time between controlled shutdown retries.
- auto.leader.rebalance.enable: If this is set to true, the broker will automatically try to balance the leadership of partitions among other brokers by periodically giving leadership to the preferred replica of each partition if it's available.
- leader.imbalance.per.broker.percentage: This sets the percentage of leader imbalance that's allowed per broker. The cluster will rebalance the leadership if this ratio goes above the set value.
- leader.imbalance.check.interval.seconds: This defines the time period for checking leader imbalance.
- offset.metadata.max.bytes: This defines the maximum amount of metadata allowed to the client to be stored with their offset.
- max.connections.per.ip: This sets the maximum number of connections that the broker accepts from a given IP address.
- connections.max.idle.ms: This sets the maximum time till which the broker will be idle before it closes a socket connection
- unclean.leader.election.enable: This is set to true to allow replicas that are not in-sync replicas (ISR) in order to be allowed to become the leader. This can lead to data loss. This is the last option for the cluster, though.
- offsets.topic.num.partitions: This sets the number of partitions for the offset commit topic. This cannot be changed post deployment, so its suggested that the number be set to a higher limit. The default value for this is 50.
- offsets.topic.retention.minutes: This sets offsets that are older than present time be marked for deletion. Actual deletion occurs when a log cleaner run the compaction of an offset topic.
- offsets.retention.check.interval.ms: This sets the time interval for the checking of stale offsets.
- offsets.topic.replication.factor: This sets the replication factor for the offset commit topic. The higher the value, the higher the availability. If at the time of creation of an offset topic, the number of brokers is lower than the replication factor, the number of replicas created will be equal to the brokers.
- offsets.topic.segment.bytes: This sets the segment size for offset topics. This, if kept low, leads to faster log compaction and loads.
- offsets.load.buffer.size: This sets the buffer size that's to be used for reading offset segments into offset manager's cache.
- offsets.commit.required.acks: This sets the number of acknowledgements that are required before an offset commit can be accepted.
- offsets.commit.timeout.ms: This sets the time after which an offset commit will be performed in case the required number of replicas have not received the offset commit.
See also
There are more broker configurations that are available. Read more about them at http://kafka.apache.org/documentation.html#brokerconfigs.
Summary
In this article, we discussed setting basic configurations for the Kafka broker, configuring and managing threads, performance, logs, and replicas. We also discussed ZooKeeper settings that are used for cluster management and some miscellaneous parameter settings.
Resources for Article:
Further resources on this subject: