Skip to content

Verifying a distributed system

musil gives you two layers, in pure Python, that fit together:

  1. Model-check the design — describe the protocol as a state machine and prove (over a bounded state space) that it is safe and converges. This is the TLA+ job: find the design bugs.
  2. Deterministic simulation testing of the real code — run your actual node logic under a controlled clock and a fault-injecting network, reproducibly from a seed, and hold it to the same model. This is the FoundationDB / TigerBeetle job: find the implementation bugs.

It is bug-finding, not proof. A clean run of a bounded model and a few thousand seeds is strong evidence, not a theorem (that's theorem-prover territory — Isabelle/Coq/Dafny, person-years). But it is the same technique the most reliable distributed systems in the world are built with.

What this is not

musil is explicit-state and bounded: it enumerates a finite model and samples real-code schedules. It does not prove correctness for unbounded scale, and it only finds faults you model. Model the network explicitly, bound your state, and lean on the small-scope hypothesis.

1. Model-check the design

A control plane pushes a route version to an agent; the agent applies last-writer-wins. Safety: never apply a version that wasn't intended. Liveness: always eventually catch up.

from dataclasses import dataclass
from musil import Action, Model, check, check_liveness

@dataclass(frozen=True)
class Spec:
    desired: int = 1
    applied: int = 0

def model() -> Model[Spec]:
    return Model(
        init=Spec(desired=1, applied=0),
        actions=[Action("deliver", lambda s: s.applied < s.desired, lambda s: Spec(s.desired, s.desired))],
        invariants={"no-stale-or-invented": lambda s: s.applied <= s.desired},
        terminal=lambda s: s.applied == s.desired,
    )

assert check(model()).ok                                   # safety + no deadlock
assert check_liveness(model(), goal=lambda s: s.applied == s.desired,
                      everywhere=True, fair=["deliver"]).ok  # converges under fair delivery

2. Simulate the real code

Now the implementation: event-driven nodes that send messages, set timers, and read a virtual clock through a Context. simulate runs them across many seeds under a NetworkModel, checking invariants and refinement after every step and a convergence goal once each run settles.

from collections.abc import Mapping
from musil import BaseNode, Context, NetworkModel, Node, simulate

class ControlPlane(BaseNode):
    def __init__(self, *, repush: bool) -> None:
        self.desired, self._repush, self._acked = 1, repush, False
    def on_start(self, ctx: Context) -> None:
        ctx.send("agent", self.desired)
        if self._repush:
            ctx.set_timer("push", delay=2)
    def on_message(self, ctx: Context, src: str, payload: object) -> None:
        self._acked = self._acked or payload == "ack"
    def on_timer(self, ctx: Context, name: str) -> None:
        if self._repush and not self._acked:
            ctx.send("agent", self.desired)
            ctx.set_timer("push", delay=2)

class Agent(BaseNode):
    def __init__(self) -> None:
        self.applied = 0
    def on_message(self, ctx: Context, src: str, payload: object) -> None:
        assert isinstance(payload, int)
        self.applied = max(self.applied, payload)
        ctx.send(src, "ack")

def snapshot(nodes: Mapping[str, Node]) -> Spec:
    return Spec(desired=nodes["cp"].desired, applied=nodes["agent"].applied)  # type: ignore[attr-defined]

def factory(repush: bool):
    return lambda: {"cp": ControlPlane(repush=repush), "agent": Agent()}

lossy = NetworkModel(loss=0.3, min_latency=1, max_latency=3)

# fire-and-forget: a dropped push is never re-sent -> the agent never catches up
bad = simulate(factory(repush=False), seeds=range(300), snapshot=snapshot, network=lossy,
               goal=lambda s: s.applied == s.desired)
assert not bad.ok
print(bad.failure)   # GOAL on seed 1 ...  reproduce with seeds=[1]

# per-tick re-push: converges, and refines the model, on every seed
good = simulate(factory(repush=True), seeds=range(300), snapshot=snapshot, network=lossy,
                model=model(), abstraction=lambda s: s, goal=lambda s: s.applied == s.desired)
assert good.ok

When a seed fails, SimFailure tells you exactly which one — re-run with seeds=[that_seed] and the run replays bit-for-bit, because the whole simulation is a deterministic function of the seed. That is the property that makes simulation testing debuggable.

Here is the full, runnable example in one piece — run it with python examples/route_delivery.py:

"""Verify a distributed protocol end to end, in pure Python -- design AND implementation.

The story: a control plane pushes a route-table version to an agent over a flaky link; the agent
applies last-writer-wins. We want two guarantees:

  * SAFETY  -- the agent never serves a version the control plane never intended.
  * LIVENESS -- the agent always eventually catches up to the latest version (convergence).

Two layers, one tool:

  1. DESIGN (model check) -- `musil.check` / `check_liveness` exhaustively verify the *design* (a
     small state machine) for safety and convergence-under-fairness.

  2. IMPLEMENTATION (deterministic simulation) -- `musil.simulate` runs the *real* node code under a
     lossy, reordering network across many seeds, holding it to the SAME model (refinement) and to
     the convergence goal. The buggy fire-and-forget push is caught with a reproducible seed; the
     per-tick re-push converges for every seed.

Run me:  python examples/route_delivery.py
"""

from __future__ import annotations

from collections.abc import Callable, Mapping
from dataclasses import dataclass

from musil import (
    Action,
    BaseNode,
    Context,
    Model,
    NetworkModel,
    Node,
    check,
    check_liveness,
    simulate,
)

# ----------------------------------------------------------------------------------------------------
# 1. The design, as a model. One route version is desired; the agent's applied version trails it.
# ----------------------------------------------------------------------------------------------------


@dataclass(frozen=True)
class Spec:
    desired: int = 1
    applied: int = 0


def build_model() -> Model[Spec]:
    return Model(
        init=Spec(desired=1, applied=0),
        actions=[
            # the network delivers the pushed version; the agent applies it (last-writer-wins)
            Action("deliver", lambda s: s.applied < s.desired, lambda s: Spec(s.desired, s.desired)),
        ],
        invariants={"no-stale-or-invented": lambda s: s.applied <= s.desired},
        terminal=lambda s: s.applied == s.desired,
    )


def check_design() -> tuple[bool, bool]:
    """Model-check the design: safety (the invariant, no deadlock) and convergence under fair delivery."""
    safety = check(build_model())
    liveness = check_liveness(
        build_model(),
        goal=lambda s: s.applied == s.desired,
        goal_name="applied==desired",
        everywhere=True,
        fair=["deliver"],
    )
    return safety.ok, liveness.ok


# ----------------------------------------------------------------------------------------------------
# 2. The implementation, as real event-driven nodes.
# ----------------------------------------------------------------------------------------------------


class ControlPlane(BaseNode):
    """Pushes the desired version to the agent. With ``repush`` it re-sends on a timer until acked;
    without it (fire-and-forget), the single push is all the agent will ever get."""

    def __init__(self, *, repush: bool) -> None:
        self.desired = 1
        self._repush = repush
        self._acked = False

    def on_start(self, ctx: Context) -> None:
        ctx.send("agent", self.desired)
        if self._repush:
            ctx.set_timer("push", delay=2)

    def on_message(self, ctx: Context, src: str, payload: object) -> None:
        if payload == "ack":
            self._acked = True

    def on_timer(self, ctx: Context, name: str) -> None:
        if self._repush and not self._acked:
            ctx.send("agent", self.desired)
            ctx.set_timer("push", delay=2)


class Agent(BaseNode):
    """Applies the latest version it has seen and acks."""

    def __init__(self) -> None:
        self.applied = 0

    def on_message(self, ctx: Context, src: str, payload: object) -> None:
        assert isinstance(payload, int)
        self.applied = max(self.applied, payload)
        ctx.send(src, "ack")


def _snapshot(nodes: Mapping[str, Node]) -> Spec:
    cp, agent = nodes["cp"], nodes["agent"]
    assert isinstance(cp, ControlPlane) and isinstance(agent, Agent)
    return Spec(desired=cp.desired, applied=agent.applied)


def _factory(repush: bool) -> Callable[[], Mapping[str, Node]]:
    def make() -> Mapping[str, Node]:
        return {"cp": ControlPlane(repush=repush), "agent": Agent()}

    return make


# a link that drops ~30% of messages and reorders the rest
_LOSSY = NetworkModel(loss=0.3, min_latency=1, max_latency=3)


def run_fire_and_forget(seeds: range = range(300)):
    return simulate(
        _factory(repush=False),
        seeds=seeds,
        snapshot=_snapshot,
        network=_LOSSY,
        model=build_model(),
        abstraction=lambda s: s,
        goal=lambda s: s.applied == s.desired,
        goal_name="applied==desired",
    )


def run_repush(seeds: range = range(300)):
    return simulate(
        _factory(repush=True),
        seeds=seeds,
        snapshot=_snapshot,
        network=_LOSSY,
        model=build_model(),
        abstraction=lambda s: s,
        goal=lambda s: s.applied == s.desired,
        goal_name="applied==desired",
    )


def main() -> int:
    print("=== DESIGN (model check) ===")
    safety_ok, liveness_ok = check_design()
    print(f"  safety (no stale/invented version, no deadlock): {'OK' if safety_ok else 'FAIL'}")
    print(f"  liveness (converges under fair delivery):        {'OK' if liveness_ok else 'FAIL'}")

    print("\n=== IMPLEMENTATION (deterministic simulation, lossy network) ===")
    ff = run_fire_and_forget()
    print("  fire-and-forget push:")
    if ff.ok:
        print(f"    converged on all {ff.runs} seeds (unexpected!)")
    else:
        assert ff.failure is not None
        print(f"    BUG FOUND -- {ff.failure.kind} on seed {ff.failure.seed}: {ff.failure.detail}")
        print(f"    final world: {ff.failure.world}  (reproduce with seeds=[{ff.failure.seed}])")

    rp = run_repush()
    print("  per-tick re-push:")
    if rp.ok:
        print(f"    converged AND refined the model on all {rp.runs} seeds")
    else:
        assert rp.failure is not None
        print(f"    unexpected failure: {rp.failure}")

    return 0


if __name__ == "__main__":
    raise SystemExit(main())

The faults you can inject

NetworkModel controls the link: loss (drop probability), duplicate (redelivery probability), and a min_latency/max_latency spread (latency variance is what produces reordering and delay). The default is a reliable, in-order channel — faults are opt-in, so you turn on exactly the adversity you want to test against. Timers (ctx.set_timer) let nodes detect and recover from loss; the seeded clock makes timeouts fire deterministically.