Social Media Means
Photo by Monstera Pexels Logo Photo: Monstera

Is Kafka a scheduler?

In summary, the goal of the kafka message scheduler is to publish kafka messages to a specific topic with a specific id and payload in the future. A schedule is just a kafka message with specific headers and a payload.

How important is social media today?
How important is social media today?

One of the most important impacts of social media in today's world lies within its ability to distribute information to the whole world. With most...

Read More »
What's the lowest paying job?
What's the lowest paying job?

25 of the Lowest Paying Jobs Gambling and Sports Book Writers and Runners. Gambling Change Persons and Booth Cashiers. Parking Lot Attendants. Home...

Read More »

Kafka message scheduler

Introduction

When i was working for MYTF1 we were in charge of publishing content to tf1.fr website. The contents are mainly videos, programs, and articles of the broadcasted programs on tf1 tv channel. Initially, this data was “pushed” with batch programs that were in charge of exporting all the needed information available in the CMS database to all the cache repositories. Then we moved to a streaming events system with Apache Kafka, meaning that every time information is updated or created in the CMS, the data is processed and pushed immediately. So we have many small programs (kafka consumers) which are triggered by new messages published in kafka topics. This works perfectly for real-time events, but for scheduled events, it was not straight forward and we need to address this type of event scheduled for the near or far future.

Github

https://github.com/etf1/kafka-message-scheduler

Goal

The challenge was to store somewhere the data and send it to a specific topic (kafka message) to trigger the consumers in charge of processing this data.

Design

Of course, we could use some kind of scheduler framework/tool, not very optimized but it could work. But we didn’t want to use any external database or system for the scheduling of these messages. This can work but the memory footprint will be big, actually, we only need to store in memory the events for the current day and scan the topic every day to update the list of events to be scheduled for the current day. This was the first draft design of the kafka message scheduler. In summary, the goal of the kafka message scheduler is to publish kafka messages to a specific topic with a specific id and payload in the future. A schedule is just a kafka message with specific headers and a payload. The schedule id should be unique and if you want to change the payload of your message, you just have to publish a new version of your message in the scheduler’s topic. The topic can have many versions for your schedule, the scheduler will always take the latest one. There is a client lib written in GO for preparing a kafka message to be a scheduled message: https://github.com/etf1/kafka-message-scheduler/tree/main/clientlib The scheduler reads the topic from the beginning at startup and every day at 00:00 am and it stores in memory the schedules planned for the current day. It will also watch new incoming messages and process only messages for the current day by adding, updating, or deleting schedules in the memory store. Once a schedule is triggered, the schedule is deleted in the kafka topic with a tombstone message (nil payload). So there is no other storage than the scheduler’s kafka topic, no external database, etc… All schedules are stored in a kafka topic.

Technical implementation

Scheduler message

First, let’s take a look at a schedule message stored in the scheduler topic.

Headers:

scheduler-epoch: 1893456000

scheduler-target-topic: online-videos

scheduler-target-key: vid1

customer-header: dummy

Timestamp: 1607918336

Key: vid1-online

Value: "video 1"

As you can see it is a regular kafka message but with some specific headers.

These headers are:

Who is the king of insta?
Who is the king of insta?

Dan Bilzerian Dan Bilzerian, known to many as the 'King of Instagram' (he has over 6 million followers) is one of the internet's most curious...

Read More »
Is there anything better than Instagram?
Is there anything better than Instagram?

When Flickr began its journey in 2007, and with no Instagram around, it was the most popular site for photo-sharing. Though Flickr is not at the...

Read More »

scheduler-epoch: the schedule meaning when the message should be produced in the target topic, expressed in EPOCH (number of seconds since Thursday 1 January 1970 00:00:00) scheduler-target-topic: the target topic to which the message should be produced at the scheduled date scheduler-target-key: the key for the produced message to the target topic, is generally different from the scheduler message The key of the schedule message should be unique per event type and the payload is your payload to be produced. Now let’s take a look at the message produced for the previously described schedule message:

Headers:

scheduler-timestamp: 1607918336

scheduler-key: vid1-online

scheduler-topic: schedules

customer-header: dummy

Key: vid1

Value: "video 1"

scheduler-timestamp: the scheduler message timestamp

scheduler-key: the scheduler message key

scheduler-topic: the scheduler message topic

As you can see, the key has changed to match the target key specified in the original scheduler message but the payload was kept.

Scheduler topic

As explained before the scheduler datastore is a kafka topic. In this topic, each schedule should have a unique key but multiple messages with the same key can be present. Why? because if the payload of the schedule message changes in time, the new version should be produced in the topic. For deleting a schedule you simply need to produce a tombstone message which is a message with a nil payload.

