1use std::collections::BTreeMap;
17use std::io::{BufRead, Write};
18
19use aios_action::DefaultActionExecutor;
20use aios_agent::DecisionRouter;
21use aios_core::collector_ingress::RustCollectorIngress;
22use aios_core::context_builder::WindowAggregator;
23use aios_core::policy_engine::PolicyEngine;
24use aios_core::privacy_airgap::DefaultPrivacyAirGap;
25use aios_spec::traits::{ActionExecutor, PrivacySanitizer};
26use aios_spec::{
27 CapabilityLevel, CollectorEnvelope, DenialReason, IngestedRawEvent, RawEvent, SourceTier,
28 StructuredContext,
29};
30use anyhow::{Context, Result};
31use serde::Serialize;
32use serde_json::{json, Map, Value};
33use sha2::{Digest, Sha256};
34
35const SCHEMA_VERSION: &str = "dipecs.collector.v1";
36
37const VOLATILE_KEYS: &[&str] = &["event_id", "window_id", "intent_id", "latency_us"];
41
42#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
48pub enum Stage {
49 Ingest,
50 Sanitize,
51 Context,
52 Decision,
53 Policy,
54 Execute,
55}
56
57impl Stage {
58 fn includes(self, other: Stage) -> bool {
59 self >= other
60 }
61}
62
63#[derive(Debug, Default, Clone, Serialize, PartialEq, Eq)]
74pub struct ReplaySummary {
75 pub lines_total: u64,
76 pub lines_skipped_no_raw_event: u64,
77 pub lines_parse_error: u64,
78 pub events_ingested: u64,
79 pub windows_closed: u64,
80 pub intents_total: u64,
81 pub intents_approved: u64,
82 pub intents_rejected: u64,
83 pub actions_authorized: u64,
84 pub actions_denied: u64,
85 pub denial_counts: BTreeMap<DenialReason, u64>,
86 pub audit_hash: String,
87}
88
89#[derive(Debug, Clone)]
92pub struct ReplayWithAudit {
93 pub summary: ReplaySummary,
94 pub audit_hash: String,
95}
96
97pub fn run<R: BufRead, W: Write>(
102 reader: R,
103 writer: &mut W,
104 window_secs: u64,
105 stage: Stage,
106) -> Result<ReplaySummary> {
107 let mut sink = std::io::sink();
108 let with_audit = run_with_audit(reader, writer, &mut sink, window_secs, stage)?;
109 Ok(with_audit.summary)
110}
111
112pub fn run_with_audit<R: BufRead, W: Write, A: Write>(
119 reader: R,
120 writer: &mut W,
121 audit: &mut A,
122 window_secs: u64,
123 stage: Stage,
124) -> Result<ReplayWithAudit> {
125 let ingress = RustCollectorIngress;
126 let sanitizer = DefaultPrivacyAirGap;
127 let router = DecisionRouter::default();
128 let policy = PolicyEngine::default();
129 let executor = DefaultActionExecutor;
130 let mut summary = ReplaySummary::default();
131 let mut aggregator: Option<WindowAggregator> = None;
132 let mut last_captured_at_ms: i64 = 0;
133 let mut emitter = Emitter::new(writer, audit);
134
135 for (line_idx, line_result) in reader.lines().enumerate() {
136 let line_no = line_idx as u64 + 1;
137 let raw_line = line_result.with_context(|| format!("read line {line_no}"))?;
138 if raw_line.trim().is_empty() {
139 continue;
140 }
141 summary.lines_total += 1;
142
143 let line_value: Value = match serde_json::from_str(&raw_line) {
144 Ok(v) => v,
145 Err(e) => {
146 summary.lines_parse_error += 1;
147 emitter.emit(&json!({
148 "stage": "error",
149 "line": line_no,
150 "error": format!("invalid JSON: {e}"),
151 }))?;
152 continue;
153 },
154 };
155
156 let raw_event_value = line_value.get("rawEvent").cloned().unwrap_or(Value::Null);
157 if raw_event_value.is_null() {
158 summary.lines_skipped_no_raw_event += 1;
159 continue;
160 }
161
162 let raw_event: RawEvent = match serde_json::from_value(raw_event_value) {
163 Ok(r) => r,
164 Err(e) => {
165 summary.lines_parse_error += 1;
166 emitter.emit(&json!({
167 "stage": "error",
168 "line": line_no,
169 "error": format!("rawEvent does not match RawEvent shape: {e}"),
170 }))?;
171 continue;
172 },
173 };
174
175 let envelope = CollectorEnvelope {
176 schema_version: SCHEMA_VERSION.to_string(),
177 source: line_value
178 .get("source")
179 .and_then(Value::as_str)
180 .unwrap_or("apps.android-collector.replay")
181 .to_string(),
182 source_tier: SourceTier::PublicApi,
183 device_trace_id: line_value
184 .get("eventId")
185 .and_then(Value::as_str)
186 .map(String::from),
187 captured_at_ms: line_value
188 .get("timestampMs")
189 .and_then(Value::as_i64)
190 .unwrap_or(0),
191 received_at_ms: None,
192 raw_event,
193 };
194 let captured_at_ms = envelope.captured_at_ms;
195 last_captured_at_ms = last_captured_at_ms.max(captured_at_ms);
196
197 let ingested: IngestedRawEvent = ingress
198 .accept(envelope)
199 .with_context(|| format!("ingress.accept failed on line {line_no}"))?;
200 summary.events_ingested += 1;
201
202 if stage.includes(Stage::Ingest) {
203 emitter.emit(&json!({
204 "stage": "ingest",
205 "line": line_no,
206 "source_tier": format!("{:?}", ingested.source_tier),
207 "raw_event_kind": raw_event_kind(&ingested.raw_event),
208 }))?;
209 }
210
211 let agg =
213 aggregator.get_or_insert_with(|| WindowAggregator::new(window_secs, captured_at_ms));
214 if agg.is_expired(captured_at_ms) {
215 if let Some(ctx) = agg.close(captured_at_ms) {
216 process_window(
217 &ctx,
218 &router,
219 &policy,
220 &executor,
221 stage,
222 &mut summary,
223 &mut emitter,
224 )?;
225 }
226 }
227
228 if !stage.includes(Stage::Sanitize) {
229 continue;
230 }
231 let sanitized = sanitizer.sanitize_with_tier(ingested.raw_event, ingested.source_tier);
232 emitter.emit(&json!({
233 "stage": "sanitize",
234 "line": line_no,
235 "sanitized": sanitized,
236 }))?;
237 if stage.includes(Stage::Context) {
238 if let Some(agg) = aggregator.as_mut() {
240 agg.push(sanitized);
241 }
242 }
243 }
244
245 if let Some(mut agg) = aggregator {
247 if let Some(ctx) = agg.close(last_captured_at_ms) {
248 process_window(
249 &ctx,
250 &router,
251 &policy,
252 &executor,
253 stage,
254 &mut summary,
255 &mut emitter,
256 )?;
257 }
258 }
259
260 let audit_hash = emitter.finalize();
261 summary.audit_hash = audit_hash.clone();
262
263 serde_json::to_writer(
266 &mut *writer,
267 &json!({
268 "stage": "summary",
269 "summary": summary,
270 }),
271 )?;
272 writer.write_all(b"\n")?;
273
274 Ok(ReplayWithAudit {
275 summary,
276 audit_hash,
277 })
278}
279
280fn process_window(
281 ctx: &StructuredContext,
282 router: &DecisionRouter,
283 policy: &PolicyEngine,
284 executor: &DefaultActionExecutor,
285 stage: Stage,
286 summary: &mut ReplaySummary,
287 emitter: &mut Emitter<'_>,
288) -> Result<()> {
289 summary.windows_closed += 1;
290
291 if stage.includes(Stage::Context) {
292 emitter.emit(&json!({
293 "stage": "context",
294 "window_id": ctx.window_id,
295 "window_start_ms": ctx.window_start_ms,
296 "window_end_ms": ctx.window_end_ms,
297 "duration_secs": ctx.duration_secs,
298 "event_count": ctx.events.len(),
299 "summary": ctx.summary,
300 }))?;
301 }
302
303 if !stage.includes(Stage::Decision) {
304 return Ok(());
305 }
306 let decision = router.evaluate(ctx);
307 summary.intents_total += decision.intent_batch.intents.len() as u64;
308 emitter.emit(&json!({
309 "stage": "decision",
310 "window_id": ctx.window_id,
311 "route": format!("{:?}", decision.route),
312 "model": decision.intent_batch.model,
313 "intent_count": decision.intent_batch.intents.len(),
314 "rationale_tags": decision.rationale_tags,
315 "error": decision.error,
316 }))?;
317
318 if !stage.includes(Stage::Policy) {
319 return Ok(());
320 }
321 let capability = CapabilityLevel::for_route(decision.route);
322 let decisions = policy.evaluate_batch_with_context(&decision.intent_batch, &capability, ctx);
323 for d in &decisions {
324 if d.approved {
325 summary.intents_approved += 1;
326 summary.actions_authorized += d.approved_actions.len() as u64;
327 } else {
328 summary.intents_rejected += 1;
329 }
330 if let Some(reason) = d.rejection_reason {
331 *summary.denial_counts.entry(reason).or_insert(0) += 1;
332 }
333 for denial in &d.action_denials {
334 *summary.denial_counts.entry(*denial).or_insert(0) += 1;
335 summary.actions_denied += 1;
336 }
337 emitter.emit(&json!({
338 "stage": "policy",
339 "window_id": ctx.window_id,
340 "intent_id": d.intent_id,
341 "approved": d.approved,
342 "rejection_reason": d.rejection_reason,
343 "action_denials": d.action_denials,
344 "approved_actions": d.approved_actions,
345 }))?;
346 }
347
348 if !stage.includes(Stage::Execute) {
349 return Ok(());
350 }
351 for d in &decisions {
352 if !d.approved {
353 continue;
354 }
355 let results = executor.execute_batch(&d.approved_actions);
356 for r in &results {
357 emitter.emit(&json!({
358 "stage": "execute",
359 "window_id": ctx.window_id,
360 "intent_id": d.intent_id,
361 "action_type": r.action_type,
362 "success": r.success,
363 "error": r.error,
364 }))?;
365 }
366 }
367 Ok(())
368}
369
370struct Emitter<'a> {
375 writer: &'a mut dyn Write,
376 audit: &'a mut dyn Write,
377 hasher: Sha256,
378}
379
380impl<'a> Emitter<'a> {
381 fn new<W: Write, A: Write>(writer: &'a mut W, audit: &'a mut A) -> Self {
382 Self {
383 writer,
384 audit,
385 hasher: Sha256::new(),
386 }
387 }
388
389 fn emit(&mut self, value: &Value) -> Result<()> {
390 serde_json::to_writer(&mut *self.writer, value)?;
391 self.writer.write_all(b"\n")?;
392 let canonical = canonicalize(value);
393 let canonical_bytes = serde_json::to_vec(&canonical)?;
394 self.audit.write_all(&canonical_bytes)?;
395 self.audit.write_all(b"\n")?;
396 self.hasher.update(&canonical_bytes);
397 self.hasher.update(b"\n");
398 Ok(())
399 }
400
401 fn finalize(self) -> String {
402 format!("sha256:{:x}", self.hasher.finalize())
403 }
404}
405
406fn canonicalize(value: &Value) -> Value {
414 match value {
415 Value::Object(map) => {
416 let mut keys: Vec<&String> = map.keys().collect();
417 keys.sort();
418 let mut out = Map::new();
419 for k in keys {
420 if VOLATILE_KEYS.contains(&k.as_str()) {
421 continue;
422 }
423 if let Some(v) = map.get(k) {
424 out.insert(k.clone(), canonicalize(v));
425 }
426 }
427 Value::Object(out)
428 },
429 Value::Array(arr) => Value::Array(arr.iter().map(canonicalize).collect()),
430 other => other.clone(),
431 }
432}
433
434fn raw_event_kind(raw: &RawEvent) -> &'static str {
435 match raw {
436 RawEvent::AppTransition(_) => "AppTransition",
437 RawEvent::BinderTransaction(_) => "BinderTransaction",
438 RawEvent::ProcStateChange(_) => "ProcStateChange",
439 RawEvent::FileSystemAccess(_) => "FileSystemAccess",
440 RawEvent::NotificationPosted(_) => "NotificationPosted",
441 RawEvent::NotificationInteraction(_) => "NotificationInteraction",
442 RawEvent::ScreenState(_) => "ScreenState",
443 RawEvent::SystemState(_) => "SystemState",
444 }
445}