Skip to content

API reference

Everything below is re-exported from the top-level musil package, e.g. from musil import Model, check.

Core — check, Model, Action

The model definition and the safety/deadlock/reachability checker.

musil.core

The explicit-state core: a model is (initial states, guarded actions, invariants), and check does a breadth-first sweep of every reachable state, reporting the first invariant violation or deadlock with the shortest trace that reaches it.

States are plain immutable, hashable Python values — a frozen dataclass is the idiomatic choice. Actions are pure: enabled(state) decides whether the action can fire, apply(state) returns the resulting NEW state. Because the sweep fires every enabled action from every state, modelling several concurrent actors is just handing it all their actions — the interleaving is explored for free. That's the whole point: the bugs that matter are races, and BFS over interleavings finds them.

This module is dependency-free and knows nothing about any particular domain.

Action dataclass

A guarded, atomic transition. enabled decides if it can fire in a state; apply returns the resulting new state (states are immutable, so it never mutates). Keep both pure — the checker calls them many times and assumes no side effects.

Source code in src/musil/core.py
@dataclass(frozen=True, slots=True)
class Action[S: Hashable]:
    """A guarded, atomic transition. ``enabled`` decides if it can fire in a state; ``apply`` returns
    the resulting new state (states are immutable, so it never mutates). Keep both pure — the checker
    calls them many times and assumes no side effects."""

    name: str
    enabled: Callable[[S], bool]
    apply: Callable[[S], S]

Step dataclass

One link in a counterexample: the action that fired and the state it produced.

Source code in src/musil/core.py
@dataclass(frozen=True, slots=True)
class Step[S: Hashable]:
    """One link in a counterexample: the action that fired and the state it produced."""

    action: str
    state: S

Model dataclass

What to check: where to start, what can happen, and what must always hold.

init is one state or several. actions are the (possibly multi-actor) transitions. invariants map a name → predicate that must hold in every reachable state. terminal marks states allowed to have no enabled action (an absorbing sink like deleted); any other dead end is reported as a deadlock.

Source code in src/musil/core.py
@dataclass(slots=True)
class Model[S: Hashable]:
    """What to check: where to start, what can happen, and what must always hold.

    ``init`` is one state or several. ``actions`` are the (possibly multi-actor) transitions.
    ``invariants`` map a name → predicate that must hold in every reachable state. ``terminal``
    marks states allowed to have no enabled action (an absorbing sink like ``deleted``); any other
    dead end is reported as a deadlock."""

    init: S | Iterable[S]
    actions: Iterable[Action[S]]
    invariants: Mapping[str, Invariant[S]] = field(default_factory=dict[str, Invariant[S]])
    terminal: Callable[[S], bool] = _never

    def initial_states(self) -> tuple[S, ...]:
        init = self.init
        if _is_state_collection(init):
            return tuple(init)  # type: ignore[arg-type]
        return (init,)  # type: ignore[return-value]

Result dataclass

The outcome. ok is True when the whole reachable space satisfied every invariant with no deadlock. Otherwise kind is "invariant" or "deadlock", invariant names the broken one, and trace is the shortest path from an initial state to the offending state.

Source code in src/musil/core.py
@dataclass(frozen=True, slots=True)
class Result[S: Hashable]:
    """The outcome. ``ok`` is True when the whole reachable space satisfied every invariant with no
    deadlock. Otherwise ``kind`` is ``"invariant"`` or ``"deadlock"``, ``invariant`` names the broken
    one, and ``trace`` is the shortest path from an initial state to the offending state."""

    ok: bool
    states_explored: int
    kind: str | None = None
    invariant: str | None = None
    trace: tuple[Step[S], ...] = ()
    truncated: bool = False  # True if the max_states cap was hit before the space was exhausted

    def __bool__(self) -> bool:
        return self.ok

    def __str__(self) -> str:
        if self.ok:
            cap = " (TRUNCATED at cap)" if self.truncated else ""
            return f"OK — {self.states_explored} states, no violations{cap}"
        head = (
            f"INVARIANT VIOLATED: {self.invariant}"
            if self.kind == "invariant"
            else "DEADLOCK (a non-terminal state with no enabled action)"
        )
        return f"{head}\n{format_trace(self.trace)}\n({self.states_explored} states explored)"

Graph dataclass

The full reachable state graph: initial states, every reachable state, and the labelled transitions (edges[s] = the (action_name, next_state) pairs out of s). Built by explore and consumed by liveness checking and graph export. truncated flags hitting the max_states cap.

Source code in src/musil/core.py
@dataclass(frozen=True, slots=True)
class Graph[S: Hashable]:
    """The full reachable state graph: initial states, every reachable state, and the labelled
    transitions (``edges[s]`` = the ``(action_name, next_state)`` pairs out of ``s``). Built by
    ``explore`` and consumed by liveness checking and graph export. ``truncated`` flags hitting the
    ``max_states`` cap."""

    initial: tuple[S, ...]
    states: tuple[S, ...]
    edges: Mapping[S, tuple[tuple[str, S], ...]]
    truncated: bool = False

format_trace

format_trace(trace: Iterable[Step[S]]) -> str

Render a counterexample as Init: <state> then → <action> <state> per step.

Source code in src/musil/core.py
def format_trace[S: Hashable](trace: Iterable[Step[S]]) -> str:
    """Render a counterexample as ``Init: <state>`` then ``→ <action>  <state>`` per step."""
    lines: list[str] = []
    for i, step in enumerate(trace):
        if i == 0:
            lines.append(f"  Init: {step.state}")
        else:
            lines.append(f"  → {step.action}: {step.state}")
    return "\n".join(lines)

check

check(
    model: Model[S], *, max_states: int = 1000000
) -> Result[S]

Breadth-first sweep of every reachable state. Returns the first invariant violation or deadlock found (shortest trace, because BFS visits by increasing depth), else OK once the whole reachable space is exhausted. max_states is a runaway guard for unbounded models.

Source code in src/musil/core.py
def check[S: Hashable](model: Model[S], *, max_states: int = 1_000_000) -> Result[S]:
    """Breadth-first sweep of every reachable state. Returns the first invariant violation or
    deadlock found (shortest trace, because BFS visits by increasing depth), else OK once the whole
    reachable space is exhausted. ``max_states`` is a runaway guard for unbounded models."""
    starts = model.initial_states()
    actions = tuple(model.actions)

    # parent[state] = (previous_state, action_name) — for reconstructing the shortest trace.
    parent: dict[S, tuple[S, str] | None] = {}
    seen: set[S] = set()
    frontier: list[S] = []
    for s in starts:
        if s not in seen:
            seen.add(s)
            parent[s] = None
            frontier.append(s)

    head = 0  # index into frontier instead of pop(0), so it behaves as a real FIFO queue
    while head < len(frontier):
        state = frontier[head]
        head += 1

        for name, inv in model.invariants.items():
            if not inv(state):
                return Result(
                    ok=False,
                    states_explored=len(seen),
                    kind="invariant",
                    invariant=name,
                    trace=_trace_to(parent, state),
                )

        fired = False
        for action in actions:
            if not action.enabled(state):
                continue
            fired = True
            nxt = action.apply(state)
            if nxt not in seen:
                seen.add(nxt)
                parent[nxt] = (state, action.name)
                frontier.append(nxt)
                if len(seen) >= max_states:
                    return Result(ok=True, states_explored=len(seen), truncated=True)

        if not fired and not model.terminal(state):
            return Result(
                ok=False,
                states_explored=len(seen),
                kind="deadlock",
                trace=_trace_to(parent, state),
            )

    return Result(ok=True, states_explored=len(seen))

explore

explore(
    model: Model[S], *, max_states: int = 1000000
) -> Graph[S]

Build the whole reachable state graph (no invariant checks). The shared exploration primitive behind liveness and graph export; check stays separate so it can early-exit with the shortest counterexample.

Source code in src/musil/core.py
def explore[S: Hashable](model: Model[S], *, max_states: int = 1_000_000) -> Graph[S]:
    """Build the whole reachable state graph (no invariant checks). The shared exploration primitive
    behind liveness and graph export; ``check`` stays separate so it can early-exit with the shortest
    counterexample."""
    starts = model.initial_states()
    actions = tuple(model.actions)
    seen: dict[S, list[tuple[str, S]]] = {}
    frontier: list[S] = []
    for s in starts:
        if s not in seen:
            seen[s] = []
            frontier.append(s)

    head = 0
    truncated = False
    while head < len(frontier):
        state = frontier[head]
        head += 1
        out: list[tuple[str, S]] = []
        for action in actions:
            if not action.enabled(state):
                continue
            nxt = action.apply(state)
            out.append((action.name, nxt))
            if nxt not in seen:
                seen[nxt] = []
                frontier.append(nxt)
                if len(seen) >= max_states:
                    truncated = True
                    break
        seen[state] = out
        if truncated:
            break

    return Graph(
        initial=tuple(starts),
        states=tuple(seen),
        edges={s: tuple(out) for s, out in seen.items()},
        truncated=truncated,
    )

Liveness — check_liveness

Eventually (<>P) and always-eventually ([]<>P) under weak and strong fairness.

musil.liveness

Liveness: does the system eventually reach a goal, or can it loop forever short of it?

check_liveness(model, goal=P) proves the temporal property "eventually P" (<>P): on every run from an initial state, P holds at some point. It fails in exactly two ways, and we report whichever we find with a lasso counterexample (a stem into a recurring set, plus the loop):

  • P unreachable — a reachable state from which P can never be reached (a dead end or a trap SCC with no exit to P). No scheduling fixes this.
  • fair cycle — a reachable cycle of ¬P states that the system can follow forever. Whether this counts depends on FAIRNESS: under weak fairness of an action set, an action that is continuously enabled along the cycle must eventually be taken — so a cycle that starves a continuously-enabled fair action is not a real counterexample (the fair scheduler would break it). Pass fair=[...] to rule those out; with fair=() you get strict (no-fairness) liveness.

