1use std::collections::HashMap;
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30use aios_action::DefaultActionExecutor;
31use aios_agent::DecisionRouter;
32use aios_collector::{
33 binder_probe::BinderProbe,
34 collection_stats::RawEventStats,
35 proc_reader::{self, ProcReader},
36 system_collector::SystemStateCollector,
37};
38use aios_core::action_bus::ActionBus;
39use aios_core::collector_ingress::RustCollectorIngress;
40use aios_core::context_builder::WindowAggregator;
41use aios_core::policy_engine::PolicyEngine;
42use aios_core::privacy_airgap::DefaultPrivacyAirGap;
43use aios_spec::traits::PrivacySanitizer;
44use aios_spec::RawEvent;
45
46mod daemon;
47pub mod pipeline;
48
49use pipeline::{should_stop_processing, ProcessingEvent};
50
51const SYS_POLL_INTERVAL_SECS: u64 = 30;
53const BINDER_POLL_INTERVAL_MS: u64 = 100;
55const WINDOW_DURATION_SECS: u64 = 10;
57
58pub async fn run() -> anyhow::Result<()> {
59 tracing_subscriber::fmt()
61 .with_env_filter(
62 tracing_subscriber::EnvFilter::from_default_env().add_directive("dipecs=info".parse()?),
63 )
64 .init();
65
66 let args: Vec<String> = std::env::args().collect();
68 let no_daemon = args.iter().any(|a| a == "--no-daemon");
69
70 if !no_daemon {
71 daemon::daemonize();
72 }
73 tracing::info!("dipecsd starting (no-daemon={})", no_daemon);
74
75 let mut bus = ActionBus::new(4096);
77 let mut shutdown_rx = daemon::install_signal_handlers();
78
79 let collect_raw_tx = bus.raw_sender();
81 let mut collect_shutdown = shutdown_rx.resubscribe();
82 let collect_handle = tokio::spawn(async move {
83 tracing::info!("collection task started");
84
85 let ingress = RustCollectorIngress;
86 let mut binder_probe = BinderProbe::new();
87 match binder_probe.try_init() {
88 Ok(true) => tracing::info!("Binder probe initialized with eBPF"),
89 Ok(false) => {
90 tracing::warn!("Binder probe unavailable — running without IPC monitoring")
91 },
92 Err(e) => tracing::error!("Binder probe init failed: {}", e),
93 }
94
95 let mut prev_proc_snapshots: HashMap<u32, proc_reader::ProcSnapshot> = HashMap::new();
96 let mut last_sys_poll = SystemTime::now() - Duration::from_secs(SYS_POLL_INTERVAL_SECS);
97
98 loop {
99 let now = timestamp_ms();
100
101 {
103 let snapshots = ProcReader::scan_all();
104 let curr_map: HashMap<u32, proc_reader::ProcSnapshot> =
105 snapshots.iter().map(|s| (s.pid, s.clone())).collect();
106
107 let changed = proc_reader::diff_snapshots(&prev_proc_snapshots, &curr_map);
108 for snap in &changed {
109 let event = ingress.accept_internal(
110 RawEvent::ProcStateChange(ProcReader::to_event(snap, now)),
111 "ProcReader",
112 now,
113 );
114 if collect_raw_tx.send(event).await.is_err() {
115 tracing::debug!("collection: raw channel closed");
116 return;
117 }
118 }
119 prev_proc_snapshots = curr_map;
120 }
121
122 {
124 let elapsed = SystemTime::now()
125 .duration_since(last_sys_poll)
126 .unwrap_or_default();
127 if elapsed.as_secs() >= SYS_POLL_INTERVAL_SECS {
128 let sys_event = SystemStateCollector::snapshot(now);
129 let event = ingress.accept_internal(
130 RawEvent::SystemState(sys_event),
131 "SystemStateCollector",
132 now,
133 );
134 if collect_raw_tx.send(event).await.is_err() {
135 tracing::debug!("collection: raw channel closed");
136 return;
137 }
138 last_sys_poll = SystemTime::now();
139 }
140 }
141
142 {
144 let binder_events = binder_probe.poll();
145 for tx in &binder_events {
146 let event = ingress.accept_internal(
147 RawEvent::BinderTransaction(tx.to_event()),
148 "BinderProbe",
149 now,
150 );
151 if collect_raw_tx.send(event).await.is_err() {
152 tracing::debug!("collection: raw channel closed");
153 return;
154 }
155 }
156 }
157
158 if collect_shutdown.try_recv().is_ok() {
160 tracing::info!("collection task shutting down");
161 return;
162 }
163
164 tokio::time::sleep(Duration::from_millis(BINDER_POLL_INTERVAL_MS)).await;
165 }
166 });
167
168 tracing::info!("processing task started");
170
171 let sanitizer = DefaultPrivacyAirGap;
172 let router = DecisionRouter::default();
173 let policy = PolicyEngine::default();
174 let executor = DefaultActionExecutor;
175 let window_dur = Duration::from_secs(WINDOW_DURATION_SECS);
176 let mut window = WindowAggregator::new(WINDOW_DURATION_SECS, timestamp_ms());
177 let mut raw_stats = RawEventStats::default();
178 let mut window_deadline = Instant::now() + window_dur;
179
180 loop {
181 let remaining = if window_deadline > Instant::now() {
182 window_deadline - Instant::now()
183 } else {
184 Duration::ZERO
185 };
186
187 let processing_event = tokio::select! {
188 _ = shutdown_rx.recv() => {
189 tracing::info!("processing task shutting down");
190 break;
191 }
192 event = bus.recv_raw() => {
193 match event {
194 Some(raw) => ProcessingEvent::Raw(raw),
195 None => ProcessingEvent::RawChannelClosed,
196 }
197 }
198 _ = tokio::time::sleep(remaining) => {
199 ProcessingEvent::WindowExpired
200 }
201 };
202
203 if should_stop_processing(&processing_event) {
204 tracing::info!("raw event channel closed, flushing remaining events");
205 let window_stats = std::mem::take(&mut raw_stats);
206 if let Some(ctx) = window.close(timestamp_ms()) {
207 pipeline::process_window(&ctx, &router, &policy, &executor, &window_stats);
208 }
209 break;
210 }
211
212 let window_expired = matches!(processing_event, ProcessingEvent::WindowExpired);
213
214 match processing_event {
215 ProcessingEvent::Raw(ingested) => {
216 raw_stats.record(&ingested.raw_event);
217 let sanitized =
218 sanitizer.sanitize_with_tier(ingested.raw_event, ingested.source_tier);
219 window.push(sanitized);
220 },
221 ProcessingEvent::RawChannelClosed => unreachable!("handled before event dispatch"),
222 ProcessingEvent::WindowExpired => {},
223 }
224
225 if window_expired || Instant::now() >= window_deadline {
227 let window_stats = std::mem::take(&mut raw_stats);
228 if let Some(ctx) = window.close(timestamp_ms()) {
229 pipeline::process_window(&ctx, &router, &policy, &executor, &window_stats);
230 }
231 window_deadline = Instant::now() + window_dur;
232 }
233 }
234
235 let _ = collect_handle.await;
237
238 tracing::info!("dipecsd stopped");
239 Ok(())
240}
241
242fn timestamp_ms() -> i64 {
243 SystemTime::now()
244 .duration_since(UNIX_EPOCH)
245 .unwrap_or_default()
246 .as_millis() as i64
247}