microsoft/openvmm

Public

mirrored fromhttps://github.com/microsoft/openvmmAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
e1cdda602823a24e5785dbcb350e21da7f113215

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

petri/pipette/src/agent.rs

184lines · modecode

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The main pipette agent, which is run when the process starts.
5
6#![cfg(any(target_os = "linux", target_os = "windows"))]
7
8use anyhow::Context;
9use futures::FutureExt;
10use mesh_remote::PointToPointMesh;
11use pal_async::socket::PolledSocket;
12use pal_async::task::Spawn;
13use pal_async::timer::PolledTimer;
14use pal_async::DefaultDriver;
15use pipette_protocol::DiagnosticFile;
16use pipette_protocol::PipetteBootstrap;
17use pipette_protocol::PipetteRequest;
18use std::sync::Arc;
19use std::time::Duration;
20use unicycle::FuturesUnordered;
21use vmsocket::VmAddress;
22use vmsocket::VmSocket;
23
24pub struct Agent {
25 driver: DefaultDriver,
26 mesh: PointToPointMesh,
27 request_recv: mesh::Receiver<PipetteRequest>,
28 diag_file_send: DiagnosticSender,
29 watch_send: mesh::OneshotSender<()>,
30}
31
32#[allow(dead_code)] // Not used on all platforms yet
33#[derive(Clone)]
34pub struct DiagnosticSender(Arc<mesh::Sender<DiagnosticFile>>);
35
36impl Agent {
37 pub async fn new(driver: DefaultDriver) -> anyhow::Result<Self> {
38 let socket = VmSocket::new()?;
39 // Extend the default timeout of 2 seconds, as tests are often run in
40 // parallel on a host, causing very heavy load on the overall system.
41 socket
42 .set_connect_timeout(Duration::from_secs(5))
43 .context("failed to set socket timeout")?;
44
45 let socket = socket
46 .connect(VmAddress::vsock_host(pipette_protocol::PIPETTE_VSOCK_PORT))
47 .context("failed to connect to vsock")?;
48 let socket =
49 PolledSocket::new(&driver, socket).context("failed to create polled socket")?;
50
51 let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
52 let mesh = PointToPointMesh::new(&driver, socket, bootstrap_recv.into());
53
54 let (request_send, request_recv) = mesh::channel();
55 let (diag_file_send, diag_file_recv) = mesh::channel();
56 let (watch_send, watch_recv) = mesh::oneshot();
57 let log = crate::trace::init_tracing();
58
59 bootstrap_send.send(PipetteBootstrap {
60 requests: request_send,
61 diag_file_recv,
62 watch: watch_recv,
63 log,
64 });
65
66 Ok(Self {
67 driver,
68 mesh,
69 request_recv,
70 diag_file_send: DiagnosticSender(Arc::new(diag_file_send)),
71 watch_send,
72 })
73 }
74
75 pub async fn run(mut self) -> anyhow::Result<()> {
76 let mut tasks = FuturesUnordered::new();
77 loop {
78 futures::select! {
79 req = self.request_recv.recv().fuse() => {
80 match req {
81 Ok(req) => {
82 tasks.push(handle_request(&self.driver, req, self.diag_file_send.clone()));
83 },
84 Err(e) => {
85 tracing::info!(?e, "request channel closed, shutting down");
86 break;
87 }
88 }
89 }
90 _ = tasks.next() => {}
91 }
92 }
93 self.watch_send.send(());
94 self.mesh.shutdown().await;
95 Ok(())
96 }
97}
98
99async fn handle_request(
100 driver: &DefaultDriver,
101 req: PipetteRequest,
102 _diag_file_send: DiagnosticSender, // Not used on all platforms yet
103) {
104 match req {
105 PipetteRequest::Ping(rpc) => rpc.handle_sync(|()| {
106 tracing::info!("ping");
107 }),
108 PipetteRequest::Execute(rpc) => rpc.handle_failable_sync(crate::execute::handle_execute),
109 PipetteRequest::Shutdown(rpc) => {
110 rpc.handle_sync(|request| {
111 tracing::info!(shutdown_type = ?request.shutdown_type, "shutdown request");
112 // TODO: handle this inline without waiting. Currently we spawn
113 // a task so that the response is sent before the shutdown
114 // starts, since hvlite fails to notice that the connection is
115 // closed if we power off while a response is pending.
116 let mut timer = PolledTimer::new(driver);
117 driver
118 .spawn("shutdown", async move {
119 // Because pipette runs as a system service on Windows
120 // it is able to issue a shutdown command before Windows
121 // has finished starting up and logging in the user. This
122 // can put the system into a stuck state, where it is
123 // completely unable to shut down. To avoid this, we
124 // wait for a longer period before attempting to shut down.
125 #[cfg(windows)]
126 timer.sleep(Duration::from_secs(5)).await;
127 #[cfg(not(windows))]
128 timer.sleep(Duration::from_millis(250)).await;
129 loop {
130 if let Err(err) = crate::shutdown::handle_shutdown(request) {
131 tracing::error!(
132 error = err.as_ref() as &dyn std::error::Error,
133 "failed to shut down"
134 );
135 }
136 timer.sleep(Duration::from_secs(5)).await;
137 tracing::warn!("still waiting to shut down, trying again");
138 }
139 })
140 .detach();
141 Ok(())
142 })
143 }
144 PipetteRequest::ReadFile(rpc) => rpc.handle_failable(read_file).await,
145 PipetteRequest::WriteFile(rpc) => rpc.handle_failable(write_file).await,
146 }
147}
148
149async fn read_file(mut request: pipette_protocol::ReadFileRequest) -> anyhow::Result<()> {
150 tracing::debug!(path = request.path, "Beginning file read request");
151 let file = fs_err::File::open(request.path)?;
152 futures::io::copy(&mut futures::io::AllowStdIo::new(file), &mut request.sender).await?;
153 tracing::debug!("file read request complete");
154 Ok(())
155}
156
157async fn write_file(mut request: pipette_protocol::WriteFileRequest) -> anyhow::Result<()> {
158 tracing::debug!(path = request.path, "Beginning file write request");
159 let file = fs_err::File::create(request.path)?;
160 futures::io::copy(
161 &mut request.receiver,
162 &mut futures::io::AllowStdIo::new(file),
163 )
164 .await?;
165 tracing::debug!("file write request complete");
166 Ok(())
167}
168
169impl DiagnosticSender {
170 #[allow(dead_code)] // Not used on all platforms yet
171 pub async fn send(&self, filename: &str) -> anyhow::Result<()> {
172 tracing::debug!(filename, "Beginning diagnostic file request");
173 let file = fs_err::File::open(filename)?;
174 let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
175 self.0.send(DiagnosticFile {
176 name: filename.to_owned(),
177 receiver: recv_pipe,
178 });
179 futures::io::copy(&mut futures::io::AllowStdIo::new(file), &mut send_pipe).await?;
180 drop(send_pipe);
181 tracing::debug!("diagnostic request complete");
182 Ok(())
183 }
184}
185