Method: explore the reachable graph, restrict to ¬P states reachable through ¬P from a ¬P initial state, then (1) flag any from which P is unreachable, (2) Tarjan-SCC the rest and report the first weakly-fair non-trivial SCC. Dependency-free; for finite (bounded) models.

LivenessResult dataclass

Outcome of an "eventually P" check. ok True means every run reaches P. Otherwise kind is "p-unreachable" or "fair-cycle"; stem is the shortest path from an initial state to the offending state, and cycle is the recurring loop (empty for a P-unreachable sink).

Source code in src/musil/liveness.py
@dataclass(frozen=True, slots=True)
class LivenessResult[S: Hashable]:
    """Outcome of an "eventually P" check. ``ok`` True means every run reaches P. Otherwise ``kind``
    is ``"p-unreachable"`` or ``"fair-cycle"``; ``stem`` is the shortest path from an initial state
    to the offending state, and ``cycle`` is the recurring loop (empty for a P-unreachable sink)."""

    ok: bool
    states_explored: int
    goal: str = "goal"
    kind: str | None = None
    stem: tuple[Step[S], ...] = ()
    cycle: tuple[Step[S], ...] = ()
    truncated: bool = False

    def __bool__(self) -> bool:
        return self.ok

    def __str__(self) -> str:
        if self.ok:
            cap = " (TRUNCATED at cap)" if self.truncated else ""
            return f"OK — eventually '{self.goal}' on every run ({self.states_explored} states){cap}"
        if self.kind == "p-unreachable":
            return (
                f"LIVENESS VIOLATED: '{self.goal}' is unreachable from a reachable state\n"
                f"{format_trace(self.stem)}\n({self.states_explored} states explored)"
            )
        loop = "\n".join(f"  ↺ {s.action}: {s.state}" for s in self.cycle)
        return (
            f"LIVENESS VIOLATED: a fair cycle never reaches '{self.goal}'\n"
            f"{format_trace(self.stem)}\n  --- loops forever ---\n{loop}\n"
            f"({self.states_explored} states explored)"
        )

check_liveness

check_liveness(
    model: Model[S],
    *,
    goal: Callable[[S], bool],
    fair: Iterable[str] = (),
    fair_strong: Iterable[str] = (),
    everywhere: bool = False,
    goal_name: str = "goal",
    max_states: int = 1000000,
) -> LivenessResult[S]

Prove a liveness property and return a lasso counterexample if it fails.

Default (everywhere=False) checks <>Peventually P on every run from an initial state. everywhere=True checks []<>Palways eventually P, i.e. from EVERY reachable state P is still eventually reached (the system always re-converges; no reachable stale cycle). Use everywhere=True for convergence/recurrence properties whose model may start already satisfying P.

Fairness prunes cycles a fair scheduler would break. fair = weak fairness (an action continuously enabled along a cycle must eventually be taken). fair_strong = strong fairness (an action enabled infinitely often — i.e. in any state of the cycle — must be taken); strong is the stronger assumption and rules out more cycles, so it can force a "flickering" action that weak fairness cannot.

Source code in src/musil/liveness.py
def check_liveness[S: Hashable](
    model: Model[S],
    *,
    goal: Callable[[S], bool],
    fair: Iterable[str] = (),
    fair_strong: Iterable[str] = (),
    everywhere: bool = False,
    goal_name: str = "goal",
    max_states: int = 1_000_000,
) -> LivenessResult[S]:
    """Prove a liveness property and return a lasso counterexample if it fails.

    Default (``everywhere=False``) checks ``<>P`` — *eventually* P on every run from an initial
    state. ``everywhere=True`` checks ``[]<>P`` — *always eventually* P, i.e. from EVERY reachable
    state P is still eventually reached (the system always re-converges; no reachable stale cycle).
    Use ``everywhere=True`` for convergence/recurrence properties whose model may start already
    satisfying P.

    Fairness prunes cycles a fair scheduler would break. ``fair`` = weak fairness (an action
    *continuously* enabled along a cycle must eventually be taken). ``fair_strong`` = strong fairness
    (an action enabled *infinitely often* — i.e. in any state of the cycle — must be taken); strong
    is the stronger assumption and rules out more cycles, so it can force a "flickering" action that
    weak fairness cannot."""
    g = explore(model, max_states=max_states)
    fairset = frozenset(fair)
    strongset = frozenset(fair_strong)

    bad = {s for s in g.states if not goal(s)}
    if not bad:
        return LivenessResult(ok=True, states_explored=len(g.states), goal=goal_name, truncated=g.truncated)

    if everywhere:
        # []<>P: a violating run may start anywhere, so seed from EVERY reachable ¬P state and take
        # stems over the full graph (the prefix may pass through P states before entering a stale cycle).
        parent, order_all = _bfs(g.initial, lambda s: g.edges[s])
        order = [s for s in order_all if s in bad]
        reachable_bad = set(bad)
    else:
        # <>P: a violating run is ¬P throughout, so seed from ¬P inits and stay within ¬P.
        bad_inits = [s for s in g.initial if s in bad]
        if not bad_inits:
            return LivenessResult(ok=True, states_explored=len(g.states), goal=goal_name, truncated=g.truncated)
        parent, order = _bfs(bad_inits, lambda s: [(a, t) for (a, t) in g.edges[s] if t in bad])
        reachable_bad = set(order)

    bad_succ: dict[S, list[tuple[str, S]]] = {
        s: [(a, t) for (a, t) in g.edges[s] if t in reachable_bad] for s in reachable_bad
    }
    can_reach_goal = _backward_reachable_to_goal(g, goal)

    # (1) a reachable ¬P state from which P can never be reached — unconditional violation.
    for s in order:  # BFS order → shortest stem
        if s not in can_reach_goal:
            return LivenessResult(
                ok=False, states_explored=len(g.states), goal=goal_name,
                kind="p-unreachable", stem=_stem(parent, s),
            )

    # (2) a weakly-fair recurring cycle of ¬P states.
    for scc in _tarjan(reachable_bad, bad_succ):
        if not _has_internal_edge(scc, bad_succ):
            continue
        if _fair_admissible(scc, g.edges, fairset, strongset):
            entry = next(s for s in order if s in scc)  # shortest-stem entry
            return LivenessResult(
                ok=False, states_explored=len(g.states), goal=goal_name, kind="fair-cycle",
                stem=_stem(parent, entry), cycle=_simple_cycle(scc, entry, bad_succ),
            )

    return LivenessResult(ok=True, states_explored=len(g.states), goal=goal_name, truncated=g.truncated)

Composition — compose

The interleaved product of independent component models.

musil.compose

Parallel composition of component models (I/O-automata style).

Specify each component independently as a Model (states + named actions), then compose them into one Model whose state is the tuple of component states and whose actions are the interleaving of all components' actions, each lifted to act on its own slice. This is the "specify per component, compose" discipline from the methodology note (docs/specs/formal-modeling-distributed-systems.md): the joint behaviour is every interleaving of the parts, which is exactly where concurrency bugs live.

