Contract-Checked Event Buses: Making Pub/Sub Safe Between Micro-Features

Contract-Checked Event Buses: Making Pub/Sub Safe Between Micro-Features

At small scale, an internal event bus feels magical. One team adds “user.session.started”, another listens and fires analytics, another triggers notifications. No coordination meeting, no new REST endpoints, just vibes.

Fast-forward a year:
You’ve got dozens of topics, half a dozen “user.*” events, multiple schemas for “pet.updated”, a consumer that silently drops messages because a field changed from int to str, and no one is sure which features will break if you touch that one producer.

This post is about turning that chaos into a contract-first event bus. We’ll walk through how to:

  • Model events as typed, versioned schemas (Pydantic/dataclasses).
  • Share explicit contracts between producers and consumers.
  • Use golden tests in CI to catch breaking changes before they ship.
  • Make event evolution boring and predictable, instead of spooky.

By the end, you should have a mental blueprint for making your pub/sub layer evolve safely—even as micro-features multiply.


The Quiet Failure Modes of Pub/Sub Between Micro-Features

Most teams don’t decide to build a fragile event bus. It emerges slowly from “just one more topic” decisions.

Typical failure modes:

  • Shape drift
    A producer adds a field, changes a type, or stops sending something “optional”. A consumer written six months ago assumes the old shape and quietly fails or miscomputes.
  • Ghost consumers
    Nobody remembers that an old job still listens to pet.weight.updated. You deprecate the topic and only notice when a downstream ML pipeline stops updating.
  • Implicit contracts
    The real definition of PetWeightUpdated lives across three places: the producer code, a wiki page from last year, and a consumer-specific mapping function.
  • Production-only surprises
    Locally, your tests still pass. In staging, nobody emits the weird edge-case event. In production, a slightly malformed payload from one path crashes a consumer you forgot to harden.

All of this is especially painful in micro-feature architectures: a single Python backend hosting many small, event-driven modules (alerts, AI insights, device analytics, billing nudges) that communicate via an internal bus (Redis, SNS/SQS, Kafka, or even in-process queues).

The root problem isn’t the transport. It’s that events are treated as strings and JSON blobs, not as contracts.


What We Mean by a Contract-Checked Event Bus

A contract-checked event bus doesn’t require exotic infrastructure. It’s a discipline and a set of tools layered on top of whatever you already use (SQS, Kafka, Redis, SNS, NATS, or an internal in-memory bus).

  1. Each event has a type and version
    You don’t emit “user.updated”. You emit something like UserProfileUpdated.v1, with a known schema.
  2. Event schemas are code, not tribal knowledge
    Defined as Pydantic models or dataclasses in a central package (events.contracts), imported by both producers and consumers.
  3. Golden tests validate the contract
    For each event type, you maintain sample payloads (goldens) that represent valid shapes and key edge cases. Producers must still emit them; consumers must still accept them.
  4. CI enforces contracts, not just style
    If you modify a schema in a breaking way, the golden tests fail, and your PR doesn’t merge.
  5. Observability tells you who’s using what
    Emissions and consumptions are logged with type, version, and feature tag, so you can map dependencies and deprecate safely.

This approach keeps your pub/sub from becoming a side-channel for surprises. It turns events into first-class APIs—with the same level of rigor you’d apply to REST or gRPC.


Modeling Events as Typed, Versioned Schemas

Assume you have a simple event bus wrapper around SQS, Kafka, or Redis. Instead of sending raw JSON dicts, you define event classes.

from enum import Enum
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Literal


class EventType(str, Enum):
    PET_WEIGHT_UPDATED_V1 = "pet.weight.updated.v1"


class PetWeightUpdatedV1(BaseModel):
    event_type: Literal[EventType.PET_WEIGHT_UPDATED_V1] = EventType.PET_WEIGHT_UPDATED_V1
    pet_id: str = Field(..., description="Internal pet identifier")
    device_id: str = Field(..., description="Source device or bowl")
    weight_grams: int = Field(..., ge=0)
    measured_at: datetime
    source: str = Field(..., description="sensor|app|backfill")
  • event_type is part of the payload
    This avoids mixed-topic confusion and makes debugging easier in logs and traces.
  • Fields are typed and documented
    Pydantic gives validation for free; your producer can’t emit malformed events without failing fast.
  • Version is baked into the type name
    PetWeightUpdatedV1 and EventType.PET_WEIGHT_UPDATED_V1 make it obvious which schema you’re using.

Your event bus wrapper can then enforce these contracts:

import json
from typing import Type, TypeVar

T = TypeVar("T", bound=BaseModel)

