Contract-Checked Event Buses: Making Pub/Sub Safe Between Micro-Features
At small scale, an internal event bus feels magical. One team adds “user.session.started”, another listens and fires analytics, another triggers notifications. No coordination meeting, no new REST endpoints, just vibes.
Fast-forward a year:
You’ve got dozens of topics, half a dozen “user.*” events, multiple schemas for “pet.updated”, a consumer that silently drops messages because a field changed from int to str, and no one is sure which features will break if you touch that one producer.
This post is about turning that chaos into a contract-first event bus. We’ll walk through how to:
- Model events as typed, versioned schemas (Pydantic/dataclasses).
- Share explicit contracts between producers and consumers.
- Use golden tests in CI to catch breaking changes before they ship.
- Make event evolution boring and predictable, instead of spooky.
By the end, you should have a mental blueprint for making your pub/sub layer evolve safely—even as micro-features multiply.
The Quiet Failure Modes of Pub/Sub Between Micro-Features
Most teams don’t decide to build a fragile event bus. It emerges slowly from “just one more topic” decisions.
Typical failure modes:
- Shape drift
A producer adds a field, changes a type, or stops sending something “optional”. A consumer written six months ago assumes the old shape and quietly fails or miscomputes. - Ghost consumers
Nobody remembers that an old job still listens topet.weight.updated. You deprecate the topic and only notice when a downstream ML pipeline stops updating. - Implicit contracts
The real definition ofPetWeightUpdatedlives across three places: the producer code, a wiki page from last year, and a consumer-specific mapping function. - Production-only surprises
Locally, your tests still pass. In staging, nobody emits the weird edge-case event. In production, a slightly malformed payload from one path crashes a consumer you forgot to harden.
All of this is especially painful in micro-feature architectures: a single Python backend hosting many small, event-driven modules (alerts, AI insights, device analytics, billing nudges) that communicate via an internal bus (Redis, SNS/SQS, Kafka, or even in-process queues).
The root problem isn’t the transport. It’s that events are treated as strings and JSON blobs, not as contracts.

