Building Reliable Distributed Systems
Session 4 - NATS Workshop
services security monitoring patterns
| Session | What We Learned |
|---|---|
| 1. Core NATS | Pub/Sub, Queue Groups, Request/Reply |
| 2. JetStream | Streams, Consumers, Persistence |
| 3. Advanced | KV Store, Object Store, Mirroring |
| 4. Production | Services, Security, Patterns |
Discoverable microservices with built-in monitoring
Services register themselves. Clients discover dynamically.
from nats.micro import Service, ServiceConfig
# Define service configuration
config = ServiceConfig(
name="orders",
version="1.0.0",
description="Order processing service",
)
# Create service
service = await Service.create(nc, config)
# Add endpoint
@service.endpoint("create")
async def create_order(req):
order = json.loads(req.data)
# Process order...
return json.dumps({"id": order_id, "status": "created"})
# Multiple endpoints per service
@service.endpoint("create")
async def create_order(req):
return handle_create(req.data)
@service.endpoint("get")
async def get_order(req):
return handle_get(req.data)
@service.endpoint("cancel")
async def cancel_order(req):
return handle_cancel(req.data)
# Endpoints become subjects:
# orders.create, orders.get, orders.cancel
# Client discovers services via $SRV subjects
# Built-in discovery - no external registry needed
# Find all services
resp = await nc.request("$SRV.INFO", b"")
services = json.loads(resp.data)
# Find specific service
resp = await nc.request("$SRV.INFO.orders", b"")
# Get service stats
resp = await nc.request("$SRV.STATS.orders", b"")
# Ping service
resp = await nc.request("$SRV.PING.orders", b"")
# Stats are automatic - no code needed
{
"name": "orders",
"endpoints": [{
"name": "create",
"num_requests": 12456,
"num_errors": 23,
"average_processing_time": "2.3ms"
}]
}
nc = await nats.connect(
servers=["nats://server1:4222", "nats://server2:4222"],
# Reconnection settings
max_reconnect_attempts=-1, # Infinite retries
reconnect_time_wait=2, # Seconds between attempts
# Timeouts
connect_timeout=5,
ping_interval=20,
max_outstanding_pings=3,
# Callbacks
disconnected_cb=on_disconnect,
reconnected_cb=on_reconnect,
error_cb=on_error,
)
async def on_disconnect():
print("Disconnected! Buffering messages...")
async def on_reconnect():
print("Reconnected! Flushing buffer...")
async def on_error(e):
print(f"Error: {e}")
# Request with timeout and fallback
try:
resp = await nc.request("api.orders", data, timeout=2.0)
except TimeoutError:
# Service unavailable - use cached data or queue for retry
return cached_response()
except NoRespondersError:
# No service listening
raise ServiceUnavailable()
import signal
async def shutdown(sig):
print(f"Received {sig}, shutting down...")
# Stop accepting new requests
await service.stop()
# Drain: finish in-flight, reject new
await nc.drain()
# Close connection
await nc.close()
# Register signal handlers
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig, lambda s=sig: asyncio.create_task(shutdown(s))
)
# Server config (nats-server.conf)
tls {
cert_file: "/path/to/server-cert.pem"
key_file: "/path/to/server-key.pem"
ca_file: "/path/to/ca.pem"
}
# Client connection
nc = await nats.connect(
servers=["tls://nats.example.com:4222"],
tls=ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH,
cafile="/path/to/ca.pem"
),
)
# Token auth
nc = await nats.connect(token="secret-token")
# User/password
nc = await nats.connect(user="myuser", password="mypass")
# NKey (Ed25519 keypair)
nc = await nats.connect(
nkeys_seed="/path/to/user.nk"
)
# JWT (for multi-tenant)
nc = await nats.connect(
user_credentials="/path/to/user.creds"
)
# nats-server.conf
accounts {
ORDERS {
users = [
{user: order_service, password: xxx,
permissions: {
publish: ["orders.>"]
subscribe: ["orders.>", "_INBOX.>"]
}}
]
}
ANALYTICS {
users = [
{user: analytics, password: xxx,
permissions: {
subscribe: ["orders.>"] # Read-only
}}
]
}
}
# NATS exposes monitoring endpoints
# http://localhost:8222/varz - General info
# http://localhost:8222/connz - Connections
# http://localhost:8222/routez - Cluster routes
# http://localhost:8222/subz - Subscriptions
# http://localhost:8222/jsz - JetStream
# Enable in server config
http_port: 8222
# Or use nats CLI
$ nats server info
$ nats server report connections
$ nats stream report
# nats-server.conf
http_port: 8222
# Prometheus scrape config
scrape_configs:
- job_name: 'nats'
static_configs:
- targets: ['nats:8222']
metrics_path: /metrics
# Key metrics:
# nats_server_connections
# nats_server_messages_in/out
# nats_jetstream_streams
# nats_jetstream_consumers
Store events, derive state
Replay events to rebuild state at any point
# Store events in JetStream
await js.add_stream(
name="ORDERS",
subjects=["orders.events.>"],
retention=RetentionPolicy.LIMITS,
max_age=365 * 24 * 60 * 60, # 1 year
)
# Publish events
await js.publish("orders.events.created", event_data,
headers={"Nats-Msg-Id": f"order-{order_id}-created"})
# Rebuild state by replaying
sub = await js.subscribe(
"orders.events.>",
deliver_policy=DeliverPolicy.ALL,
)
Separate read and write models
# Write side: commands via request/reply
@service.endpoint("orders.commands.create")
async def create_order(req):
# Validate and store event
await js.publish("orders.events.created", event)
return {"id": order_id}
# Read side: project events to read model
async for msg in subscription.messages:
event = json.loads(msg.data)
if event["type"] == "OrderCreated":
await db.insert("orders_view", event["data"])
elif event["type"] == "OrderShipped":
await db.update("orders_view", ...)
Distributed transactions with compensation
If any step fails, compensate previous steps
# Orchestrator pattern
async def process_order(order):
try:
# Step 1
await nc.request("inventory.reserve", order.items)
# Step 2
await nc.request("payments.charge", order.payment)
# Step 3
await nc.request("shipping.create", order.address)
except Exception as e:
# Compensate
await nc.publish("inventory.release", order.items)
await nc.publish("payments.refund", order.payment)
raise
Reliable event publishing from database
# Transaction: save + outbox
async with db.transaction():
await db.insert("orders", order)
await db.insert("outbox", {
"subject": "orders.created",
"payload": order_json
})
# Background worker publishes outbox
async def outbox_worker():
while True:
events = await db.query("SELECT * FROM outbox")
for event in events:
await js.publish(event.subject, event.payload)
await db.delete("outbox", event.id)
API Gateway
|
v
NATS Cluster (3 nodes, TLS, Auth)
|
+-- orders-service (3 replicas, queue group)
| +-- ORDERS stream (events)
| +-- TASKS stream (work queue)
|
+-- payments-service (2 replicas)
| +-- Saga coordinator
|
+-- analytics-service
| +-- ORDERS mirror (read-only)
| +-- Projections to read DB
|
+-- config KV bucket
+-- assets Object Store
# Recommended patterns:
{domain}.{entity}.{action}
{domain}.{entity}.{id}.{action}
# Examples:
orders.create
orders.123.update
orders.events.created
orders.commands.cancel
payments.process
payments.events.completed
# Internal:
_INBOX.> # Reply subjects
$SRV.> # Service discovery
$JS.API.> # JetStream API
Explore services, monitoring, and patterns:
Launch DemoVisualization of production architecture patterns
| Topic | Key Points |
|---|---|
| Services | Self-registering, discoverable, stats |
| Security | TLS + Auth + Accounts |
| Monitoring | /varz, /jsz, Prometheus |
| Patterns | Event Sourcing, CQRS, Saga, Outbox |
| Session | Topics Covered |
|---|---|
| 1. Core | Pub/Sub, Queue Groups, Request/Reply |
| 2. JetStream | Streams, Consumers, Acks, Replay |
| 3. Advanced | KV, Object Store, Mirroring, Sourcing |
| 4. Production | Services, Security, Patterns |
NATS Production Patterns - Session 4