Kafka Producers and Consumers in Rust: An In-Depth Overview
Written on
Chapter 1: Introduction to Rust and Kafka
Rust, a programming language that emerged in 2015, has rapidly gained traction as a viable alternative to established languages like C and C++ for high-performance applications. This surge in popularity can be attributed to its Ownership Based Resource Management (OBRM) model, which mandates adherence to Resource Acquisition Is Initialization (RAII) principles, alongside its statically typed and compiled characteristics.
For those eager to delve deeper into Rust's capabilities, resources such as "Rust by Example" and "The Rust Programming Language" are excellent starting points, both freely available thanks to community contributions.
Kafka, on the other hand, is an open-source platform designed for distributed event streaming, emphasizing high throughput and performance. While a comprehensive exploration of Kafka is beyond the scope of this discussion, it's essential to grasp that Kafka facilitates message passing between applications through a standalone service that manages queues known as topics.
To visualize Kafka's operation, consider a Python dictionary that associates strings with messages:
MESSAGE_QUEUE = {
"topic1": [message1_1, message1_2, message1_3, ...],
"topic2": [message2_1, message1_1, message2_2, ...],
"topic3": [message3_1, ...]
}
In this setup, a Kafka producer appends messages of any type to one or more queues, while a Kafka consumer subscribes to these topics to process incoming messages. Depending on its configuration, the consumer can choose to parse only new messages, the entire queue stored on disk, or a subset of the messages.
While Kafka's functionality extends beyond this introductory explanation, understanding these fundamentals is crucial for grasping its role in interprocess communication, which will be further elaborated in this article.
For clarity, this article assumes that readers have either set up their own Kafka cluster or have access to one, such as Confluent Cloud. Numerous online resources are available to assist with establishing a Kafka cluster.
Section 1.1: Kafka and Rust Integration
In contemporary systems requiring interprocess communication, message queuing systems like Kafka, RabbitMQ, and IBM MQ have become indispensable. Newer programming languages and technologies, such as Rust and Kafka, are particularly well-suited for applications that demand high-performance message passing with low latency and high throughput.
Subsection 1.1.1: Creating a Kafka Producer
To kick off any message-passing system, it is essential to create an application that sends messages to a message queue, commonly referred to as a producer. For this article, we will utilize the rust-rdkafka library to implement both producers and consumers.
The first step involves adding the necessary dependencies, rdkafka and tokio, to the Cargo.toml file. This can be done manually or by executing the following command:
cargo add rdkafka tokio
The tokio library is a crucial component as it enables asynchronous operations, which will be leveraged in our producer implementation.
use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
pub struct Producer {
future_producer: FutureProducer,
topic: String,
message_id: usize
}
The Producer struct will require three attributes: a FutureProducer to handle asynchronous message sending, a topic to specify where to publish messages, and a message_id to uniquely identify each sent message.
Next, we will implement two functions: a constructor to instantiate the Producer and a method to send messages to the specified Kafka topic.
impl Producer {
pub fn new(bootstrap_server: &str, message_timeout: &str, topic: &str) -> Producer {
let producer: &FutureProducer = &ClientConfig::new()
.set("bootstrap.servers", bootstrap_server)
.set("message.timeout.ms", message_timeout)
.create()
.expect("Producer creation error");
Producer { future_producer: producer.clone(), topic: String::from(topic), message_id: 0 }
}
pub async fn produce(&mut self, message_bytes: &str) -> bool {
let delivery_status = self.future_producer.send(
FutureRecord::to(self.topic.as_str())
.payload(&format!("Message: {}", message_bytes))
.key(&format!("Key: {}", self.message_id))
.headers(OwnedHeaders::new().insert(Header {
key: "header_key",
value: Some("header_value"),
})),
Duration::from_secs(0),
).await;
self.message_id += 1;
match delivery_status.err() {
None => true,
Some(err) => {
print!("Error sending message {0} on topic {1}: {2}", message_bytes, self.topic, err.0);
false
}
}
}
}
Two distinct producers can be instantiated to publish messages on different topics, enhancing code clarity.
mod producer;
use std::future::Future;
use producer::producer::Producer;
#[tokio::main]
async fn main() {
let price_updates = vec![
"AAPL 169.58", "TSLA 164.90", "IBM 189.14", "PARA 11.97", "INTC 38.71",
// Additional updates...
];
let trade_updates = vec![
"BUY AAPL 100 170.00",
"SELL PARA 453 11.23",
// Additional trades...
];
let server: &str = "127.0.0.1:9092";
let timeout: &str = "5000";
let mut price_producer = Producer::new(server, timeout, "ticker.topic");
let mut trade_producer = Producer::new(server, timeout, "trade.topic");
for update in price_updates {
match price_producer.produce(update).await {
true => println!("Price update sent successfully."),
false => println!("Failed to send message"),
}
}
for trade in trade_updates {
match trade_producer.produce(trade).await {
true => println!("Trade update sent successfully."),
false => println!("Failed to send message"),
}
}
}
This asynchronous main function demonstrates the creation of two lists of messages—trade_updates and price_updates—and utilizes dedicated producers for each topic, efficiently sending messages and logging the outcomes to the console.
Section 1.2: Implementing a Kafka Consumer
With messages now being published to Kafka topics, we need to develop a Kafka Consumer to ingest and process these messages. Below is a simple implementation using the rdkafka library.
use log::warn;
use rdkafka::{ClientConfig, Message};
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{CommitMode, Consumer as OtherConsumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::message::Headers;
pub struct Consumer {
consumer: StreamConsumer
}
impl Consumer {
pub fn new(topics: &[&str], broker: &str, kafka_group: &str) -> Consumer {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", kafka_group)
.set("bootstrap.servers", broker)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context(DefaultConsumerContext)
.expect("Error creating stream consumer.");
let consumer = Consumer { consumer };
consumer.consumer.subscribe(topics).expect("Can't subscribe to specified topics");
consumer
}
fn process_message(&self, topic: &str, payload: &str) {
match topic {
"ticker.topic" => println!("Price update received: {}", payload),
"trade.topic" => println!("Trade update received: {}", payload),
_ => println!("Message received on unknown topic {}: {}", topic, payload),
}
}
pub async fn start_processing(&self) {
loop {
match self.consumer.recv().await {
Err(e) => warn!("Kafka error: {}", e),
Ok(message) => {
let payload = match message.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
warn!("Error while deserializing message payload: {:?}", e);
""
}
};
self.process_message(message.topic(), payload);
self.consumer.commit_message(&message, CommitMode::Async).unwrap();}
};
}
}
}
The above code wraps around the StreamConsumer and subscribes to specified topics, continuously processing incoming messages. Each message's topic and payload are routed to a processing method, where the appropriate action is taken based on the topic.
In the following output, you can observe how the Consumer acknowledges received messages:
Price update received: AAPL 169.58
Price update received: TSLA 164.90
Trade update received: BUY AAPL 100 170.00
// More received messages...
Conclusion
In this article, we explored the implementation of Kafka producers and consumers in Rust utilizing the rust-rdkafka crate. The discussion highlighted how producers can write to various topics and how consumers can process messages from multiple sources while managing the processing logic accordingly.
This video provides a comprehensive guide on setting up Kafka in Rust, including producer and consumer implementations.
Meet Iggy: A reimagined Kafka in 2023, powered by Rust. This video showcases the latest advancements in Kafka technology.