1use anyhow::anyhow;
2use bincode::Options;
3use oinq::frame;
4
5use super::Connection;
6use crate::{
7 client,
8 types::{HostNetworkGroup, Process, ResourceUsage, SamplingPolicy, TrafficFilterRule},
9};
10
11impl Connection {
13 pub async fn get_process_list(&self) -> anyhow::Result<Vec<Process>> {
20 self.send_request(client::RequestCode::ProcessList, &())
21 .await
22 }
23
24 pub async fn get_resource_usage(&self) -> anyhow::Result<ResourceUsage> {
31 self.send_request::<(), (String, ResourceUsage)>(client::RequestCode::ResourceUsage, &())
32 .await
33 .map(|(_, usage)| usage)
34 }
35
36 pub async fn send_allowlist(&self, allowlist: &HostNetworkGroup) -> anyhow::Result<()> {
42 self.send_request(client::RequestCode::Allowlist, allowlist)
43 .await
44 }
45
46 pub async fn send_blocklist(&self, blocklist: &HostNetworkGroup) -> anyhow::Result<()> {
52 self.send_request(client::RequestCode::Blocklist, blocklist)
53 .await
54 }
55
56 pub async fn send_config_update_cmd(&self) -> anyhow::Result<()> {
62 self.send_request(client::RequestCode::UpdateConfig, &())
63 .await
64 }
65
66 pub async fn send_filtering_rules(&self, list: &[TrafficFilterRule]) -> anyhow::Result<()> {
72 self.send_request(client::RequestCode::ReloadFilterRule, list)
73 .await
74 }
75
76 pub async fn send_internal_network_list(&self, list: &HostNetworkGroup) -> anyhow::Result<()> {
82 self.send_request(client::RequestCode::InternalNetworkList, list)
83 .await
84 }
85
86 pub async fn send_ping(&self) -> anyhow::Result<()> {
92 self.send_request(client::RequestCode::EchoRequest, &())
93 .await
94 }
95
96 pub async fn send_reboot_cmd(&self) -> anyhow::Result<()> {
102 self.send_request(client::RequestCode::Reboot, &()).await
103 }
104
105 pub async fn send_sampling_policies(&self, list: &[SamplingPolicy]) -> anyhow::Result<()> {
111 self.send_request(client::RequestCode::SamplingPolicyList, list)
112 .await
113 }
114
115 pub async fn send_shutdown_cmd(&self) -> anyhow::Result<()> {
121 self.send_request(client::RequestCode::Shutdown, &()).await
122 }
123
124 pub async fn send_tor_exit_node_list(&self, list: &[String]) -> anyhow::Result<()> {
130 self.send_request(client::RequestCode::TorExitNodeList, list)
131 .await
132 }
133
134 pub async fn send_trusted_domain_list(&self, list: &[String]) -> anyhow::Result<()> {
140 self.send_request(client::RequestCode::TrustedDomainList, list)
141 .await
142 }
143
144 pub async fn send_trusted_user_agent_list(&self, list: &[String]) -> anyhow::Result<()> {
150 self.send_request(client::RequestCode::TrustedUserAgentList, list)
151 .await
152 }
153
154 async fn send_request<T: serde::Serialize + ?Sized, S: serde::de::DeserializeOwned>(
156 &self,
157 request_code: client::RequestCode,
158 payload: &T,
159 ) -> anyhow::Result<S> {
160 let Ok(mut buf) = bincode::serialize::<u32>(&request_code.into()) else {
161 unreachable!("serialization of u32 into memory buffer should not fail")
162 };
163 let ser = bincode::DefaultOptions::new();
164 buf.extend(ser.serialize(payload)?);
165
166 let (mut send, mut recv) = self.conn.open_bi().await?;
167 frame::send_raw(&mut send, &buf).await?;
168
169 frame::recv::<Result<S, String>>(&mut recv, &mut buf)
170 .await?
171 .map_err(|e| anyhow!(e))
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 #[cfg(all(feature = "client", feature = "server"))]
178 use {
179 crate::{
180 test::TEST_ENV,
181 types::{HostNetworkGroup, SamplingKind, SamplingPolicy},
182 },
183 ipnet::IpNet,
184 std::{
185 net::{IpAddr, Ipv4Addr},
186 time::Duration,
187 },
188 };
189
190 #[cfg(all(feature = "client", feature = "server"))]
191 const IP_ADDR_1: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
193
194 #[cfg(all(feature = "client", feature = "server"))]
195 struct TestHandler;
197
198 #[cfg(all(feature = "client", feature = "server"))]
199 #[async_trait::async_trait]
200 impl crate::request::Handler for TestHandler {
201 async fn allowlist(&mut self, list: HostNetworkGroup) -> Result<(), String> {
202 if list.hosts == [IP_ADDR_1] {
203 Ok(())
204 } else {
205 Err("unexpected domain list".to_string())
206 }
207 }
208
209 async fn blocklist(&mut self, list: HostNetworkGroup) -> Result<(), String> {
210 if list.hosts == [IP_ADDR_1] {
211 Ok(())
212 } else {
213 Err("unexpected blocklist".to_string())
214 }
215 }
216
217 async fn update_config(&mut self) -> Result<(), String> {
218 Ok(())
219 }
220
221 async fn update_traffic_filter_rules(
222 &mut self,
223 rules: &[super::TrafficFilterRule],
224 ) -> Result<(), String> {
225 if rules.len() == 1 {
226 Ok(())
227 } else {
228 Err("unexpected filtering rules".to_string())
229 }
230 }
231
232 async fn internal_network_list(&mut self, list: HostNetworkGroup) -> Result<(), String> {
233 if list.hosts == [IP_ADDR_1] {
234 Ok(())
235 } else {
236 Err("unexpected internal network list".to_string())
237 }
238 }
239
240 async fn process_list(&mut self) -> Result<Vec<super::Process>, String> {
241 Ok(vec![super::Process {
242 user: "test-user".to_string(),
243 cpu_usage: 10.0,
244 mem_usage: 20.0,
245 start_time: 1_234_567_890,
246 command: "test-command".to_string(),
247 }])
248 }
249
250 async fn resource_usage(&mut self) -> Result<(String, super::ResourceUsage), String> {
251 Ok((
252 "test-host".to_string(),
253 super::ResourceUsage {
254 cpu_usage: 0.5,
255 total_memory: 100,
256 used_memory: 50,
257 total_disk_space: 1000,
258 used_disk_space: 500,
259 },
260 ))
261 }
262
263 async fn reboot(&mut self) -> Result<(), String> {
264 Ok(())
265 }
266
267 async fn sampling_policy_list(
268 &mut self,
269 policies: &[super::SamplingPolicy],
270 ) -> Result<(), String> {
271 if policies.len() == 1 && policies[0].id == 42 {
272 Ok(())
273 } else {
274 Err("unexpected sampling policies".to_string())
275 }
276 }
277
278 async fn shutdown(&mut self) -> Result<(), String> {
279 Ok(())
280 }
281
282 async fn tor_exit_node_list(&mut self, nodes: &[&str]) -> Result<(), String> {
283 if nodes == ["192.168.1.1", "10.0.0.1"] {
284 Ok(())
285 } else {
286 Err("unexpected tor exit node list".to_string())
287 }
288 }
289
290 async fn trusted_domain_list(&mut self, domains: &[&str]) -> Result<(), String> {
291 if domains == ["example.com", "test.org"] {
292 Ok(())
293 } else {
294 Err("unexpected trusted domain list".to_string())
295 }
296 }
297
298 async fn trusted_user_agent_list(&mut self, agents: &[&str]) -> Result<(), String> {
299 if agents == ["Mozilla/5.0", "Chrome/91.0"] {
300 Ok(())
301 } else {
302 Err("unexpected trusted user agent list".to_string())
303 }
304 }
305 }
306
307 #[cfg(all(feature = "client", feature = "server"))]
308 #[tokio::test]
309 async fn get_resource_usage() {
310 let test_env = TEST_ENV.lock().await;
311 let (server_conn, client_conn) = test_env.setup().await;
312
313 let mut handler = TestHandler;
314 let handler_conn = client_conn.clone();
315 let client_handle = tokio::spawn(async move {
316 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
317
318 crate::request::handle(&mut handler, &mut send, &mut recv).await
319 });
320 let server_res = server_conn.get_resource_usage().await;
321 assert!(server_res.is_ok());
322 let usage = server_res.unwrap();
323 assert_eq!(usage.total_memory, 100);
324 let client_res = client_handle.await.unwrap();
325 assert!(client_res.is_ok());
326
327 test_env.teardown(&server_conn);
328 }
329
330 #[cfg(all(feature = "client", feature = "server"))]
331 #[tokio::test]
332 async fn get_process_list() {
333 let test_env = TEST_ENV.lock().await;
334 let (server_conn, client_conn) = test_env.setup().await;
335
336 let mut handler = TestHandler;
337 let handler_conn = client_conn.clone();
338 let client_handle = tokio::spawn(async move {
339 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
340
341 crate::request::handle(&mut handler, &mut send, &mut recv).await
342 });
343 let server_res = server_conn.get_process_list().await;
344 assert!(server_res.is_ok());
345 let processes = server_res.unwrap();
346 assert_eq!(processes.len(), 1);
347 assert_eq!(processes[0].user, "test-user");
348 let client_res = client_handle.await.unwrap();
349 assert!(client_res.is_ok());
350
351 test_env.teardown(&server_conn);
352 }
353
354 #[cfg(all(feature = "client", feature = "server"))]
355 #[tokio::test]
356 async fn send_allowlist() {
357 let test_env = TEST_ENV.lock().await;
358 let (server_conn, client_conn) = test_env.setup().await;
359
360 let allowlist_to_send = HostNetworkGroup {
361 hosts: vec![IP_ADDR_1],
362 networks: vec![],
363 ip_ranges: vec![],
364 };
365
366 let mut handler = TestHandler;
367 let handler_conn = client_conn.clone();
368 let client_handle = tokio::spawn(async move {
369 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
370
371 crate::request::handle(&mut handler, &mut send, &mut recv).await
372 });
373 let server_res = server_conn.send_allowlist(&allowlist_to_send).await;
374 assert!(server_res.is_ok());
375 let client_res = client_handle.await.unwrap();
376 assert!(client_res.is_ok());
377
378 test_env.teardown(&server_conn);
379 }
380
381 #[cfg(all(feature = "client", feature = "server"))]
382 #[tokio::test]
383 async fn send_reboot_cmd() {
384 let test_env = TEST_ENV.lock().await;
385 let (server_conn, client_conn) = test_env.setup().await;
386
387 let mut handler = TestHandler;
388 let handler_conn = client_conn.clone();
389 let client_handle = tokio::spawn(async move {
390 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
391
392 crate::request::handle(&mut handler, &mut send, &mut recv).await
393 });
394 let server_res = server_conn.send_reboot_cmd().await;
395 assert!(server_res.is_ok());
396 let client_res = client_handle.await.unwrap();
397 assert!(client_res.is_ok());
398
399 test_env.teardown(&server_conn);
400 }
401
402 #[cfg(all(feature = "client", feature = "server"))]
403 #[tokio::test]
404 async fn send_blocklist() {
405 let test_env = TEST_ENV.lock().await;
406 let (server_conn, client_conn) = test_env.setup().await;
407
408 let blocklist_to_send = HostNetworkGroup {
409 hosts: vec![IP_ADDR_1],
410 networks: vec![],
411 ip_ranges: vec![],
412 };
413
414 let mut handler = TestHandler;
415 let handler_conn = client_conn.clone();
416 let client_handle = tokio::spawn(async move {
417 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
418
419 crate::request::handle(&mut handler, &mut send, &mut recv).await
420 });
421 let server_res = server_conn.send_blocklist(&blocklist_to_send).await;
422 assert!(server_res.is_ok());
423 let client_res = client_handle.await.unwrap();
424 assert!(client_res.is_ok());
425
426 test_env.teardown(&server_conn);
427 }
428
429 #[cfg(all(feature = "client", feature = "server"))]
430 #[tokio::test]
431 async fn send_config_update_cmd() {
432 let test_env = TEST_ENV.lock().await;
433 let (server_conn, client_conn) = test_env.setup().await;
434
435 let mut handler = TestHandler;
436 let handler_conn = client_conn.clone();
437 let client_handle = tokio::spawn(async move {
438 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
439
440 crate::request::handle(&mut handler, &mut send, &mut recv).await
441 });
442 let server_res = server_conn.send_config_update_cmd().await;
443 assert!(server_res.is_ok());
444 let client_res = client_handle.await.unwrap();
445 assert!(client_res.is_ok());
446
447 test_env.teardown(&server_conn);
448 }
449
450 #[cfg(all(feature = "client", feature = "server"))]
451 #[tokio::test]
452 async fn send_filtering_rules() {
453 let test_env = TEST_ENV.lock().await;
454 let (server_conn, client_conn) = test_env.setup().await;
455
456 let filtering_rules_to_send = vec![(
457 "0.0.0.0/0".parse::<IpNet>().unwrap(),
458 Some(vec![80]),
459 Some(vec![6]),
460 )];
461
462 let mut handler = TestHandler;
463 let handler_conn = client_conn.clone();
464 let client_handle = tokio::spawn(async move {
465 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
466
467 crate::request::handle(&mut handler, &mut send, &mut recv).await
468 });
469 let server_res = server_conn
470 .send_filtering_rules(&filtering_rules_to_send)
471 .await;
472 assert!(server_res.is_ok());
473 let client_res = client_handle.await.unwrap();
474 assert!(client_res.is_ok());
475
476 test_env.teardown(&server_conn);
477 }
478
479 #[cfg(all(feature = "client", feature = "server"))]
480 #[tokio::test]
481 async fn send_internal_network_list() {
482 let test_env = TEST_ENV.lock().await;
483 let (server_conn, client_conn) = test_env.setup().await;
484
485 let internal_network_list_to_send = HostNetworkGroup {
486 hosts: vec![IP_ADDR_1],
487 networks: vec![],
488 ip_ranges: vec![],
489 };
490
491 let mut handler = TestHandler;
492 let handler_conn = client_conn.clone();
493 let client_handle = tokio::spawn(async move {
494 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
495
496 crate::request::handle(&mut handler, &mut send, &mut recv).await
497 });
498 let server_res = server_conn
499 .send_internal_network_list(&internal_network_list_to_send)
500 .await;
501 assert!(server_res.is_ok());
502 let client_res = client_handle.await.unwrap();
503 assert!(client_res.is_ok());
504
505 test_env.teardown(&server_conn);
506 }
507
508 #[cfg(all(feature = "client", feature = "server"))]
509 #[tokio::test]
510 async fn send_sampling_policies() {
511 let test_env = TEST_ENV.lock().await;
512 let (server_conn, client_conn) = test_env.setup().await;
513
514 let sampling_policies_to_send = vec![SamplingPolicy {
515 id: 42,
516 kind: SamplingKind::Conn,
517 interval: Duration::from_secs(60),
518 period: Duration::from_secs(3600),
519 offset: 0,
520 src_ip: None,
521 dst_ip: None,
522 node: None,
523 column: None,
524 }];
525
526 let mut handler = TestHandler;
527 let handler_conn = client_conn.clone();
528 let client_handle = tokio::spawn(async move {
529 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
530
531 crate::request::handle(&mut handler, &mut send, &mut recv).await
532 });
533 let server_res = server_conn
534 .send_sampling_policies(&sampling_policies_to_send)
535 .await;
536 assert!(server_res.is_ok());
537 let client_res = client_handle.await.unwrap();
538 assert!(client_res.is_ok());
539
540 test_env.teardown(&server_conn);
541 }
542
543 #[cfg(all(feature = "client", feature = "server"))]
544 #[tokio::test]
545 async fn send_tor_exit_node_list() {
546 let test_env = TEST_ENV.lock().await;
547 let (server_conn, client_conn) = test_env.setup().await;
548
549 let tor_exit_node_list_to_send = vec!["192.168.1.1".to_string(), "10.0.0.1".to_string()];
550
551 let mut handler = TestHandler;
552 let handler_conn = client_conn.clone();
553 let client_handle = tokio::spawn(async move {
554 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
555
556 crate::request::handle(&mut handler, &mut send, &mut recv).await
557 });
558 let server_res = server_conn
559 .send_tor_exit_node_list(&tor_exit_node_list_to_send)
560 .await;
561 assert!(server_res.is_ok());
562 let client_res = client_handle.await.unwrap();
563 assert!(client_res.is_ok());
564
565 test_env.teardown(&server_conn);
566 }
567
568 #[cfg(all(feature = "client", feature = "server"))]
569 #[tokio::test]
570 async fn send_trusted_domain_list() {
571 let test_env = TEST_ENV.lock().await;
572 let (server_conn, client_conn) = test_env.setup().await;
573
574 let trusted_domain_list_to_send = vec!["example.com".to_string(), "test.org".to_string()];
575
576 let mut handler = TestHandler;
577 let handler_conn = client_conn.clone();
578 let client_handle = tokio::spawn(async move {
579 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
580
581 crate::request::handle(&mut handler, &mut send, &mut recv).await
582 });
583 let server_res = server_conn
584 .send_trusted_domain_list(&trusted_domain_list_to_send)
585 .await;
586 assert!(server_res.is_ok());
587 let client_res = client_handle.await.unwrap();
588 assert!(client_res.is_ok());
589
590 test_env.teardown(&server_conn);
591 }
592
593 #[cfg(all(feature = "client", feature = "server"))]
594 #[tokio::test]
595 async fn send_trusted_user_agent_list() {
596 let test_env = TEST_ENV.lock().await;
597 let (server_conn, client_conn) = test_env.setup().await;
598
599 let trusted_user_agent_list_to_send =
600 vec!["Mozilla/5.0".to_string(), "Chrome/91.0".to_string()];
601
602 let mut handler = TestHandler;
603 let handler_conn = client_conn.clone();
604 let client_handle = tokio::spawn(async move {
605 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
606
607 crate::request::handle(&mut handler, &mut send, &mut recv).await
608 });
609 let server_res = server_conn
610 .send_trusted_user_agent_list(&trusted_user_agent_list_to_send)
611 .await;
612 assert!(server_res.is_ok());
613 let client_res = client_handle.await.unwrap();
614 assert!(client_res.is_ok());
615
616 test_env.teardown(&server_conn);
617 }
618
619 #[cfg(all(feature = "client", feature = "server"))]
620 #[tokio::test]
621 async fn send_ping() {
622 let test_env = TEST_ENV.lock().await;
623 let (server_conn, client_conn) = test_env.setup().await;
624
625 let mut handler = TestHandler;
626 let handler_conn = client_conn.clone();
627 let client_handle = tokio::spawn(async move {
628 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
629
630 crate::request::handle(&mut handler, &mut send, &mut recv).await
631 });
632 let server_res = server_conn.send_ping().await;
633 assert!(server_res.is_ok());
634 let client_res = client_handle.await.unwrap();
635 assert!(client_res.is_ok());
636
637 test_env.teardown(&server_conn);
638 }
639
640 #[cfg(all(feature = "client", feature = "server"))]
641 #[tokio::test]
642 async fn send_shutdown_cmd() {
643 let test_env = TEST_ENV.lock().await;
644 let (server_conn, client_conn) = test_env.setup().await;
645
646 let mut handler = TestHandler;
647 let handler_conn = client_conn.clone();
648 let client_handle = tokio::spawn(async move {
649 let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
650
651 crate::request::handle(&mut handler, &mut send, &mut recv).await
652 });
653 let server_res = server_conn.send_shutdown_cmd().await;
654 assert!(server_res.is_ok());
655 let client_res = client_handle.await.unwrap();
656 assert!(client_res.is_ok());
657
658 test_env.teardown(&server_conn);
659 }
660}