← All articlesCloud Infrastructure

Building Real-Time Analytics Pipelines with Apache Kafka

A practical guide to building production-grade real-time analytics pipelines with Apache Kafka. Covers architecture patterns, Kafka Streams, ksqlDB,...

T
TechSaaS Team
13 min read

Why Batch Analytics Isn't Enough Anymore

Traditional analytics pipelines run on a schedule. Extract data at midnight, transform it by 3 AM, load it into the warehouse by 6 AM. Business users see yesterday's numbers when they arrive at work.

CodeBuildTestDeployLiveContinuous Integration / Continuous Deployment Pipeline

A typical CI/CD pipeline: code flows through build, test, and deploy stages automatically.

For many use cases, yesterday's numbers are fine. For others, they're useless:

  • Fraud detection: A fraudulent transaction needs to be caught in milliseconds, not discovered in tomorrow's batch report.
  • Real-time pricing: An e-commerce platform adjusting prices based on demand needs current data, not data from 12 hours ago.
  • Operational monitoring: A spike in API errors needs immediate alerting, not a summary in the next morning's dashboard.
  • User experience: A recommendation engine serving stale data loses conversions.

Apache Kafka is the backbone of real-time analytics pipelines at companies like LinkedIn, Netflix, Uber, and Airbnb. It handles trillions of messages per day across the industry. Here's how to build a production-grade real-time analytics pipeline with it.

Architecture Overview

A Kafka-based real-time analytics pipeline has four layers:

┌──────────────────────────────────────────────────────────┐
│                    Data Sources                           │
│  Web Events │ Mobile App │ IoT Sensors │ Database CDC     │
└──────────────┬───────────┬─────────────┬────────────────┘
               │           │             │
               ▼           ▼             ▼
┌──────────────────────────────────────────────────────────┐
│                  Apache Kafka Cluster                      │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                │
│  │ Topic:   │  │ Topic:   │  │ Topic:   │                │
│  │ events   │  │ orders   │  │ sensors  │                │
│  └──────────┘  └──────────┘  └──────────┘                │
└──────────────┬───────────┬─────────────┬────────────────┘
               │           │             │
               ▼           ▼             ▼
┌──────────────────────────────────────────────────────────┐
│              Stream Processing Layer                      │
│  Kafka Streams │ ksqlDB │ Flink │ Custom Consumers        │
└──────────────┬───────────┬─────────────┬────────────────┘
               │           │             │
               ▼           ▼             ▼
┌──────────────────────────────────────────────────────────┐
│                   Sink / Serving Layer                     │
│  PostgreSQL │ ClickHouse │ Elasticsearch │ Redis │ S3     │
└──────────────────────────────────────────────────────────┘

Setting Up Kafka

Docker Compose for Development

services:
  kafka:
    image: apache/kafka:3.7.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
    volumes:
      - kafka-data:/tmp/kraft-combined-logs

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    depends_on:
      - kafka

volumes:
  kafka-data:

This uses KRaft mode (no ZooKeeper dependency) — the default since Kafka 3.3.

Creating Topics

# Create topics with appropriate partitioning
kafka-topics.sh --bootstrap-server localhost:9092 --create \
  --topic page-views \
  --partitions 12 \
  --replication-factor 1 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

kafka-topics.sh --bootstrap-server localhost:9092 --create \
  --topic order-events \
  --partitions 6 \
  --replication-factor 1 \
  --config retention.ms=-1 \
  --config cleanup.policy=compact

Get more insights on Cloud Infrastructure

Join 2,000+ engineers who get our weekly deep-dives. No spam, unsubscribe anytime.

Partition count matters: it determines maximum consumer parallelism. 12 partitions means up to 12 consumers can process page-views in parallel. For order events, compaction retains only the latest state per key.

Producing Events

Python Producer (Web Events)

from confluent_kafka import Producer
from datetime import datetime
import json
import uuid

def delivery_callback(err, msg):
    if err:
        print(f'Delivery failed: {err}')

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',                    # Wait for all replicas
    'enable.idempotence': True,       # Exactly-once producer
    'max.in.flight.requests.per.connection': 5,
    'retries': 2147483647,            # Infinite retries
    'linger.ms': 5,                   # Batch for 5ms
    'batch.size': 65536,              # 64KB batches
    'compression.type': 'zstd',       # Best compression ratio
})

def track_page_view(user_id: str, page: str, referrer: str = None):
    event = {
        'event_id': str(uuid.uuid4()),
        'event_type': 'page_view',
        'user_id': user_id,
        'page': page,
        'referrer': referrer,
        'timestamp': datetime.utcnow().isoformat(),
        'user_agent': 'Mozilla/5.0...',
    }

    # Key by user_id ensures all events for a user go to the same partition
    # This enables correct per-user session analytics
    producer.produce(
        topic='page-views',
        key=user_id.encode(),
        value=json.dumps(event).encode(),
        callback=delivery_callback,
    )
    producer.poll(0)  # Trigger callbacks

