How to Build a Notification System: Email, Push, SMS, and Webhooks
Design and build a multi-channel notification system supporting email, push, SMS, and webhooks. Covers architecture, templates, preferences, and delivery.
Why Notifications Are Harder Than They Look
Every application needs notifications. A user signs up — send a welcome email. An order ships — send a push notification. A payment fails — send an SMS. A teammate comments on a PR — send a webhook.
Workflow automation: triggers, conditions, and actions chain together to eliminate manual processes.
Simple enough, right? Until you need: user preferences (Alice wants email, Bob wants SMS), rate limiting (do not send 50 emails in a row), templates (different languages, A/B testing), delivery tracking (was it opened?), and retry logic (what if the SMS provider is down?).
Architecture Overview
Event Source → Notification Service → Channel Router → Delivery Workers
↓ ↓
Template Engine Provider APIs
Preference Store (SendGrid, Twilio, FCM)
Rate Limiter ↓
Delivery Tracking
Data Model
-- Notification templates
CREATE TABLE notification_templates (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) UNIQUE NOT NULL, -- e.g., 'order_shipped'
channel VARCHAR(20) NOT NULL, -- email, push, sms, webhook
subject TEXT, -- Email subject
body_template TEXT NOT NULL, -- Mustache/Handlebars template
locale VARCHAR(10) DEFAULT 'en',
active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- User notification preferences
CREATE TABLE notification_preferences (
user_id UUID NOT NULL,
notification_type VARCHAR(100) NOT NULL, -- e.g., 'order_updates'
channel VARCHAR(20) NOT NULL, -- email, push, sms
enabled BOOLEAN DEFAULT true,
PRIMARY KEY (user_id, notification_type, channel)
);
-- Notification log (delivery tracking)
CREATE TABLE notification_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
template_name VARCHAR(100) NOT NULL,
channel VARCHAR(20) NOT NULL,
status VARCHAR(20) DEFAULT 'pending', -- pending, sent, delivered, failed
provider_id VARCHAR(255), -- External message ID
error_message TEXT,
sent_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
Get more insights on Platform Engineering
Join 2,000+ engineers who get our weekly deep-dives. No spam, unsubscribe anytime.
The Notification Service
from dataclasses import dataclass
from enum import Enum
class Channel(Enum):
EMAIL = "email"
PUSH = "push"
SMS = "sms"
WEBHOOK = "webhook"
@dataclass
class NotificationRequest:
user_id: str
template: str
data: dict
channels: list[Channel] | None = None # None = use preferences
priority: str = "normal" # low, normal, high, critical
class NotificationService:
def __init__(self, db, template_engine, rate_limiter, providers):
self.db = db
self.templates = template_engine
self.limiter = rate_limiter
self.providers = providers
async def send(self, request: NotificationRequest):
# 1. Determine channels from preferences or override
channels = request.channels or await self._get_preferred_channels(
request.user_id, request.template
)
results = []
for channel in channels:
# 2. Check rate limits
if not self.limiter.allow(request.user_id, channel):
results.append({"channel": channel, "status": "rate_limited"})
continue
# 3. Render template
template = await self.templates.get(request.template, channel)
rendered = template.render(request.data)
# 4. Queue for delivery
job_id = await self._queue_delivery(
user_id=request.user_id,
channel=channel,
content=rendered,
priority=request.priority
)
results.append({"channel": channel, "status": "queued", "id": job_id})
return results
Email Delivery
import httpx
class EmailProvider:
"""SendGrid email provider."""
def __init__(self, api_key: str, from_email: str):
self.api_key = api_key
self.from_email = from_email
self.base_url = "https://api.sendgrid.com/v3/mail/send"
async def send(self, to: str, subject: str, html_body: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(
self.base_url,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"personalizations": [{"to": [{"email": to}]}],
"from": {"email": self.from_email},
"subject": subject,
"content": [{"type": "text/html", "value": html_body}]
}
)
if response.status_code == 202:
return {"status": "sent", "provider_id": response.headers.get("X-Message-Id")}
else:
return {"status": "failed", "error": response.text}
Push Notifications (FCM)
import firebase_admin
from firebase_admin import messaging
class PushProvider:
def __init__(self):
firebase_admin.initialize_app()
async def send(self, device_token: str, title: str, body: str, data: dict = None):
message = messaging.Message(
notification=messaging.Notification(title=title, body=body),
data=data or {},
token=device_token
)
try:
response = messaging.send(message)
return {"status": "sent", "provider_id": response}
except messaging.UnregisteredError:
# Token is invalid, remove it
await self.remove_token(device_token)
return {"status": "failed", "error": "unregistered_token"}
except Exception as e:
return {"status": "failed", "error": str(e)}
API gateway pattern: a single entry point handles auth, rate limiting, and routing to backend services.
Webhook Delivery
Webhooks need special handling — retries, signature verification, and timeout management:
import hmac
import hashlib
import json
class WebhookProvider:
def __init__(self, signing_secret: str):
self.secret = signing_secret
self.max_retries = 5
self.retry_delays = [10, 30, 120, 600, 3600] # seconds
def sign_payload(self, payload: str) -> str:
return hmac.new(
self.secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
async def send(self, url: str, event_type: str, data: dict):
payload = json.dumps({
"event": event_type,
"data": data,
"timestamp": datetime.utcnow().isoformat()
})
signature = self.sign_payload(payload)
async with httpx.AsyncClient(timeout=30) as client:
try:
response = await client.post(
url,
content=payload,
headers={
"Content-Type": "application/json",
"X-Webhook-Signature": f"sha256={signature}",
"X-Webhook-Event": event_type
}
)
if response.status_code < 300:
return {"status": "delivered"}
else:
return {"status": "failed", "http_status": response.status_code}
except httpx.TimeoutException:
return {"status": "timeout"}
Rate Limiting
Prevent notification fatigue:
class NotificationRateLimiter:
"""Per-user, per-channel rate limits."""
LIMITS = {
"email": {"count": 10, "window": 3600}, # 10/hour
"push": {"count": 20, "window": 3600}, # 20/hour
"sms": {"count": 5, "window": 86400}, # 5/day
"webhook": {"count": 100, "window": 60}, # 100/minute
}
def __init__(self, redis_client):
self.redis = redis_client
def allow(self, user_id: str, channel: str) -> bool:
limit = self.LIMITS.get(channel, {"count": 50, "window": 3600})
key = f"notif_limit:{user_id}:{channel}"
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, limit["window"])
count, _ = pipe.execute()
return count <= limit["count"]
Template Engine
from jinja2 import Environment, BaseLoader
class TemplateEngine:
def __init__(self):
self.env = Environment(loader=BaseLoader())
def render(self, template_str: str, data: dict) -> str:
template = self.env.from_string(template_str)
return template.render(**data)
# Email template example
ORDER_SHIPPED_EMAIL = """
<h2>Your order has shipped!</h2>
<p>Hi {{ customer_name }},</p>
<p>Your order <strong>{{ order_id }}</strong> is on its way.</p>
<p>Tracking number: <a href="{{ tracking_url }}">{{ tracking_number }}</a></p>
<p>Estimated delivery: {{ delivery_date }}</p>
"""
# SMS template (keep it short)
ORDER_SHIPPED_SMS = """Your order {{ order_id }} shipped! Track: {{ tracking_url }}"""
Free Resource
Free Cloud Architecture Checklist
A 47-point checklist covering security, scalability, cost optimization, and disaster recovery for production cloud environments.
User Preference API
@app.get("/api/notifications/preferences")
async def get_preferences(user: User = Depends(get_current_user)):
prefs = await db.fetch_all(
"SELECT * FROM notification_preferences WHERE user_id = $1",
user.id
)
return {"preferences": prefs}
@app.put("/api/notifications/preferences")
async def update_preferences(
updates: list[PreferenceUpdate],
user: User = Depends(get_current_user)
):
for update in updates:
await db.execute("""
INSERT INTO notification_preferences (user_id, notification_type, channel, enabled)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, notification_type, channel)
DO UPDATE SET enabled = $4
""", user.id, update.type, update.channel, update.enabled)
return {"status": "updated"}
Neural network architecture: data flows through input, hidden, and output layers.
Delivery Monitoring
Track delivery rates per channel in Grafana:
SELECT
channel,
COUNT(*) FILTER (WHERE status = 'delivered') * 100.0 / COUNT(*) AS delivery_rate,
COUNT(*) FILTER (WHERE status = 'failed') AS failures,
AVG(EXTRACT(EPOCH FROM (delivered_at - created_at))) AS avg_delivery_seconds
FROM notification_log
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY channel;
A well-built notification system is a competitive advantage. Users who get timely, relevant notifications on their preferred channel are more engaged and more loyal. At TechSaaS, we build notification infrastructure as part of our platform engineering services.
Related Service
Cloud Solutions
Let our experts help you build the right technology strategy for your business.
Need help with platform engineering?
TechSaaS provides expert consulting and managed services for cloud infrastructure, DevOps, and AI/ML operations.
We Will Build You a Demo Site — For Free
Like it? Pay us. Do not like it? Walk away, zero complaints. You will spend way less than hiring developers or any agency.
No spam. No contracts. Just a free demo.