i33ym

NATS Production

You understand NATS fundamentals—pub/sub, JetStream, KV stores. Now it is time to deploy. Production systems need service discovery, security, monitoring, and battle-tested patterns. This is where NATS becomes a complete platform.

Services Framework

The Services Framework turns NATS into a microservices platform. Services register themselves automatically. Clients discover them dynamically. Stats are collected without any code.

from nats.micro import Service, ServiceConfig

config = ServiceConfig(
    name="orders",
    version="1.0.0",
    description="Order processing service",
)

service = await Service.create(nc, config)

@service.endpoint("create")
async def create_order(req):
    order = json.loads(req.data)
    result = await process_order(order)
    return json.dumps({"id": result.id, "status": "created"})

Each endpoint becomes a subject. The create endpoint above listens on orders.create. Clients call it with request/reply:

response = await nc.request("orders.create", order_json, timeout=5.0)

Service Discovery

Services register with NATS automatically. No external service registry needed. Clients discover services using built-in $SRV subjects:

# Find all services
response = await nc.request("$SRV.INFO", b"")
services = json.loads(response.data)

# Find specific service
response = await nc.request("$SRV.INFO.orders", b"")

# Get service stats
response = await nc.request("$SRV.STATS.orders", b"")

# Health check
response = await nc.request("$SRV.PING.orders", b"")

Stats include request counts, error counts, and average processing times—all tracked automatically.

Connection Management

Production connections need resilience. Configure reconnection, timeouts, and callbacks:

nc = await nats.connect(
    servers=["nats://server1:4222", "nats://server2:4222"],
    max_reconnect_attempts=-1,
    reconnect_time_wait=2,
    connect_timeout=5,
    ping_interval=20,
    disconnected_cb=on_disconnect,
    reconnected_cb=on_reconnect,
    error_cb=on_error,
)

With max_reconnect_attempts=-1, the client retries forever. During disconnection, messages are buffered (up to a limit). When reconnected, buffered messages flush automatically.

Graceful Shutdown

Services should shut down cleanly. The drain() method completes in-flight messages before closing:

import signal

async def shutdown(sig):
    await service.stop()
    await nc.drain()
    await nc.close()

for sig in (signal.SIGTERM, signal.SIGINT):
    loop.add_signal_handler(
        sig, lambda s=sig: asyncio.create_task(shutdown(s))
    )

Security

Production NATS needs three layers of security: encryption, authentication, and authorization.

TLS Encryption

Enable TLS in the server configuration:

# nats-server.conf
tls {
    cert_file: "/path/to/server-cert.pem"
    key_file: "/path/to/server-key.pem"
    ca_file: "/path/to/ca.pem"
}

Clients connect with TLS:

import ssl

nc = await nats.connect(
    servers=["tls://nats.example.com:4222"],
    tls=ssl.create_default_context(
        purpose=ssl.Purpose.SERVER_AUTH,
        cafile="/path/to/ca.pem"
    ),
)

Authentication

NATS supports multiple authentication methods:

# Token
nc = await nats.connect(token="secret-token")

# Username/password
nc = await nats.connect(user="myuser", password="mypass")

# NKey (Ed25519 keypair)
nc = await nats.connect(nkeys_seed="/path/to/user.nk")

# JWT credentials (recommended for production)
nc = await nats.connect(user_credentials="/path/to/user.creds")

JWT-based authentication is recommended for production. It supports multi-tenancy with accounts and fine-grained permissions.

Authorization with Accounts

Accounts isolate users and define permissions:

# nats-server.conf
accounts {
    ORDERS {
        users = [
            {user: order_service, password: xxx,
             permissions: {
                publish: ["orders.>"]
                subscribe: ["orders.>", "_INBOX.>"]
            }}
        ]
    }
    ANALYTICS {
        users = [
            {user: analytics, password: xxx,
             permissions: {
                subscribe: ["orders.>"]
            }}
        ]
    }
}

The analytics user can subscribe to order events but cannot publish. This enforces separation of concerns at the messaging layer.

Monitoring

NATS exposes HTTP endpoints for monitoring:

# Enable in server config
http_port: 8222

# Endpoints:
# /varz   - General server info
# /connz  - Connection details
# /routez - Cluster routing
# /subz   - Subscription info
# /jsz    - JetStream stats

Scrape these with Prometheus:

scrape_configs:
  - job_name: 'nats'
    static_configs:
      - targets: ['nats:8222']
    metrics_path: /metrics

Key metrics to monitor: connection count, message rates, JetStream stream sizes, consumer lag, and pending acknowledgments.

Event Sourcing

Event Sourcing stores state changes as a sequence of events. Instead of updating a record, you append an event. Current state is computed by replaying events.

JetStream is perfect for this. Streams persist events with sequence numbers. Consumers can replay from any point.

# Create event stream
await js.add_stream(
    name="ORDERS",
    subjects=["orders.events.>"],
    retention=RetentionPolicy.LIMITS,
    max_age=365 * 24 * 60 * 60,
)

# Publish events
await js.publish("orders.events.created", 
    json.dumps({"orderId": 123, "items": [...]}),
    headers={"Nats-Msg-Id": f"order-123-created"})