# Usage
track_page_view('user-123', '/products/laptop', referrer='google.com')
track_page_view('user-456', '/checkout', referrer='/cart')
producer.flush()  # Ensure all messages are sent

Critical producer settings:

  • acks=all: Message is written to all in-sync replicas before acknowledgment
  • enable.idempotence=True: Prevents duplicate messages on retries
  • compression.type=zstd: 60-70% size reduction with fast decompression

Change Data Capture (CDC) with Debezium

For database-sourced analytics, Debezium captures row-level changes and streams them to Kafka:

{
  "name": "orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/secrets/db-password}",
    "database.dbname": "commerce",
    "topic.prefix": "cdc",
    "table.include.list": "public.orders,public.order_items",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_orders",
    "publication.name": "dbz_publication",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Every INSERT, UPDATE, and DELETE on the orders table is captured as a Kafka event — without any application code changes.

Stream Processing

Kafka Streams (Java)

Kafka Streams is a library (not a separate cluster) for stream processing:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
import java.util.Properties;

public class PageViewAnalytics {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "page-view-analytics");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_v2");

        StreamsBuilder builder = new StreamsBuilder();

        // Read page views
        KStream<String, String> pageViews = builder.stream("page-views");

        // Count page views per page in 1-minute windows
        KTable<Windowed<String>, Long> pageViewCounts = pageViews
            .mapValues(value -> {
                // Parse JSON, extract page field
                JsonNode node = objectMapper.readTree(value);
                return node.get("page").asText();
            })
            .groupBy((key, page) -> page)
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count(Materialized.as("page-view-counts"));

        // Write results to output topic
        pageViewCounts.toStream()
            .map((windowedKey, count) -> {
                String key = windowedKey.key();
                String value = String.format(
                    "{\"page\":\"%s\",\"count\":%d,\"window_start\":\"%s\",\"window_end\":\"%s\"}",
                    key, count,
                    windowedKey.window().startTime(),
                    windowedKey.window().endTime()
                );
                return KeyValue.pair(key, value);
            })
            .to("page-view-counts");

        // Detect high-traffic pages (> 1000 views per minute)
        pageViewCounts.toStream()
            .filter((windowedKey, count) -> count > 1000)
            .map((windowedKey, count) -> KeyValue.pair(
                windowedKey.key(),
                String.format("{\"alert\":\"high_traffic\",\"page\":\"%s\",\"count\":%d}",
                    windowedKey.key(), count)
            ))
            .to("alerts");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

This single application:

  1. Reads page view events
  2. Counts views per page in 1-minute tumbling windows
  3. Outputs counts to a results topic
  4. Generates alerts for pages exceeding 1,000 views/minute

All with exactly-once processing guarantees.

RawDataPre-processTrainModelEvaluateMetricsDeployModelMonretrain loop

ML pipeline: from raw data collection through training, evaluation, deployment, and continuous monitoring.

ksqlDB (SQL-Based Stream Processing)

For teams that prefer SQL over Java:

-- Create a stream from the page-views topic
CREATE STREAM page_views (
    event_id VARCHAR,
    event_type VARCHAR,
    user_id VARCHAR KEY,
    page VARCHAR,
    referrer VARCHAR,
    timestamp VARCHAR
) WITH (
    KAFKA_TOPIC = 'page-views',
    VALUE_FORMAT = 'JSON'
);

-- Real-time page view counts per minute
CREATE TABLE page_views_per_minute AS
    SELECT page,
           COUNT(*) AS view_count,
           COUNT_DISTINCT(user_id) AS unique_users,
           WINDOWSTART AS window_start,
           WINDOWEND AS window_end
    FROM page_views
    WINDOW TUMBLING (SIZE 1 MINUTE)
    GROUP BY page
    EMIT CHANGES;

-- Session detection: group events within 30-minute inactivity gap
CREATE TABLE user_sessions AS
    SELECT user_id,
           COUNT(*) AS pages_viewed,
           COLLECT_LIST(page) AS page_sequence,
           WINDOWSTART AS session_start,
           WINDOWEND AS session_end
    FROM page_views
    WINDOW SESSION (30 MINUTES)
    GROUP BY user_id
    EMIT CHANGES;

-- Real-time funnel analysis
CREATE TABLE checkout_funnel AS
    SELECT
        CASE
            WHEN page = '/products' THEN 'browse'
            WHEN page = '/cart' THEN 'cart'
            WHEN page = '/checkout' THEN 'checkout'
            WHEN page = '/confirmation' THEN 'purchase'
        END AS funnel_step,
        COUNT_DISTINCT(user_id) AS unique_users,
        WINDOWSTART AS window_start
    FROM page_views
    WHERE page IN ('/products', '/cart', '/checkout', '/confirmation')
    WINDOW TUMBLING (SIZE 5 MINUTES)
    GROUP BY CASE
        WHEN page = '/products' THEN 'browse'
        WHEN page = '/cart' THEN 'cart'
        WHEN page = '/checkout' THEN 'checkout'
        WHEN page = '/confirmation' THEN 'purchase'
    END
    EMIT CHANGES;

ksqlDB turns Kafka topics into queryable streams and tables using standard SQL. Every query is a continuously running computation that updates as new data arrives.

Sinking Data to Analytics Stores

Kafka Connect to ClickHouse

For high-performance analytical queries, sink processed data to ClickHouse:

{
  "name": "clickhouse-sink",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "4",
    "topics": "page-view-counts",
    "hostname": "clickhouse",
    "port": "8123",
    "database": "analytics",
    "username": "default",
    "password": "${file:/secrets/ch-password}",
    "tableMapping": "page-view-counts=page_view_counts",
    "exactlyOnce": "true",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Kafka Connect to Elasticsearch

For full-text search and Kibana dashboards:

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "4",
    "topics": "page-views,order-events",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true",
    "behavior.on.malformed.documents": "warn"
  }
}

