← All articlesPlatform Engineering

How to Build a Notification System: Email, Push, SMS, and Webhooks

Design and build a multi-channel notification system supporting email, push, SMS, and webhooks. Covers architecture, templates, preferences, and delivery.

Y
Yash Pritwani
14 min read

Why Notifications Are Harder Than They Look

Every application needs notifications. A user signs up — send a welcome email. An order ships — send a push notification. A payment fails — send an SMS. A teammate comments on a PR — send a webhook.

TriggerwebhookIfSend EmailSMTPLog EventdatabaseUpdate CRMAPI callDonetruefalse

Workflow automation: triggers, conditions, and actions chain together to eliminate manual processes.

Simple enough, right? Until you need: user preferences (Alice wants email, Bob wants SMS), rate limiting (do not send 50 emails in a row), templates (different languages, A/B testing), delivery tracking (was it opened?), and retry logic (what if the SMS provider is down?).

Architecture Overview

Event Source → Notification Service → Channel Router → Delivery Workers
                     ↓                                      ↓
              Template Engine                         Provider APIs
              Preference Store                    (SendGrid, Twilio, FCM)
              Rate Limiter                              ↓
                                                  Delivery Tracking

Data Model

-- Notification templates
CREATE TABLE notification_templates (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(100) UNIQUE NOT NULL,         -- e.g., 'order_shipped'
    channel VARCHAR(20) NOT NULL,              -- email, push, sms, webhook
    subject TEXT,                               -- Email subject
    body_template TEXT NOT NULL,                -- Mustache/Handlebars template
    locale VARCHAR(10) DEFAULT 'en',
    active BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- User notification preferences
CREATE TABLE notification_preferences (
    user_id UUID NOT NULL,
    notification_type VARCHAR(100) NOT NULL,    -- e.g., 'order_updates'
    channel VARCHAR(20) NOT NULL,               -- email, push, sms
    enabled BOOLEAN DEFAULT true,
    PRIMARY KEY (user_id, notification_type, channel)
);

-- Notification log (delivery tracking)
CREATE TABLE notification_log (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL,
    template_name VARCHAR(100) NOT NULL,
    channel VARCHAR(20) NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',       -- pending, sent, delivered, failed
    provider_id VARCHAR(255),                    -- External message ID
    error_message TEXT,
    sent_at TIMESTAMPTZ,
    delivered_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

Get more insights on Platform Engineering

Join 2,000+ engineers who get our weekly deep-dives. No spam, unsubscribe anytime.

The Notification Service

from dataclasses import dataclass
from enum import Enum

class Channel(Enum):
    EMAIL = "email"
    PUSH = "push"
    SMS = "sms"
    WEBHOOK = "webhook"

@dataclass
class NotificationRequest:
    user_id: str
    template: str
    data: dict
    channels: list[Channel] | None = None  # None = use preferences
    priority: str = "normal"  # low, normal, high, critical

class NotificationService:
    def __init__(self, db, template_engine, rate_limiter, providers):
        self.db = db
        self.templates = template_engine
        self.limiter = rate_limiter
        self.providers = providers

    async def send(self, request: NotificationRequest):
        # 1. Determine channels from preferences or override
        channels = request.channels or await self._get_preferred_channels(
            request.user_id, request.template
        )

        results = []
        for channel in channels:
            # 2. Check rate limits
            if not self.limiter.allow(request.user_id, channel):
                results.append({"channel": channel, "status": "rate_limited"})
                continue

            # 3. Render template
            template = await self.templates.get(request.template, channel)
            rendered = template.render(request.data)

            # 4. Queue for delivery
            job_id = await self._queue_delivery(
                user_id=request.user_id,
                channel=channel,
                content=rendered,
                priority=request.priority
            )
            results.append({"channel": channel, "status": "queued", "id": job_id})

        return results

Email Delivery

import httpx

class EmailProvider:
    """SendGrid email provider."""

    def __init__(self, api_key: str, from_email: str):
        self.api_key = api_key
        self.from_email = from_email
        self.base_url = "https://api.sendgrid.com/v3/mail/send"

    async def send(self, to: str, subject: str, html_body: str) -> dict:
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.base_url,
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                },
                json={
                    "personalizations": [{"to": [{"email": to}]}],
                    "from": {"email": self.from_email},
                    "subject": subject,
                    "content": [{"type": "text/html", "value": html_body}]
                }
            )

            if response.status_code == 202:
                return {"status": "sent", "provider_id": response.headers.get("X-Message-Id")}
            else:
                return {"status": "failed", "error": response.text}

Push Notifications (FCM)

import firebase_admin
from firebase_admin import messaging

class PushProvider:
    def __init__(self):
        firebase_admin.initialize_app()

    async def send(self, device_token: str, title: str, body: str, data: dict = None):
        message = messaging.Message(
            notification=messaging.Notification(title=title, body=body),
            data=data or {},
            token=device_token
        )

        try:
            response = messaging.send(message)
            return {"status": "sent", "provider_id": response}
        except messaging.UnregisteredError:
            # Token is invalid, remove it
            await self.remove_token(device_token)
            return {"status": "failed", "error": "unregistered_token"}
        except Exception as e:
            return {"status": "failed", "error": str(e)}