Component actions are name-qualified ("<component>:<action>") and component invariants are lifted (checked against that component's slice). Pass extra invariants to compose for joint properties that span components. The composite is terminal only when every component is terminal.

Composite dataclass

A joint state: one component state per name, kept sorted by name so it hashes deterministically. Read a component's slice with state["name"].

Source code in src/musil/compose.py
@dataclass(frozen=True, slots=True)
class Composite:
    """A joint state: one component state per name, kept sorted by name so it hashes deterministically.
    Read a component's slice with ``state["name"]``."""

    parts: tuple[tuple[str, Hashable], ...]

    def __getitem__(self, name: str) -> Hashable:
        for n, s in self.parts:
            if n == name:
                return s
        raise KeyError(name)

    def with_part(self, name: str, new: Hashable) -> Composite:
        return Composite(tuple((n, new if n == name else s) for n, s in self.parts))

compose

compose(
    components: Mapping[str, Model[Any]],
    *,
    invariants: Mapping[str, Invariant[Composite]]
    | None = None,
) -> Model[Composite]

Interleaving product of components (name -> Model). The result's reachable states are every interleaving of the parts; component invariants are lifted ("<name>:<inv>") and invariants adds joint properties over the whole composite.

Source code in src/musil/compose.py
def compose(
    components: Mapping[str, Model[Any]],
    *,
    invariants: Mapping[str, Invariant[Composite]] | None = None,
) -> Model[Composite]:
    """Interleaving product of ``components`` (name -> Model). The result's reachable states are every
    interleaving of the parts; component invariants are lifted (``"<name>:<inv>"``) and ``invariants``
    adds joint properties over the whole composite."""
    names = sorted(components)
    init_slices = [[(n, s) for s in components[n].initial_states()] for n in names]
    inits = [Composite(tuple(combo)) for combo in product(*init_slices)]

    actions: list[Action[Composite]] = []
    for n in names:
        for action in components[n].actions:
            actions.append(
                Action(
                    name=f"{n}:{action.name}",
                    enabled=lambda s, _n=n, _a=action: _a.enabled(s[_n]),
                    apply=lambda s, _n=n, _a=action: s.with_part(_n, _a.apply(s[_n])),
                )
            )

    inv: dict[str, Invariant[Composite]] = {}
    for n in names:
        for iname, ipred in components[n].invariants.items():
            inv[f"{n}:{iname}"] = lambda s, _n=n, _p=ipred: _p(s[_n])
    if invariants:
        inv.update(invariants)

    def terminal(s: Composite) -> bool:
        return all(components[n].terminal(s[n]) for n in names)

    return Model(init=inits, actions=actions, invariants=inv, terminal=terminal)

Channels — channel_actions, send

Model a message channel (reliable / lossy / duplicating; unordered, so reordering is free).

musil.channels

Generic message-channel modeling kit -- the "model the network explicitly" principle as a reusable building block (methodology doc section 3).

A channel's in-flight messages are held as a frozenset somewhere in your model's state. This kit builds the channel's actions over that state: deliver a message (apply it to the receiver), and -- depending on the channel's reliability -- drop it (loss) or keep it after delivery (duplication). Because the in-flight set is unordered and deliver may fire for ANY in-flight message, reordering is modelled for free; lossy adds message loss; duplicating allows redelivery. (A set dedups a re-send of the same message -- which matches an idempotent re-push. FIFO/ordered channels would need a sequence instead; the realistic network is unordered, which is what this models.)

Domain-free: messages are any hashable values the caller supplies. Use send inside a sender's action to inject a message, and channel_actions to add the channel's own steps to the model.

send

send(
    get: Callable[[S], frozenset[Message]],
    put: Callable[[S, frozenset[Message]], S],
) -> Callable[[S, Message], S]

Return a helper send(state, msg) that injects msg into the channel's in-flight set (idempotent: re-sending an in-flight message is a no-op, like an idempotent re-push).

Source code in src/musil/channels.py
def send[S: Hashable](
    get: Callable[[S], frozenset[Message]], put: Callable[[S, frozenset[Message]], S]
) -> Callable[[S, Message], S]:
    """Return a helper ``send(state, msg)`` that injects ``msg`` into the channel's in-flight set
    (idempotent: re-sending an in-flight message is a no-op, like an idempotent re-push)."""
    return lambda s, msg: put(s, get(s) | frozenset({msg}))

channel_actions

channel_actions(
    *,
    messages: Iterable[Message],
    get: Callable[[S], frozenset[Message]],
    put: Callable[[S, frozenset[Message]], S],
    deliver: Callable[[S, Message], S],
    lossy: bool = False,
    duplicating: bool = False,
    name: str = "ch",
) -> list[Action[S]]

The channel's steps over the caller's state. get/put read/write the in-flight set; deliver(state, msg) applies a delivered message to the receiver. messages is the (bounded) universe of possible messages. lossy adds drop steps; duplicating lets a message be delivered more than once. Delivery order is unconstrained (reordering).

Source code in src/musil/channels.py
def channel_actions[S: Hashable](
    *,
    messages: Iterable[Message],
    get: Callable[[S], frozenset[Message]],
    put: Callable[[S, frozenset[Message]], S],
    deliver: Callable[[S, Message], S],
    lossy: bool = False,
    duplicating: bool = False,
    name: str = "ch",
) -> list[Action[S]]:
    """The channel's steps over the caller's state. ``get``/``put`` read/write the in-flight set;
    ``deliver(state, msg)`` applies a delivered message to the receiver. ``messages`` is the (bounded)
    universe of possible messages. ``lossy`` adds drop steps; ``duplicating`` lets a message be
    delivered more than once. Delivery order is unconstrained (reordering)."""
    actions: list[Action[S]] = []
    for m in messages:
        actions.append(_deliver_action(f"{name}:deliver:{m!r}", get, put, deliver, m, duplicating))
        if lossy:
            actions.append(_drop_action(f"{name}:drop:{m!r}", get, put, m))
    return actions

FSM bridge — transition_actions, status_field_actions

Build a model straight from an allowed-transitions table, so it can't drift from the code.

musil.fsm

Build a model straight from a transition table — {from_state: {to_states}} — instead of hand-writing one Action per edge. This is the zero-drift bridge: point it at the very table your code enforces (e.g. a state machine's allowed-transitions map) and the model can never disagree with the code, because it is the code's table. No separate spec file to keep in sync.

Generic over the state type: you supply get (read the status out of a state) and with_status (return a new state with the status replaced), so the table can drive any state shape — typically a frozen dataclass with a status field plus whatever other variables your invariants need.

transition_actions

transition_actions(
    table: Table,
    *,
    get: Callable[[S], str],
    with_status: Callable[[S, str], S],
    name: Callable[[str, str], str] = lambda src, dst: (
        f"{src}->{dst}"
    ),
) -> list[Action[S]]

One Action per declared src -> dst edge. get reads the current status from a state; with_status returns a new state with the status set to dst (states are immutable).

Source code in src/musil/fsm.py
def transition_actions[S: Hashable](
    table: Table,
    *,
    get: Callable[[S], str],
    with_status: Callable[[S, str], S],
    name: Callable[[str, str], str] = lambda src, dst: f"{src}->{dst}",
) -> list[Action[S]]:
    """One Action per declared ``src -> dst`` edge. ``get`` reads the current status from a state;
    ``with_status`` returns a new state with the status set to ``dst`` (states are immutable)."""
    actions: list[Action[S]] = []
    for src, dsts in table.items():
        for dst in dsts:
            actions.append(
                Action(
                    name=name(src, dst),
                    # default-arg capture pins src/dst per loop iteration (avoid late-binding).
                    enabled=lambda s, _src=src: get(s) == _src,
                    apply=lambda s, _dst=dst: with_status(s, _dst),
                )
            )
    return actions

status_field_actions

status_field_actions(
    table: Table, *, field: str = "status"
) -> list[Action[Any]]

Convenience for the common case: states are frozen dataclasses whose status lives in a single field (default status). Builds actions that read/replace that field via dataclasses.replace.

Source code in src/musil/fsm.py
def status_field_actions(table: Table, *, field: str = "status") -> list[Action[Any]]:
    """Convenience for the common case: states are frozen dataclasses whose status lives in a single
    field (default ``status``). Builds actions that read/replace that field via dataclasses.replace."""
    return transition_actions(
        table,
        get=lambda s: getattr(s, field),
        with_status=lambda s, dst: replace(s, **{field: dst}),
    )

declared_states

declared_states(table: Table) -> set[str]

Every state named in the table — sources (keys) plus every target.

Source code in src/musil/fsm.py
def declared_states(table: Table) -> set[str]:
    """Every state named in the table — sources (keys) plus every target."""
    return set(table) | {dst for dsts in table.values() for dst in dsts}

terminal_states

terminal_states(table: Table) -> set[str]

States with no outgoing transitions — absorbing sinks (a key mapped to an empty set, or a target that's never a source). Use to build a model's terminal predicate so a legitimate resting/sink state isn't reported as a deadlock.

Source code in src/musil/fsm.py
def terminal_states(table: Table) -> set[str]:
    """States with no outgoing transitions — absorbing sinks (a key mapped to an empty set, or a
    target that's never a source). Use to build a model's ``terminal`` predicate so a legitimate
    resting/sink state isn't reported as a deadlock."""
    return {s for s in declared_states(table) if not table.get(s)}

Conformance — generate_traces, replay

Generate traces from the model and replay them against the real implementation.

musil.conformance

Conformance: does the real implementation behave like the model?

Model checking proves the design is sound. Conformance testing checks the code matches the design: generate action traces from the model (sequences that cover the reachable graph), then replay each against the real system through a thin adapter, asserting the implementation lands in the state the model predicted after every step. It does not prove refinement (that's theorem-proving territory), but it checks it exhaustively over the model's traces against the running code — the practical "implementation faithful to spec" bridge (see docs/specs/RESEARCH.md: P, Stateright, ioco).

Two pieces
  • generate_traces(model) -> action sequences (lists of Steps, <init> first) that together cover every reachable edge (or state).
  • replay(model, traces, step_fn, project, start) -> ConformanceResult. step_fn(real, action) drives the real system; project(real) maps it back to a model state to compare. The first divergence (real projected state != model-predicted state) is reported.

Divergence dataclass

The implementation departed from the model: after action (step step_index of trace trace_index), the model predicted expected but the implementation projected to actual.

Source code in src/musil/conformance.py
@dataclass(frozen=True, slots=True)
class Divergence[S: Hashable]:
    """The implementation departed from the model: after ``action`` (step ``step_index`` of trace
    ``trace_index``), the model predicted ``expected`` but the implementation projected to ``actual``."""

    trace_index: int
    step_index: int
    action: str
    expected: S
    actual: S

generate_traces

generate_traces(
    model: Model[S],
    *,
    cover: str = "edges",
    max_traces: int = 100000,
    max_states: int = 1000000,
) -> list[tuple[Step[S], ...]]

Action sequences from the reachable graph (each a tuple of Steps, <init> first). With cover='edges' (default) the set exercises every reachable transition at least once; with cover='states', a shortest path to each reachable state. Each trace is a shortest stem, so replays stay short.

Source code in src/musil/conformance.py
def generate_traces[S: Hashable](
    model: Model[S], *, cover: str = "edges", max_traces: int = 100_000, max_states: int = 1_000_000
) -> list[tuple[Step[S], ...]]:
    """Action sequences from the reachable graph (each a tuple of Steps, ``<init>`` first). With
    ``cover='edges'`` (default) the set exercises every reachable transition at least once; with
    ``cover='states'``, a shortest path to each reachable state. Each trace is a shortest stem, so
    replays stay short."""
    g = explore(model, max_states=max_states)
    parent, order = _paths(g)
    traces: list[tuple[Step[S], ...]] = []
    if cover == "states":
        for s in order:
            traces.append(_path_to(parent, s))
            if len(traces) >= max_traces:
                break
        return traces
    for s in order:
        base = _path_to(parent, s)
        for action, t in g.edges[s]:
            traces.append((*base, Step(action=action, state=t)))
            if len(traces) >= max_traces:
                return traces
    return traces

replay

replay(
    traces: Iterable[Sequence[Step[S]]],
    *,
    step_fn: Callable[[R, str], R],
    project: Callable[[R], S],
    start: R,
) -> ConformanceResult[S]

Replay each trace (from generate_traces) against the real system. start is the real initial state (its projection must equal the model's initial state); step_fn(real, action) applies an action to the real system; project maps a real state back to a model state for comparison. Single-initial-state models; for multiple inits, project(start) must match each trace's initial state.

Source code in src/musil/conformance.py
def replay[R, S: Hashable](
    traces: Iterable[Sequence[Step[S]]],
    *,
    step_fn: Callable[[R, str], R],
    project: Callable[[R], S],
    start: R,
) -> ConformanceResult[S]:
    """Replay each trace (from ``generate_traces``) against the real system. ``start`` is the real
    initial state (its projection must equal the model's initial state); ``step_fn(real, action)``
    applies an action to the real system; ``project`` maps a real state back to a model state for
    comparison. Single-initial-state models; for multiple inits, project(start) must match each
    trace's initial state."""
    steps_run = 0
    traces_run = 0
    for ti, trace in enumerate(traces):
        traces_run += 1
        real = start
        if not trace:
            continue
        if project(real) != trace[0].state:
            return ConformanceResult(
                ok=False, traces_run=traces_run, steps_run=steps_run,
                divergence=Divergence(ti, 0, "<init>", trace[0].state, project(real)),
            )
        for si in range(1, len(trace)):
            step = trace[si]
            real = step_fn(real, step.action)
            steps_run += 1
            if project(real) != step.state:
                return ConformanceResult(
                    ok=False, traces_run=traces_run, steps_run=steps_run,
                    divergence=Divergence(ti, si, step.action, step.state, project(real)),
                )
    return ConformanceResult(ok=True, traces_run=traces_run, steps_run=steps_run)

Refinement — check_refinement, RefinementMonitor

Check that an observed run of the real system refines the model (each transition is a model edge or a stutter) -- the oracle simulate uses to hold real code to a model-checked spec.

musil.refine

Refinement: hold real code to a model by checking its observed states, not its actions.

A run of the real system produces a sequence of concrete states (snapshots). An abstraction maps each concrete state to a model state. The run refines the model iff every consecutive pair maps to either the same abstract state (a stutter -- the real code took an internal step the model doesn't care about) or to an abstract transition the model actually allows. The first observation must map to a model initial state (unless require_init=False, for checking a mid-run fragment).

This is the runtime analog of refinement in the seL4/TLA+ sense (concrete behaviors are a subset of abstract behaviors), done by checking against the explored model rather than proving -- the practical bridge from a model-checked design to the running implementation. musil.sim feeds a RefinementMonitor a snapshot after every step so the simulated real code is held to the spec.

Complement of conformance (which drives the real code with model-generated action labels); here the real code drives itself and we watch where it goes.

RefinementViolation dataclass

The real run left the model. reason is "bad-init" (first state isn't a model initial state), "unknown-state" (mapped to a state the model can't reach), or "no-model-step" (the abstract transition isn't a model edge). index is the offending observation's position.

Source code in src/musil/refine.py
@dataclass(frozen=True, slots=True)
class RefinementViolation[C, S: Hashable]:
    """The real run left the model. ``reason`` is ``"bad-init"`` (first state isn't a model initial
    state), ``"unknown-state"`` (mapped to a state the model can't reach), or ``"no-model-step"`` (the
    abstract transition isn't a model edge). ``index`` is the offending observation's position."""

    index: int
    reason: str
    concrete_from: C | None
    concrete_to: C
    abstract_from: S | None
    abstract_to: S

    def __str__(self) -> str:
        if self.reason == "bad-init":
            return f"REFINEMENT: observation 0 maps to {self.abstract_to!r}, not a model initial state"
        if self.reason == "unknown-state":
            return f"REFINEMENT: observation {self.index} maps to {self.abstract_to!r}, unreachable in the model"
        return (
            f"REFINEMENT: observation {self.index} steps {self.abstract_from!r} -> {self.abstract_to!r}, "
            f"which the model does not allow"
        )

RefinementMonitor

Streaming refinement check. Construct with a model and an abstraction; feed observed concrete states one at a time to observe, which returns a RefinementViolation the first time the run leaves the model (and None otherwise). The model is explored once, up front, so this is for bounded models (the explicit-state regime musil lives in).

Source code in src/musil/refine.py
class RefinementMonitor[C, S: Hashable]:
    """Streaming refinement check. Construct with a model and an abstraction; feed observed concrete
    states one at a time to ``observe``, which returns a ``RefinementViolation`` the first time the
    run leaves the model (and ``None`` otherwise). The model is explored once, up front, so this is
    for bounded models (the explicit-state regime musil lives in)."""

    __slots__ = ("_abs", "_edges", "_index", "_inits", "_prev_abstract", "_prev_concrete", "_require_init", "_started", "_states")

    def __init__(
        self,
        model: Model[S],
        abstraction: Callable[[C], S],
        *,
        require_init: bool = True,
        max_states: int = 1_000_000,
    ) -> None:
        g = explore(model, max_states=max_states)
        self._states: frozenset[S] = frozenset(g.states)
        self._inits: frozenset[S] = frozenset(g.initial)
        self._edges: dict[S, frozenset[S]] = {s: frozenset(t for _, t in outs) for s, outs in g.edges.items()}
        self._abs = abstraction
        self._require_init = require_init
        self._started = False
        self._index = -1
        self._prev_concrete: C | None = None
        self._prev_abstract: S | None = None

    def observe(self, concrete: C) -> RefinementViolation[C, S] | None:
        self._index += 1
        a = self._abs(concrete)
        if not self._started:
            self._started = True
            self._prev_concrete, self._prev_abstract = concrete, a
            if a not in self._states:
                return RefinementViolation(self._index, "unknown-state", None, concrete, None, a)
            if self._require_init and a not in self._inits:
                return RefinementViolation(self._index, "bad-init", None, concrete, None, a)
            return None
        prev_a = self._prev_abstract
        if a not in self._states:
            return RefinementViolation(self._index, "unknown-state", self._prev_concrete, concrete, prev_a, a)
        if a != prev_a and a not in self._edges.get(prev_a, frozenset()):  # type: ignore[arg-type]
            return RefinementViolation(self._index, "no-model-step", self._prev_concrete, concrete, prev_a, a)
        self._prev_concrete, self._prev_abstract = concrete, a
        return None

check_refinement

check_refinement(
    model: Model[S],
    observations: Iterable[C],
    abstraction: Callable[[C], S],
    *,
    require_init: bool = True,
    max_states: int = 1000000,
) -> RefinementResult[C, S]

Check a whole observed run against the model. Returns at the first divergence (with the offending observation), else OK once the run is exhausted. abstraction maps each observed concrete state to a model state; require_init demands the first observation be a model initial state (set False to check a mid-run fragment).

Source code in src/musil/refine.py
def check_refinement[C, S: Hashable](
    model: Model[S],
    observations: Iterable[C],
    abstraction: Callable[[C], S],
    *,
    require_init: bool = True,
    max_states: int = 1_000_000,
) -> RefinementResult[C, S]:
    """Check a whole observed run against the model. Returns at the first divergence (with the
    offending observation), else OK once the run is exhausted. ``abstraction`` maps each observed
    concrete state to a model state; ``require_init`` demands the first observation be a model initial
    state (set False to check a mid-run fragment)."""
    monitor = RefinementMonitor(model, abstraction, require_init=require_init, max_states=max_states)
    checked = 0
    for obs in observations:
        checked += 1
        violation = monitor.observe(obs)
        if violation is not None:
            return RefinementResult(ok=False, steps_checked=checked, violation=violation)
    return RefinementResult(ok=True, steps_checked=checked)

Simulation — simulate, Simulator, NetworkModel, FaultSchedule

Deterministic simulation testing: run real event-driven node code under a controlled clock and a fault-injecting network, reproducible from a seed, with invariants + refinement + a convergence goal as the oracle. Beyond NetworkModel's independent per-message loss, a FaultSchedule of sustained, heal-able faults — network Partitions and node Crashes (with optional cold restart) — drives VOPR-style liveness testing: after the faults heal, the system must converge to its goal (a failure to do so is reported as kind="liveness").

musil.sim

Deterministic simulation: run real, event-driven node code under a controlled world.

The expensive distributed bugs hide in timing -- message loss, reordering, a timeout racing a reply, a partition healing at the wrong moment. This runs your actual node logic against a virtual network and clock where all of that is controlled and seeded, so every run is reproducible from its seed (a found bug replays exactly). It's the FoundationDB/TigerBeetle "deterministic simulation testing" technique as a pure-Python library -- bug finding, not proof.

Model: nodes are event-driven actors (on_start / on_message / on_timer) that act through a Context (send a message, set a timer, read the virtual clock, draw seeded randomness). The Simulator owns a discrete-event queue ordered by (virtual time, sequence); the NetworkModel decides per-message loss / duplication / latency, so reordering and delay emerge from latency spread. A FaultSchedule layers on sustained faults -- network partitions and node crashes that hold for a window then heal -- so simulate can check liveness: does the system converge once they lift? All randomness comes from one seeded RNG. musil.refine + invariants are the oracle (next slice: simulate).

Envelope dataclass

A message in flight: a unique sequence number, source, destination, and payload.

Source code in src/musil/sim.py
@dataclass(frozen=True, slots=True)
class Envelope:
    """A message in flight: a unique sequence number, source, destination, and payload."""

    seq: int
    src: str
    dst: str
    payload: object

NetworkModel dataclass

How the virtual network treats each message. Defaults are a reliable, in-order channel; set loss/duplicate (probabilities in [0, 1]) and a min_latency/max_latency spread (virtual-time units) to inject drops, duplicates, and reordering. All draws use the sim's seed.

mutate is a Byzantine fault hook: mutate(src, dst, payload) -> object | None. The returned value replaces the payload before delivery; returning None drops the message. Applied after loss/duplicate decisions, so it targets messages that survive the network model. Models wire-level corruption, wrong-answer injection, or selective drops (Castro & Liskov, OSDI 1999).

Source code in src/musil/sim.py
@dataclass(frozen=True, slots=True)
class NetworkModel:
    """How the virtual network treats each message. Defaults are a reliable, in-order channel; set
    ``loss``/``duplicate`` (probabilities in [0, 1]) and a ``min_latency``/``max_latency`` spread
    (virtual-time units) to inject drops, duplicates, and reordering. All draws use the sim's seed.

    ``mutate`` is a Byzantine fault hook: ``mutate(src, dst, payload) -> object | None``. The
    returned value replaces the payload before delivery; returning ``None`` drops the message.
    Applied after loss/duplicate decisions, so it targets messages that survive the network model.
    Models wire-level corruption, wrong-answer injection, or selective drops (Castro & Liskov, OSDI
    1999)."""

    loss: float = 0.0
    duplicate: float = 0.0
    min_latency: int = 1
    max_latency: int = 1
    mutate: Callable[[str, str, object], object | None] | None = None

Partition dataclass

A sustained network fault: every directed link in links ((src, dst) pairs) drops ALL traffic while active -- the window [start, end) in virtual time -- then heals. Unlike NetworkModel.loss (independent per message, which a retrying sender papers over), a partition cuts the link wholesale, so the system can only recover once it lifts. This is what makes a real liveness test: does it converge after the partition heals?

Source code in src/musil/sim.py
@dataclass(frozen=True, slots=True)
class Partition:
    """A sustained network fault: every directed link in ``links`` (``(src, dst)`` pairs) drops ALL
    traffic while active -- the window ``[start, end)`` in virtual time -- then heals. Unlike
    ``NetworkModel.loss`` (independent per message, which a retrying sender papers over), a partition
    cuts the link wholesale, so the system can only recover once it lifts. This is what makes a real
    *liveness* test: does it converge after the partition heals?"""

    links: frozenset[tuple[str, str]]
    start: int
    end: int

    def cuts(self, src: str, dst: str, t: int) -> bool:
        return self.start <= t < self.end and (src, dst) in self.links

Crash dataclass

A sustained process fault: node is down over [start, end) -- inbound messages are dropped, its timers don't fire, it runs no handlers -- then restarts. lose_state=True is a cold restart (a fresh node instance that re-runs on_start); the default is a warm resume that keeps the instance's state (it simply missed everything while down).

Source code in src/musil/sim.py
@dataclass(frozen=True, slots=True)
class Crash:
    """A sustained process fault: ``node`` is down over ``[start, end)`` -- inbound messages are
    dropped, its timers don't fire, it runs no handlers -- then restarts. ``lose_state=True`` is a cold
    restart (a fresh node instance that re-runs ``on_start``); the default is a warm resume that keeps
    the instance's state (it simply missed everything while down)."""

    node: str
    start: int
    end: int
    lose_state: bool = False

    def down(self, node: str, t: int) -> bool:
        return node == self.node and self.start <= t < self.end

FaultSchedule dataclass

A fixed set of sustained faults for one run. heal_time() is when the last fault lifts (the world is clean again) -- the point after which a live system must converge to its goal. Build one explicitly, or per-seed via a Callable[[Random], FaultSchedule] so randomized schedules stay seed-reproducible.

Source code in src/musil/sim.py
@dataclass(frozen=True, slots=True)
class FaultSchedule:
    """A fixed set of sustained faults for one run. ``heal_time()`` is when the last fault lifts (the
    world is clean again) -- the point after which a live system must converge to its goal. Build one
    explicitly, or per-seed via a ``Callable[[Random], FaultSchedule]`` so randomized schedules stay
    seed-reproducible."""

    partitions: tuple[Partition, ...] = ()
    crashes: tuple[Crash, ...] = ()

    def heal_time(self) -> int:
        return max([p.end for p in self.partitions] + [c.end for c in self.crashes], default=0)

    def link_cut(self, src: str, dst: str, t: int) -> bool:
        return any(p.cuts(src, dst, t) for p in self.partitions)

    def node_down(self, node: str, t: int) -> bool:
        return any(c.down(node, t) for c in self.crashes)

    def crash_ending_at(self, node: str, t: int) -> Crash | None:
        for c in self.crashes:
            if c.node == node and c.end == t:
                return c
        return None

RunResult dataclass

Outcome of a run. reason is "quiescent" (queue drained), "until" (goal predicate met), or "max-steps" (step cap hit).

Source code in src/musil/sim.py
@dataclass(frozen=True, slots=True)
class RunResult:
    """Outcome of a run. ``reason`` is ``"quiescent"`` (queue drained), ``"until"`` (goal predicate
    met), or ``"max-steps"`` (step cap hit)."""

    steps: int
    time: int
    delivered: int
    dropped: int
    reason: str

Context

A node's handle on the world, passed to every handler. Lets the node send messages, arm/cancel timers, read the virtual clock (now), and draw seeded randomness -- never use real time or the global random module in a node, or runs stop being reproducible.

Source code in src/musil/sim.py
class Context:
    """A node's handle on the world, passed to every handler. Lets the node send messages, arm/cancel
    timers, read the virtual clock (``now``), and draw seeded randomness -- never use real time or the
    global ``random`` module in a node, or runs stop being reproducible."""

    __slots__ = ("_node", "_sim")

    def __init__(self, sim: Simulator, node: str) -> None:
        self._sim = sim
        self._node = node

    @property
    def node(self) -> str:
        return self._node

    @property
    def now(self) -> int:
        return self._sim.now

    def send(self, dst: str, payload: object) -> None:
        self._sim.inject(self._node, dst, payload)

    def set_timer(self, name: str, delay: int = 1) -> None:
        """Arm (or re-arm) a timer; it fires ``delay`` virtual-time units from now via ``on_timer``.
        Re-arming a live timer of the same name replaces it."""
        self._sim.schedule_timer(self._node, name, delay)

    def cancel_timer(self, name: str) -> None:
        self._sim.cancel_timer(self._node, name)

    def random(self) -> float:
        return self._sim.random()

    def randint(self, a: int, b: int) -> int:
        return self._sim.randint(a, b)

set_timer

set_timer(name: str, delay: int = 1) -> None

Arm (or re-arm) a timer; it fires delay virtual-time units from now via on_timer. Re-arming a live timer of the same name replaces it.

Source code in src/musil/sim.py
def set_timer(self, name: str, delay: int = 1) -> None:
    """Arm (or re-arm) a timer; it fires ``delay`` virtual-time units from now via ``on_timer``.
    Re-arming a live timer of the same name replaces it."""
    self._sim.schedule_timer(self._node, name, delay)

Node

Bases: Protocol

What the simulator needs from a node: three event handlers. Subclass BaseNode and override only the ones you need.

Source code in src/musil/sim.py
class Node(Protocol):
    """What the simulator needs from a node: three event handlers. Subclass ``BaseNode`` and override
    only the ones you need."""

    def on_start(self, ctx: Context) -> None: ...
    def on_message(self, ctx: Context, src: str, payload: object) -> None: ...
    def on_timer(self, ctx: Context, name: str) -> None: ...

BaseNode

A node with no-op handlers; override what you need. Keep all mutable state on the instance.

Source code in src/musil/sim.py
class BaseNode:
    """A node with no-op handlers; override what you need. Keep all mutable state on the instance."""

    def on_start(self, ctx: Context) -> None:
        pass

    def on_message(self, ctx: Context, src: str, payload: object) -> None:
        pass

    def on_timer(self, ctx: Context, name: str) -> None:
        pass

AdversarialNode

Bases: BaseNode

A simulation node that injects adversarial behavior via a prioritized behavior list.

Models an external component (a K8s node, an AWS service, a Byzantine peer) that sometimes misbehaves. Each behavior is a (guard, response) pair:

  • guard(src, payload) -> bool — returns True when this behavior should fire.
  • response(ctx, src, payload) -> None — the adversarial action: send wrong data, send nothing, send duplicate responses, violate the protocol.

On each on_message, behaviors are tried in order; the first matching guard fires and no further behaviors are tried. If no guard matches, the message is silently dropped — the default adversarial posture (an external service that ignores requests).

All randomness in behavior callbacks must go through ctx.random() / ctx.randint() so runs stay reproducible across seeds. This is the simulation counterpart of EnvironmentSpec: same contract theory, different execution model.

Source code in src/musil/sim.py
class AdversarialNode(BaseNode):
    """A simulation node that injects adversarial behavior via a prioritized behavior list.

    Models an external component (a K8s node, an AWS service, a Byzantine peer) that sometimes
    misbehaves. Each behavior is a ``(guard, response)`` pair:

    - ``guard(src, payload) -> bool``  — returns True when this behavior should fire.
    - ``response(ctx, src, payload) -> None``  — the adversarial action: send wrong data,
      send nothing, send duplicate responses, violate the protocol.

    On each ``on_message``, behaviors are tried in order; the first matching guard fires and
    no further behaviors are tried. If no guard matches, the message is silently dropped —
    the default adversarial posture (an external service that ignores requests).

    All randomness in behavior callbacks must go through ``ctx.random()`` / ``ctx.randint()``
    so runs stay reproducible across seeds. This is the simulation counterpart of
    ``EnvironmentSpec``: same contract theory, different execution model.
    """

    def __init__(
        self,
        behaviors: list[
            tuple[
                Callable[[str, object], bool],         # guard(src, payload)
                Callable[[Context, str, object], None], # response(ctx, src, payload)
            ]
        ],
    ) -> None:
        self._behaviors = behaviors

    def on_message(self, ctx: Context, src: str, payload: object) -> None:
        for guard, response in self._behaviors:
            if guard(src, payload):
                response(ctx, src, payload)
                return

Simulator

Runs a fixed set of named nodes under a virtual clock and network. Construct with the node map, a seed, and an optional NetworkModel; call run. Determinism: identical (nodes, seed, network) always produce the identical run.

Source code in src/musil/sim.py
class Simulator:
    """Runs a fixed set of named nodes under a virtual clock and network. Construct with the node
    map, a ``seed``, and an optional ``NetworkModel``; call ``run``. Determinism: identical
    (nodes, seed, network) always produce the identical run."""

    __slots__ = (
        "_active_timers", "_counter", "_ctxs", "_delivered", "_dropped", "_events",
        "_faults", "_net", "_nodes", "_now", "_queue", "_rebuild", "_rng", "_started",
    )

    def __init__(
        self,
        nodes: Mapping[str, Node],
        *,
        seed: int = 0,
        network: NetworkModel | None = None,
        faults: FaultSchedule | None = None,
        rebuild: Callable[[], Mapping[str, Node]] | None = None,
    ) -> None:
        self._nodes: dict[str, Node] = dict(nodes)
        self._net = network if network is not None else NetworkModel()
        self._faults = faults
        # how to build a fresh node for a cold restart (Crash.lose_state); the per-node fresh
        # instance is taken from a fresh full map, so the same factory simulate already uses works.
        self._rebuild = rebuild
        self._rng = Random(seed)
        self._now: int = 0
        self._counter: int = 0
        self._delivered: int = 0
        self._dropped: int = 0
        self._queue: list[tuple[int, int]] = []  # (due_time, seq) min-heap
        self._events: dict[int, _Event] = {}
        self._active_timers: dict[tuple[str, str], int] = {}
        self._started: set[str] = set()
        self._ctxs: dict[str, Context] = {name: Context(self, name) for name in self._nodes}

    @property
    def nodes(self) -> Mapping[str, Node]:
        """The live node map -- the authoritative instances, which a cold restart swaps. The oracle
        snapshots through this so it always reads post-restart state."""
        return self._nodes

    @property
    def now(self) -> int:
        return self._now

    def _schedule(self, due: int, event: _Event) -> int:
        self._counter += 1
        seq = self._counter
        self._events[seq] = event
        heapq.heappush(self._queue, (due, seq))
        return seq

    def random(self) -> float:
        """A seeded random float in [0, 1) -- the only randomness nodes may use (keeps runs reproducible)."""
        return self._rng.random()

    def randint(self, a: int, b: int) -> int:
        """A seeded random integer in [a, b]."""
        return self._rng.randint(a, b)

    def inject(self, src: str, dst: str, payload: object) -> None:
        """Send a message (from a node's ``ctx.send``, or externally to kick the system). Subject to
        the network model: it may be dropped, duplicated, or delayed."""
        if dst not in self._nodes:
            raise ValueError(f"send to unknown node {dst!r} (known: {sorted(self._nodes)})")
        if self._faults is not None and self._faults.link_cut(src, dst, self._now):
            self._dropped += 1  # link partitioned right now -> the message never makes it on
            return
        net = self._net
        if net.loss > 0.0 and self._rng.random() < net.loss:
            self._dropped += 1
            return
        if net.mutate is not None:
            payload = net.mutate(src, dst, payload)
            if payload is None:
                self._dropped += 1
                return
        latency = self._rng.randint(net.min_latency, net.max_latency)
        env = Envelope(seq=self._counter + 1, src=src, dst=dst, payload=payload)
        self._schedule(self._now + latency, _Deliver(env))
        if net.duplicate > 0.0 and self._rng.random() < net.duplicate:
            dup_latency = self._rng.randint(net.min_latency, net.max_latency)
            self._schedule(self._now + dup_latency, _Deliver(env))

    def schedule_timer(self, node: str, name: str, delay: int) -> None:
        """Arm a timer for ``node`` that fires ``delay`` units from now (re-arming replaces it)."""
        seq = self._schedule(self._now + max(delay, 1), _Timer(node, name))
        self._active_timers[(node, name)] = seq

    def cancel_timer(self, node: str, name: str) -> None:
        self._active_timers.pop((node, name), None)

    def run(
        self,
        *,
        max_steps: int = 10_000,
        until: Callable[[Simulator], bool] | None = None,
        on_step: Callable[[Simulator], None] | None = None,
        after_start: Callable[[Simulator], None] | None = None,
    ) -> RunResult:
        """Drive the system to quiescence (queue empty), until ``until(sim)`` is true, or until
        ``max_steps`` events have fired. ``after_start(sim)`` runs once after every node's
        ``on_start`` (the initial world); ``on_step(sim)`` runs after each fired event -- the hooks
        the oracle uses to snapshot and check the world."""
        for name in sorted(self._nodes):
            if self._faults is not None and self._faults.node_down(name, 0):
                continue  # crashed at t=0 -> it boots when its window ends (its _Restart event)
            self._nodes[name].on_start(self._ctxs[name])
            self._started.add(name)
        if self._faults is not None:
            for crash in self._faults.crashes:
                # a restart event both reboots the node AND keeps the queue alive until heal, so a
                # run that would otherwise go quiescent mid-crash still waits for recovery.
                self._schedule(crash.end, _Restart(crash.node))
        if after_start is not None:
            after_start(self)
        if until is not None and until(self):
            return RunResult(0, self._now, self._delivered, self._dropped, "until")

        steps = 0
        reason = "quiescent"
        while self._queue:
            if steps >= max_steps:
                reason = "max-steps"
                break
            due, seq = heapq.heappop(self._queue)
            event = self._events.pop(seq)
            if isinstance(event, _Timer):
                if self._active_timers.get((event.node, event.name)) != seq:
                    continue  # cancelled or superseded by a re-arm
                self._active_timers.pop((event.node, event.name), None)
                if self._faults is not None and self._faults.node_down(event.node, due):
                    continue  # node is down -> its timer is lost
                self._now = due
                self._nodes[event.node].on_timer(self._ctxs[event.node], event.name)
            elif isinstance(event, _Restart):
                self._now = due
                self._restart(event.node, due)
            else:
                env = event.env
                if self._faults is not None and self._faults.node_down(env.dst, due):
                    self._dropped += 1
                    continue  # destination is down -> message dropped on arrival
                self._now = due
                self._delivered += 1
                self._nodes[env.dst].on_message(self._ctxs[env.dst], env.src, env.payload)
            steps += 1
            if on_step is not None:
                on_step(self)
            if until is not None and until(self):
                reason = "until"
                break

        return RunResult(steps, self._now, self._delivered, self._dropped, reason)

    def _restart(self, node: str, t: int) -> None:
        """Bring ``node`` back up at the end of its crash window. A cold restart (``lose_state``) swaps
        in a fresh instance from the factory and re-runs ``on_start``; a warm one resumes the existing
        instance (and runs ``on_start`` only if the node was down from t=0 and never booted)."""
        crash = self._faults.crash_ending_at(node, t) if self._faults is not None else None
        if crash is not None and crash.lose_state and self._rebuild is not None:
            self._nodes[node] = self._rebuild()[node]
            self._started.discard(node)
        if node not in self._started:
            self._nodes[node].on_start(self._ctxs[node])
            self._started.add(node)

nodes property

nodes: Mapping[str, Node]

The live node map -- the authoritative instances, which a cold restart swaps. The oracle snapshots through this so it always reads post-restart state.

random

random() -> float

A seeded random float in [0, 1) -- the only randomness nodes may use (keeps runs reproducible).

Source code in src/musil/sim.py
def random(self) -> float:
    """A seeded random float in [0, 1) -- the only randomness nodes may use (keeps runs reproducible)."""
    return self._rng.random()

randint

randint(a: int, b: int) -> int

A seeded random integer in [a, b].

Source code in src/musil/sim.py
def randint(self, a: int, b: int) -> int:
    """A seeded random integer in [a, b]."""
    return self._rng.randint(a, b)

inject

inject(src: str, dst: str, payload: object) -> None

Send a message (from a node's ctx.send, or externally to kick the system). Subject to the network model: it may be dropped, duplicated, or delayed.

Source code in src/musil/sim.py
def inject(self, src: str, dst: str, payload: object) -> None:
    """Send a message (from a node's ``ctx.send``, or externally to kick the system). Subject to
    the network model: it may be dropped, duplicated, or delayed."""
    if dst not in self._nodes:
        raise ValueError(f"send to unknown node {dst!r} (known: {sorted(self._nodes)})")
    if self._faults is not None and self._faults.link_cut(src, dst, self._now):
        self._dropped += 1  # link partitioned right now -> the message never makes it on
        return
    net = self._net
    if net.loss > 0.0 and self._rng.random() < net.loss:
        self._dropped += 1
        return
    if net.mutate is not None:
        payload = net.mutate(src, dst, payload)
        if payload is None:
            self._dropped += 1
            return
    latency = self._rng.randint(net.min_latency, net.max_latency)
    env = Envelope(seq=self._counter + 1, src=src, dst=dst, payload=payload)
    self._schedule(self._now + latency, _Deliver(env))
    if net.duplicate > 0.0 and self._rng.random() < net.duplicate:
        dup_latency = self._rng.randint(net.min_latency, net.max_latency)
        self._schedule(self._now + dup_latency, _Deliver(env))

schedule_timer

schedule_timer(node: str, name: str, delay: int) -> None

Arm a timer for node that fires delay units from now (re-arming replaces it).

Source code in src/musil/sim.py
def schedule_timer(self, node: str, name: str, delay: int) -> None:
    """Arm a timer for ``node`` that fires ``delay`` units from now (re-arming replaces it)."""
    seq = self._schedule(self._now + max(delay, 1), _Timer(node, name))
    self._active_timers[(node, name)] = seq

run

run(
    *,
    max_steps: int = 10000,
    until: Callable[[Simulator], bool] | None = None,
    on_step: Callable[[Simulator], None] | None = None,
    after_start: Callable[[Simulator], None] | None = None,
) -> RunResult

Drive the system to quiescence (queue empty), until until(sim) is true, or until max_steps events have fired. after_start(sim) runs once after every node's on_start (the initial world); on_step(sim) runs after each fired event -- the hooks the oracle uses to snapshot and check the world.

Source code in src/musil/sim.py
def run(
    self,
    *,
    max_steps: int = 10_000,
    until: Callable[[Simulator], bool] | None = None,
    on_step: Callable[[Simulator], None] | None = None,
    after_start: Callable[[Simulator], None] | None = None,
) -> RunResult:
    """Drive the system to quiescence (queue empty), until ``until(sim)`` is true, or until
    ``max_steps`` events have fired. ``after_start(sim)`` runs once after every node's
    ``on_start`` (the initial world); ``on_step(sim)`` runs after each fired event -- the hooks
    the oracle uses to snapshot and check the world."""
    for name in sorted(self._nodes):
        if self._faults is not None and self._faults.node_down(name, 0):
            continue  # crashed at t=0 -> it boots when its window ends (its _Restart event)
        self._nodes[name].on_start(self._ctxs[name])
        self._started.add(name)
    if self._faults is not None:
        for crash in self._faults.crashes:
            # a restart event both reboots the node AND keeps the queue alive until heal, so a
            # run that would otherwise go quiescent mid-crash still waits for recovery.
            self._schedule(crash.end, _Restart(crash.node))
    if after_start is not None:
        after_start(self)
    if until is not None and until(self):
        return RunResult(0, self._now, self._delivered, self._dropped, "until")

    steps = 0
    reason = "quiescent"
    while self._queue:
        if steps >= max_steps:
            reason = "max-steps"
            break
        due, seq = heapq.heappop(self._queue)
        event = self._events.pop(seq)
        if isinstance(event, _Timer):
            if self._active_timers.get((event.node, event.name)) != seq:
                continue  # cancelled or superseded by a re-arm
            self._active_timers.pop((event.node, event.name), None)
            if self._faults is not None and self._faults.node_down(event.node, due):
                continue  # node is down -> its timer is lost
            self._now = due
            self._nodes[event.node].on_timer(self._ctxs[event.node], event.name)
        elif isinstance(event, _Restart):
            self._now = due
            self._restart(event.node, due)
        else:
            env = event.env
            if self._faults is not None and self._faults.node_down(env.dst, due):
                self._dropped += 1
                continue  # destination is down -> message dropped on arrival
            self._now = due
            self._delivered += 1
            self._nodes[env.dst].on_message(self._ctxs[env.dst], env.src, env.payload)
        steps += 1
        if on_step is not None:
            on_step(self)
        if until is not None and until(self):
            reason = "until"
            break

    return RunResult(steps, self._now, self._delivered, self._dropped, reason)

SimFailure dataclass

A reproducible failure from simulate. kind is "invariant", "refinement", "goal" (settled-but-goal-false with no faults), or "liveness" (under a FaultSchedule, the system did not converge to the goal after the faults healed). detail names the broken invariant / refinement violation / goal. world is the offending snapshot and history the snapshots leading to it (a trace). Re-run with seeds=[seed] to reproduce exactly.

Source code in src/musil/sim.py
@dataclass(frozen=True, slots=True)
class SimFailure[W]:
    """A reproducible failure from ``simulate``. ``kind`` is ``"invariant"``, ``"refinement"``,
    ``"goal"`` (settled-but-goal-false with no faults), or ``"liveness"`` (under a ``FaultSchedule``,
    the system did not converge to the goal after the faults healed). ``detail`` names the broken
    invariant / refinement violation / goal. ``world`` is the offending snapshot and ``history`` the
    snapshots leading to it (a trace). Re-run with ``seeds=[seed]`` to reproduce exactly."""

    seed: int
    kind: str
    detail: str
    step: int
    world: W
    history: tuple[W, ...]

    def __str__(self) -> str:
        return (
            f"{self.kind.upper()} on seed {self.seed} at step {self.step}: {self.detail}\n"
            f"  world: {self.world}\n"
            f"  reproduce with seeds=[{self.seed}]"
        )

simulate

simulate(
    factory: Callable[[], Mapping[str, Node]],
    *,
    seeds: Iterable[int],
    snapshot: Callable[[Mapping[str, Node]], W],
    network: NetworkModel | None = None,
    faults: FaultSchedule
    | Callable[[Random], FaultSchedule]
    | None = None,
    invariants: Mapping[str, Callable[[W], bool]]
    | None = None,
    model: Model[S] | None = None,
    abstraction: Callable[[W], S] | None = None,
    goal: Callable[[W], bool] | None = None,
    goal_name: str = "goal",
    converge_within: int | None = None,
    max_steps: int = 10000,
    require_init: bool = True,
) -> SimReport[W]

Deterministic simulation testing of the real nodes. Runs the system once per seed under network (independent per-message faults) and an optional faults schedule (sustained, heal-able partitions / crashes -- a FaultSchedule or a per-seed Random -> FaultSchedule). After every step it checks each invariant and (if a model + abstraction are given) that the run refines the model -- safety holds during the chaos too. If a goal is given, the settled final world must satisfy it: with a non-trivial faults schedule this is the liveness check (did the system converge once the faults healed? -> kind="liveness"); with no faults it is the plain convergence check (kind="goal"). converge_within (steps after the schedule's heal_time) catches livelock early. Returns at the FIRST failing seed -- a reproducible repro (report.failure.seed). factory builds a fresh node map per run (also reused to cold-restart a crashed node); snapshot maps the live nodes to an immutable world value used by the oracle.

Source code in src/musil/sim.py
def simulate[W, S: Hashable](
    factory: Callable[[], Mapping[str, Node]],
    *,
    seeds: Iterable[int],
    snapshot: Callable[[Mapping[str, Node]], W],
    network: NetworkModel | None = None,
    faults: FaultSchedule | Callable[[Random], FaultSchedule] | None = None,
    invariants: Mapping[str, Callable[[W], bool]] | None = None,
    model: Model[S] | None = None,
    abstraction: Callable[[W], S] | None = None,
    goal: Callable[[W], bool] | None = None,
    goal_name: str = "goal",
    converge_within: int | None = None,
    max_steps: int = 10_000,
    require_init: bool = True,
) -> SimReport[W]:
    """Deterministic simulation testing of the real nodes. Runs the system once per seed under
    ``network`` (independent per-message faults) and an optional ``faults`` schedule (sustained,
    heal-able partitions / crashes -- a ``FaultSchedule`` or a per-seed ``Random -> FaultSchedule``).
    After every step it checks each invariant and (if a ``model`` + ``abstraction`` are given) that the
    run refines the model -- safety holds *during* the chaos too. If a ``goal`` is given, the settled
    final world must satisfy it: with a non-trivial ``faults`` schedule this is the **liveness** check
    (did the system converge once the faults healed? -> ``kind="liveness"``); with no faults it is the
    plain convergence check (``kind="goal"``). ``converge_within`` (steps after the schedule's
    ``heal_time``) catches livelock early. Returns at the FIRST failing seed -- a reproducible repro
    (``report.failure.seed``). ``factory`` builds a fresh node map per run (also reused to cold-restart
    a crashed node); ``snapshot`` maps the live nodes to an immutable world value used by the oracle."""
    if model is not None and abstraction is None:
        raise ValueError("model given without abstraction: pass abstraction=... to map worlds to model states")
    invs: Mapping[str, Callable[[W], bool]] = invariants if invariants is not None else {}
    runs = 0
    for seed in seeds:
        runs += 1
        schedule = faults(Random(seed)) if callable(faults) else faults
        heal_time = schedule.heal_time() if schedule is not None else 0
        nodes = factory()
        sim = Simulator(nodes, seed=seed, network=network, faults=schedule, rebuild=factory)
        monitor: RefinementMonitor[W, S] | None = (
            RefinementMonitor(model, abstraction, require_init=require_init)
            if model is not None and abstraction is not None
            else None
        )
        oracle = _Oracle(
            seed, sim, snapshot, invs, monitor,
            goal=goal, goal_name=goal_name, heal_time=heal_time, converge_within=converge_within,
        )
        try:
            sim.run(max_steps=max_steps, after_start=oracle, on_step=oracle)
        except _OracleStop:
            return SimReport(ok=False, runs=runs, failure=oracle.failure)
        if goal is not None:
            final = oracle.history[-1] if oracle.history else snapshot(sim.nodes)
            if not goal(final):
                kind = "liveness" if heal_time > 0 else "goal"
                step = len(oracle.history) - 1
                return SimReport(
                    ok=False,
                    runs=runs,
                    failure=SimFailure(seed, kind, goal_name, step, final, tuple(oracle.history)),
                )
    return SimReport(ok=True, runs=runs)

Open systems — check_open, EnvironmentSpec, Assumption

Verify a system against adversarial external environments. Each EnvironmentSpec is a contract: the non-deterministic behaviors the environment can exhibit (eviction, crash, wrong answer), the guarantees it commits to, and the assumptions those guarantees rest on (following seL4's two-layer assurance model: Klein et al., SOSP 2009). check_open composes by merging environment behaviors into the system's action set (so the BFS explores every adversarial move) and environment guarantees into the invariants. The result surface all Assumption objects that are not yet "verified" as residual proof obligations.

musil.env

Open-system verification: check a system against adversarial external environments.

Real distributed systems do not operate in isolation. A service runs on Linux, inside K8s, talking to AWS -- components that can crash, evict, throttle, or return wrong answers at any moment. musil's closed-system check assumes the entire state space is under the model's control. check_open lifts that restriction: each external component is an EnvironmentSpec -- a contract consisting of the non-deterministic behaviors it can exhibit, the guarantees it commits to, and the assumptions those guarantees rest on (following seL4's two-layer assurance model: Klein et al., SOSP 2009).

check_open composes by merging: env behaviors become additional actions (the BFS explores every thing the environment can do), env guarantees become additional invariants. This is Interface Automata composition (de Alfaro & Henzinger, ESEC/FSE 2001) -- the environment acts adversarially within its contract; the system must handle all of it.

Assumptions are first-class: they are printed in the result, never silently swallowed. A guarantee that rests on an unverified assumption is weaker than one that rests on a verified proof -- the caller knows exactly what the verification is worth.

Assumption dataclass

A named proof obligation taken on faith or verified elsewhere.

status tracks where the assumption stands: - "axiom" — taken as a foundation; not expected to be proved (e.g., hardware correctness) - "verified" — machine-checked or formally proved (e.g., seL4 kernel proof) - "unverified" — stated but not yet validated (the default; should be addressed)

source cites where the assumption comes from (paper, spec, URL). predicate is an optional machine-checkable version of the assumption; None for assumptions that cannot be expressed as a state predicate (e.g., "network eventually delivers").

Source code in src/musil/env.py
@dataclass(frozen=True, slots=True)
class Assumption:
    """A named proof obligation taken on faith or verified elsewhere.

    ``status`` tracks where the assumption stands:
    - ``"axiom"``      — taken as a foundation; not expected to be proved (e.g., hardware correctness)
    - ``"verified"``   — machine-checked or formally proved (e.g., seL4 kernel proof)
    - ``"unverified"`` — stated but not yet validated (the default; should be addressed)

    ``source`` cites where the assumption comes from (paper, spec, URL).
    ``predicate`` is an optional machine-checkable version of the assumption; ``None`` for
    assumptions that cannot be expressed as a state predicate (e.g., "network eventually delivers").
    """

    name: str
    description: str
    status: Literal["axiom", "verified", "unverified"] = "unverified"
    source: str = ""
    predicate: Callable[[Any], bool] | None = None

EnvironmentSpec dataclass

A contract for one external component: what it can do, what it promises, and what it assumes.

behaviors — the non-deterministic actions the environment can take. Each is an Action[S] that fires from the same global state the system uses. The BFS in check_open explores every possible environment action from every state, so the system is checked against the worst possible environment (within the contract). Model each distinct failure mode -- crash, evict, wrong-answer, timeout -- as a separate behavior.

guarantees — invariants the environment commits to hold in every reachable state. If the environment breaks one, check_open reports it as "<env.name>:<guarantee_name>".

assumptions — named Assumption objects the guarantees rest on. Unverified assumptions are surfaced in OpenResult.unverified_assumptions; they are never silently dropped.

Source code in src/musil/env.py
@dataclass(slots=True)
class EnvironmentSpec[S: Hashable]:
    """A contract for one external component: what it can do, what it promises, and what it assumes.

    ``behaviors`` — the non-deterministic actions the environment can take. Each is an ``Action[S]``
    that fires from the *same* global state the system uses. The BFS in ``check_open`` explores
    every possible environment action from every state, so the system is checked against the worst
    possible environment (within the contract). Model each distinct failure mode -- crash, evict,
    wrong-answer, timeout -- as a separate behavior.

    ``guarantees`` — invariants the environment commits to hold in every reachable state. If the
    environment breaks one, ``check_open`` reports it as ``"<env.name>:<guarantee_name>"``.

    ``assumptions`` — named ``Assumption`` objects the guarantees rest on. Unverified assumptions
    are surfaced in ``OpenResult.unverified_assumptions``; they are never silently dropped.
    """

    name: str
    behaviors: list[Action[S]]
    guarantees: Mapping[str, Invariant[S]] = field(default_factory=dict[str, Invariant[S]])
    assumptions: Mapping[str, Assumption] = field(default_factory=dict[str, Assumption])

OpenResult dataclass

The outcome of check_open. Wraps the underlying Result and surfaces every Assumption that is not yet "verified" -- these are the residual proof obligations the caller must discharge for the verification to be unconditional.

bool(open_result) delegates to result.ok so it works in assertions.

Source code in src/musil/env.py
@dataclass(frozen=True, slots=True)
class OpenResult[S: Hashable]:
    """The outcome of ``check_open``. Wraps the underlying ``Result`` and surfaces every
    ``Assumption`` that is not yet ``"verified"`` -- these are the residual proof obligations
    the caller must discharge for the verification to be unconditional.

    ``bool(open_result)`` delegates to ``result.ok`` so it works in assertions."""

    result: Result[S]
    unverified_assumptions: tuple[Assumption, ...]

    @property
    def ok(self) -> bool:
        return self.result.ok

    def __bool__(self) -> bool:
        return self.ok

    def __str__(self) -> str:
        lines = [str(self.result)]
        if self.unverified_assumptions:
            lines.append("Unverified assumptions (residual proof obligations):")
            for a in self.unverified_assumptions:
                src = f"  [{a.source}]" if a.source else ""
                lines.append(f"  [{a.status}] {a.name}: {a.description}{src}")
        return "\n".join(lines)

check_open

check_open(
    system: Model[S],
    *envs: EnvironmentSpec[S],
    max_states: int = 1000000,
) -> OpenResult[S]

Check system against adversarial external environments.

Composes the system and every environment by merging: - all env.behaviors into the system's actions (the BFS explores every env action) - all env.guarantees into the system's invariants (qualified as "<env.name>:<k>")

Then calls the standard check on the composed model.

check_open(system) with no environments is identical to check(system).

Unverified assumptions from all environments are collected and returned in OpenResult.unverified_assumptions -- they are the residual proof obligations.

Source code in src/musil/env.py
def check_open[S: Hashable](
    system: Model[S],
    *envs: EnvironmentSpec[S],
    max_states: int = 1_000_000,
) -> OpenResult[S]:
    """Check ``system`` against adversarial external environments.

    Composes the system and every environment by merging:
    - all ``env.behaviors`` into the system's actions (the BFS explores every env action)
    - all ``env.guarantees`` into the system's invariants (qualified as ``"<env.name>:<k>"``)

    Then calls the standard ``check`` on the composed model.

    ``check_open(system)`` with no environments is identical to ``check(system)``.

    Unverified assumptions from all environments are collected and returned in
    ``OpenResult.unverified_assumptions`` -- they are the residual proof obligations.
    """
    all_actions = list(system.actions)
    for env in envs:
        all_actions.extend(env.behaviors)

    all_invariants: dict[str, Invariant[S]] = dict(system.invariants)
    for env in envs:
        for k, inv in env.guarantees.items():
            all_invariants[f"{env.name}:{k}"] = inv

    composed = Model(
        init=system.init,
        actions=all_actions,
        invariants=all_invariants,
        terminal=system.terminal,
    )
    result = check(composed, max_states=max_states)

    unverified: list[Assumption] = []
    for env in envs:
        for assumption in env.assumptions.values():
            if assumption.status != "verified":
                unverified.append(assumption)

    return OpenResult(result=result, unverified_assumptions=tuple(unverified))

Graph export — to_dot, explore

The reachable state graph and its Graphviz DOT rendering.

musil.graph

Render a model's reachable state graph as Graphviz DOT — the same artifact FizzBee emits, but from the Python model. to_dot(model) explores the model and returns a DOT string you can pipe to dot -Tsvg. Pass highlight (a sequence of states, e.g. the states of a counterexample trace) to colour that path — handy for visualising why a check failed.

to_dot

to_dot(
    source: Model[S] | Graph[S],
    *,
    highlight: Iterable[S] = (),
    name: str = "musil",
    max_states: int = 1000000,
) -> str

Graphviz DOT for the reachable state graph. source is a Model (explored here) or a pre-built Graph. Initial states are drawn bold; highlight states are filled and edges between two highlighted states are red (e.g. highlight=[s.state for s in result.trace]).

Source code in src/musil/graph.py
def to_dot[S: Hashable](
    source: Model[S] | Graph[S],
    *,
    highlight: Iterable[S] = (),
    name: str = "musil",
    max_states: int = 1_000_000,
) -> str:
    """Graphviz DOT for the reachable state graph. ``source`` is a Model (explored here) or a
    pre-built Graph. Initial states are drawn bold; ``highlight`` states are filled and edges between
    two highlighted states are red (e.g. ``highlight=[s.state for s in result.trace]``)."""
    graph = source if isinstance(source, Graph) else explore(source, max_states=max_states)
    idx = {s: i for i, s in enumerate(graph.states)}
    hi = set(highlight)
    inits = set(graph.initial)

    lines = [
        f"digraph {name} {{",
        "  rankdir=LR;",
        '  node [shape=box, fontname="monospace"];',
        '  edge [fontname="monospace"];',
    ]
    for s in graph.states:
        attrs = [f'label="{_escape(_label(s))}"']
        if s in inits:
            attrs.append("penwidth=2")
        if s in hi:
            attrs += ["style=filled", 'fillcolor="#ffe0e0"']
        lines.append(f"  n{idx[s]} [{', '.join(attrs)}];")
    for s in graph.states:
        for action, t in graph.edges[s]:
            edge_attrs = [f'label="{_escape(action)}"']
            if s in hi and t in hi:
                edge_attrs.append('color="red"')
                edge_attrs.append("penwidth=2")
            lines.append(f"  n{idx[s]} -> n{idx[t]} [{', '.join(edge_attrs)}];")
    lines.append("}")
    return "\n".join(lines) + "\n"