NATS Production Patterns

Building Reliable Distributed Systems

Session 4 - NATS Workshop

services security monitoring patterns

Workshop Recap

Session What We Learned
1. Core NATS Pub/Sub, Queue Groups, Request/Reply
2. JetStream Streams, Consumers, Persistence
3. Advanced KV Store, Object Store, Mirroring
4. Production Services, Security, Patterns

Services Framework

Discoverable microservices with built-in monitoring

users-service
v1.2.0
3 endpoints
orders-service
v2.0.1
5 endpoints
payments-service
v1.0.0
2 endpoints

Services register themselves. Clients discover dynamically.

Defining a Service

from nats.micro import Service, ServiceConfig

# Define service configuration
config = ServiceConfig(
    name="orders",
    version="1.0.0",
    description="Order processing service",
)

# Create service
service = await Service.create(nc, config)

# Add endpoint
@service.endpoint("create")
async def create_order(req):
    order = json.loads(req.data)
    # Process order...
    return json.dumps({"id": order_id, "status": "created"})

Service Endpoints

# Multiple endpoints per service
@service.endpoint("create")
async def create_order(req):
    return handle_create(req.data)

@service.endpoint("get")
async def get_order(req):
    return handle_get(req.data)

@service.endpoint("cancel")
async def cancel_order(req):
    return handle_cancel(req.data)

# Endpoints become subjects:
# orders.create, orders.get, orders.cancel

Service Discovery

# Client discovers services via $SRV subjects
# Built-in discovery - no external registry needed

# Find all services
resp = await nc.request("$SRV.INFO", b"")
services = json.loads(resp.data)

# Find specific service
resp = await nc.request("$SRV.INFO.orders", b"")

# Get service stats
resp = await nc.request("$SRV.STATS.orders", b"")

# Ping service
resp = await nc.request("$SRV.PING.orders", b"")

Service Stats

12,456
Requests
23
Errors
2.3ms
Avg Latency
99.8%
Success Rate
# Stats are automatic - no code needed
{
  "name": "orders",
  "endpoints": [{
    "name": "create",
    "num_requests": 12456,
    "num_errors": 23,
    "average_processing_time": "2.3ms"
  }]
}

Connection Management

nc = await nats.connect(
    servers=["nats://server1:4222", "nats://server2:4222"],
    
    # Reconnection settings
    max_reconnect_attempts=-1,    # Infinite retries
    reconnect_time_wait=2,        # Seconds between attempts
    
    # Timeouts
    connect_timeout=5,
    ping_interval=20,
    max_outstanding_pings=3,
    
    # Callbacks
    disconnected_cb=on_disconnect,
    reconnected_cb=on_reconnect,
    error_cb=on_error,
)

Error Handling

async def on_disconnect():
    print("Disconnected! Buffering messages...")

async def on_reconnect():
    print("Reconnected! Flushing buffer...")

async def on_error(e):
    print(f"Error: {e}")

# Request with timeout and fallback
try:
    resp = await nc.request("api.orders", data, timeout=2.0)
except TimeoutError:
    # Service unavailable - use cached data or queue for retry
    return cached_response()
except NoRespondersError:
    # No service listening
    raise ServiceUnavailable()

Graceful Shutdown

import signal

async def shutdown(sig):
    print(f"Received {sig}, shutting down...")
    
    # Stop accepting new requests
    await service.stop()
    
    # Drain: finish in-flight, reject new
    await nc.drain()
    
    # Close connection
    await nc.close()

# Register signal handlers
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
    loop.add_signal_handler(
        sig, lambda s=sig: asyncio.create_task(shutdown(s))
    )

Security Overview

TLS
Encryption
TLS for all connections
AUTH
Authentication
Token, User/Pass, NKey, JWT
ACCT
Authorization
Accounts, permissions

TLS Configuration

# Server config (nats-server.conf)
tls {
    cert_file: "/path/to/server-cert.pem"
    key_file: "/path/to/server-key.pem"
    ca_file: "/path/to/ca.pem"
}

# Client connection
nc = await nats.connect(
    servers=["tls://nats.example.com:4222"],
    tls=ssl.create_default_context(
        purpose=ssl.Purpose.SERVER_AUTH,
        cafile="/path/to/ca.pem"
    ),
)

Authentication Methods

# Token auth
nc = await nats.connect(token="secret-token")

# User/password
nc = await nats.connect(user="myuser", password="mypass")

# NKey (Ed25519 keypair)
nc = await nats.connect(
    nkeys_seed="/path/to/user.nk"
)

# JWT (for multi-tenant)
nc = await nats.connect(
    user_credentials="/path/to/user.creds"
)

Accounts and Permissions

# nats-server.conf
accounts {
    ORDERS {
        users = [
            {user: order_service, password: xxx,
             permissions: {
                publish: ["orders.>"]
                subscribe: ["orders.>", "_INBOX.>"]
            }}
        ]
    }
    ANALYTICS {
        users = [
            {user: analytics, password: xxx,
             permissions: {
                subscribe: ["orders.>"]  # Read-only
            }}
        ]
    }
}

Monitoring: Server Stats

# NATS exposes monitoring endpoints
# http://localhost:8222/varz - General info
# http://localhost:8222/connz - Connections
# http://localhost:8222/routez - Cluster routes
# http://localhost:8222/subz - Subscriptions
# http://localhost:8222/jsz - JetStream