def publish(event: BaseModel, topic: str) -> None:
    payload = event.model_dump(mode="json")
    # send to Kafka/SQS/Redis, etc.
    broker_client.publish(topic, json.dumps(payload).encode("utf-8"))


def parse_event(payload: bytes, model: Type[T]) -> T:
    data = json.loads(payload.decode("utf-8"))
    return model.model_validate(data)

Now both producers and consumers live in the same type universe, instead of each doing ad-hoc JSON parsing.


Evolving Events Without Breaking Everyone

Once events are typed, the next challenge is evolution. Requirements change; schemas must too. The trick is to standardize how you change them.

Safe changes within the same version

Stay on the same version (e.g., *.v1) when:

  • You add a nullable/optional field with a reasonable default.
  • You widen a type in a backwards-compatible way (e.g., enum adds new values; string length increases).
class PetWeightUpdatedV1(BaseModel):
    # ... existing fields ...
    location_hint: str | None = Field(
        default=None,
        description="Optional text like 'kitchen bowl' or 'travel bowl'"
    )

Golden tests (we’ll get to them) ensure existing goldens still validate. New goldens can include the optional field.

Changes that require a new version

Create a new version (e.g., PetWeightUpdatedV2) when you:

  • Change field meaning (e.g., weight_grams starts including the bowl weight).
  • Change types in incompatible ways (intstr, nested structure rewritten).
  • Drop fields that consumers may still depend on.
class PetWeightUpdatedV2(BaseModel):
    event_type: Literal[EventType.PET_WEIGHT_UPDATED_V2] = EventType.PET_WEIGHT_UPDATED_V2
    pet_id: str
    device_id: str
    net_weight_grams: int
    tare_weight_grams: int
    measured_at: datetime
    source: str
  • Start emitting both v1 and v2 for a while (dual-write).
  • Update consumers to support v2.
  • Deprecate v1 once usage is low enough (observability helps here).

Deprecation and sunsetting

Deprecation shouldn’t be a Slack message. Treat it as a mini API lifecycle:

  1. Mark the version as @deprecated in code and docs.
  2. Add a planned removal date and owner.
  3. Alert on remaining v1 traffic after a certain time window.
  4. Remove producer support only when no consumers rely on it.

If this feels like a lot for “internal events”, that’s the point: they’re just as critical as external APIs when micro-features depend on them.


Golden Tests: Turning Contracts into CI Enforcers

Typed schemas are great, but they don’t stop someone from “harmlessly” rewriting an event in a way that passes type checks but breaks semantics.

What is a golden test for events?

For each event type, you maintain:

  • A set of canonical example payloads (JSON files) that represent:
    • Typical cases
    • Edge cases (missing optional fields, enum edge values)
    • Old variants you still support
  • A test that ensures:
    • Producers can still generate payloads that match the contracts.
    • Consumers can still parse and handle them without raising or misbehaving.

Example layout

events/
  contracts/
    pet_weight_updated_v1.py
    pet_weight_updated_v2.py
  goldens/
    pet_weight_updated_v1/
      typical.json
      missing_location_hint.json
    pet_weight_updated_v2/
      dual_bowl_setup.json
tests/
  test_golden_events.py

And then a test like:

import json
import pytest
from events.contracts import PetWeightUpdatedV1, PetWeightUpdatedV2
from pathlib import Path

GOLDENS_DIR = Path(__file__).parent.parent / "events" / "goldens"

@pytest.mark.parametrize("event_cls, golden_dir", [
    (PetWeightUpdatedV1, "pet_weight_updated_v1"),
    (PetWeightUpdatedV2, "pet_weight_updated_v2"),
])
def test_goldens_still_parse(event_cls, golden_dir):
    for golden_path in (GOLDENS_DIR / golden_dir).glob("*.json"):
        raw = json.loads(golden_path.read_text())
        event = event_cls.model_validate(raw)
        # optionally: assert derived invariants
        assert event.pet_id
        assert event.measured_at

Any incompatible change to the schema or consumer logic will cause this test to fail before you merge.

Enforcing consumer behavior

You can go further and exercise actual consumer handlers:

def test_golden_events_for_weight_analytics():
    for golden_path in (GOLDENS_DIR / "pet_weight_updated_v2").glob("*.json"):
        event = PetWeightUpdatedV2.model_validate(
            json.loads(golden_path.read_text())
        )
        result = weight_analytics_consumer.handle(event)
        # assert that no exceptions are raised and invariants hold
        assert result.success

Now you’re not just validating shape—you’re validating behavior against known, curated examples.


Observability: Who Emits What, Who Consumes It, and When It Breaks