What We Mean by a Contract-Checked Event Bus
A contract-checked event bus doesn’t require exotic infrastructure. It’s a discipline and a set of tools layered on top of whatever you already use (SQS, Kafka, Redis, SNS, NATS, or an internal in-memory bus).
- Each event has a type and version
You don’t emit “user.updated”. You emit something likeUserProfileUpdated.v1, with a known schema. - Event schemas are code, not tribal knowledge
Defined as Pydantic models or dataclasses in a central package (events.contracts), imported by both producers and consumers. - Golden tests validate the contract
For each event type, you maintain sample payloads (goldens) that represent valid shapes and key edge cases. Producers must still emit them; consumers must still accept them. - CI enforces contracts, not just style
If you modify a schema in a breaking way, the golden tests fail, and your PR doesn’t merge. - Observability tells you who’s using what
Emissions and consumptions are logged with type, version, and feature tag, so you can map dependencies and deprecate safely.
This approach keeps your pub/sub from becoming a side-channel for surprises. It turns events into first-class APIs—with the same level of rigor you’d apply to REST or gRPC.
Modeling Events as Typed, Versioned Schemas
Assume you have a simple event bus wrapper around SQS, Kafka, or Redis. Instead of sending raw JSON dicts, you define event classes.
from enum import Enum
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Literal
class EventType(str, Enum):
PET_WEIGHT_UPDATED_V1 = "pet.weight.updated.v1"
class PetWeightUpdatedV1(BaseModel):
event_type: Literal[EventType.PET_WEIGHT_UPDATED_V1] = EventType.PET_WEIGHT_UPDATED_V1
pet_id: str = Field(..., description="Internal pet identifier")
device_id: str = Field(..., description="Source device or bowl")
weight_grams: int = Field(..., ge=0)
measured_at: datetime
source: str = Field(..., description="sensor|app|backfill")
event_typeis part of the payload
This avoids mixed-topic confusion and makes debugging easier in logs and traces.- Fields are typed and documented
Pydantic gives validation for free; your producer can’t emit malformed events without failing fast. - Version is baked into the type name
PetWeightUpdatedV1andEventType.PET_WEIGHT_UPDATED_V1make it obvious which schema you’re using.
Your event bus wrapper can then enforce these contracts:
import json
from typing import Type, TypeVar
T = TypeVar("T", bound=BaseModel)
def publish(event: BaseModel, topic: str) -> None:
payload = event.model_dump(mode="json")
# send to Kafka/SQS/Redis, etc.
broker_client.publish(topic, json.dumps(payload).encode("utf-8"))
def parse_event(payload: bytes, model: Type[T]) -> T:
data = json.loads(payload.decode("utf-8"))
return model.model_validate(data)
Now both producers and consumers live in the same type universe, instead of each doing ad-hoc JSON parsing.
Evolving Events Without Breaking Everyone
Once events are typed, the next challenge is evolution. Requirements change; schemas must too. The trick is to standardize how you change them.
Safe changes within the same version
Stay on the same version (e.g., *.v1) when:
- You add a nullable/optional field with a reasonable default.
- You widen a type in a backwards-compatible way (e.g., enum adds new values; string length increases).
class PetWeightUpdatedV1(BaseModel):
# ... existing fields ...
location_hint: str | None = Field(
default=None,
description="Optional text like 'kitchen bowl' or 'travel bowl'"
)
Golden tests (we’ll get to them) ensure existing goldens still validate. New goldens can include the optional field.
Changes that require a new version
Create a new version (e.g., PetWeightUpdatedV2) when you:
- Change field meaning (e.g.,
weight_gramsstarts including the bowl weight). - Change types in incompatible ways (
int→str, nested structure rewritten). - Drop fields that consumers may still depend on.
class PetWeightUpdatedV2(BaseModel):
event_type: Literal[EventType.PET_WEIGHT_UPDATED_V2] = EventType.PET_WEIGHT_UPDATED_V2
pet_id: str
device_id: str
net_weight_grams: int
tare_weight_grams: int
measured_at: datetime
source: str
- Start emitting both v1 and v2 for a while (dual-write).
- Update consumers to support v2.
- Deprecate v1 once usage is low enough (observability helps here).
Deprecation and sunsetting
Deprecation shouldn’t be a Slack message. Treat it as a mini API lifecycle:
- Mark the version as
@deprecatedin code and docs. - Add a planned removal date and owner.
- Alert on remaining v1 traffic after a certain time window.
- Remove producer support only when no consumers rely on it.
If this feels like a lot for “internal events”, that’s the point: they’re just as critical as external APIs when micro-features depend on them.
Golden Tests: Turning Contracts into CI Enforcers
Typed schemas are great, but they don’t stop someone from “harmlessly” rewriting an event in a way that passes type checks but breaks semantics.
What is a golden test for events?
For each event type, you maintain:
- A set of canonical example payloads (JSON files) that represent:
- Typical cases
- Edge cases (missing optional fields, enum edge values)
- Old variants you still support
- A test that ensures:
- Producers can still generate payloads that match the contracts.
- Consumers can still parse and handle them without raising or misbehaving.
Example layout
events/
contracts/
pet_weight_updated_v1.py
pet_weight_updated_v2.py
goldens/
pet_weight_updated_v1/
typical.json
missing_location_hint.json
pet_weight_updated_v2/
dual_bowl_setup.json
tests/
test_golden_events.py
And then a test like:
import json
import pytest
from events.contracts import PetWeightUpdatedV1, PetWeightUpdatedV2
from pathlib import Path
GOLDENS_DIR = Path(__file__).parent.parent / "events" / "goldens"
@pytest.mark.parametrize("event_cls, golden_dir", [
(PetWeightUpdatedV1, "pet_weight_updated_v1"),
(PetWeightUpdatedV2, "pet_weight_updated_v2"),
])
def test_goldens_still_parse(event_cls, golden_dir):
for golden_path in (GOLDENS_DIR / golden_dir).glob("*.json"):
raw = json.loads(golden_path.read_text())
event = event_cls.model_validate(raw)
# optionally: assert derived invariants
assert event.pet_id
assert event.measured_at
Any incompatible change to the schema or consumer logic will cause this test to fail before you merge.
Enforcing consumer behavior
You can go further and exercise actual consumer handlers:
def test_golden_events_for_weight_analytics():
for golden_path in (GOLDENS_DIR / "pet_weight_updated_v2").glob("*.json"):
event = PetWeightUpdatedV2.model_validate(
json.loads(golden_path.read_text())
)
result = weight_analytics_consumer.handle(event)
# assert that no exceptions are raised and invariants hold
assert result.success
Now you’re not just validating shape—you’re validating behavior against known, curated examples.

