Cookbook¶
Common patterns. Each is a short, copy-pasteable starting point — adapt to your code.
Boot pattern: try-resume, fall through to init¶
import strix
from strix import NoActiveSessionError, RiskSettings
transport = strix.LocalTransport(data_dir="./strix_data")
RISK = RiskSettings(max_qty_per_order=1000, position_limits=LIMITS)
try:
strix.resume(transport=transport)
for ex in broker.fetch_executions(since=last_seen):
strix.ingest_execution(ex)
strix.init(transport=transport, risk=RISK)
except NoActiveSessionError:
strix.init(transport=transport, risk=RISK)
The risk config goes on the init call. (Note: the resume path's prior session keeps its own config from when it was opened — you can't retroactively change limits on a session you're resuming.)
Submit-and-fill on the same line¶
For unit tests and synchronous backtests where the fill is known immediately:
import strix
from strix import Side, Execution
strix.init()
def market_buy(symbol: str, qty, fill_price) -> None:
with strix.order(symbol=symbol, side=Side.BUY, qty=qty) as o:
pass # no real broker; we just want the order recorded
strix.ingest_execution(Execution(
order_id=o.order_id, symbol=symbol, side=Side.BUY,
qty=qty, price=fill_price,
))
market_buy("AAPL", 100, "150")
The exposed o is still a snapshot, but order_id is stable so ingest_execution can find the order.
Stable order IDs across systems¶
Default order_id is a UUIDv7 (time-ordered, sortable). Pass your own when you need stable IDs (broker reconciliation, replay, multi-system attribution):
def my_order_id(strategy: str, signal_seq: int) -> str:
return f"{strategy}-{signal_seq:012d}"
with strix.order(
symbol="AAPL", side=Side.BUY, qty=100,
order_id=my_order_id("mean-reversion", signal_seq),
) as o:
broker.submit(o.order_id, o.symbol, o.side, o.qty)
Within a single session, IDs must be unique — registering the same ID twice raises StrixDuplicateOrderIdError (also a ValueError for back-compat). Open orders carry forward across strix.init(...), so a still-open id stays taken in the next session; terminal orders drop, so their ids become reusable. Be deterministic enough that you don't accidentally collide within a session.
Contract: any non-empty string up to 128 characters, no ASCII control characters. If you care about cloud-DB index locality (when CloudTransport lands post-v1), prefer time-ordered schemes — UUIDv7, ULID, Snowflake. The default is UUIDv7; pure UUIDv4 will work but degrades the secondary index when orders move to a hosted DB.
Backtest harness — one session per run¶
Each backtest run gets its own session. Use LocalTransport if you want the completed sessions to accumulate as a history.
import strix
from strix import LocalTransport
def run_backtest(strategy, data, *, results_dir: str) -> None:
transport = LocalTransport(data_dir=results_dir)
session = strix.init(transport=transport)
try:
for bar in data:
strategy.on_bar(bar) # strategy uses strix.order and strix.ingest_execution
finally:
transport.flush()
transport.close()
return session.session_id
for i in range(N_RUNS):
sid = run_backtest(strategy, data, results_dir=f"./backtests/run_{i:03d}")
print(f"run {i} -> session_id={sid}")
Each run lands in its own data_dir so the data-dir lock doesn't serialize parallel runs. (If you want all runs under one data_dir, run them sequentially in one process — each init closes the prior session.)
Per-strategy attribution¶
Strix's session model is global to the process. If you want per-strategy attribution within one process, encode the strategy in the order_id prefix:
def submit(strategy: str, symbol: str, side, qty) -> None:
# uuid.uuid7() on Python 3.14+; fall back to uuid4() on older runtimes
# (less ideal for cloud-DB locality but still unique).
with strix.order(
symbol=symbol, side=side, qty=qty,
order_id=f"{strategy}:{uuid.uuid7()}",
) as o:
broker.submit(o.order_id, o.symbol, o.side, o.qty)
Dashboards (post-v1) and ad-hoc scripts can split by the prefix. This is a string-convention pattern, not a Strix feature — but it's the lightweight way to get attribution without per-strategy sessions.
If you need real per-strategy isolation (different risk configs, independent open-orders counters), today the cleanest answer is one process per strategy with its own data_dir. Native multi-session-in-one-process is not a v1 feature.
Cancelling an order¶
Lifecycle: order → PENDING_CANCEL on entry → CANCELLED on clean exit. If your broker call throws, the order reverts to its prior status (NEW or PARTIALLY_FILLED) and a CancelAttemptFailed event is appended for audit.
Cancellable from PENDING_NEW, NEW, PARTIALLY_FILLED. Cancelling a FILLED, REJECTED, CANCELLED, or already-PENDING_CANCEL order raises StrixOrderNotCancellableError (a subclass of both StrixCancelError and ValueError). Cancelling an unknown order_id raises StrixUnknownOrderError (a subclass of both StrixCancelError and KeyError).
Handling the fill-wins race¶
If a fill arrives while your cancel is in flight (PENDING_CANCEL), the fill applies and the order moves to PARTIALLY_FILLED or FILLED. The cancel block exits as a no-op — broker reality wins.
from strix import StrixCancelError
try:
with strix.cancel(order_id=o.order_id):
broker.cancel(o.order_id)
except StrixCancelError:
# Order is unknown or in a non-cancellable status — usually a race with
# a fill or a prior cancel landing first. Treat as a no-op.
pass
except Exception:
# Broker call failed; order reverted, CancelAttemptFailed event emitted.
raise
# Check final status — could be CANCELLED, FILLED, or PARTIALLY_FILLED depending on the race.
final = next(x for x in strix.open_orders() if x.order_id == o.order_id) if ... else None
When you need to discriminate, catch the specific subclass:
from strix import StrixOrderNotCancellableError, StrixUnknownOrderError
try:
with strix.cancel(order_id=o.order_id):
broker.cancel(o.order_id)
except StrixUnknownOrderError:
log.info("order vanished from book before cancel reached it")
except StrixOrderNotCancellableError as e:
log.info("cancel race: order now %s", e.current_status.value)
Existing handlers using bare except KeyError: / except ValueError: keep working — both subclasses inherit from their stdlib counterpart for back-compat.
Catching risk rejections gracefully¶
from strix import StrixRiskError
def safe_submit(symbol, side, qty) -> bool:
try:
with strix.order(symbol=symbol, side=side, qty=qty) as o:
broker.submit(o.order_id, o.symbol, o.side, o.qty)
return True
except StrixRiskError as e:
log.info("blocked: %s", e.reason)
return False
A False return is a normal outcome, not an error. Risk checks doing their job.
Reading state mid-session¶
from strix import OrderStatus
# All positions (including zero-qty entries from previously-held symbols).
all_positions = strix.positions()
# Filter to non-zero.
holdings = [p for p in all_positions if p.qty != 0]
# Open orders (PENDING_NEW, NEW, PARTIALLY_FILLED, PENDING_CANCEL).
working = strix.open_orders()
# Open count of a specific symbol.
aapl_open = sum(1 for o in working if o.symbol == "AAPL")
# Filter by status — e.g. only fully-acked NEW orders (skip PENDING_*).
cancellable = [o for o in working if o.status == OrderStatus.NEW]
# Single-order lookup by id — works for any status, including terminal
# (FILLED, REJECTED, CANCELLED) which open_orders() excludes.
o = strix.get_order(some_order_id) # -> Order | None
These accessors return fresh snapshots each call. Cheap, no caching needed on your side.
Reading an order after the strix.order(...) block exits¶
The Order yielded inside the block is a snapshot frozen at PENDING_NEW — transitions replace the stored object rather than mutating the yielded one. To read the post-block state (NEW, or REJECTED if the body threw, or any later status after fills/cancels), call strix.get_order(...):
with strix.order(symbol="AAPL", side=Side.BUY, qty=100) as o:
broker.submit(o.order_id, o.symbol, o.side, o.qty)
# o.status is still PENDING_NEW here — it's the snapshot, not a live view.
current = strix.get_order(o.order_id)
assert current is not None and current.status is OrderStatus.NEW
Distinguishing partial vs full fill from ingest_execution¶
strix.ingest_execution returns the matching order's resulting
OrderStatus, so you don't need a follow-up open_orders() lookup to
know whether the fill closed the order:
from strix import OrderStatus
status = strix.ingest_execution(execution)
if status == OrderStatus.FILLED:
log.info("done: %s", execution.order_id)
elif status == OrderStatus.PARTIALLY_FILLED:
log.info("partial: %s (remainder still working)", execution.order_id)
elif status is None:
# Duplicate execution_id or no matching order_id — nothing changed.
pass
For anomalies (mismatched symbol/side/qty, terminal-order target, overfill) the return value is the order's prior status — the order book is left untouched because the position book is the broker-authoritative view. See order lifecycle § Validation.
Custom timestamps (replay, backtests)¶
strix.order(..., timestamp=...) and Execution(..., timestamp=...) both accept a caller-supplied timestamp. Useful for replays where the real wall-clock isn't the trading time.
from datetime import datetime, timezone
ts = datetime(2026, 1, 15, 9, 30, tzinfo=timezone.utc)
with strix.order(symbol="AAPL", side=Side.BUY, qty=100, timestamp=ts) as o:
pass
strix.ingest_execution(Execution(
order_id=o.order_id, symbol="AAPL", side=Side.BUY,
qty=100, price="150",
timestamp=ts,
))
These timestamps are informational — Strix doesn't use them for ordering decisions. The event log's seq is the authoritative order; ts is for human-readable audit and for the dashboard.
Pushing marks on every tick (one-liner)¶
The minimum-viable mark feed in a tick loop:
strix.mark_to_market(
{s: feed.last(s) for s in strix.active_symbols() if feed.last(s) is not None}
)
active_symbols() is what Strix needs marks for right now (non-zero
position OR open order on the symbol). Empty payloads are no-ops. Filtering
out None keeps the call tidy when your feed hasn't seen a symbol yet —
Strix will then warn about the missing mark on the next stop-loss eval,
which is what you want.
Configuring SessionConfig¶
strix.init(..., config=SessionConfig(...)) controls behavior policies
that aren't risk rules — how to react when reality diverges from
expectations:
from strix import SessionConfig
strix.init(
transport=transport,
config=SessionConfig(
on_invalid_execution="warn", # log + keep going on broker/SDK divergence
on_missing_market_data="allow", # skip pre-trade checks that need a price
),
risk=...,
)
Defaults are conservative ("raise" / "reject" — fail-closed). Switch to
the looser settings only when you've understood the consequences:
on_invalid_execution="warn"will keep your fill pipeline running even when an execution doesn't match the local order book (the broker is still authoritative on position). Useful if you'd rather log + investigate later than abort the tick.on_missing_market_data="allow"lets pre-trade checks pass through when your adapter can't supply a quote. Risky on tight position-limit / notional configs; pair with a synthetic-fallback adapter if you can.
SessionConfig is not carried forward across strix.init calls — re-pass
it on every init if you want the same policy in the next session.
Storing risk config as code¶
Risk limits drift; check them into source.
# risk_config.py
from decimal import Decimal
from strix import RiskSettings
RISK = RiskSettings(
max_qty_per_order=Decimal("1000"),
max_open_orders=20,
position_limits={
"AAPL": Decimal("500"),
"MSFT": Decimal("1000"),
"GOOG": Decimal("200"),
},
)
# main.py
import strix
import risk_config as rc
strix.init(transport=strix.LocalTransport(data_dir="./strix_data"), risk=rc.RISK)
The risk config is persisted as part of the SessionStarted event, so the dashboard knows the limits in force for each session.
Clean shutdown¶
LocalTransport releases the data-dir lock on process exit, but eager cleanup is fine in long-lived algos:
transport = strix.LocalTransport(data_dir="./strix_data")
strix.init(transport=transport)
try:
# ... trading loop ...
finally:
transport.close()
flush() forces an fsync; not usually needed since append() already fsyncs, but it's there for explicit pre-flush points.
What strix.resume does NOT recover¶
resume rebuilds the parts of state Strix owns: positions, open orders,
risk policy, halt projection, realized P&L. The mark cache is not
restored — marks are runtime state by design; re-supply your
MarketDataAdapter and resume calling strix.mark_to_market(...) to
repopulate it. Strix also does not rebuild your strategy's state —
rolling windows, signal counters, per-symbol cooldowns, ML model state,
etc. Those are user-owned.
For long-running strategies on live data, plan for this explicitly:
# Option A: re-derive strategy state from a warmup window on resume.
strategy.warmup(lookback_bars=feed.fetch_recent(symbols, n=200))
# Option B: persist strategy state alongside Strix's data_dir and load it back.
strategy.load_state(Path(data_dir) / "strategy_state.json")
# Option C: skip the first N ticks while indicators re-converge.
strategy.skip_warmup_signals = True
Strix's LocalTransport(data_dir=...) is intentionally for Strix's event
log only; a sibling file in the same directory is the easiest place to
park strategy state for self-contained resume.
Build your own analytics from the event log¶
Everything Strix records is plain JSONL on your disk — one self-describing JSON object per line, schema documented in Persistence. You don't need Strix (or us) to analyze it. Fills table and P&L curve with pandas:
import json
from pathlib import Path
import pandas as pd
import strix
data_dir = "./strix_data"
latest = strix.list_sessions(data_dir=data_dir)[-1]
path = Path(data_dir) / "sessions" / latest / "events.jsonl"
events = [json.loads(line) for line in path.read_text().splitlines()]
# One row per fill.
fills = pd.json_normalize(
[e["execution"] for e in events if e["type"] == "ExecutionApplied"]
)
fills[["qty", "price"]] = fills[["qty", "price"]].astype(float)
fills["signed_qty"] = fills["qty"].where(fills["side"] == "BUY", -fills["qty"])
print(fills.groupby("symbol")["signed_qty"].sum()) # net filled qty per symbol
# Equity curve from the throttled PnLSnapshot events.
pnl = pd.DataFrame(
[
{"ts": e["ts"], "realized": e["realized"], "unrealized": e["unrealized"]}
for e in events
if e["type"] == "PnLSnapshot"
]
)
pnl["ts"] = pd.to_datetime(pnl["ts"])
pnl[["realized", "unrealized"]] = pnl[["realized", "unrealized"]].astype(float)
pnl["total"] = pnl["realized"] + pnl["unrealized"]
pnl.set_index("ts")["total"].plot() # needs matplotlib
Notes:
- Decimal fields arrive as JSON strings (
"150.00") to preserve precision.astype(float)is fine for plotting and exploration; parse throughdecimal.Decimalinstead when the numbers feed accounting. PnLSnapshotevents only appear while you feedstrix.mark_to_market(...); their cadence isSessionConfig.pnl_snapshot_every_seconds(default 15s).- Prefer typed objects over raw dicts?
strix.replay(data_dir=...)yields parsedSessionEventinstances you canisinstance-check — same data, no pandas. Withfollow=Trueit tails the log live (read-only, no lock), which is how you build a sidecar monitor on a running bot. - The schema is a supported public contract, not an implementation detail:
every event carries
schema_version, and changes will be explicit in the changelog.
Anti-patterns¶
A short list of things that look right but aren't.
Calling the broker inside the order block and outside it:
with strix.order(symbol="AAPL", side=Side.BUY, qty=100) as o:
broker.submit(...)
broker.submit(...) # also called outside the block — Strix won't see this one's lifecycle
Catching StrixRiskError to bypass the check:
while True:
try:
with strix.order(...) as o:
...
break
except StrixRiskError:
# don't do this — you're papering over the limit you set
continue
Mutating the exposed Order: It's immutable; you can't. Read it, don't try to write it.
Using floats: floating-point quantities are rejected. Use "100.5" or Decimal("100.5"). This is on purpose — see Getting started.
Calling ingest_execution for an order ID not in the book: raises a missing-order error. Strix doesn't auto-create the order — the order has to come through strix.order(...) first.