1use aios_action::DefaultActionExecutor;
6use aios_agent::DecisionRouter;
7use aios_collector::collection_stats::RawEventStats;
8use aios_core::policy_engine::PolicyEngine;
9use aios_spec::traits::ActionExecutor;
10use aios_spec::{CapabilityLevel, IngestedRawEvent};
11
12pub(crate) fn process_window(
14 ctx: &aios_spec::StructuredContext,
15 router: &DecisionRouter,
16 policy: &PolicyEngine,
17 executor: &DefaultActionExecutor,
18 raw_stats: &RawEventStats,
19) {
20 tracing::info!(
21 window_id = %ctx.window_id,
22 event_count = ctx.events.len(),
23 raw_event_total = raw_stats.total(),
24 raw_event_stats = %raw_stats.summary_line(),
25 duration_secs = ctx.duration_secs,
26 "window closed, sending to agent"
27 );
28
29 let decision_result = router.evaluate(ctx);
30 tracing::info!(
31 route = ?decision_result.route,
32 model = %decision_result.intent_batch.model,
33 latency_us = decision_result.latency_us,
34 error = ?decision_result.error,
35 "decision backend completed"
36 );
37
38 let capability = CapabilityLevel::for_route(decision_result.route);
39 let decisions =
40 policy.evaluate_batch_with_context(&decision_result.intent_batch, &capability, ctx);
41
42 let mut executed = 0u32;
43 for decision in &decisions {
44 if decision.approved {
45 let results = executor.execute_batch(&decision.approved_actions);
46 executed += results.len() as u32;
47 for result in &results {
48 if !result.success {
49 tracing::warn!(
50 action = %result.action_type,
51 error = ?result.error,
52 "action execution failed"
53 );
54 }
55 }
56 } else {
57 tracing::debug!(
58 intent_id = %decision.intent_id,
59 reason = ?decision.rejection_reason,
60 "intent rejected by policy"
61 );
62 }
63 for denial in &decision.action_denials {
64 tracing::warn!(
65 intent_id = %decision.intent_id,
66 reason = ?denial,
67 "action denied by policy"
68 );
69 }
70 }
71
72 tracing::info!(
73 window_id = %ctx.window_id,
74 intents_total = decisions.len(),
75 actions_executed = executed,
76 "window processed"
77 );
78}
79
80#[derive(Debug)]
85pub enum ProcessingEvent {
86 Raw(IngestedRawEvent),
87 RawChannelClosed,
88 WindowExpired,
89}
90
91pub fn should_stop_processing(event: &ProcessingEvent) -> bool {
92 matches!(event, ProcessingEvent::RawChannelClosed)
93}