Skip to content

Cookbook

Common patterns. Each is a short, copy-pasteable starting point — adapt to your code.

Boot pattern: try-resume, fall through to init

import strix
from strix import NoActiveSessionError, RiskSettings

transport = strix.LocalTransport(data_dir="./strix_data")
RISK = RiskSettings(max_qty_per_order=1000, position_limits=LIMITS)

try:
    strix.resume(transport=transport)
    for ex in broker.fetch_executions(since=last_seen):
        strix.ingest_execution(ex)
    strix.init(transport=transport, risk=RISK)
except NoActiveSessionError:
    strix.init(transport=transport, risk=RISK)

The risk config goes on the init call. (Note: the resume path's prior session keeps its own config from when it was opened — you can't retroactively change limits on a session you're resuming.)

Submit-and-fill on the same line

For unit tests and synchronous backtests where the fill is known immediately:

import strix
from strix import Side, Execution

strix.init()

def market_buy(symbol: str, qty, fill_price) -> None:
    with strix.order(symbol=symbol, side=Side.BUY, qty=qty) as o:
        pass   # no real broker; we just want the order recorded
    strix.ingest_execution(Execution(
        order_id=o.order_id, symbol=symbol, side=Side.BUY,
        qty=qty, price=fill_price,
    ))

market_buy("AAPL", 100, "150")

The exposed o is still a snapshot, but order_id is stable so ingest_execution can find the order.

Stable order IDs across systems

Default order_id is a UUIDv7 (time-ordered, sortable). Pass your own when you need stable IDs (broker reconciliation, replay, multi-system attribution):

def my_order_id(strategy: str, signal_seq: int) -> str:
    return f"{strategy}-{signal_seq:012d}"

with strix.order(
    symbol="AAPL", side=Side.BUY, qty=100,
    order_id=my_order_id("mean-reversion", signal_seq),
) as o:
    broker.submit(o.order_id, o.symbol, o.side, o.qty)

Within a single session, IDs must be unique — registering the same ID twice raises StrixDuplicateOrderIdError (also a ValueError for back-compat). Open orders carry forward across strix.init(...), so a still-open id stays taken in the next session; terminal orders drop, so their ids become reusable. Be deterministic enough that you don't accidentally collide within a session.

Contract: any non-empty string up to 128 characters, no ASCII control characters. If you care about cloud-DB index locality (when CloudTransport lands post-v1), prefer time-ordered schemes — UUIDv7, ULID, Snowflake. The default is UUIDv7; pure UUIDv4 will work but degrades the secondary index when orders move to a hosted DB.

Backtest harness — one session per run

Each backtest run gets its own session. Use LocalTransport if you want the completed sessions to accumulate as a history.

import strix
from strix import LocalTransport

def run_backtest(strategy, data, *, results_dir: str) -> None:
    transport = LocalTransport(data_dir=results_dir)
    session = strix.init(transport=transport)
    try:
        for bar in data:
            strategy.on_bar(bar)   # strategy uses strix.order and strix.ingest_execution
    finally:
        transport.flush()
        transport.close()
    return session.session_id

for i in range(N_RUNS):
    sid = run_backtest(strategy, data, results_dir=f"./backtests/run_{i:03d}")
    print(f"run {i} -> session_id={sid}")

Each run lands in its own data_dir so the data-dir lock doesn't serialize parallel runs. (If you want all runs under one data_dir, run them sequentially in one process — each init closes the prior session.)

Per-strategy attribution

Strix's session model is global to the process. If you want per-strategy attribution within one process, encode the strategy in the order_id prefix:

def submit(strategy: str, symbol: str, side, qty) -> None:
    # uuid.uuid7() on Python 3.14+; fall back to uuid4() on older runtimes
    # (less ideal for cloud-DB locality but still unique).
    with strix.order(
        symbol=symbol, side=side, qty=qty,
        order_id=f"{strategy}:{uuid.uuid7()}",
    ) as o:
        broker.submit(o.order_id, o.symbol, o.side, o.qty)

Dashboards (post-v1) and ad-hoc scripts can split by the prefix. This is a string-convention pattern, not a Strix feature — but it's the lightweight way to get attribution without per-strategy sessions.

If you need real per-strategy isolation (different risk configs, independent open-orders counters), today the cleanest answer is one process per strategy with its own data_dir. Native multi-session-in-one-process is not a v1 feature.

Cancelling an order

with strix.cancel(order_id=o.order_id):
    broker.cancel(o.order_id)

Lifecycle: order → PENDING_CANCEL on entry → CANCELLED on clean exit. If your broker call throws, the order reverts to its prior status (NEW or PARTIALLY_FILLED) and a CancelAttemptFailed event is appended for audit.

Cancellable from PENDING_NEW, NEW, PARTIALLY_FILLED. Cancelling a FILLED, REJECTED, CANCELLED, or already-PENDING_CANCEL order raises StrixOrderNotCancellableError (a subclass of both StrixCancelError and ValueError). Cancelling an unknown order_id raises StrixUnknownOrderError (a subclass of both StrixCancelError and KeyError).

Handling the fill-wins race

If a fill arrives while your cancel is in flight (PENDING_CANCEL), the fill applies and the order moves to PARTIALLY_FILLED or FILLED. The cancel block exits as a no-op — broker reality wins.

from strix import StrixCancelError

try:
    with strix.cancel(order_id=o.order_id):
        broker.cancel(o.order_id)
except StrixCancelError:
    # Order is unknown or in a non-cancellable status — usually a race with
    # a fill or a prior cancel landing first. Treat as a no-op.
    pass
except Exception:
    # Broker call failed; order reverted, CancelAttemptFailed event emitted.
    raise

# Check final status — could be CANCELLED, FILLED, or PARTIALLY_FILLED depending on the race.
final = next(x for x in strix.open_orders() if x.order_id == o.order_id) if ... else None

When you need to discriminate, catch the specific subclass:

from strix import StrixOrderNotCancellableError, StrixUnknownOrderError

try:
    with strix.cancel(order_id=o.order_id):
        broker.cancel(o.order_id)
except StrixUnknownOrderError:
    log.info("order vanished from book before cancel reached it")
except StrixOrderNotCancellableError as e:
    log.info("cancel race: order now %s", e.current_status.value)

Existing handlers using bare except KeyError: / except ValueError: keep working — both subclasses inherit from their stdlib counterpart for back-compat.

Catching risk rejections gracefully

from strix import StrixRiskError

def safe_submit(symbol, side, qty) -> bool:
    try:
        with strix.order(symbol=symbol, side=side, qty=qty) as o:
            broker.submit(o.order_id, o.symbol, o.side, o.qty)
        return True
    except StrixRiskError as e:
        log.info("blocked: %s", e.reason)
        return False

A False return is a normal outcome, not an error. Risk checks doing their job.

Reading state mid-session

from strix import OrderStatus

# All positions (including zero-qty entries from previously-held symbols).
all_positions = strix.positions()

# Filter to non-zero.
holdings = [p for p in all_positions if p.qty != 0]

# Open orders (PENDING_NEW, NEW, PARTIALLY_FILLED, PENDING_CANCEL).
working = strix.open_orders()

# Open count of a specific symbol.
aapl_open = sum(1 for o in working if o.symbol == "AAPL")

# Filter by status — e.g. only fully-acked NEW orders (skip PENDING_*).
cancellable = [o for o in working if o.status == OrderStatus.NEW]

# Single-order lookup by id — works for any status, including terminal
# (FILLED, REJECTED, CANCELLED) which open_orders() excludes.
o = strix.get_order(some_order_id)   # -> Order | None

These accessors return fresh snapshots each call. Cheap, no caching needed on your side.

Reading an order after the strix.order(...) block exits

The Order yielded inside the block is a snapshot frozen at PENDING_NEW — transitions replace the stored object rather than mutating the yielded one. To read the post-block state (NEW, or REJECTED if the body threw, or any later status after fills/cancels), call strix.get_order(...):

with strix.order(symbol="AAPL", side=Side.BUY, qty=100) as o:
    broker.submit(o.order_id, o.symbol, o.side, o.qty)

# o.status is still PENDING_NEW here — it's the snapshot, not a live view.
current = strix.get_order(o.order_id)
assert current is not None and current.status is OrderStatus.NEW

Distinguishing partial vs full fill from ingest_execution

strix.ingest_execution returns the matching order's resulting OrderStatus, so you don't need a follow-up open_orders() lookup to know whether the fill closed the order:

from strix import OrderStatus

status = strix.ingest_execution(execution)
if status == OrderStatus.FILLED:
    log.info("done: %s", execution.order_id)
elif status == OrderStatus.PARTIALLY_FILLED:
    log.info("partial: %s (remainder still working)", execution.order_id)
elif status is None:
    # Duplicate execution_id or no matching order_id — nothing changed.
    pass

For anomalies (mismatched symbol/side/qty, terminal-order target, overfill) the return value is the order's prior status — the order book is left untouched because the position book is the broker-authoritative view. See order lifecycle § Validation.

Custom timestamps (replay, backtests)

strix.order(..., timestamp=...) and Execution(..., timestamp=...) both accept a caller-supplied timestamp. Useful for replays where the real wall-clock isn't the trading time.

from datetime import datetime, timezone

ts = datetime(2026, 1, 15, 9, 30, tzinfo=timezone.utc)

with strix.order(symbol="AAPL", side=Side.BUY, qty=100, timestamp=ts) as o:
    pass

strix.ingest_execution(Execution(
    order_id=o.order_id, symbol="AAPL", side=Side.BUY,
    qty=100, price="150",
    timestamp=ts,
))

These timestamps are informational — Strix doesn't use them for ordering decisions. The event log's seq is the authoritative order; ts is for human-readable audit and for the dashboard.

Pushing marks on every tick (one-liner)

The minimum-viable mark feed in a tick loop:

strix.mark_to_market(
    {s: feed.last(s) for s in strix.active_symbols() if feed.last(s) is not None}
)

active_symbols() is what Strix needs marks for right now (non-zero position OR open order on the symbol). Empty payloads are no-ops. Filtering out None keeps the call tidy when your feed hasn't seen a symbol yet — Strix will then warn about the missing mark on the next stop-loss eval, which is what you want.

Configuring SessionConfig

strix.init(..., config=SessionConfig(...)) controls behavior policies that aren't risk rules — how to react when reality diverges from expectations:

from strix import SessionConfig

strix.init(
    transport=transport,
    config=SessionConfig(
        on_invalid_execution="warn",       # log + keep going on broker/SDK divergence
        on_missing_market_data="allow",    # skip pre-trade checks that need a price
    ),
    risk=...,
)

Defaults are conservative ("raise" / "reject" — fail-closed). Switch to the looser settings only when you've understood the consequences:

  • on_invalid_execution="warn" will keep your fill pipeline running even when an execution doesn't match the local order book (the broker is still authoritative on position). Useful if you'd rather log + investigate later than abort the tick.
  • on_missing_market_data="allow" lets pre-trade checks pass through when your adapter can't supply a quote. Risky on tight position-limit / notional configs; pair with a synthetic-fallback adapter if you can.

SessionConfig is not carried forward across strix.init calls — re-pass it on every init if you want the same policy in the next session.

Storing risk config as code

Risk limits drift; check them into source.

# risk_config.py
from decimal import Decimal

from strix import RiskSettings

RISK = RiskSettings(
    max_qty_per_order=Decimal("1000"),
    max_open_orders=20,
    position_limits={
        "AAPL": Decimal("500"),
        "MSFT": Decimal("1000"),
        "GOOG": Decimal("200"),
    },
)

# main.py
import strix
import risk_config as rc

strix.init(transport=strix.LocalTransport(data_dir="./strix_data"), risk=rc.RISK)

The risk config is persisted as part of the SessionStarted event, so the dashboard knows the limits in force for each session.

Clean shutdown

LocalTransport releases the data-dir lock on process exit, but eager cleanup is fine in long-lived algos:

transport = strix.LocalTransport(data_dir="./strix_data")
strix.init(transport=transport)
try:
    # ... trading loop ...
finally:
    transport.close()

flush() forces an fsync; not usually needed since append() already fsyncs, but it's there for explicit pre-flush points.

What strix.resume does NOT recover

resume rebuilds the parts of state Strix owns: positions, open orders, risk policy, halt projection, realized P&L. The mark cache is not restored — marks are runtime state by design; re-supply your MarketDataAdapter and resume calling strix.mark_to_market(...) to repopulate it. Strix also does not rebuild your strategy's state — rolling windows, signal counters, per-symbol cooldowns, ML model state, etc. Those are user-owned.

For long-running strategies on live data, plan for this explicitly:

# Option A: re-derive strategy state from a warmup window on resume.
strategy.warmup(lookback_bars=feed.fetch_recent(symbols, n=200))

# Option B: persist strategy state alongside Strix's data_dir and load it back.
strategy.load_state(Path(data_dir) / "strategy_state.json")

# Option C: skip the first N ticks while indicators re-converge.
strategy.skip_warmup_signals = True

Strix's LocalTransport(data_dir=...) is intentionally for Strix's event log only; a sibling file in the same directory is the easiest place to park strategy state for self-contained resume.

Build your own analytics from the event log

Everything Strix records is plain JSONL on your disk — one self-describing JSON object per line, schema documented in Persistence. You don't need Strix (or us) to analyze it. Fills table and P&L curve with pandas:

import json
from pathlib import Path

import pandas as pd
import strix

data_dir = "./strix_data"

latest = strix.list_sessions(data_dir=data_dir)[-1]
path = Path(data_dir) / "sessions" / latest / "events.jsonl"
events = [json.loads(line) for line in path.read_text().splitlines()]

# One row per fill.
fills = pd.json_normalize(
    [e["execution"] for e in events if e["type"] == "ExecutionApplied"]
)
fills[["qty", "price"]] = fills[["qty", "price"]].astype(float)
fills["signed_qty"] = fills["qty"].where(fills["side"] == "BUY", -fills["qty"])
print(fills.groupby("symbol")["signed_qty"].sum())   # net filled qty per symbol

# Equity curve from the throttled PnLSnapshot events.
pnl = pd.DataFrame(
    [
        {"ts": e["ts"], "realized": e["realized"], "unrealized": e["unrealized"]}
        for e in events
        if e["type"] == "PnLSnapshot"
    ]
)
pnl["ts"] = pd.to_datetime(pnl["ts"])
pnl[["realized", "unrealized"]] = pnl[["realized", "unrealized"]].astype(float)
pnl["total"] = pnl["realized"] + pnl["unrealized"]
pnl.set_index("ts")["total"].plot()                  # needs matplotlib

Notes:

  • Decimal fields arrive as JSON strings ("150.00") to preserve precision. astype(float) is fine for plotting and exploration; parse through decimal.Decimal instead when the numbers feed accounting.
  • PnLSnapshot events only appear while you feed strix.mark_to_market(...); their cadence is SessionConfig.pnl_snapshot_every_seconds (default 15s).
  • Prefer typed objects over raw dicts? strix.replay(data_dir=...) yields parsed SessionEvent instances you can isinstance-check — same data, no pandas. With follow=True it tails the log live (read-only, no lock), which is how you build a sidecar monitor on a running bot.
  • The schema is a supported public contract, not an implementation detail: every event carries schema_version, and changes will be explicit in the changelog.

Anti-patterns

A short list of things that look right but aren't.

Calling the broker inside the order block and outside it:

with strix.order(symbol="AAPL", side=Side.BUY, qty=100) as o:
    broker.submit(...)
broker.submit(...)   # also called outside the block — Strix won't see this one's lifecycle

Catching StrixRiskError to bypass the check:

while True:
    try:
        with strix.order(...) as o:
            ...
        break
    except StrixRiskError:
        # don't do this — you're papering over the limit you set
        continue

Mutating the exposed Order: It's immutable; you can't. Read it, don't try to write it.

Using floats: floating-point quantities are rejected. Use "100.5" or Decimal("100.5"). This is on purpose — see Getting started.

Calling ingest_execution for an order ID not in the book: raises a missing-order error. Strix doesn't auto-create the order — the order has to come through strix.order(...) first.