JetStream provides message persistence, but that is just the foundation. Built on top of streams are higher-level abstractions that solve common distributed systems problems: Key-Value Store for configuration and coordination, Object Store for large files, and stream operations like mirroring and sourcing for geo-distribution.
These are not separate systems. They are APIs that use streams under the hood. The same replication, the same persistence guarantees, the same clustering—just with purpose-built interfaces.
Key-Value Store
The KV Store is a distributed key-value database with versioning. Every key tracks its revision history. You can watch for changes in real-time. It supports TTL for automatic expiration.
nc = await nats.connect()
js = nc.jetstream()
# Create a KV bucket
kv = await js.create_key_value(
bucket="config",
history=5,
ttl=3600,
)
The history parameter controls how many revisions to keep per key. The ttl sets automatic expiration in seconds.
Basic Operations
KV operations are straightforward:
# Put a value, get back the revision number
rev = await kv.put("db.host", b"postgres.local")
# Get current value
entry = await kv.get("db.host")
print(f"Value: {entry.value}, Revision: {entry.revision}")
# Update with optimistic locking
# Fails if someone else modified the key
await kv.update("db.host", b"new-host", last=entry.revision)
# Delete (marks as deleted, keeps in history)
await kv.delete("db.host")
# Purge (removes all history)
await kv.purge("db.host")
The revision number enables optimistic concurrency control. If you read a value at revision 5 and try to update with last=5, the update succeeds only if no one else modified it. This is how you implement safe concurrent updates without locks.
Watching for Changes
The watch feature is where KV becomes powerful for configuration management:
# Watch all keys
watcher = await kv.watchall()
async for entry in watcher:
if entry.operation == "PUT":
print(f"Updated: {entry.key} = {entry.value}")
reload_config(entry.key, entry.value)
elif entry.operation == "DEL":
print(f"Deleted: {entry.key}")
# Watch specific pattern
watcher = await kv.watch("db.*")
Services can watch for configuration changes and reload automatically. No polling, no manual restarts. Change a key and all watchers receive the update immediately.
History
Every key maintains its revision history:
history = await kv.history("db.host")
for entry in history:
print(f"Rev {entry.revision}: {entry.value} at {entry.created}")
This is useful for auditing, debugging, or rolling back to previous values. The history depth is limited by the history parameter set when creating the bucket.
Use Cases
KV Store is ideal for configuration management—store database connection strings, feature flags, or service endpoints. Services watch for changes and reload automatically. For service discovery, services register themselves and watch for peers. For leader election, use the revision number as a compare-and-swap primitive.
Object Store
Object Store handles large files. Unlike regular messages (limited to 1MB), objects can be any size. The client library automatically chunks large files and reassembles them on read.
# Create object store
obj = await js.create_object_store("assets")
# Store a file
with open("model.pkl", "rb") as f:
await obj.put("models/classifier.pkl", f)
# Retrieve a file
result = await obj.get("models/classifier.pkl")
model_data = result.data
# List all objects
async for info in obj.list():
print(f"{info.name}: {info.size} bytes, {info.chunks} chunks")
Metadata
Objects can carry metadata:
from nats.js.api import ObjectMeta
meta = ObjectMeta(
name="classifier.pkl",
description="Production classifier v2.3",
headers={"Content-Type": "application/octet-stream", "Version": "2.3"}
)
await obj.put("models/classifier.pkl", data, meta=meta)
# Get info without downloading the file
info = await obj.info("models/classifier.pkl")
print(f"Size: {info.size}, Modified: {info.mtime}")
Use Cases
Object Store works well for distributing ML models to inference nodes, sharing configuration files or certificates across services, storing backups with built-in replication, and managing any large files that need to be shared across a cluster.
Work Queues
JetStream streams have a retention policy called WorkQueue. With this policy, messages are deleted immediately after the first acknowledgment—not when limits are reached, and not when all consumers acknowledge.
from nats.js.api import RetentionPolicy
await js.add_stream(
name="TASKS",
subjects=["tasks.>"],
retention=RetentionPolicy.WORK_QUEUE,
)
# Multiple workers with queue group
await js.subscribe(
"tasks.>",
queue="workers",
durable="task-processor",
)
This is perfect for task queues. A task is published, one worker picks it up, processes it, acknowledges, and the task disappears. If the worker crashes before acknowledging, the message redelivers to another worker.
Combine with queue groups for load balancing. Multiple workers subscribe with the same queue name, and each task goes to exactly one worker.
Stream Mirroring
Mirroring replicates a stream from one cluster to another. The mirror is a read-only copy that stays synchronized with the source.
# On the remote cluster, create a mirror of the source
await js.add_stream(
name="ORDERS",
mirror=StreamSource(
name="ORDERS",
domain="us-east",
),
)
Messages published to the source stream automatically replicate to all mirrors. Consumers in each region read from their local mirror with low latency.
Use cases include disaster recovery—if the primary region fails, you have a full copy in the mirror. Geo-distribution puts data close to users for fast reads. And read scaling offloads read traffic to mirrors without hitting the source.
Stream Sourcing
Sourcing is the opposite of mirroring. Instead of replicating one stream to many, it aggregates many streams into one.
await js.add_stream(
name="ORDERS-GLOBAL",
sources=[
StreamSource(name="ORDERS-US"),
StreamSource(name="ORDERS-EU"),
StreamSource(name="ORDERS-ASIA"),
],
)
Messages from all source streams flow into the aggregate stream. Unlike mirrors, sourced streams can also accept direct publishes—they are not read-only.
Subject Transforms
When sourcing, you can transform subjects to avoid collisions or add context:
await js.add_stream(
name="ORDERS-GLOBAL",
sources=[
StreamSource(
name="ORDERS-US",
subject_transforms=[
SubjectTransform(src="orders.>", dest="us.orders.>")
]
),
StreamSource(
name="ORDERS-EU",
subject_transforms=[
SubjectTransform(src="orders.>", dest="eu.orders.>")
]
),
],
)
Now messages from ORDERS-US appear in ORDERS-GLOBAL with subjects like us.orders.new, and messages from ORDERS-EU appear as eu.orders.new. Consumers can filter by region using subject wildcards.
Filtering
You can also filter which messages to source:
await js.add_stream(
name="HIGH-VALUE-ORDERS",
sources=[
StreamSource(
name="ORDERS",
filter_subject="orders.premium.>",
),
],
)
This creates a derived stream containing only premium orders. Useful for building specialized views without duplicating data.
Putting It All Together
A realistic architecture might look like this:
Regional Clusters (US, EU, Asia):
- ORDERS stream: local order events
- ORDERS mirror from other regions (read-only replicas)
- TASKS stream with WorkQueue retention
- config KV bucket for service configuration
- assets Object Store for shared files
Central Analytics:
- ORDERS-GLOBAL sources from all regional ORDERS streams
- Analytics consumers process the global view
Each Region:
- Services watch config KV for live updates
- Workers process tasks from local TASKS stream
- Applications read from local ORDERS mirror
This gives you local writes with global reads, automatic failover through mirrors, distributed task processing, and centralized configuration.
Implementation Notes
KV Store and Object Store are built on regular JetStream streams with special naming conventions. KV buckets create streams named KV_{bucket}. Object stores create streams named OBJ_{bucket}. You can actually see these streams in the stream list.
Mirroring works by the mirror cluster connecting to the source cluster and consuming messages. The connection uses NATS itself, so it works across any network that allows NATS connections. Latency between source and mirror depends on network conditions.
Sourcing is similar but in reverse—the aggregating stream connects to multiple sources and pulls messages from each. Subject transforms happen at the aggregator, not the source.
Try It Yourself
The interactive demo for this session lets you experiment with KV operations, Object Store, work queues, mirroring, and sourcing: i33ym.cc/demo-nats-advanced
What's Next
In the final session, we cover production patterns: the Services Framework for building discoverable microservices, security with TLS and authentication, monitoring and observability, and architectural patterns like Event Sourcing and CQRS built on NATS.
Comments (0)
No comments yet. Be the first to share your thoughts.
Log in to leave a comment.