await js.publish("orders.events.paid",
    json.dumps({"orderId": 123, "amount": 99.99}))

# Rebuild state by replaying
sub = await js.subscribe(
    "orders.events.>",
    deliver_policy=DeliverPolicy.ALL,
)

order_state = {}
async for msg in sub.messages:
    event = json.loads(msg.data)
    apply_event(order_state, event)
    await msg.ack()

Benefits: complete audit trail, ability to replay and rebuild state, temporal queries (what was the state at time T), and debugging by replaying events.

CQRS

Command Query Responsibility Segregation separates read and write operations. Commands modify state. Queries read from optimized views.

With NATS, commands go through request/reply. Events are published to streams. Projections consume events and update read models.

# Write side: handle commands
@service.endpoint("orders.commands.create")
async def create_order(req):
    order = validate(req.data)
    await js.publish("orders.events.created", event_data)
    return {"id": order.id}

# Read side: project events to database
async for msg in subscription.messages:
    event = json.loads(msg.data)
    if event["type"] == "OrderCreated":
        await db.insert("orders_view", event["data"])
    await msg.ack()

The read model can be denormalized, cached, or stored in a different database optimized for queries. The write model focuses on validation and event generation.

Saga Pattern

Distributed transactions across services are hard. The Saga pattern breaks a transaction into steps, each with a compensating action if something fails.

async def process_order(order):
    try:
        # Step 1: Reserve inventory
        await nc.request("inventory.reserve", order.items)
        
        # Step 2: Process payment
        await nc.request("payments.charge", order.payment)
        
        # Step 3: Create shipment
        await nc.request("shipping.create", order.address)
        
    except Exception as e:
        # Compensate in reverse order
        await nc.publish("shipping.cancel", order.id)
        await nc.publish("payments.refund", order.payment)
        await nc.publish("inventory.release", order.items)
        raise

This is the orchestrator pattern—a central coordinator manages the saga. Alternatively, use choreography where each service listens for events and decides what to do next.

Outbox Pattern

Publishing events after a database write is risky. The database might commit but the publish might fail, leaving inconsistent state.

The Outbox pattern solves this. Write the event to an outbox table in the same transaction as the business data. A background worker publishes from the outbox.

# In transaction: save order and outbox entry
async with db.transaction():
    await db.insert("orders", order)
    await db.insert("outbox", {
        "subject": "orders.created",
        "payload": order_json,
        "created_at": datetime.now()
    })

# Background worker: publish and delete
async def outbox_worker():
    while True:
        entries = await db.query("SELECT * FROM outbox ORDER BY created_at")
        for entry in entries:
            await js.publish(entry.subject, entry.payload,
                headers={"Nats-Msg-Id": str(entry.id)})
            await db.delete("outbox", entry.id)
        await asyncio.sleep(1)

The Nats-Msg-Id header ensures idempotency. If the worker crashes after publishing but before deleting, the retry will be deduplicated.

Complete Architecture

A production NATS deployment typically includes:

NATS Cluster (3 nodes minimum)
├── TLS enabled
├── JWT authentication
├── Account-based permissions
│
├── Services
│   ├── orders-service (3 replicas, queue group)
│   ├── payments-service (2 replicas)
│   └── shipping-service (2 replicas)
│
├── JetStream
│   ├── ORDERS stream (events, 7-day retention)
│   ├── TASKS stream (work queue)
│   └── Mirrors in DR region
│
├── KV Stores
│   └── config bucket (service configuration)
│
└── Monitoring
    ├── Prometheus scraping /metrics
    └── Grafana dashboards

Deployment Checklist

Before going to production:

  1. Enable TLS for all connections
  2. Configure authentication (JWT recommended)
  3. Set up account permissions
  4. Deploy 3+ node cluster for high availability
  5. Configure JetStream with replication factor 3
  6. Enable monitoring endpoints
  7. Set up Prometheus and Grafana
  8. Implement graceful shutdown in all services
  9. Add health checks for load balancers
  10. Document subject naming conventions

Subject Naming Conventions

Consistent naming makes systems easier to understand and secure:

# Pattern: {domain}.{entity}.{action}
orders.create
orders.get
orders.cancel

# With ID: {domain}.{entity}.{id}.{action}
orders.123.update

# Events: {domain}.events.{event-type}
orders.events.created
orders.events.shipped

# Commands: {domain}.commands.{command}
orders.commands.create

# Internal subjects
_INBOX.>     # Reply subjects (auto-generated)
$SRV.>       # Service discovery
$JS.API.>    # JetStream API

Try It Yourself

The interactive demo for this session visualizes services, monitoring, event sourcing, and the saga pattern: i33ym.cc/demo-nats-production

Conclusion

NATS is more than a message broker. With Core NATS, JetStream, KV Store, Object Store, and the Services Framework, it is a complete platform for distributed systems. Add production-grade security, monitoring, and proven patterns like Event Sourcing and Sagas, and you have everything needed to build reliable, scalable applications.

Start simple with pub/sub. Add persistence when needed. Layer on patterns as complexity grows. NATS scales with you—from a single service to a global distributed system.

↑ 0 ↓ 0

Comments (0)

No comments yet. Be the first to share your thoughts.

← back to essays