topic used as store for the scheduler

As you can see in the diagram schedule S1 has 3 versions and schedule S2 has been deleted, so the scheduler will only trigger schedule version V3 of schedule S1.

GO program

The kafka message scheduler is written in GO, and is using the official GO client library from confluent (https://github.com/confluentinc/confluent-kafka-go). We have decided to use the official library because with kafka, the client has a big part of logic regarding failover and recovery, and even if the library is based on C, it was, from our point of view, a wiser choice.

scheduler main components

As shown above, the scheduler has 5 main components which will describe below.

Missed event handler

It is actually in charge of reading the scheduler topic from the beginning and determine if there are any missed schedules. It is done by consuming the topic and storing in a map all found schedules, it will then determine if there are any missed schedules. When a schedule is triggered a tombstone message is produced in the topic, so if this tombstone is missing then it is a missed schedule. It cannot process the message as they are read, it has to buffer the messages because a delete schedule or a new version of the schedule can be present after the first read. The second tricky part is to determine when to stop this failover recovery because if there is a continuous flow of new messages in the topic, the process will never stop. The idea here is to stop when the message topic creation timestamp is lower than the current time minus 1 second. This missed event handler is useful if the scheduler is done for some time in the day or if the scheduler has been restarted.

missed events processing

How much does 2 million followers on Instagram pay?
How much does 2 million followers on Instagram pay?

According to USA Today, an influencer with 10,000 to 50,000 active fans can make a few thousand per post. Instagram influencers with up to 1...

Read More »
How do I become an online recruiter?
How do I become an online recruiter?

Requirements and Qualifications Associate degree in human resources or a related field, bachelor's degree preferred. Experience as a technical or...

Read More »

Live event handler

It is in charge of processing live schedules incoming in the topic. If it is for the current day they are processed, or if it impacts a schedule already planned for the current day. It is notifying for schedules that should be triggered in the current day.

Timers

It stores all the schedules which are GO timers time.AfterFunc(...) , it is a basic map and is in charge of creating/updating/deleting the map according to the notification send by the missed and live event handler.

Partition controller

Its role is to watch the partition assignment of the kafka consumers used in the scheduler and notify the Timers. This feature is available in the confluent client library. "go.application.rebalance.enable": true if you enable this option the automatic assignment to partitions is disabled, you are simply notified that a new set of partitions is assigned to you. You can do some processing and call Assign(partitions) to effectively assign your GO program. Why do we need this? imagine that you have a single instance of the scheduler and the scheduler topic has 3 partitions, all of these three partitions will be assigned to this single instance. Let say that you want to popup a second instance, kafka will assign partitions to these 2 new instances maybe 2 partitions to the first one and 1 to the second one. Knowing that you will be able to remove from the Timers store the schedules which are not handled by your instance.

Ticker

The ticker will be in charge of notifying the timers to reset its database every night at midnight to process the schedules for the new day. It will also reset the offset to zero for all scheduler kafka consumers. So a new Timers store will be populated with the schedules of the current day.

Main loop

Scheduler main loop is a classical GO for select on 3 channels: timer, missed, and store events. Store events are coming from a consumer kafka, missed events also coming from a consumer kafka if they are determined as missed events, and the timer events are coming from triggered schedules from the timers store. We are closing the scheduler is the store events channel is close closed. Last but not least the REST API of the scheduler was introduced lately and exposes the current configuration of the scheduler and also all schedules currently planned ready to be triggered. It is used by the kafka message scheduler admin which is a GUI for managing schedulers (https://github.com/etf1/kafka-message-scheduler-admin)

Summary

As you can see the kafka message scheduler is a relatively simple component but tricky at the same time “The devil is in the detail”, so if you need more information please check the github repository (https://github.com/etf1/kafka-message-scheduler).

I hope this article was useful and interesting.

Thank you

What do introverts do all day?
What do introverts do all day?

Introverts enjoy activities they can do alone or with just a few others. So, it's not surprising that so many introverted, gifted children love to...

Read More »
What jobs make you rich quickly?
What jobs make you rich quickly?

High paying jobs by starting salary Investment banking careers. ... Management consulting careers. ... IT/telecoms careers. ... Engineering...

Read More »
What social media does Japan use?
What social media does Japan use?

The most-used Social Media Platforms As you can see from the graph above, as in the 2022 report, the most-used social media platform in Japan is...

Read More »
How much can you make with 5000 followers?
How much can you make with 5000 followers?

According to CNBC, someone with 5,000 Instagram followers and 308 sponsored posts a year can make $100,000. How much money you make will depend on...

Read More »