microsoft/openvmm
Publicmirrored fromhttps://github.com/microsoft/openvmmAvailable
petri/pipette/src/agent.rs
184lines · modecode
| 1 | // Copyright (c) Microsoft Corporation. |
| 2 | // Licensed under the MIT License. |
| 3 | |
| 4 | //! The main pipette agent, which is run when the process starts. |
| 5 | |
| 6 | #![cfg(any(target_os = "linux", target_os = "windows"))] |
| 7 | |
| 8 | use anyhow::Context; |
| 9 | use futures::FutureExt; |
| 10 | use mesh_remote::PointToPointMesh; |
| 11 | use pal_async::socket::PolledSocket; |
| 12 | use pal_async::task::Spawn; |
| 13 | use pal_async::timer::PolledTimer; |
| 14 | use pal_async::DefaultDriver; |
| 15 | use pipette_protocol::DiagnosticFile; |
| 16 | use pipette_protocol::PipetteBootstrap; |
| 17 | use pipette_protocol::PipetteRequest; |
| 18 | use std::sync::Arc; |
| 19 | use std::time::Duration; |
| 20 | use unicycle::FuturesUnordered; |
| 21 | use vmsocket::VmAddress; |
| 22 | use vmsocket::VmSocket; |
| 23 | |
| 24 | pub struct Agent { |
| 25 | driver: DefaultDriver, |
| 26 | mesh: PointToPointMesh, |
| 27 | request_recv: mesh::Receiver<PipetteRequest>, |
| 28 | diag_file_send: DiagnosticSender, |
| 29 | watch_send: mesh::OneshotSender<()>, |
| 30 | } |
| 31 | |
| 32 | #[allow(dead_code)] // Not used on all platforms yet |
| 33 | #[derive(Clone)] |
| 34 | pub struct DiagnosticSender(Arc<mesh::Sender<DiagnosticFile>>); |
| 35 | |
| 36 | impl Agent { |
| 37 | pub async fn new(driver: DefaultDriver) -> anyhow::Result<Self> { |
| 38 | let socket = VmSocket::new()?; |
| 39 | // Extend the default timeout of 2 seconds, as tests are often run in |
| 40 | // parallel on a host, causing very heavy load on the overall system. |
| 41 | socket |
| 42 | .set_connect_timeout(Duration::from_secs(5)) |
| 43 | .context("failed to set socket timeout")?; |
| 44 | |
| 45 | let socket = socket |
| 46 | .connect(VmAddress::vsock_host(pipette_protocol::PIPETTE_VSOCK_PORT)) |
| 47 | .context("failed to connect to vsock")?; |
| 48 | let socket = |
| 49 | PolledSocket::new(&driver, socket).context("failed to create polled socket")?; |
| 50 | |
| 51 | let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>(); |
| 52 | let mesh = PointToPointMesh::new(&driver, socket, bootstrap_recv.into()); |
| 53 | |
| 54 | let (request_send, request_recv) = mesh::channel(); |
| 55 | let (diag_file_send, diag_file_recv) = mesh::channel(); |
| 56 | let (watch_send, watch_recv) = mesh::oneshot(); |
| 57 | let log = crate::trace::init_tracing(); |
| 58 | |
| 59 | bootstrap_send.send(PipetteBootstrap { |
| 60 | requests: request_send, |
| 61 | diag_file_recv, |
| 62 | watch: watch_recv, |
| 63 | log, |
| 64 | }); |
| 65 | |
| 66 | Ok(Self { |
| 67 | driver, |
| 68 | mesh, |
| 69 | request_recv, |
| 70 | diag_file_send: DiagnosticSender(Arc::new(diag_file_send)), |
| 71 | watch_send, |
| 72 | }) |
| 73 | } |
| 74 | |
| 75 | pub async fn run(mut self) -> anyhow::Result<()> { |
| 76 | let mut tasks = FuturesUnordered::new(); |
| 77 | loop { |
| 78 | futures::select! { |
| 79 | req = self.request_recv.recv().fuse() => { |
| 80 | match req { |
| 81 | Ok(req) => { |
| 82 | tasks.push(handle_request(&self.driver, req, self.diag_file_send.clone())); |
| 83 | }, |
| 84 | Err(e) => { |
| 85 | tracing::info!(?e, "request channel closed, shutting down"); |
| 86 | break; |
| 87 | } |
| 88 | } |
| 89 | } |
| 90 | _ = tasks.next() => {} |
| 91 | } |
| 92 | } |
| 93 | self.watch_send.send(()); |
| 94 | self.mesh.shutdown().await; |
| 95 | Ok(()) |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | async fn handle_request( |
| 100 | driver: &DefaultDriver, |
| 101 | req: PipetteRequest, |
| 102 | _diag_file_send: DiagnosticSender, // Not used on all platforms yet |
| 103 | ) { |
| 104 | match req { |
| 105 | PipetteRequest::Ping(rpc) => rpc.handle_sync(|()| { |
| 106 | tracing::info!("ping"); |
| 107 | }), |
| 108 | PipetteRequest::Execute(rpc) => rpc.handle_failable_sync(crate::execute::handle_execute), |
| 109 | PipetteRequest::Shutdown(rpc) => { |
| 110 | rpc.handle_sync(|request| { |
| 111 | tracing::info!(shutdown_type = ?request.shutdown_type, "shutdown request"); |
| 112 | // TODO: handle this inline without waiting. Currently we spawn |
| 113 | // a task so that the response is sent before the shutdown |
| 114 | // starts, since hvlite fails to notice that the connection is |
| 115 | // closed if we power off while a response is pending. |
| 116 | let mut timer = PolledTimer::new(driver); |
| 117 | driver |
| 118 | .spawn("shutdown", async move { |
| 119 | // Because pipette runs as a system service on Windows |
| 120 | // it is able to issue a shutdown command before Windows |
| 121 | // has finished starting up and logging in the user. This |
| 122 | // can put the system into a stuck state, where it is |
| 123 | // completely unable to shut down. To avoid this, we |
| 124 | // wait for a longer period before attempting to shut down. |
| 125 | #[cfg(windows)] |
| 126 | timer.sleep(Duration::from_secs(5)).await; |
| 127 | #[cfg(not(windows))] |
| 128 | timer.sleep(Duration::from_millis(250)).await; |
| 129 | loop { |
| 130 | if let Err(err) = crate::shutdown::handle_shutdown(request) { |
| 131 | tracing::error!( |
| 132 | error = err.as_ref() as &dyn std::error::Error, |
| 133 | "failed to shut down" |
| 134 | ); |
| 135 | } |
| 136 | timer.sleep(Duration::from_secs(5)).await; |
| 137 | tracing::warn!("still waiting to shut down, trying again"); |
| 138 | } |
| 139 | }) |
| 140 | .detach(); |
| 141 | Ok(()) |
| 142 | }) |
| 143 | } |
| 144 | PipetteRequest::ReadFile(rpc) => rpc.handle_failable(read_file).await, |
| 145 | PipetteRequest::WriteFile(rpc) => rpc.handle_failable(write_file).await, |
| 146 | } |
| 147 | } |
| 148 | |
| 149 | async fn read_file(mut request: pipette_protocol::ReadFileRequest) -> anyhow::Result<()> { |
| 150 | tracing::debug!(path = request.path, "Beginning file read request"); |
| 151 | let file = fs_err::File::open(request.path)?; |
| 152 | futures::io::copy(&mut futures::io::AllowStdIo::new(file), &mut request.sender).await?; |
| 153 | tracing::debug!("file read request complete"); |
| 154 | Ok(()) |
| 155 | } |
| 156 | |
| 157 | async fn write_file(mut request: pipette_protocol::WriteFileRequest) -> anyhow::Result<()> { |
| 158 | tracing::debug!(path = request.path, "Beginning file write request"); |
| 159 | let file = fs_err::File::create(request.path)?; |
| 160 | futures::io::copy( |
| 161 | &mut request.receiver, |
| 162 | &mut futures::io::AllowStdIo::new(file), |
| 163 | ) |
| 164 | .await?; |
| 165 | tracing::debug!("file write request complete"); |
| 166 | Ok(()) |
| 167 | } |
| 168 | |
| 169 | impl DiagnosticSender { |
| 170 | #[allow(dead_code)] // Not used on all platforms yet |
| 171 | pub async fn send(&self, filename: &str) -> anyhow::Result<()> { |
| 172 | tracing::debug!(filename, "Beginning diagnostic file request"); |
| 173 | let file = fs_err::File::open(filename)?; |
| 174 | let (recv_pipe, mut send_pipe) = mesh::pipe::pipe(); |
| 175 | self.0.send(DiagnosticFile { |
| 176 | name: filename.to_owned(), |
| 177 | receiver: recv_pipe, |
| 178 | }); |
| 179 | futures::io::copy(&mut futures::io::AllowStdIo::new(file), &mut send_pipe).await?; |
| 180 | drop(send_pipe); |
| 181 | tracing::debug!("diagnostic request complete"); |
| 182 | Ok(()) |
| 183 | } |
| 184 | } |
| 185 | |