review_protocol/server/
api.rs

1use anyhow::anyhow;
2use bincode::Options;
3use oinq::frame;
4
5use super::Connection;
6use crate::{
7    client,
8    types::{HostNetworkGroup, Process, ResourceUsage, SamplingPolicy, TrafficFilterRule},
9};
10
11/// The server API.
12impl Connection {
13    /// Fetches the list of processes running on the agent.
14    ///
15    /// # Errors
16    ///
17    /// Returns an error if serialization/deserialization failed or
18    /// communication with the client failed.
19    pub async fn get_process_list(&self) -> anyhow::Result<Vec<Process>> {
20        self.send_request(client::RequestCode::ProcessList, &())
21            .await
22    }
23
24    /// Fetches the resource usage of an agent.
25    ///
26    /// # Errors
27    ///
28    /// Returns an error if serialization/deserialization failed or
29    /// communication with the client failed.
30    pub async fn get_resource_usage(&self) -> anyhow::Result<ResourceUsage> {
31        self.send_request::<(), (String, ResourceUsage)>(client::RequestCode::ResourceUsage, &())
32            .await
33            .map(|(_, usage)| usage)
34    }
35
36    /// Sends the allowlist for network addresses.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if serialization failed or communication with the client failed.
41    pub async fn send_allowlist(&self, allowlist: &HostNetworkGroup) -> anyhow::Result<()> {
42        self.send_request(client::RequestCode::Allowlist, allowlist)
43            .await
44    }
45
46    /// Sends the blocklist for network addresses.
47    ///
48    /// # Errors
49    ///
50    /// Returns an error if serialization failed or communication with the client failed.
51    pub async fn send_blocklist(&self, blocklist: &HostNetworkGroup) -> anyhow::Result<()> {
52        self.send_request(client::RequestCode::Blocklist, blocklist)
53            .await
54    }
55
56    /// Sends the config-update command.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if serialization failed or communication with the client failed.
61    pub async fn send_config_update_cmd(&self) -> anyhow::Result<()> {
62        self.send_request(client::RequestCode::UpdateConfig, &())
63            .await
64    }
65
66    /// Sends the traffic filtering rules.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if serialization failed or communication with the client failed.
71    pub async fn send_filtering_rules(&self, list: &[TrafficFilterRule]) -> anyhow::Result<()> {
72        self.send_request(client::RequestCode::ReloadFilterRule, list)
73            .await
74    }
75
76    /// Sends the internal network list.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if serialization failed or communication with the client failed.
81    pub async fn send_internal_network_list(&self, list: &HostNetworkGroup) -> anyhow::Result<()> {
82        self.send_request(client::RequestCode::InternalNetworkList, list)
83            .await
84    }
85
86    /// Sends the ping message.
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if serialization failed or communication with the client failed.
91    pub async fn send_ping(&self) -> anyhow::Result<()> {
92        self.send_request(client::RequestCode::EchoRequest, &())
93            .await
94    }
95
96    /// Sends the reboot command.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if serialization failed or communication with the client failed.
101    pub async fn send_reboot_cmd(&self) -> anyhow::Result<()> {
102        self.send_request(client::RequestCode::Reboot, &()).await
103    }
104
105    /// Sends the sampling policies.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if serialization failed or communication with the client failed.
110    pub async fn send_sampling_policies(&self, list: &[SamplingPolicy]) -> anyhow::Result<()> {
111        self.send_request(client::RequestCode::SamplingPolicyList, list)
112            .await
113    }
114
115    /// Sends the shutdown command.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if serialization failed or communication with the client failed.
120    pub async fn send_shutdown_cmd(&self) -> anyhow::Result<()> {
121        self.send_request(client::RequestCode::Shutdown, &()).await
122    }
123
124    /// Sends a list of Tor exit nodes to the client.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if serialization failed or communication with the client failed.
129    pub async fn send_tor_exit_node_list(&self, list: &[String]) -> anyhow::Result<()> {
130        self.send_request(client::RequestCode::TorExitNodeList, list)
131            .await
132    }
133
134    /// Sends a list of trusted domains to the client.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if serialization failed or communication with the client failed.
139    pub async fn send_trusted_domain_list(&self, list: &[String]) -> anyhow::Result<()> {
140        self.send_request(client::RequestCode::TrustedDomainList, list)
141            .await
142    }
143
144    /// Sends a list of trusted user-agents to the client.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if serialization failed or communication with the client failed.
149    pub async fn send_trusted_user_agent_list(&self, list: &[String]) -> anyhow::Result<()> {
150        self.send_request(client::RequestCode::TrustedUserAgentList, list)
151            .await
152    }
153
154    /// Sends the given payload to the client.
155    async fn send_request<T: serde::Serialize + ?Sized, S: serde::de::DeserializeOwned>(
156        &self,
157        request_code: client::RequestCode,
158        payload: &T,
159    ) -> anyhow::Result<S> {
160        let Ok(mut buf) = bincode::serialize::<u32>(&request_code.into()) else {
161            unreachable!("serialization of u32 into memory buffer should not fail")
162        };
163        let ser = bincode::DefaultOptions::new();
164        buf.extend(ser.serialize(payload)?);
165
166        let (mut send, mut recv) = self.conn.open_bi().await?;
167        frame::send_raw(&mut send, &buf).await?;
168
169        frame::recv::<Result<S, String>>(&mut recv, &mut buf)
170            .await?
171            .map_err(|e| anyhow!(e))
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    #[cfg(all(feature = "client", feature = "server"))]
178    use {
179        crate::{
180            test::TEST_ENV,
181            types::{HostNetworkGroup, SamplingKind, SamplingPolicy},
182        },
183        ipnet::IpNet,
184        std::{
185            net::{IpAddr, Ipv4Addr},
186            time::Duration,
187        },
188    };
189
190    #[cfg(all(feature = "client", feature = "server"))]
191    // Define a constant IP address for tests
192    const IP_ADDR_1: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
193
194    #[cfg(all(feature = "client", feature = "server"))]
195    // Shared handler for all tests
196    struct TestHandler;
197
198    #[cfg(all(feature = "client", feature = "server"))]
199    #[async_trait::async_trait]
200    impl crate::request::Handler for TestHandler {
201        async fn allowlist(&mut self, list: HostNetworkGroup) -> Result<(), String> {
202            if list.hosts == [IP_ADDR_1] {
203                Ok(())
204            } else {
205                Err("unexpected domain list".to_string())
206            }
207        }
208
209        async fn blocklist(&mut self, list: HostNetworkGroup) -> Result<(), String> {
210            if list.hosts == [IP_ADDR_1] {
211                Ok(())
212            } else {
213                Err("unexpected blocklist".to_string())
214            }
215        }
216
217        async fn update_config(&mut self) -> Result<(), String> {
218            Ok(())
219        }
220
221        async fn update_traffic_filter_rules(
222            &mut self,
223            rules: &[super::TrafficFilterRule],
224        ) -> Result<(), String> {
225            if rules.len() == 1 {
226                Ok(())
227            } else {
228                Err("unexpected filtering rules".to_string())
229            }
230        }
231
232        async fn internal_network_list(&mut self, list: HostNetworkGroup) -> Result<(), String> {
233            if list.hosts == [IP_ADDR_1] {
234                Ok(())
235            } else {
236                Err("unexpected internal network list".to_string())
237            }
238        }
239
240        async fn process_list(&mut self) -> Result<Vec<super::Process>, String> {
241            Ok(vec![super::Process {
242                user: "test-user".to_string(),
243                cpu_usage: 10.0,
244                mem_usage: 20.0,
245                start_time: 1_234_567_890,
246                command: "test-command".to_string(),
247            }])
248        }
249
250        async fn resource_usage(&mut self) -> Result<(String, super::ResourceUsage), String> {
251            Ok((
252                "test-host".to_string(),
253                super::ResourceUsage {
254                    cpu_usage: 0.5,
255                    total_memory: 100,
256                    used_memory: 50,
257                    total_disk_space: 1000,
258                    used_disk_space: 500,
259                },
260            ))
261        }
262
263        async fn reboot(&mut self) -> Result<(), String> {
264            Ok(())
265        }
266
267        async fn sampling_policy_list(
268            &mut self,
269            policies: &[super::SamplingPolicy],
270        ) -> Result<(), String> {
271            if policies.len() == 1 && policies[0].id == 42 {
272                Ok(())
273            } else {
274                Err("unexpected sampling policies".to_string())
275            }
276        }
277
278        async fn shutdown(&mut self) -> Result<(), String> {
279            Ok(())
280        }
281
282        async fn tor_exit_node_list(&mut self, nodes: &[&str]) -> Result<(), String> {
283            if nodes == ["192.168.1.1", "10.0.0.1"] {
284                Ok(())
285            } else {
286                Err("unexpected tor exit node list".to_string())
287            }
288        }
289
290        async fn trusted_domain_list(&mut self, domains: &[&str]) -> Result<(), String> {
291            if domains == ["example.com", "test.org"] {
292                Ok(())
293            } else {
294                Err("unexpected trusted domain list".to_string())
295            }
296        }
297
298        async fn trusted_user_agent_list(&mut self, agents: &[&str]) -> Result<(), String> {
299            if agents == ["Mozilla/5.0", "Chrome/91.0"] {
300                Ok(())
301            } else {
302                Err("unexpected trusted user agent list".to_string())
303            }
304        }
305    }
306
307    #[cfg(all(feature = "client", feature = "server"))]
308    #[tokio::test]
309    async fn get_resource_usage() {
310        let test_env = TEST_ENV.lock().await;
311        let (server_conn, client_conn) = test_env.setup().await;
312
313        let mut handler = TestHandler;
314        let handler_conn = client_conn.clone();
315        let client_handle = tokio::spawn(async move {
316            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
317
318            crate::request::handle(&mut handler, &mut send, &mut recv).await
319        });
320        let server_res = server_conn.get_resource_usage().await;
321        assert!(server_res.is_ok());
322        let usage = server_res.unwrap();
323        assert_eq!(usage.total_memory, 100);
324        let client_res = client_handle.await.unwrap();
325        assert!(client_res.is_ok());
326
327        test_env.teardown(&server_conn);
328    }
329
330    #[cfg(all(feature = "client", feature = "server"))]
331    #[tokio::test]
332    async fn get_process_list() {
333        let test_env = TEST_ENV.lock().await;
334        let (server_conn, client_conn) = test_env.setup().await;
335
336        let mut handler = TestHandler;
337        let handler_conn = client_conn.clone();
338        let client_handle = tokio::spawn(async move {
339            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
340
341            crate::request::handle(&mut handler, &mut send, &mut recv).await
342        });
343        let server_res = server_conn.get_process_list().await;
344        assert!(server_res.is_ok());
345        let processes = server_res.unwrap();
346        assert_eq!(processes.len(), 1);
347        assert_eq!(processes[0].user, "test-user");
348        let client_res = client_handle.await.unwrap();
349        assert!(client_res.is_ok());
350
351        test_env.teardown(&server_conn);
352    }
353
354    #[cfg(all(feature = "client", feature = "server"))]
355    #[tokio::test]
356    async fn send_allowlist() {
357        let test_env = TEST_ENV.lock().await;
358        let (server_conn, client_conn) = test_env.setup().await;
359
360        let allowlist_to_send = HostNetworkGroup {
361            hosts: vec![IP_ADDR_1],
362            networks: vec![],
363            ip_ranges: vec![],
364        };
365
366        let mut handler = TestHandler;
367        let handler_conn = client_conn.clone();
368        let client_handle = tokio::spawn(async move {
369            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
370
371            crate::request::handle(&mut handler, &mut send, &mut recv).await
372        });
373        let server_res = server_conn.send_allowlist(&allowlist_to_send).await;
374        assert!(server_res.is_ok());
375        let client_res = client_handle.await.unwrap();
376        assert!(client_res.is_ok());
377
378        test_env.teardown(&server_conn);
379    }
380
381    #[cfg(all(feature = "client", feature = "server"))]
382    #[tokio::test]
383    async fn send_reboot_cmd() {
384        let test_env = TEST_ENV.lock().await;
385        let (server_conn, client_conn) = test_env.setup().await;
386
387        let mut handler = TestHandler;
388        let handler_conn = client_conn.clone();
389        let client_handle = tokio::spawn(async move {
390            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
391
392            crate::request::handle(&mut handler, &mut send, &mut recv).await
393        });
394        let server_res = server_conn.send_reboot_cmd().await;
395        assert!(server_res.is_ok());
396        let client_res = client_handle.await.unwrap();
397        assert!(client_res.is_ok());
398
399        test_env.teardown(&server_conn);
400    }
401
402    #[cfg(all(feature = "client", feature = "server"))]
403    #[tokio::test]
404    async fn send_blocklist() {
405        let test_env = TEST_ENV.lock().await;
406        let (server_conn, client_conn) = test_env.setup().await;
407
408        let blocklist_to_send = HostNetworkGroup {
409            hosts: vec![IP_ADDR_1],
410            networks: vec![],
411            ip_ranges: vec![],
412        };
413
414        let mut handler = TestHandler;
415        let handler_conn = client_conn.clone();
416        let client_handle = tokio::spawn(async move {
417            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
418
419            crate::request::handle(&mut handler, &mut send, &mut recv).await
420        });
421        let server_res = server_conn.send_blocklist(&blocklist_to_send).await;
422        assert!(server_res.is_ok());
423        let client_res = client_handle.await.unwrap();
424        assert!(client_res.is_ok());
425
426        test_env.teardown(&server_conn);
427    }
428
429    #[cfg(all(feature = "client", feature = "server"))]
430    #[tokio::test]
431    async fn send_config_update_cmd() {
432        let test_env = TEST_ENV.lock().await;
433        let (server_conn, client_conn) = test_env.setup().await;
434
435        let mut handler = TestHandler;
436        let handler_conn = client_conn.clone();
437        let client_handle = tokio::spawn(async move {
438            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
439
440            crate::request::handle(&mut handler, &mut send, &mut recv).await
441        });
442        let server_res = server_conn.send_config_update_cmd().await;
443        assert!(server_res.is_ok());
444        let client_res = client_handle.await.unwrap();
445        assert!(client_res.is_ok());
446
447        test_env.teardown(&server_conn);
448    }
449
450    #[cfg(all(feature = "client", feature = "server"))]
451    #[tokio::test]
452    async fn send_filtering_rules() {
453        let test_env = TEST_ENV.lock().await;
454        let (server_conn, client_conn) = test_env.setup().await;
455
456        let filtering_rules_to_send = vec![(
457            "0.0.0.0/0".parse::<IpNet>().unwrap(),
458            Some(vec![80]),
459            Some(vec![6]),
460        )];
461
462        let mut handler = TestHandler;
463        let handler_conn = client_conn.clone();
464        let client_handle = tokio::spawn(async move {
465            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
466
467            crate::request::handle(&mut handler, &mut send, &mut recv).await
468        });
469        let server_res = server_conn
470            .send_filtering_rules(&filtering_rules_to_send)
471            .await;
472        assert!(server_res.is_ok());
473        let client_res = client_handle.await.unwrap();
474        assert!(client_res.is_ok());
475
476        test_env.teardown(&server_conn);
477    }
478
479    #[cfg(all(feature = "client", feature = "server"))]
480    #[tokio::test]
481    async fn send_internal_network_list() {
482        let test_env = TEST_ENV.lock().await;
483        let (server_conn, client_conn) = test_env.setup().await;
484
485        let internal_network_list_to_send = HostNetworkGroup {
486            hosts: vec![IP_ADDR_1],
487            networks: vec![],
488            ip_ranges: vec![],
489        };
490
491        let mut handler = TestHandler;
492        let handler_conn = client_conn.clone();
493        let client_handle = tokio::spawn(async move {
494            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
495
496            crate::request::handle(&mut handler, &mut send, &mut recv).await
497        });
498        let server_res = server_conn
499            .send_internal_network_list(&internal_network_list_to_send)
500            .await;
501        assert!(server_res.is_ok());
502        let client_res = client_handle.await.unwrap();
503        assert!(client_res.is_ok());
504
505        test_env.teardown(&server_conn);
506    }
507
508    #[cfg(all(feature = "client", feature = "server"))]
509    #[tokio::test]
510    async fn send_sampling_policies() {
511        let test_env = TEST_ENV.lock().await;
512        let (server_conn, client_conn) = test_env.setup().await;
513
514        let sampling_policies_to_send = vec![SamplingPolicy {
515            id: 42,
516            kind: SamplingKind::Conn,
517            interval: Duration::from_secs(60),
518            period: Duration::from_secs(3600),
519            offset: 0,
520            src_ip: None,
521            dst_ip: None,
522            node: None,
523            column: None,
524        }];
525
526        let mut handler = TestHandler;
527        let handler_conn = client_conn.clone();
528        let client_handle = tokio::spawn(async move {
529            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
530
531            crate::request::handle(&mut handler, &mut send, &mut recv).await
532        });
533        let server_res = server_conn
534            .send_sampling_policies(&sampling_policies_to_send)
535            .await;
536        assert!(server_res.is_ok());
537        let client_res = client_handle.await.unwrap();
538        assert!(client_res.is_ok());
539
540        test_env.teardown(&server_conn);
541    }
542
543    #[cfg(all(feature = "client", feature = "server"))]
544    #[tokio::test]
545    async fn send_tor_exit_node_list() {
546        let test_env = TEST_ENV.lock().await;
547        let (server_conn, client_conn) = test_env.setup().await;
548
549        let tor_exit_node_list_to_send = vec!["192.168.1.1".to_string(), "10.0.0.1".to_string()];
550
551        let mut handler = TestHandler;
552        let handler_conn = client_conn.clone();
553        let client_handle = tokio::spawn(async move {
554            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
555
556            crate::request::handle(&mut handler, &mut send, &mut recv).await
557        });
558        let server_res = server_conn
559            .send_tor_exit_node_list(&tor_exit_node_list_to_send)
560            .await;
561        assert!(server_res.is_ok());
562        let client_res = client_handle.await.unwrap();
563        assert!(client_res.is_ok());
564
565        test_env.teardown(&server_conn);
566    }
567
568    #[cfg(all(feature = "client", feature = "server"))]
569    #[tokio::test]
570    async fn send_trusted_domain_list() {
571        let test_env = TEST_ENV.lock().await;
572        let (server_conn, client_conn) = test_env.setup().await;
573
574        let trusted_domain_list_to_send = vec!["example.com".to_string(), "test.org".to_string()];
575
576        let mut handler = TestHandler;
577        let handler_conn = client_conn.clone();
578        let client_handle = tokio::spawn(async move {
579            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
580
581            crate::request::handle(&mut handler, &mut send, &mut recv).await
582        });
583        let server_res = server_conn
584            .send_trusted_domain_list(&trusted_domain_list_to_send)
585            .await;
586        assert!(server_res.is_ok());
587        let client_res = client_handle.await.unwrap();
588        assert!(client_res.is_ok());
589
590        test_env.teardown(&server_conn);
591    }
592
593    #[cfg(all(feature = "client", feature = "server"))]
594    #[tokio::test]
595    async fn send_trusted_user_agent_list() {
596        let test_env = TEST_ENV.lock().await;
597        let (server_conn, client_conn) = test_env.setup().await;
598
599        let trusted_user_agent_list_to_send =
600            vec!["Mozilla/5.0".to_string(), "Chrome/91.0".to_string()];
601
602        let mut handler = TestHandler;
603        let handler_conn = client_conn.clone();
604        let client_handle = tokio::spawn(async move {
605            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
606
607            crate::request::handle(&mut handler, &mut send, &mut recv).await
608        });
609        let server_res = server_conn
610            .send_trusted_user_agent_list(&trusted_user_agent_list_to_send)
611            .await;
612        assert!(server_res.is_ok());
613        let client_res = client_handle.await.unwrap();
614        assert!(client_res.is_ok());
615
616        test_env.teardown(&server_conn);
617    }
618
619    #[cfg(all(feature = "client", feature = "server"))]
620    #[tokio::test]
621    async fn send_ping() {
622        let test_env = TEST_ENV.lock().await;
623        let (server_conn, client_conn) = test_env.setup().await;
624
625        let mut handler = TestHandler;
626        let handler_conn = client_conn.clone();
627        let client_handle = tokio::spawn(async move {
628            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
629
630            crate::request::handle(&mut handler, &mut send, &mut recv).await
631        });
632        let server_res = server_conn.send_ping().await;
633        assert!(server_res.is_ok());
634        let client_res = client_handle.await.unwrap();
635        assert!(client_res.is_ok());
636
637        test_env.teardown(&server_conn);
638    }
639
640    #[cfg(all(feature = "client", feature = "server"))]
641    #[tokio::test]
642    async fn send_shutdown_cmd() {
643        let test_env = TEST_ENV.lock().await;
644        let (server_conn, client_conn) = test_env.setup().await;
645
646        let mut handler = TestHandler;
647        let handler_conn = client_conn.clone();
648        let client_handle = tokio::spawn(async move {
649            let (mut send, mut recv) = handler_conn.accept_bi().await.unwrap();
650
651            crate::request::handle(&mut handler, &mut send, &mut recv).await
652        });
653        let server_res = server_conn.send_shutdown_cmd().await;
654        assert!(server_res.is_ok());
655        let client_res = client_handle.await.unwrap();
656        assert!(client_res.is_ok());
657
658        test_env.teardown(&server_conn);
659    }
660}