JetStream Advanced

Key-Value, Object Store, and Patterns

Session 3 - NATS Workshop

kv store object store mirroring sourcing

Recap: JetStream

Concept Purpose
Stream Persist messages to subjects
Consumer Track reading position
Ack/Nak Confirm processing
Replay Start from any point

JetStream enables more than just message persistence...

Built on JetStream

Streams
messages
-->
KV Store
key-value
-->
Object Store
large files
Key Insight
KV and Object Store are abstractions built on top of streams. Same persistence, same replication, higher-level API.

Key-Value Store

Distributed key-value storage with history

config bucket
db.host postgres.local rev 3
db.port 5432 rev 1
cache.ttl 3600 rev 2

Each key tracks revision history

Creating a KV Bucket

nc = await nats.connect()
js = nc.jetstream()

# Create KV bucket
kv = await js.create_key_value(
    bucket="config",
    history=5,        # Keep last 5 revisions
    ttl=3600,         # Optional: expire after 1 hour
    replicas=3,       # Replication for HA
)

KV Operations

# Put a value
rev = await kv.put("db.host", b"postgres.local")
print(f"Stored at revision {rev}")

# Get current value
entry = await kv.get("db.host")
print(f"Value: {entry.value}, Rev: {entry.revision}")

# Update (with optimistic locking)
await kv.update("db.host", b"new-host", last=entry.revision)

# Delete
await kv.delete("db.host")

# Purge (remove all history)
await kv.purge("db.host")

Watch for Changes

# Watch all keys
watcher = await kv.watchall()

async for entry in watcher:
    if entry.operation == "PUT":
        print(f"Updated: {entry.key} = {entry.value}")
    elif entry.operation == "DEL":
        print(f"Deleted: {entry.key}")

# Watch specific key pattern
watcher = await kv.watch("db.*")
Use Case
Configuration management: services watch for config changes and reload automatically.

KV History

# Get revision history
history = await kv.history("db.host")

for entry in history:
    print(f"Rev {entry.revision}: {entry.value}")
    print(f"  at {entry.created}")

# Output:
# Rev 1: b'localhost'
# Rev 2: b'postgres.local'
# Rev 3: b'new-postgres.local'

History depth configured with history parameter

KV Use Cases

Configuration
Distributed config with live reload via watch
Feature Flags
Toggle features across services instantly
Service Discovery
Register services, watch for changes
Leader Election
Use revision for compare-and-swap

Object Store

Store large files, chunked automatically

assets bucket
PDF
report-2024.pdf
2.4 MB - 3 chunks
IMG
logo.png
156 KB - 1 chunk
ZIP
backup.zip
50 MB - 50 chunks

Files split into chunks, reassembled on read

Object Store Operations

# Create object store
obj = await js.create_object_store("assets")

# Put file
await obj.put("report.pdf", file_bytes)

# Put from file path
with open("report.pdf", "rb") as f:
    await obj.put("report.pdf", f)

# Get file
result = await obj.get("report.pdf")
data = result.data

# List objects
async for info in obj.list():
    print(f"{info.name}: {info.size} bytes")

Object Metadata

# Store with metadata
meta = ObjectMeta(
    name="report.pdf",
    description="Q4 Financial Report",
    headers={"Content-Type": "application/pdf"}
)
await obj.put("report.pdf", data, meta=meta)

# Get info without downloading
info = await obj.info("report.pdf")
print(f"Size: {info.size}")
print(f"Chunks: {info.chunks}")
print(f"Modified: {info.mtime}")

Object Store Use Cases

File Distribution
Distribute configs, certs, binaries to services
Model Storage
Store ML models, update across inference nodes
Asset Management
Images, documents, media files
Backup/Archive
Replicated storage with clustering

Work Queues

Task distribution with exactly-once processing

from nats.js.api import RetentionPolicy

# WorkQueue retention: delete after first ack
await js.add_stream(
    name="TASKS",
    subjects=["tasks.>"],
    retention=RetentionPolicy.WORK_QUEUE,
)

# Multiple workers share the load
await js.subscribe(
    "tasks.>",
    queue="workers",
    durable="task-processor",
)

Work Queue Flow

Producer
tasks.process
-->
TASKS Stream
WorkQueue retention
-->
Worker 1
Worker 2
Worker 3

Message deleted immediately after ack (not waiting for all consumers)

Stream Mirroring

Replicate streams across clusters for geo-distribution

US-East
ORDERS
source
-->
EU-West
ORDERS
mirror
-->
Asia-Pacific
ORDERS
mirror

Creating a Mirror

# On EU cluster: mirror the US stream
await js.add_stream(
    name="ORDERS",
    mirror=StreamSource(
        name="ORDERS",
        domain="us-east",  # Source domain
    ),
)

# Mirror is read-only
# Consumers read locally with low latency
# Writes go to source stream
Use Case
Disaster recovery: if US-East fails, EU-West has full copy.

Stream Sourcing

Aggregate multiple streams into one

ORDERS-US
ORDERS-EU
ORDERS-ASIA
-->
ORDERS-GLOBAL
aggregated
await js.add_stream(
    name="ORDERS-GLOBAL",
    sources=[
        StreamSource(name="ORDERS-US"),
        StreamSource(name="ORDERS-EU"),
        StreamSource(name="ORDERS-ASIA"),
    ],
)

Mirror vs Source

Feature Mirror Source
Purpose Exact copy Aggregate multiple
Sources Exactly one One or more
Writable No (read-only) Yes (can add subjects)
Use Case DR, geo-distribution Aggregation, fan-in

Subject Transforms

# Transform subjects when sourcing
await js.add_stream(
    name="ORDERS-GLOBAL",
    sources=[
        StreamSource(
            name="ORDERS-US",
            subject_transforms=[
                SubjectTransform(
                    src="orders.>",
                    dest="us.orders.>"
                )
            ]
        ),
        StreamSource(
            name="ORDERS-EU",
            subject_transforms=[
                SubjectTransform(
                    src="orders.>",
                    dest="eu.orders.>"
                )
            ]
        ),
    ],
)

Filtering Sources

# Only source specific subjects
await js.add_stream(
    name="HIGH-VALUE-ORDERS",
    sources=[
        StreamSource(
            name="ORDERS",
            filter_subject="orders.premium.>",
        ),
    ],
)

# Combines with subject transforms
# Great for building derived streams

Putting It Together

Architecture:

Regional Clusters:
  US:   ORDERS stream (primary)
  EU:   ORDERS mirror (read-only copy)
  ASIA: ORDERS mirror (read-only copy)

Central Analytics:
  ORDERS-GLOBAL sources from all regions
  Analytics consumers read globally

Task Processing:
  TASKS stream with WorkQueue retention
  Workers in each region process locally

Interactive Demo

Explore KV Store, Object Store, and stream patterns:

Launch Demo

Interactive visualization of advanced JetStream concepts

Summary

Feature What It Does
KV Store Key-value with history, watch, TTL
Object Store Large files, chunked, metadata
WorkQueue Task processing, delete on ack
Mirror Exact copy for DR/geo
Source Aggregate multiple streams

Key Takeaway

Streams + Abstraction = Data Platform
1
KV for configuration and coordination
2
Object Store for files and artifacts
3
Mirrors and sources for distribution

Resources

Next Session

Production Patterns

  • Services Framework for microservices
  • Error handling and reconnection
  • Security: TLS, auth, accounts
  • Monitoring and observability
  • Event sourcing, CQRS, Saga patterns

Questions?

JetStream Advanced - Session 3