Reliability: It is distributed, partitioned, replicated and fault tolerance.
Scalability: It scales easily without down time..
Durability: It uses Distributed commit log which means messages persists on disk as fast as possible, hence it is durable..
Performance: It has high throughput for both publishing and subscribing messages. It maintains stable performance even many TB of messages are stored.
👉 High performance + Zero downtime + Zero data loss.
|
|
|
|
|
|
Message created with key, value, and target topic.
|
|
|
|
|
|
|
|
|
|
|
|
|
KRaft metadata tells producer which broker hosts the partition leader.
|
|
|
|
|
Message appended sequentially to the partition log file.
|
|
|
|
|
|
|
|
|
|
|
|
Followers (on other brokers) copy the message for fault tolerance.
Joins a consumer group and fetches data from its assigned partition(s).
After successful processing, consumer commits offset (progress checkpoint).
Consumers can re-read messages by resetting their offsets if needed.
Kafka stores data as immutable logs, making it replayable.
KRaft handles all metadata and leader coordination (no ZooKeeper).
Offsets give precise control over what each consumer has read.
Replication ensures fault tolerance — one leader, multiple followers.
Producers → Brokers → Consumers is a continuous streaming pipeline.
Multiple Topics: Kafka holds many logical data streams — e.g. `user-activity`, `transactions`, `metrics`.
Partitions per Topic: Each topic is split into partitions for parallelism and scalability.
Leaders and Followers: Each partition has one leader broker (handles reads/writes) and follower replicas (for fault tolerance).
Producers: Choose partitions (usually by hashing key or round-robin).
Consumers: Organized into consumer groups — each group gets its own copy of the topic’s data stream.
Parallel Consumption: Within a group, each partition is read by exactly one consumer, allowing parallel processing.
Replication: Follower brokers maintain redundant copies for recovery and durability.
KRaft Controller Quorum: Replaces ZooKeeper — handles leader election, metadata, and configs internally.
Easiest method: Docker Compose
# docker-compose.yml
version: '3'
services:
kafka:
image: bitnami/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
- ALLOW_PLAINTEXT_LISTENER=yes
Then run:
docker compose up -d
This starts a single-node Kafka cluster (KRaft mode) on port 9092.
In src/main/resources/application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
// src/main/java/com/example/kafka/producer/KafkaProducerService.java
package com.example.kafka.producer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("✅ Sent message: " + message);
}
}
// src/main/java/com/example/kafka/consumer/KafkaConsumerService.java
package com.example.kafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "test-topic", groupId = "my-group")
public void consume(String message) {
System.out.println("📩 Received message: " + message);
}
}
// src/main/java/com/example/kafka/controller/MessageController.java
package com.example.kafka.controller;
import com.example.kafka.producer.KafkaProducerService;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/messages")
public class MessageController {
private final KafkaProducerService producerService;
public MessageController(KafkaProducerService producerService) {
this.producerService = producerService;
}
@PostMapping
public String sendMessage(@RequestParam String message) {
producerService.sendMessage("test-topic", message);
return "Message sent: " + message;
}
}
Now test by sending a message:
curl -X POST "http://localhost:8080/api/messages?message=HelloKafka"
You should see in the console:
✅ Sent message: HelloKafka
📩 Received message: HelloKafka
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- JSON Serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<!-- Optional: Lombok for less boilerplate -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: user-events-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
streams:
application-id: user-event-streams
default-key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default-value-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
package com.example.kafka.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserEvent {
private String userId;
private String action;
private long timestamp;
}
package com.example.kafka.producer;
import com.example.kafka.model.UserEvent;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class UserEventProducer {
private final KafkaTemplate<String, UserEvent> kafkaTemplate;
private static final String TOPIC = "user-events";
public UserEventProducer(KafkaTemplate<String, UserEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendEvent(UserEvent event) {
kafkaTemplate.send(TOPIC, event.getUserId(), event);
System.out.println("✅ Sent event: " + event);
}
}
package com.example.kafka.consumer;
import com.example.kafka.model.UserEvent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class UserEventConsumer {
@KafkaListener(topics = "user-events", groupId = "user-events-group")
public void consume(UserEvent event) {
System.out.println("📩 Received event: " + event);
}
}
package com.example.kafka.controller;
import com.example.kafka.model.UserEvent;
import com.example.kafka.producer.UserEventProducer;
import org.springframework.web.bind.annotation.*;
import java.time.Instant;
@RestController
@RequestMapping("/api/events")
public class MessageController {
private final UserEventProducer producer;
public MessageController(UserEventProducer producer) {
this.producer = producer;
}
@PostMapping
public String sendEvent(@RequestParam String userId, @RequestParam String action) {
UserEvent event = new UserEvent(userId, action, Instant.now().toEpochMilli());
producer.sendEvent(event);
return "Sent event: " + event;
}
}
This component consumes from one topic, processes data, and writes to another.
package com.example.kafka.streams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.stereotype.Component;
@Component
@EnableKafkaStreams
public class UserEventStreamProcessor {
@Bean
public KStream<String, String> processUserEvents(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("user-events-raw");
stream
.peek((key, value) -> System.out.println("Processing raw event: " + value))
.mapValues(value -> value.toUpperCase()) // simple transformation
.to("user-events-processed");
return stream;
}
}
You can create a producer to send to "user-events-raw" and a consumer to read from "user-events-processed" to test streaming pipelines.
Now send an event:
curl -X POST "http://localhost:8080/api/events?userId=U123&action=login"
Output:
✅ Sent event: UserEvent(userId=U123, action=login, timestamp=...)
📩 Received event: UserEvent(userId=U123, action=login, timestamp=...)
If using Streams (user-events-raw → user-events-processed), you’ll see:
Processing raw event: {"userId":"U123","action":"login"}