Study Notes on Apache Kafka — II

vahdet
7 min readMay 3, 2022
Another customized Kafka logo

Hey hi 👋 Take a look at Part I on Kafka fundamentals if you please.

Kafka Client Applications

From its point of view as a hub, both producer and consumer applications are clients of Kafka.

A quick note about version compatibility before we continue: Kafka is bi-directionally compatible which means no matter the version of the Kafka cluster, the clients (producers & consumers) is expected to operate as expected. So, it’s presumably ok to keep the client versions up-to-date regardless of the cluster.

Producers

Although creating a message via mere CLI commands of Kafka, the term producer generally represent the APIs packaged as language libraries the clients of a Kafka cluster creates/pushes events. The (intrinsic partitioner of) producer is responsible of guiding an incoming message of a topic to a corresponding partition and, naturally, to the leader broker of that partition.

(We have discussed the role of record key in the previous volume but, we can give detailed information on it here since it is set by the producers)

Record Keys: In Kafka a record can be produced with or without a key. If present, the key can be any value e.g. integer, string etc. The main difference of absence or presence of the record key appears when determining the partition in which the record would be pushed:

No record key 👉 The producer assigns the partition on a round-robin fashion.

Record key 👉 The hash of the key is used and the record is added to the same partition used and will be used for the event records with the same key.

In other words and taking a more business perspective, the record key is a means for grouping and timely ordering records (i.e. not an identifier per record as it may feel at first sight). The time ordering is an important theme here: especially if the records are subject to consumption patterns like “get last 10 events from this IoT device sorted by time”, without record keys to distinguish the device it would be a genuine pain in the neck to get the desired result. If we used a record key, we would guarantee sending them on the same partition, and as a result, would easily group and collect them.

An exception to this smooth ride is new partitions are added on a topic: The hashes of the new records may map to the new partition. At this point, to go deeper on using hashes to find target servers, you may want to check consistent hashing later on.

As long as the Kafka cluster successfully elects the new partition leader brokers, the producers can be considered capable of routing the messages to the new leader of that particular message. The producers are . In case of a serious deep-dive on this issue, consider taking time to read this article: https://www.confluent.io/blog/how-to-survive-a-kafka-outage/

A producer has a handy configuration called acks to check the durability of the records it creates. For example:

  • acks=0 stands for a fire-and-forget behavior. With this value, a producer cannot even know if the servers have received the request as it won’t wait for a message no matter successful or not. Although the most performant option, should only be used when the loss of event data is simply ok.
  • acks=1 is the default. It guarantees the record to be saved on the local log of the partition leader broker and returns an acknowledgement as such. This use case generally seeks for a sweet spot between improved performance and durability, but just be aware of a possibel data loss when the broker just fails after receiving our record and before having time to spread it to the In-Sync Replicas (ISR)
  • And there is also acks=all, in which before returning an acknowledgement the leader waits for all ISR inform it that they’ve received and saved the message. This method has the highest durability and is analogous to strong consistency in distributed systems (like in all of relational databases thanks to ACID guarantee-set, or some modes of NoSQL databases e.g. MongoDB)

Now to create a quite fair starter configuration, we may set the broker settings as follows to have a fail-safe one:

  • enable.idempotence=true to make sure no duplications of a message will be found on the system i.e. exactly one. This is the default configuration, anyway.
  • min.insync.replicas=2 may make sense for a case as stated in the linked doc “A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of “all”. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.” The default value is 1.

As always, it is the best to consider your use cases before getting set to pump the events directly with this configuration.

The compression setting is defined via the producer config, too. On the compression.type field, if specified, the valid values are gzip, lz4, snappy, zstd. It is almost always reasonable to enable compression as it enhances throughput and helps mitigate latency. Especially the lz4,snappy, and the youngest sibling, zstd options are worth hazarding a try.

Another good news is, changing the value of that field does not require any on neither the cluster nor the consumer configs.

Another sweet producer setting is about batching: By default, Kafka producers just try to pipe the records to the brokers right when they get one. However, we have two values to change this behavior into rather a buffer:

  • linger.ms defaulting to 5ms, increasing the value here would add an artificial delay and may be used to reduce the network load by enabling sending waiting a bit more to collect event records.
  • batch.size is the upper size limit of record batches. The default 16KB can be tried and replaced by 32KB or even 64KB to unlock the possibility of larger batches. Bear in mind: after waiting for a duration of linger.ms, if the batch is not full; it is sent to the broker anyway.

Especially when applied along with compression, larger batch sizes may output more valuable results (i.e. throughput increased for some latency and CPU sacrifice), so you know, just fine-tune the values with your use case.

Consumers (and definitely Consumer Groups)

Consumers are, again code pieces used to read data from a specific topic by subscribing to one.

Since topics have no other ID etc. apart from their names, this is also a good point to remind the importance of naming the topics in a meaningful & definitive fashion.

When talking about consumers and going further in the mechanism and the configuration; right off the bat, we have to come up with the consumer group concept: Kafka consumers, more often than not, are used as a member of consumer group. Even when we start with one consumer instance, it would make sense to encapsulate it in a consumer group for scalability. (Indeed, scalability is the key idea for having a group concept: A single consumer may well run out of its resources esp. during producers top the load, or parallel processing is desired, we utilize a consumer group of multiple consumers).

For further details on consuming, check this lovely chapter from Kafka the Definitive book: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html.

  • When the consumer count in a consumer group is smaller than the partition count of the subscribed topic, each consumer read from one or more partitions.
  • When the consumer count is equal or bigger than the partition count in the topic, there exists a 1–1 relationship of consumers and topics.
  • When the consumer count exceeds the partitions, the excess consumers just sit back and do nothing while the rest keep the 1–1 mapping.

To scale out the data processing, we can just add up consumers in a consumer group and we can well say state the ideal case is when consumer count is equal to the partition count.

There could (or probably should) be multiple consumer groups. Unlike a typical queue, a message is not removed from Kafka when it is read by a consumer group and we can consume the records from multiple services and applications. This consumer-group-per-application helps us read the same event from multiple distinct domains.

We have talked about partition offset a few times. Kafka automatically audits and keeps track of the real-time offsets of separate consumers with a common administrative topic called __consumer_offsets. When a consumer reads from a partition, it commits the offset and is helpful when a consumer is falls apart: If that consumer comes back alive it can find where it left (just like a bookmark) and continue consuming from that point on.

Question to Ask: What happens if another consumer from the same consumer group starts reading from that partition?

Although the operation is handled intrinsically by Kafka itself, the timing details of committing the offset is actually depends on the consumer and defined by consumer delivery semantics options as follows:

  • At most once: The consumer acknowledges the receipt of the record right when it fetches from the topic. The downside is: if the processing of the very record, there is no chance to get it again. Most of the time, this option is not favorable.
  • At least once: This is the typically preferred one: The consumer only acknowledges the receipt and demands committing its offset when it is processed successfully, not right at the time received. Although it enables consuming the record again in case of an error, we need the processing system to be strictly idempotent as passing the same message more than once would be well expected.
  • Exactly once: This shiny option is only possible if other inside-Kafka operations would be feed via Kafka Streams API. Otherwise, if the consumer is a regular, out-of-Kafka-system application this option is merely acts like at least once above and keep idempotent & keep discarding dupes or whatever.

--

--

vahdet

I always leave the bio empty -this is an exception.