Free Resource

Free Cloud Architecture Checklist

A 47-point checklist covering security, scalability, cost optimization, and disaster recovery for production cloud environments.

Download the Checklist

Exactly-Once Semantics

Kafka supports exactly-once processing across the entire pipeline:

Producer (idempotent) → Kafka → Consumer (transactional) → Kafka

Guarantees:
1. Each message produced exactly once (no duplicates from retries)
2. Each message consumed exactly once (consumer offsets committed atomically)
3. Processing results written exactly once (transactional produce + offset commit)
// Exactly-once consumer-producer pattern
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "analytics-processor-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    producer.beginTransaction();
    try {
        for (ConsumerRecord<String, String> record : records) {
            // Process and produce result
            String result = process(record.value());
            producer.send(new ProducerRecord<>("output-topic", record.key(), result));
        }
        // Commit consumer offsets within the same transaction
        producer.sendOffsetsToTransaction(
            currentOffsets(records),
            consumer.groupMetadata()
        );
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
    }
}

Monitoring and Operations

Key Metrics to Track

# Prometheus metrics to alert on
alerts:
  - name: ConsumerLag
    expr: kafka_consumergroup_lag > 100000
    for: 5m
    severity: warning
    summary: "Consumer group falling behind"

  - name: UnderReplicatedPartitions
    expr: kafka_server_replicamanager_underreplicatedpartitions > 0
    for: 1m
    severity: critical
    summary: "Data durability at risk"

  - name: ProducerErrorRate
    expr: rate(kafka_producer_record_error_total[5m]) > 0
    for: 2m
    severity: critical
    summary: "Producer failing to write"

Consumer Lag Monitoring

# Check consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group page-view-analytics

# Output:
# TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# page-views     0          1234567         1234570         3
# page-views     1          2345678         2345680         2

Lag is the single most important operational metric. It tells you how far behind your consumers are from the latest data. Increasing lag means your processing can't keep up with production rate.

Production Checklist

  1. Replication factor ≥ 3 for production topics (fault tolerance)
  2. acks=all on producers (durability)
  3. enable.idempotence=true on producers (no duplicates)
  4. min.insync.replicas=2 on brokers (prevent data loss)
  5. Schema Registry for schema evolution (Avro or Protobuf, not JSON in production)
  6. Consumer group monitoring with lag alerting
  7. Rack awareness for broker placement across availability zones
  8. Topic retention configured per use case (time-based or size-based)
  9. Dead letter queues for messages that fail processing
  10. Log compaction for state topics (retain latest value per key)
ProductionWeb ServerApp ServerDatabaseMonitoringStagingWeb ServerApp ServerDatabaseVLANBackupStorage3-2-1 Rule

Server infrastructure: production and staging environments connected via VLAN with offsite backups.

The Bottom Line

Apache Kafka transforms analytics from "what happened yesterday" to "what's happening now." The combination of durable event storage, exactly-once processing, and a rich connector ecosystem makes it the standard foundation for real-time analytics pipelines.

Start with a single use case — real-time dashboards, anomaly detection, or CDC-based analytics — and expand from there. The architecture scales from thousands to trillions of events. The key is getting the fundamentals right: proper partitioning, exactly-once semantics, and consumer lag monitoring.

#kafka#data-engineering#streaming#real-time-analytics#event-driven

Related Service

Cloud Solutions

Let our experts help you build the right technology strategy for your business.

Need help with cloud infrastructure?

TechSaaS provides expert consulting and managed services for cloud infrastructure, DevOps, and AI/ML operations.

We Will Build You a Demo Site — For Free

Like it? Pay us. Do not like it? Walk away, zero complaints. You will spend way less than hiring developers or any agency.

47+ companies trusted us
99.99% uptime
< 48hr response

No spam. No contracts. Just a free demo.