Skip to main content

aios_daemon/
pipeline.rs

1//! 上下文窗口处理管线。
2//!
3//! 单个窗口的处理流程: DecisionRouter → PolicyEngine → ActionExecutor。
4
5use aios_action::DefaultActionExecutor;
6use aios_agent::DecisionRouter;
7use aios_collector::collection_stats::RawEventStats;
8use aios_core::policy_engine::PolicyEngine;
9use aios_spec::traits::ActionExecutor;
10use aios_spec::{CapabilityLevel, IngestedRawEvent};
11
12/// 处理一个上下文窗口: decision router → validate → execute
13pub(crate) fn process_window(
14    ctx: &aios_spec::StructuredContext,
15    router: &DecisionRouter,
16    policy: &PolicyEngine,
17    executor: &DefaultActionExecutor,
18    raw_stats: &RawEventStats,
19) {
20    tracing::info!(
21        window_id = %ctx.window_id,
22        event_count = ctx.events.len(),
23        raw_event_total = raw_stats.total(),
24        raw_event_stats = %raw_stats.summary_line(),
25        duration_secs = ctx.duration_secs,
26        "window closed, sending to agent"
27    );
28
29    let decision_result = router.evaluate(ctx);
30    tracing::info!(
31        route = ?decision_result.route,
32        model = %decision_result.intent_batch.model,
33        latency_us = decision_result.latency_us,
34        error = ?decision_result.error,
35        "decision backend completed"
36    );
37
38    let capability = CapabilityLevel::for_route(decision_result.route);
39    let decisions =
40        policy.evaluate_batch_with_context(&decision_result.intent_batch, &capability, ctx);
41
42    let mut executed = 0u32;
43    for decision in &decisions {
44        if decision.approved {
45            let results = executor.execute_batch(&decision.approved_actions);
46            executed += results.len() as u32;
47            for result in &results {
48                if !result.success {
49                    tracing::warn!(
50                        action = %result.action_type,
51                        error = ?result.error,
52                        "action execution failed"
53                    );
54                }
55            }
56        } else {
57            tracing::debug!(
58                intent_id = %decision.intent_id,
59                reason = ?decision.rejection_reason,
60                "intent rejected by policy"
61            );
62        }
63        for denial in &decision.action_denials {
64            tracing::warn!(
65                intent_id = %decision.intent_id,
66                reason = ?denial,
67                "action denied by policy"
68            );
69        }
70    }
71
72    tracing::info!(
73        window_id = %ctx.window_id,
74        intents_total = decisions.len(),
75        actions_executed = executed,
76        "window processed"
77    );
78}
79
80// ============================================================
81// Processing event dispatch
82// ============================================================
83
84#[derive(Debug)]
85pub enum ProcessingEvent {
86    Raw(IngestedRawEvent),
87    RawChannelClosed,
88    WindowExpired,
89}
90
91pub fn should_stop_processing(event: &ProcessingEvent) -> bool {
92    matches!(event, ProcessingEvent::RawChannelClosed)
93}