Contracts and tests protect you at deploy time. Observability protects you at runtime. For a contract-checked event bus, you want to log and trace:

  • Event type and version (PetWeightUpdated.v2)
  • Producing feature (feature=smart_bowl, feature=ai_insights)
  • Consuming feature (consumer=weight_analytics_service)
  • Outcome (processed, retried, discarded, dead-lettered)
  • Latency (publish → consume)

Even if your transport is simple (Redis or SNS/SQS), adding a thin shared wrapper around publish/consume lets you emit structured logs:

def publish(event: BaseModel, topic: str, feature: str) -> None:
    payload = event.model_dump(mode="json")
    logger.info(
        "event_published",
        extra={
            "event_type": payload["event_type"],
            "version": payload["event_type"].rsplit(".v", 1)[-1],
            "topic": topic,
            "feature": feature,
        },
    )
    broker_client.publish(topic, json.dumps(payload).encode("utf-8"))

Consumers can log similarly, including outcome and duration.

Once you have this data, you can:

  • Build a dependency map: “Which features consume PetWeightUpdated.v1 vs v2?”
  • Alert when a deprecated version is still active after a target date.
  • Investigate incidents quickly: “Which consumer is dropping events on this topic?”

You don’t need a huge observability stack to start—structured logs that can be filtered by event_type and consumer already take you far.


At Hoomanely, our mission is to keep pets healthier for longer by connecting smart devices, real-world behavior, and AI insights into one coherent experience for pet parents.

That inevitably means a lot of micro-features talking to each other:

  • A device session from a smart tracker or bowl emits weight samples, motion summaries, or thermal flags.
  • AI modules produce nutrition nudges or anomaly alerts.
  • Analytics features aggregate and score behavior over days and weeks.
  • Notification and coaching modules decide when to nudge the pet parent.

Instead of wiring these with tightly coupled APIs, we lean heavily on internal events—but that only works if those events are safe to evolve.

A simplified pattern we use:

  • Device and inference services publish contract-checked events like PetSessionSummary.v1 or PetWeightTrend.v2.
  • Analytics, insights, and notification features consume them via a shared event contracts package.
  • Golden tests in CI ensure that when we adjust a schema (e.g., adding a “possible dehydration” flag), we don’t accidentally break an older consumer.
  • Observability ties everything together so we can see, for a given event type, which internal features are relying on it.

The result is that we can keep adding new wellness features and AI insights without turning the system into a fragile web of implicit dependencies—crucial when you’re shipping firmware + apps + AI together.


Rolling This Out Without Stopping the World

If you already have a messy event bus, you don’t need a big-bang rewrite. You can phase in contract-checking.

  1. Pick one event family to harden
    Choose something central but manageable (e.g., “pet weight updates” or “user session events”).
  2. Define canonical schemas
    Create Pydantic models for the current real-world shapes you see in the wild. Don’t idealize yet—reflect reality.
  3. Backfill goldens from production samples
    Capture actual payloads (sanitized), save them as golden JSON files, and write parsing tests.
  4. Add a thin wrapper around publish/consume
    Even if some code still uses raw transports, start routing new and refactored paths via a small library that:
    • Validates event models
    • Attaches event type/version
    • Logs structured metadata
  5. Migrate producers gradually
    Update one producer at a time to emit typed events. Keep consumers tolerant where necessary.
  6. Introduce versioning rules
    Document your evolution policy (additive vs breaking, when to bump version, how to dual-write).
  7. Turn CI up slowly
    Start with non-blocking tests (warnings), then make them blocking once you’re confident. The goal is to make contract violations boring to detect.
  8. Socialize the model
    Treat the event contracts package as a first-class API surface. Review changes. Give them owners. Add them to onboarding docs.

After a few cycles, engineers stop thinking “just emit JSON” and start thinking “which event contract and version should I use?” That mindset shift is the real win.


Key Takeaways

  • Pub/sub is an API, not a side-channel.
    If micro-features depend on events, treat those events with the same rigor as any public API.
  • Typed, versioned schemas are the foundation.
    Pydantic or dataclasses give you validation, documentation, and a place to reason about changes.
  • Golden tests turn contracts into enforcement.
    Sample payloads + CI ensure producers and consumers stay compatible as code evolves.
  • Observability gives you a map, not a guess.
    With structured logs and simple maps, you can see who emits and consumes what, and deprecate safely.
  • Rollout can be incremental.
    Start with a single event family, wrap publishes/consumes, and expand from there.

For teams growing a constellation of micro-features—whether in pet-tech like Hoomanely or any other domain—contract-checked event buses are one of the cleanest ways to keep flexibility and safety as you scale.

Read more