Key-Value, Object Store, and Patterns
Session 3 - NATS Workshop
kv store object store mirroring sourcing
| Concept | Purpose |
|---|---|
| Stream | Persist messages to subjects |
| Consumer | Track reading position |
| Ack/Nak | Confirm processing |
| Replay | Start from any point |
JetStream enables more than just message persistence...
Distributed key-value storage with history
Each key tracks revision history
nc = await nats.connect()
js = nc.jetstream()
# Create KV bucket
kv = await js.create_key_value(
bucket="config",
history=5, # Keep last 5 revisions
ttl=3600, # Optional: expire after 1 hour
replicas=3, # Replication for HA
)
# Put a value
rev = await kv.put("db.host", b"postgres.local")
print(f"Stored at revision {rev}")
# Get current value
entry = await kv.get("db.host")
print(f"Value: {entry.value}, Rev: {entry.revision}")
# Update (with optimistic locking)
await kv.update("db.host", b"new-host", last=entry.revision)
# Delete
await kv.delete("db.host")
# Purge (remove all history)
await kv.purge("db.host")
# Watch all keys
watcher = await kv.watchall()
async for entry in watcher:
if entry.operation == "PUT":
print(f"Updated: {entry.key} = {entry.value}")
elif entry.operation == "DEL":
print(f"Deleted: {entry.key}")
# Watch specific key pattern
watcher = await kv.watch("db.*")
# Get revision history
history = await kv.history("db.host")
for entry in history:
print(f"Rev {entry.revision}: {entry.value}")
print(f" at {entry.created}")
# Output:
# Rev 1: b'localhost'
# Rev 2: b'postgres.local'
# Rev 3: b'new-postgres.local'
History depth configured with history parameter
Store large files, chunked automatically
Files split into chunks, reassembled on read
# Create object store
obj = await js.create_object_store("assets")
# Put file
await obj.put("report.pdf", file_bytes)
# Put from file path
with open("report.pdf", "rb") as f:
await obj.put("report.pdf", f)
# Get file
result = await obj.get("report.pdf")
data = result.data
# List objects
async for info in obj.list():
print(f"{info.name}: {info.size} bytes")
# Store with metadata
meta = ObjectMeta(
name="report.pdf",
description="Q4 Financial Report",
headers={"Content-Type": "application/pdf"}
)
await obj.put("report.pdf", data, meta=meta)
# Get info without downloading
info = await obj.info("report.pdf")
print(f"Size: {info.size}")
print(f"Chunks: {info.chunks}")
print(f"Modified: {info.mtime}")
Task distribution with exactly-once processing
from nats.js.api import RetentionPolicy
# WorkQueue retention: delete after first ack
await js.add_stream(
name="TASKS",
subjects=["tasks.>"],
retention=RetentionPolicy.WORK_QUEUE,
)
# Multiple workers share the load
await js.subscribe(
"tasks.>",
queue="workers",
durable="task-processor",
)
Message deleted immediately after ack (not waiting for all consumers)
Replicate streams across clusters for geo-distribution
# On EU cluster: mirror the US stream
await js.add_stream(
name="ORDERS",
mirror=StreamSource(
name="ORDERS",
domain="us-east", # Source domain
),
)
# Mirror is read-only
# Consumers read locally with low latency
# Writes go to source stream
Aggregate multiple streams into one
await js.add_stream(
name="ORDERS-GLOBAL",
sources=[
StreamSource(name="ORDERS-US"),
StreamSource(name="ORDERS-EU"),
StreamSource(name="ORDERS-ASIA"),
],
)
| Feature | Mirror | Source |
|---|---|---|
| Purpose | Exact copy | Aggregate multiple |
| Sources | Exactly one | One or more |
| Writable | No (read-only) | Yes (can add subjects) |
| Use Case | DR, geo-distribution | Aggregation, fan-in |
# Transform subjects when sourcing
await js.add_stream(
name="ORDERS-GLOBAL",
sources=[
StreamSource(
name="ORDERS-US",
subject_transforms=[
SubjectTransform(
src="orders.>",
dest="us.orders.>"
)
]
),
StreamSource(
name="ORDERS-EU",
subject_transforms=[
SubjectTransform(
src="orders.>",
dest="eu.orders.>"
)
]
),
],
)
# Only source specific subjects
await js.add_stream(
name="HIGH-VALUE-ORDERS",
sources=[
StreamSource(
name="ORDERS",
filter_subject="orders.premium.>",
),
],
)
# Combines with subject transforms
# Great for building derived streams
Architecture:
Regional Clusters:
US: ORDERS stream (primary)
EU: ORDERS mirror (read-only copy)
ASIA: ORDERS mirror (read-only copy)
Central Analytics:
ORDERS-GLOBAL sources from all regions
Analytics consumers read globally
Task Processing:
TASKS stream with WorkQueue retention
Workers in each region process locally
Explore KV Store, Object Store, and stream patterns:
Launch DemoInteractive visualization of advanced JetStream concepts
| Feature | What It Does |
|---|---|
| KV Store | Key-value with history, watch, TTL |
| Object Store | Large files, chunked, metadata |
| WorkQueue | Task processing, delete on ack |
| Mirror | Exact copy for DR/geo |
| Source | Aggregate multiple streams |
JetStream Advanced - Session 3