Observability: Who Emits What, Who Consumes It, and When It Breaks
Contracts and tests protect you at deploy time. Observability protects you at runtime. For a contract-checked event bus, you want to log and trace:
- Event type and version (
PetWeightUpdated.v2) - Producing feature (
feature=smart_bowl,feature=ai_insights) - Consuming feature (
consumer=weight_analytics_service) - Outcome (processed, retried, discarded, dead-lettered)
- Latency (publish → consume)
Even if your transport is simple (Redis or SNS/SQS), adding a thin shared wrapper around publish/consume lets you emit structured logs:
def publish(event: BaseModel, topic: str, feature: str) -> None:
payload = event.model_dump(mode="json")
logger.info(
"event_published",
extra={
"event_type": payload["event_type"],
"version": payload["event_type"].rsplit(".v", 1)[-1],
"topic": topic,
"feature": feature,
},
)
broker_client.publish(topic, json.dumps(payload).encode("utf-8"))
Consumers can log similarly, including outcome and duration.
Once you have this data, you can:
- Build a dependency map: “Which features consume
PetWeightUpdated.v1vsv2?” - Alert when a deprecated version is still active after a target date.
- Investigate incidents quickly: “Which consumer is dropping events on this topic?”
You don’t need a huge observability stack to start—structured logs that can be filtered by event_type and consumer already take you far.
At Hoomanely, our mission is to keep pets healthier for longer by connecting smart devices, real-world behavior, and AI insights into one coherent experience for pet parents.
That inevitably means a lot of micro-features talking to each other:
- A device session from a smart tracker or bowl emits weight samples, motion summaries, or thermal flags.
- AI modules produce nutrition nudges or anomaly alerts.
- Analytics features aggregate and score behavior over days and weeks.
- Notification and coaching modules decide when to nudge the pet parent.
Instead of wiring these with tightly coupled APIs, we lean heavily on internal events—but that only works if those events are safe to evolve.
A simplified pattern we use:
- Device and inference services publish contract-checked events like
PetSessionSummary.v1orPetWeightTrend.v2. - Analytics, insights, and notification features consume them via a shared event contracts package.
- Golden tests in CI ensure that when we adjust a schema (e.g., adding a “possible dehydration” flag), we don’t accidentally break an older consumer.
- Observability ties everything together so we can see, for a given event type, which internal features are relying on it.
The result is that we can keep adding new wellness features and AI insights without turning the system into a fragile web of implicit dependencies—crucial when you’re shipping firmware + apps + AI together.

Rolling This Out Without Stopping the World
If you already have a messy event bus, you don’t need a big-bang rewrite. You can phase in contract-checking.
- Pick one event family to harden
Choose something central but manageable (e.g., “pet weight updates” or “user session events”). - Define canonical schemas
Create Pydantic models for the current real-world shapes you see in the wild. Don’t idealize yet—reflect reality. - Backfill goldens from production samples
Capture actual payloads (sanitized), save them as golden JSON files, and write parsing tests. - Add a thin wrapper around publish/consume
Even if some code still uses raw transports, start routing new and refactored paths via a small library that:- Validates event models
- Attaches event type/version
- Logs structured metadata
- Migrate producers gradually
Update one producer at a time to emit typed events. Keep consumers tolerant where necessary. - Introduce versioning rules
Document your evolution policy (additive vs breaking, when to bump version, how to dual-write). - Turn CI up slowly
Start with non-blocking tests (warnings), then make them blocking once you’re confident. The goal is to make contract violations boring to detect. - Socialize the model
Treat the event contracts package as a first-class API surface. Review changes. Give them owners. Add them to onboarding docs.
After a few cycles, engineers stop thinking “just emit JSON” and start thinking “which event contract and version should I use?” That mindset shift is the real win.
Key Takeaways
- Pub/sub is an API, not a side-channel.
If micro-features depend on events, treat those events with the same rigor as any public API. - Typed, versioned schemas are the foundation.
Pydantic or dataclasses give you validation, documentation, and a place to reason about changes. - Golden tests turn contracts into enforcement.
Sample payloads + CI ensure producers and consumers stay compatible as code evolves. - Observability gives you a map, not a guess.
With structured logs and simple maps, you can see who emits and consumes what, and deprecate safely. - Rollout can be incremental.
Start with a single event family, wrap publishes/consumes, and expand from there.
For teams growing a constellation of micro-features—whether in pet-tech like Hoomanely or any other domain—contract-checked event buses are one of the cleanest ways to keep flexibility and safety as you scale.