# Enable in server config
http_port: 8222

# Or use nats CLI
$ nats server info
$ nats server report connections
$ nats stream report

Prometheus Metrics

# nats-server.conf
http_port: 8222

# Prometheus scrape config
scrape_configs:
  - job_name: 'nats'
    static_configs:
      - targets: ['nats:8222']
    metrics_path: /metrics

# Key metrics:
# nats_server_connections
# nats_server_messages_in/out
# nats_jetstream_streams
# nats_jetstream_consumers

Pattern: Event Sourcing

Store events, derive state

1OrderCreated{id: 123, items: [...]}
2PaymentReceived{orderId: 123, amount: 99.99}
3OrderShipped{orderId: 123, tracking: "ABC"}
4OrderDelivered{orderId: 123}

Replay events to rebuild state at any point

Event Sourcing with NATS

# Store events in JetStream
await js.add_stream(
    name="ORDERS",
    subjects=["orders.events.>"],
    retention=RetentionPolicy.LIMITS,
    max_age=365 * 24 * 60 * 60,  # 1 year
)

# Publish events
await js.publish("orders.events.created", event_data,
    headers={"Nats-Msg-Id": f"order-{order_id}-created"})

# Rebuild state by replaying
sub = await js.subscribe(
    "orders.events.>",
    deliver_policy=DeliverPolicy.ALL,
)

Pattern: CQRS

Separate read and write models

Commands
-->
NATS
-->
Write Model
Queries
-->
Read Model
<--
Projections

CQRS Implementation

# Write side: commands via request/reply
@service.endpoint("orders.commands.create")
async def create_order(req):
    # Validate and store event
    await js.publish("orders.events.created", event)
    return {"id": order_id}

# Read side: project events to read model
async for msg in subscription.messages:
    event = json.loads(msg.data)
    if event["type"] == "OrderCreated":
        await db.insert("orders_view", event["data"])
    elif event["type"] == "OrderShipped":
        await db.update("orders_view", ...)

Pattern: Saga

Distributed transactions with compensation

Create Order
-->
Reserve Inventory
-->
Process Payment
-->
Ship Order

If any step fails, compensate previous steps

Saga with NATS

# Orchestrator pattern
async def process_order(order):
    try:
        # Step 1
        await nc.request("inventory.reserve", order.items)
        
        # Step 2
        await nc.request("payments.charge", order.payment)
        
        # Step 3
        await nc.request("shipping.create", order.address)
        
    except Exception as e:
        # Compensate
        await nc.publish("inventory.release", order.items)
        await nc.publish("payments.refund", order.payment)
        raise

Pattern: Outbox

Reliable event publishing from database

# Transaction: save + outbox
async with db.transaction():
    await db.insert("orders", order)
    await db.insert("outbox", {
        "subject": "orders.created",
        "payload": order_json
    })

# Background worker publishes outbox
async def outbox_worker():
    while True:
        events = await db.query("SELECT * FROM outbox")
        for event in events:
            await js.publish(event.subject, event.payload)
            await db.delete("outbox", event.id)

Complete Architecture

API Gateway
    |
    v
NATS Cluster (3 nodes, TLS, Auth)
    |
    +-- orders-service (3 replicas, queue group)
    |       +-- ORDERS stream (events)
    |       +-- TASKS stream (work queue)
    |
    +-- payments-service (2 replicas)
    |       +-- Saga coordinator
    |
    +-- analytics-service
    |       +-- ORDERS mirror (read-only)
    |       +-- Projections to read DB
    |
    +-- config KV bucket
    +-- assets Object Store

Deployment Checklist

  • 1. Enable TLS for all connections
  • 2. Configure authentication (JWT for multi-tenant)
  • 3. Set up account permissions
  • 4. Deploy 3+ node cluster for HA
  • 5. Configure JetStream replication (R=3)
  • 6. Enable monitoring endpoints
  • 7. Set up Prometheus/Grafana
  • 8. Implement graceful shutdown
  • 9. Add health checks
  • 10. Document subject naming conventions

Subject Naming Conventions

# Recommended patterns:
{domain}.{entity}.{action}
{domain}.{entity}.{id}.{action}

# Examples:
orders.create
orders.123.update
orders.events.created
orders.commands.cancel

payments.process
payments.events.completed

# Internal:
_INBOX.>          # Reply subjects
$SRV.>            # Service discovery
$JS.API.>         # JetStream API

Interactive Demo

Explore services, monitoring, and patterns:

Launch Demo

Visualization of production architecture patterns

Summary

Topic Key Points
Services Self-registering, discoverable, stats
Security TLS + Auth + Accounts
Monitoring /varz, /jsz, Prometheus
Patterns Event Sourcing, CQRS, Saga, Outbox

Key Takeaway

NATS = Complete Platform
1
Messaging, persistence, KV, files - one system
2
Services framework for microservices
3
Production-ready: secure, observable, resilient

Resources

Workshop Complete!

Session Topics Covered
1. Core Pub/Sub, Queue Groups, Request/Reply
2. JetStream Streams, Consumers, Acks, Replay
3. Advanced KV, Object Store, Mirroring, Sourcing
4. Production Services, Security, Patterns

Questions?

NATS Production Patterns - Session 4

All Workshops