Skip to main content

aios_daemon/
lib.rs

1//! # dipecsd — DiPECS 系统守护进程
2//!
3//! 部署路径: `/system/bin/dipecsd`
4//! 启动方式: `dipecsd [--no-daemon] [--verbose]`
5//!
6//! ## 运行模式
7//!
8//! - **daemon 模式** (默认): fork 到后台, 持续采集系统事件
9//! - **--no-daemon**: 前台运行, 用于调试和开发
10//!
11//! ## 数据流 (v0.2 — 2-task 管道)
12//!
13//! ```text
14//! Task 1 (采集):  BinderProbe + ProcReader + SysCollector → bus.raw_events_tx
15//! Task 2 (处理):  bus.raw_events_rx → PrivacyAirGap → WindowAggregator
16//!                    → DecisionRouter → PolicyEngine → ActionExecutor
17//! ```
18//!
19//! ## 当前实现状态 (2026-05-05)
20//!
21//! - ProcReader: 可用 (Linux/Android 均可)
22//! - SystemStateCollector: 可用 (Linux/Android 均可, 电池/网络 fallback)
23//! - BinderProbe: 接口完成, eBPF attach 待真机验证
24//! - 决策路由: DecisionRouter with RuleBasedBackend (rule-based intent generation)
25//! - Action 执行: 骨架 (tracing 记录)
26
27use std::collections::HashMap;
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30use aios_action::DefaultActionExecutor;
31use aios_agent::DecisionRouter;
32use aios_collector::{
33    binder_probe::BinderProbe,
34    collection_stats::RawEventStats,
35    proc_reader::{self, ProcReader},
36    system_collector::SystemStateCollector,
37};
38use aios_core::action_bus::ActionBus;
39use aios_core::collector_ingress::RustCollectorIngress;
40use aios_core::context_builder::WindowAggregator;
41use aios_core::policy_engine::PolicyEngine;
42use aios_core::privacy_airgap::DefaultPrivacyAirGap;
43use aios_spec::traits::PrivacySanitizer;
44use aios_spec::RawEvent;
45
46mod daemon;
47pub mod pipeline;
48
49use pipeline::{should_stop_processing, ProcessingEvent};
50
51/// 系统状态采集间隔 (秒)
52const SYS_POLL_INTERVAL_SECS: u64 = 30;
53/// Binder 事件轮询间隔 (毫秒)
54const BINDER_POLL_INTERVAL_MS: u64 = 100;
55/// 上下文窗口时长 (秒)
56const WINDOW_DURATION_SECS: u64 = 10;
57
58pub async fn run() -> anyhow::Result<()> {
59    // 1. 初始化日志
60    tracing_subscriber::fmt()
61        .with_env_filter(
62            tracing_subscriber::EnvFilter::from_default_env().add_directive("dipecs=info".parse()?),
63        )
64        .init();
65
66    // 2. 解析命令行参数
67    let args: Vec<String> = std::env::args().collect();
68    let no_daemon = args.iter().any(|a| a == "--no-daemon");
69
70    if !no_daemon {
71        daemon::daemonize();
72    }
73    tracing::info!("dipecsd starting (no-daemon={})", no_daemon);
74
75    // 3. 初始化 ActionBus 和关闭信号
76    let mut bus = ActionBus::new(4096);
77    let mut shutdown_rx = daemon::install_signal_handlers();
78
79    // ---- Task 1: 采集 ----
80    let collect_raw_tx = bus.raw_sender();
81    let mut collect_shutdown = shutdown_rx.resubscribe();
82    let collect_handle = tokio::spawn(async move {
83        tracing::info!("collection task started");
84
85        let ingress = RustCollectorIngress;
86        let mut binder_probe = BinderProbe::new();
87        match binder_probe.try_init() {
88            Ok(true) => tracing::info!("Binder probe initialized with eBPF"),
89            Ok(false) => {
90                tracing::warn!("Binder probe unavailable — running without IPC monitoring")
91            },
92            Err(e) => tracing::error!("Binder probe init failed: {}", e),
93        }
94
95        let mut prev_proc_snapshots: HashMap<u32, proc_reader::ProcSnapshot> = HashMap::new();
96        let mut last_sys_poll = SystemTime::now() - Duration::from_secs(SYS_POLL_INTERVAL_SECS);
97
98        loop {
99            let now = timestamp_ms();
100
101            // /proc 轮询
102            {
103                let snapshots = ProcReader::scan_all();
104                let curr_map: HashMap<u32, proc_reader::ProcSnapshot> =
105                    snapshots.iter().map(|s| (s.pid, s.clone())).collect();
106
107                let changed = proc_reader::diff_snapshots(&prev_proc_snapshots, &curr_map);
108                for snap in &changed {
109                    let event = ingress.accept_internal(
110                        RawEvent::ProcStateChange(ProcReader::to_event(snap, now)),
111                        "ProcReader",
112                        now,
113                    );
114                    if collect_raw_tx.send(event).await.is_err() {
115                        tracing::debug!("collection: raw channel closed");
116                        return;
117                    }
118                }
119                prev_proc_snapshots = curr_map;
120            }
121
122            // 系统状态采集
123            {
124                let elapsed = SystemTime::now()
125                    .duration_since(last_sys_poll)
126                    .unwrap_or_default();
127                if elapsed.as_secs() >= SYS_POLL_INTERVAL_SECS {
128                    let sys_event = SystemStateCollector::snapshot(now);
129                    let event = ingress.accept_internal(
130                        RawEvent::SystemState(sys_event),
131                        "SystemStateCollector",
132                        now,
133                    );
134                    if collect_raw_tx.send(event).await.is_err() {
135                        tracing::debug!("collection: raw channel closed");
136                        return;
137                    }
138                    last_sys_poll = SystemTime::now();
139                }
140            }
141
142            // Binder 事件轮询
143            {
144                let binder_events = binder_probe.poll();
145                for tx in &binder_events {
146                    let event = ingress.accept_internal(
147                        RawEvent::BinderTransaction(tx.to_event()),
148                        "BinderProbe",
149                        now,
150                    );
151                    if collect_raw_tx.send(event).await.is_err() {
152                        tracing::debug!("collection: raw channel closed");
153                        return;
154                    }
155                }
156            }
157
158            // 检查退出信号 (non-blocking)
159            if collect_shutdown.try_recv().is_ok() {
160                tracing::info!("collection task shutting down");
161                return;
162            }
163
164            tokio::time::sleep(Duration::from_millis(BINDER_POLL_INTERVAL_MS)).await;
165        }
166    });
167
168    // ---- Task 2: 处理管道 (运行在主 task 上) ----
169    tracing::info!("processing task started");
170
171    let sanitizer = DefaultPrivacyAirGap;
172    let router = DecisionRouter::default();
173    let policy = PolicyEngine::default();
174    let executor = DefaultActionExecutor;
175    let window_dur = Duration::from_secs(WINDOW_DURATION_SECS);
176    let mut window = WindowAggregator::new(WINDOW_DURATION_SECS, timestamp_ms());
177    let mut raw_stats = RawEventStats::default();
178    let mut window_deadline = Instant::now() + window_dur;
179
180    loop {
181        let remaining = if window_deadline > Instant::now() {
182            window_deadline - Instant::now()
183        } else {
184            Duration::ZERO
185        };
186
187        let processing_event = tokio::select! {
188            _ = shutdown_rx.recv() => {
189                tracing::info!("processing task shutting down");
190                break;
191            }
192            event = bus.recv_raw() => {
193                match event {
194                    Some(raw) => ProcessingEvent::Raw(raw),
195                    None => ProcessingEvent::RawChannelClosed,
196                }
197            }
198            _ = tokio::time::sleep(remaining) => {
199                ProcessingEvent::WindowExpired
200            }
201        };
202
203        if should_stop_processing(&processing_event) {
204            tracing::info!("raw event channel closed, flushing remaining events");
205            let window_stats = std::mem::take(&mut raw_stats);
206            if let Some(ctx) = window.close(timestamp_ms()) {
207                pipeline::process_window(&ctx, &router, &policy, &executor, &window_stats);
208            }
209            break;
210        }
211
212        let window_expired = matches!(processing_event, ProcessingEvent::WindowExpired);
213
214        match processing_event {
215            ProcessingEvent::Raw(ingested) => {
216                raw_stats.record(&ingested.raw_event);
217                let sanitized =
218                    sanitizer.sanitize_with_tier(ingested.raw_event, ingested.source_tier);
219                window.push(sanitized);
220            },
221            ProcessingEvent::RawChannelClosed => unreachable!("handled before event dispatch"),
222            ProcessingEvent::WindowExpired => {},
223        }
224
225        // Check if window should close
226        if window_expired || Instant::now() >= window_deadline {
227            let window_stats = std::mem::take(&mut raw_stats);
228            if let Some(ctx) = window.close(timestamp_ms()) {
229                pipeline::process_window(&ctx, &router, &policy, &executor, &window_stats);
230            }
231            window_deadline = Instant::now() + window_dur;
232        }
233    }
234
235    // Wait for collection task to finish
236    let _ = collect_handle.await;
237
238    tracing::info!("dipecsd stopped");
239    Ok(())
240}
241
242fn timestamp_ms() -> i64 {
243    SystemTime::now()
244        .duration_since(UNIX_EPOCH)
245        .unwrap_or_default()
246        .as_millis() as i64
247}