WebMobileIoTGatewayRate LimitAuthLoad BalanceTransformCacheService AService BService CDB / Cache

API gateway pattern: a single entry point handles auth, rate limiting, and routing to backend services.

Webhook Delivery

Webhooks need special handling — retries, signature verification, and timeout management:

import hmac
import hashlib
import json

class WebhookProvider:
    def __init__(self, signing_secret: str):
        self.secret = signing_secret
        self.max_retries = 5
        self.retry_delays = [10, 30, 120, 600, 3600]  # seconds

    def sign_payload(self, payload: str) -> str:
        return hmac.new(
            self.secret.encode(),
            payload.encode(),
            hashlib.sha256
        ).hexdigest()

    async def send(self, url: str, event_type: str, data: dict):
        payload = json.dumps({
            "event": event_type,
            "data": data,
            "timestamp": datetime.utcnow().isoformat()
        })

        signature = self.sign_payload(payload)

        async with httpx.AsyncClient(timeout=30) as client:
            try:
                response = await client.post(
                    url,
                    content=payload,
                    headers={
                        "Content-Type": "application/json",
                        "X-Webhook-Signature": f"sha256={signature}",
                        "X-Webhook-Event": event_type
                    }
                )

                if response.status_code < 300:
                    return {"status": "delivered"}
                else:
                    return {"status": "failed", "http_status": response.status_code}

            except httpx.TimeoutException:
                return {"status": "timeout"}

Rate Limiting

Prevent notification fatigue:

class NotificationRateLimiter:
    """Per-user, per-channel rate limits."""

    LIMITS = {
        "email": {"count": 10, "window": 3600},     # 10/hour
        "push": {"count": 20, "window": 3600},       # 20/hour
        "sms": {"count": 5, "window": 86400},         # 5/day
        "webhook": {"count": 100, "window": 60},      # 100/minute
    }

    def __init__(self, redis_client):
        self.redis = redis_client

    def allow(self, user_id: str, channel: str) -> bool:
        limit = self.LIMITS.get(channel, {"count": 50, "window": 3600})
        key = f"notif_limit:{user_id}:{channel}"

        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, limit["window"])
        count, _ = pipe.execute()

        return count <= limit["count"]

Template Engine

from jinja2 import Environment, BaseLoader

class TemplateEngine:
    def __init__(self):
        self.env = Environment(loader=BaseLoader())

    def render(self, template_str: str, data: dict) -> str:
        template = self.env.from_string(template_str)
        return template.render(**data)

# Email template example
ORDER_SHIPPED_EMAIL = """
<h2>Your order has shipped!</h2>
<p>Hi {{ customer_name }},</p>
<p>Your order <strong>{{ order_id }}</strong> is on its way.</p>
<p>Tracking number: <a href="{{ tracking_url }}">{{ tracking_number }}</a></p>
<p>Estimated delivery: {{ delivery_date }}</p>
"""

# SMS template (keep it short)
ORDER_SHIPPED_SMS = """Your order {{ order_id }} shipped! Track: {{ tracking_url }}"""

Free Resource

Free Cloud Architecture Checklist

A 47-point checklist covering security, scalability, cost optimization, and disaster recovery for production cloud environments.

Download the Checklist

User Preference API

@app.get("/api/notifications/preferences")
async def get_preferences(user: User = Depends(get_current_user)):
    prefs = await db.fetch_all(
        "SELECT * FROM notification_preferences WHERE user_id = $1",
        user.id
    )
    return {"preferences": prefs}

@app.put("/api/notifications/preferences")
async def update_preferences(
    updates: list[PreferenceUpdate],
    user: User = Depends(get_current_user)
):
    for update in updates:
        await db.execute("""
            INSERT INTO notification_preferences (user_id, notification_type, channel, enabled)
            VALUES ($1, $2, $3, $4)
            ON CONFLICT (user_id, notification_type, channel)
            DO UPDATE SET enabled = $4
        """, user.id, update.type, update.channel, update.enabled)

    return {"status": "updated"}
InputHiddenHiddenOutput

Neural network architecture: data flows through input, hidden, and output layers.

Delivery Monitoring

Track delivery rates per channel in Grafana:

SELECT
  channel,
  COUNT(*) FILTER (WHERE status = 'delivered') * 100.0 / COUNT(*) AS delivery_rate,
  COUNT(*) FILTER (WHERE status = 'failed') AS failures,
  AVG(EXTRACT(EPOCH FROM (delivered_at - created_at))) AS avg_delivery_seconds
FROM notification_log
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY channel;

A well-built notification system is a competitive advantage. Users who get timely, relevant notifications on their preferred channel are more engaged and more loyal. At TechSaaS, we build notification infrastructure as part of our platform engineering services.

#notifications#email#webhooks#architecture#platform-engineering

Related Service

Cloud Solutions

Let our experts help you build the right technology strategy for your business.

Need help with platform engineering?

TechSaaS provides expert consulting and managed services for cloud infrastructure, DevOps, and AI/ML operations.

We Will Build You a Demo Site — For Free

Like it? Pay us. Do not like it? Walk away, zero complaints. You will spend way less than hiring developers or any agency.

47+ companies trusted us
99.99% uptime
< 48hr response

No spam. No contracts. Just a free demo.