Skip to main content

aios_cli/
replay.rs

1//! JSONL replay — drive the core pipeline from an Android `CollectorEvent` trace.
2//!
3//! Each input line is the Android `CollectorEvent` JSON shape; we extract its
4//! inner `rawEvent`, synthesize a `CollectorEnvelope`, and push it through
5//! `RustCollectorIngress → DefaultPrivacyAirGap → WindowAggregator →
6//! DecisionRouter → PolicyEngine`. Window boundaries use the captured
7//! timestamps from the trace, not wall-clock time — replay is deterministic.
8//!
9//! Determinism is enforced by the **canonical audit stream**: every per-stage
10//! record is also serialized into a sorted-key, volatility-stripped projection
11//! that is both mirrored to an optional audit sink and folded into a SHA-256
12//! hasher. The resulting hex digest (`audit_hash`) is pinned by golden tests:
13//! any divergence in the pipeline's observable state transitions for a given
14//! input trace is caught immediately.
15
16use std::collections::BTreeMap;
17use std::io::{BufRead, Write};
18
19use aios_action::DefaultActionExecutor;
20use aios_agent::DecisionRouter;
21use aios_core::collector_ingress::RustCollectorIngress;
22use aios_core::context_builder::WindowAggregator;
23use aios_core::policy_engine::PolicyEngine;
24use aios_core::privacy_airgap::DefaultPrivacyAirGap;
25use aios_spec::traits::{ActionExecutor, PrivacySanitizer};
26use aios_spec::{
27    CapabilityLevel, CollectorEnvelope, DenialReason, IngestedRawEvent, RawEvent, SourceTier,
28    StructuredContext,
29};
30use anyhow::{Context, Result};
31use serde::Serialize;
32use serde_json::{json, Map, Value};
33use sha2::{Digest, Sha256};
34
35const SCHEMA_VERSION: &str = "dipecs.collector.v1";
36
37/// Keys whose values are non-deterministic (uuids, wall-clock durations) and
38/// must be stripped from the canonical audit projection so replay hashes are
39/// stable across runs.
40const VOLATILE_KEYS: &[&str] = &["event_id", "window_id", "intent_id", "latency_us"];
41
42/// Pipeline stage at which replay should stop emitting events.
43///
44/// Higher stages imply that all preceding stages also run. `Policy` is the
45/// default — it proves correctness through authorization without invoking the
46/// (still-stubbed) action executor.
47#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
48pub enum Stage {
49    Ingest,
50    Sanitize,
51    Context,
52    Decision,
53    Policy,
54    Execute,
55}
56
57impl Stage {
58    fn includes(self, other: Stage) -> bool {
59        self >= other
60    }
61}
62
63/// Aggregate counters surfaced both in the NDJSON summary record and to
64/// integration tests.
65///
66/// `audit_hash` is a hex SHA-256 of the canonical projection of every
67/// per-stage record (everything except the final summary record itself).
68/// Identical inputs must yield identical hashes; pin this in golden tests.
69///
70/// `denial_counts` keys are `DenialReason` enum variants; the `BTreeMap`
71/// gives a stable, canonical iteration order so the JSON projection is
72/// deterministic and folds into `audit_hash` without extra effort.
73#[derive(Debug, Default, Clone, Serialize, PartialEq, Eq)]
74pub struct ReplaySummary {
75    pub lines_total: u64,
76    pub lines_skipped_no_raw_event: u64,
77    pub lines_parse_error: u64,
78    pub events_ingested: u64,
79    pub windows_closed: u64,
80    pub intents_total: u64,
81    pub intents_approved: u64,
82    pub intents_rejected: u64,
83    pub actions_authorized: u64,
84    pub actions_denied: u64,
85    pub denial_counts: BTreeMap<DenialReason, u64>,
86    pub audit_hash: String,
87}
88
89/// Return value of [`run_with_audit`]: the summary (with the same `audit_hash`
90/// as the field below) plus the hash hoisted for convenient assertions.
91#[derive(Debug, Clone)]
92pub struct ReplayWithAudit {
93    pub summary: ReplaySummary,
94    pub audit_hash: String,
95}
96
97/// Replay a JSONL stream through the core pipeline without writing a separate
98/// audit file. The canonical-projection hash is still computed and surfaced as
99/// `ReplaySummary.audit_hash` so callers can pin determinism without managing
100/// an audit sink.
101pub fn run<R: BufRead, W: Write>(
102    reader: R,
103    writer: &mut W,
104    window_secs: u64,
105    stage: Stage,
106) -> Result<ReplaySummary> {
107    let mut sink = std::io::sink();
108    let with_audit = run_with_audit(reader, writer, &mut sink, window_secs, stage)?;
109    Ok(with_audit.summary)
110}
111
112/// Replay a JSONL stream through the core pipeline, mirroring every per-stage
113/// record into `audit` (volatility-stripped, sorted-key canonical form) and
114/// returning the SHA-256 of that canonical stream.
115///
116/// The audit stream omits the trailing summary record so the hash itself can
117/// be embedded into the summary without self-reference.
118pub fn run_with_audit<R: BufRead, W: Write, A: Write>(
119    reader: R,
120    writer: &mut W,
121    audit: &mut A,
122    window_secs: u64,
123    stage: Stage,
124) -> Result<ReplayWithAudit> {
125    let ingress = RustCollectorIngress;
126    let sanitizer = DefaultPrivacyAirGap;
127    let router = DecisionRouter::default();
128    let policy = PolicyEngine::default();
129    let executor = DefaultActionExecutor;
130    let mut summary = ReplaySummary::default();
131    let mut aggregator: Option<WindowAggregator> = None;
132    let mut last_captured_at_ms: i64 = 0;
133    let mut emitter = Emitter::new(writer, audit);
134
135    for (line_idx, line_result) in reader.lines().enumerate() {
136        let line_no = line_idx as u64 + 1;
137        let raw_line = line_result.with_context(|| format!("read line {line_no}"))?;
138        if raw_line.trim().is_empty() {
139            continue;
140        }
141        summary.lines_total += 1;
142
143        let line_value: Value = match serde_json::from_str(&raw_line) {
144            Ok(v) => v,
145            Err(e) => {
146                summary.lines_parse_error += 1;
147                emitter.emit(&json!({
148                    "stage": "error",
149                    "line": line_no,
150                    "error": format!("invalid JSON: {e}"),
151                }))?;
152                continue;
153            },
154        };
155
156        let raw_event_value = line_value.get("rawEvent").cloned().unwrap_or(Value::Null);
157        if raw_event_value.is_null() {
158            summary.lines_skipped_no_raw_event += 1;
159            continue;
160        }
161
162        let raw_event: RawEvent = match serde_json::from_value(raw_event_value) {
163            Ok(r) => r,
164            Err(e) => {
165                summary.lines_parse_error += 1;
166                emitter.emit(&json!({
167                    "stage": "error",
168                    "line": line_no,
169                    "error": format!("rawEvent does not match RawEvent shape: {e}"),
170                }))?;
171                continue;
172            },
173        };
174
175        let envelope = CollectorEnvelope {
176            schema_version: SCHEMA_VERSION.to_string(),
177            source: line_value
178                .get("source")
179                .and_then(Value::as_str)
180                .unwrap_or("apps.android-collector.replay")
181                .to_string(),
182            source_tier: SourceTier::PublicApi,
183            device_trace_id: line_value
184                .get("eventId")
185                .and_then(Value::as_str)
186                .map(String::from),
187            captured_at_ms: line_value
188                .get("timestampMs")
189                .and_then(Value::as_i64)
190                .unwrap_or(0),
191            received_at_ms: None,
192            raw_event,
193        };
194        let captured_at_ms = envelope.captured_at_ms;
195        last_captured_at_ms = last_captured_at_ms.max(captured_at_ms);
196
197        let ingested: IngestedRawEvent = ingress
198            .accept(envelope)
199            .with_context(|| format!("ingress.accept failed on line {line_no}"))?;
200        summary.events_ingested += 1;
201
202        if stage.includes(Stage::Ingest) {
203            emitter.emit(&json!({
204                "stage": "ingest",
205                "line": line_no,
206                "source_tier": format!("{:?}", ingested.source_tier),
207                "raw_event_kind": raw_event_kind(&ingested.raw_event),
208            }))?;
209        }
210
211        // Time-based window driven by the trace's own timestamps.
212        let agg =
213            aggregator.get_or_insert_with(|| WindowAggregator::new(window_secs, captured_at_ms));
214        if agg.is_expired(captured_at_ms) {
215            if let Some(ctx) = agg.close(captured_at_ms) {
216                process_window(
217                    &ctx,
218                    &router,
219                    &policy,
220                    &executor,
221                    stage,
222                    &mut summary,
223                    &mut emitter,
224                )?;
225            }
226        }
227
228        if !stage.includes(Stage::Sanitize) {
229            continue;
230        }
231        let sanitized = sanitizer.sanitize_with_tier(ingested.raw_event, ingested.source_tier);
232        emitter.emit(&json!({
233            "stage": "sanitize",
234            "line": line_no,
235            "sanitized": sanitized,
236        }))?;
237        if stage.includes(Stage::Context) {
238            // Push into the aggregator so the next window close can build context.
239            if let Some(agg) = aggregator.as_mut() {
240                agg.push(sanitized);
241            }
242        }
243    }
244
245    // Flush the last open window using the latest captured timestamp seen.
246    if let Some(mut agg) = aggregator {
247        if let Some(ctx) = agg.close(last_captured_at_ms) {
248            process_window(
249                &ctx,
250                &router,
251                &policy,
252                &executor,
253                stage,
254                &mut summary,
255                &mut emitter,
256            )?;
257        }
258    }
259
260    let audit_hash = emitter.finalize();
261    summary.audit_hash = audit_hash.clone();
262
263    // Summary record goes only to the human-facing writer — it contains the
264    // hash itself and therefore must not be folded back into the hash.
265    serde_json::to_writer(
266        &mut *writer,
267        &json!({
268            "stage": "summary",
269            "summary": summary,
270        }),
271    )?;
272    writer.write_all(b"\n")?;
273
274    Ok(ReplayWithAudit {
275        summary,
276        audit_hash,
277    })
278}
279
280fn process_window(
281    ctx: &StructuredContext,
282    router: &DecisionRouter,
283    policy: &PolicyEngine,
284    executor: &DefaultActionExecutor,
285    stage: Stage,
286    summary: &mut ReplaySummary,
287    emitter: &mut Emitter<'_>,
288) -> Result<()> {
289    summary.windows_closed += 1;
290
291    if stage.includes(Stage::Context) {
292        emitter.emit(&json!({
293            "stage": "context",
294            "window_id": ctx.window_id,
295            "window_start_ms": ctx.window_start_ms,
296            "window_end_ms": ctx.window_end_ms,
297            "duration_secs": ctx.duration_secs,
298            "event_count": ctx.events.len(),
299            "summary": ctx.summary,
300        }))?;
301    }
302
303    if !stage.includes(Stage::Decision) {
304        return Ok(());
305    }
306    let decision = router.evaluate(ctx);
307    summary.intents_total += decision.intent_batch.intents.len() as u64;
308    emitter.emit(&json!({
309        "stage": "decision",
310        "window_id": ctx.window_id,
311        "route": format!("{:?}", decision.route),
312        "model": decision.intent_batch.model,
313        "intent_count": decision.intent_batch.intents.len(),
314        "rationale_tags": decision.rationale_tags,
315        "error": decision.error,
316    }))?;
317
318    if !stage.includes(Stage::Policy) {
319        return Ok(());
320    }
321    let capability = CapabilityLevel::for_route(decision.route);
322    let decisions = policy.evaluate_batch_with_context(&decision.intent_batch, &capability, ctx);
323    for d in &decisions {
324        if d.approved {
325            summary.intents_approved += 1;
326            summary.actions_authorized += d.approved_actions.len() as u64;
327        } else {
328            summary.intents_rejected += 1;
329        }
330        if let Some(reason) = d.rejection_reason {
331            *summary.denial_counts.entry(reason).or_insert(0) += 1;
332        }
333        for denial in &d.action_denials {
334            *summary.denial_counts.entry(*denial).or_insert(0) += 1;
335            summary.actions_denied += 1;
336        }
337        emitter.emit(&json!({
338            "stage": "policy",
339            "window_id": ctx.window_id,
340            "intent_id": d.intent_id,
341            "approved": d.approved,
342            "rejection_reason": d.rejection_reason,
343            "action_denials": d.action_denials,
344            "approved_actions": d.approved_actions,
345        }))?;
346    }
347
348    if !stage.includes(Stage::Execute) {
349        return Ok(());
350    }
351    for d in &decisions {
352        if !d.approved {
353            continue;
354        }
355        let results = executor.execute_batch(&d.approved_actions);
356        for r in &results {
357            emitter.emit(&json!({
358                "stage": "execute",
359                "window_id": ctx.window_id,
360                "intent_id": d.intent_id,
361                "action_type": r.action_type,
362                "success": r.success,
363                "error": r.error,
364            }))?;
365        }
366    }
367    Ok(())
368}
369
370/// Twin sink: every record goes to the human-facing `writer` verbatim and to
371/// the `audit` sink in canonical (sorted-key, volatility-stripped) form, while
372/// the canonical bytes are also folded into a SHA-256 hasher for the replay
373/// fingerprint.
374struct Emitter<'a> {
375    writer: &'a mut dyn Write,
376    audit: &'a mut dyn Write,
377    hasher: Sha256,
378}
379
380impl<'a> Emitter<'a> {
381    fn new<W: Write, A: Write>(writer: &'a mut W, audit: &'a mut A) -> Self {
382        Self {
383            writer,
384            audit,
385            hasher: Sha256::new(),
386        }
387    }
388
389    fn emit(&mut self, value: &Value) -> Result<()> {
390        serde_json::to_writer(&mut *self.writer, value)?;
391        self.writer.write_all(b"\n")?;
392        let canonical = canonicalize(value);
393        let canonical_bytes = serde_json::to_vec(&canonical)?;
394        self.audit.write_all(&canonical_bytes)?;
395        self.audit.write_all(b"\n")?;
396        self.hasher.update(&canonical_bytes);
397        self.hasher.update(b"\n");
398        Ok(())
399    }
400
401    fn finalize(self) -> String {
402        format!("sha256:{:x}", self.hasher.finalize())
403    }
404}
405
406/// Recursively rebuild `value` so that:
407/// - object keys are sorted (canonical order),
408/// - any key listed in [`VOLATILE_KEYS`] is dropped.
409///
410/// This is the only projection ever fed into the audit hasher; trace-derived
411/// timestamps (`window_start_ms`, `generated_at_ms`, etc.) are intentionally
412/// kept so they participate in the determinism check.
413fn canonicalize(value: &Value) -> Value {
414    match value {
415        Value::Object(map) => {
416            let mut keys: Vec<&String> = map.keys().collect();
417            keys.sort();
418            let mut out = Map::new();
419            for k in keys {
420                if VOLATILE_KEYS.contains(&k.as_str()) {
421                    continue;
422                }
423                if let Some(v) = map.get(k) {
424                    out.insert(k.clone(), canonicalize(v));
425                }
426            }
427            Value::Object(out)
428        },
429        Value::Array(arr) => Value::Array(arr.iter().map(canonicalize).collect()),
430        other => other.clone(),
431    }
432}
433
434fn raw_event_kind(raw: &RawEvent) -> &'static str {
435    match raw {
436        RawEvent::AppTransition(_) => "AppTransition",
437        RawEvent::BinderTransaction(_) => "BinderTransaction",
438        RawEvent::ProcStateChange(_) => "ProcStateChange",
439        RawEvent::FileSystemAccess(_) => "FileSystemAccess",
440        RawEvent::NotificationPosted(_) => "NotificationPosted",
441        RawEvent::NotificationInteraction(_) => "NotificationInteraction",
442        RawEvent::ScreenState(_) => "ScreenState",
443        RawEvent::SystemState(_) => "SystemState",
444    }
445}