microsoft/openvmm
Publicmirrored fromhttps://github.com/microsoft/openvmmAvailable
openhcl/diag_server/src/diag_service.rs
834lines · modecode
| 1 | // Copyright (c) Microsoft Corporation. |
| 2 | // Licensed under the MIT License. |
| 3 | |
| 4 | //! RPC service for diagnostics. |
| 5 | |
| 6 | use crate::grpc_result; |
| 7 | use crate::new_pty; |
| 8 | use anyhow::Context; |
| 9 | use azure_profiler_proto::AzureProfiler; |
| 10 | use azure_profiler_proto::ProfileRequest; |
| 11 | use diag_proto::ExecRequest; |
| 12 | use diag_proto::ExecResponse; |
| 13 | use diag_proto::FILE_LINE_MAX; |
| 14 | use diag_proto::FileRequest; |
| 15 | use diag_proto::KmsgRequest; |
| 16 | use diag_proto::NetworkPacketCaptureRequest; |
| 17 | use diag_proto::NetworkPacketCaptureResponse; |
| 18 | use diag_proto::OpenhclDiag; |
| 19 | use diag_proto::StartRequest; |
| 20 | use diag_proto::UnderhillDiag; |
| 21 | use diag_proto::WaitRequest; |
| 22 | use diag_proto::WaitResponse; |
| 23 | use diag_proto::network_packet_capture_request::Operation; |
| 24 | use futures::AsyncRead; |
| 25 | use futures::AsyncReadExt; |
| 26 | use futures::AsyncWrite; |
| 27 | use futures::AsyncWriteExt; |
| 28 | use futures::FutureExt; |
| 29 | use futures::StreamExt; |
| 30 | use futures::future::join_all; |
| 31 | use futures::io::AllowStdIo; |
| 32 | use futures_concurrency::stream::Merge; |
| 33 | use inspect::InspectionBuilder; |
| 34 | use inspect_proto::InspectRequest; |
| 35 | use inspect_proto::InspectResponse2; |
| 36 | use inspect_proto::InspectService; |
| 37 | use inspect_proto::UpdateRequest; |
| 38 | use inspect_proto::UpdateResponse2; |
| 39 | use mesh::CancelContext; |
| 40 | use mesh::rpc::FailableRpc; |
| 41 | use mesh::rpc::RpcSend; |
| 42 | use mesh_rpc::server::RpcReceiver; |
| 43 | use net_packet_capture::OperationData; |
| 44 | use net_packet_capture::PacketCaptureOperation; |
| 45 | use net_packet_capture::PacketCaptureParams; |
| 46 | use net_packet_capture::StartData; |
| 47 | use pal::unix::process::Stdio; |
| 48 | use pal_async::driver::Driver; |
| 49 | use pal_async::interest::InterestSlot; |
| 50 | use pal_async::interest::PollEvents; |
| 51 | use pal_async::pipe::PolledPipe; |
| 52 | use pal_async::socket::AsSockRef; |
| 53 | use pal_async::socket::PollReady; |
| 54 | use pal_async::socket::PollReadyExt; |
| 55 | use pal_async::socket::PolledSocket; |
| 56 | use pal_async::task::Spawn; |
| 57 | use pal_async::task::Task; |
| 58 | use parking_lot::Mutex; |
| 59 | use socket2::Socket; |
| 60 | use std::collections::HashMap; |
| 61 | use std::fs::File; |
| 62 | use std::future::poll_fn; |
| 63 | use std::io; |
| 64 | use std::io::Read; |
| 65 | use std::os::unix::fs::FileTypeExt; |
| 66 | use std::os::unix::prelude::*; |
| 67 | use std::process::ExitStatus; |
| 68 | use std::sync::Arc; |
| 69 | |
| 70 | /// A diagnostics request. |
| 71 | #[derive(Debug, mesh::MeshPayload)] |
| 72 | pub enum DiagRequest { |
| 73 | /// Start the VM, if it has not already been started. |
| 74 | Start(FailableRpc<StartParams, ()>), |
| 75 | /// Inspect the VM. |
| 76 | Inspect(inspect::Deferred), |
| 77 | /// Crash the VM |
| 78 | Crash(i32), |
| 79 | /// Restart the worker. |
| 80 | Restart(FailableRpc<(), ()>), |
| 81 | /// Pause VTL0 |
| 82 | Pause(FailableRpc<(), ()>), |
| 83 | /// Resume VTL0 |
| 84 | Resume(FailableRpc<(), ()>), |
| 85 | /// Save VTL2 state |
| 86 | Save(FailableRpc<(), Vec<u8>>), |
| 87 | /// Setup network trace |
| 88 | PacketCapture(FailableRpc<PacketCaptureParams<Socket>, PacketCaptureParams<Socket>>), |
| 89 | /// Profile VTL2 |
| 90 | #[cfg(feature = "profiler")] |
| 91 | Profile(FailableRpc<profiler_worker::ProfilerRequest, ()>), |
| 92 | } |
| 93 | |
| 94 | /// Additional parameters provided as part of a delayed start request. |
| 95 | #[derive(Debug, mesh::MeshPayload)] |
| 96 | pub struct StartParams { |
| 97 | /// Environment variables to set or remove. |
| 98 | pub env: Vec<(String, Option<String>)>, |
| 99 | /// Command line arguments to append. |
| 100 | pub args: Vec<String>, |
| 101 | } |
| 102 | |
| 103 | pub(crate) struct DiagServiceHandler { |
| 104 | request_send: mesh::Sender<DiagRequest>, |
| 105 | children: Mutex<HashMap<i32, Task<ExitStatus>>>, |
| 106 | inspect_sensitivity_level: Option<inspect::SensitivityLevel>, |
| 107 | inner: Arc<crate::Inner>, |
| 108 | } |
| 109 | |
| 110 | impl DiagServiceHandler { |
| 111 | pub fn new(request_send: mesh::Sender<DiagRequest>, inner: Arc<crate::Inner>) -> Self { |
| 112 | Self { |
| 113 | children: Default::default(), |
| 114 | request_send, |
| 115 | // On CVMs only allow inspecting nodes defined as safe. |
| 116 | inspect_sensitivity_level: if underhill_confidentiality::confidential_filtering_enabled( |
| 117 | ) { |
| 118 | Some(inspect::SensitivityLevel::Safe) |
| 119 | } else { |
| 120 | None |
| 121 | }, |
| 122 | // TODO: use a remotable type for `Inner`, which is just used to get |
| 123 | // data connection sockets. |
| 124 | inner, |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | pub async fn process_requests( |
| 129 | self: &Arc<Self>, |
| 130 | driver: &(impl Driver + Spawn + Clone), |
| 131 | diag_recv: RpcReceiver<UnderhillDiag>, |
| 132 | diag2_recv: RpcReceiver<OpenhclDiag>, |
| 133 | inspect_recv: RpcReceiver<InspectService>, |
| 134 | profile_recv: RpcReceiver<AzureProfiler>, |
| 135 | ) -> anyhow::Result<()> { |
| 136 | enum Event { |
| 137 | Diag(UnderhillDiag), |
| 138 | Diag2(OpenhclDiag), |
| 139 | Inspect(InspectService), |
| 140 | Profile(AzureProfiler), |
| 141 | } |
| 142 | let mut s = ( |
| 143 | diag_recv.map(|(ctx, req)| (ctx, Event::Diag(req))), |
| 144 | diag2_recv.map(|(ctx, req)| (ctx, Event::Diag2(req))), |
| 145 | inspect_recv.map(|(ctx, req)| (ctx, Event::Inspect(req))), |
| 146 | profile_recv.map(|(ctx, req)| (ctx, Event::Profile(req))), |
| 147 | ) |
| 148 | .merge(); |
| 149 | |
| 150 | while let Some((ctx, req)) = s.next().await { |
| 151 | driver |
| 152 | .spawn("diag request", { |
| 153 | let driver = driver.clone(); |
| 154 | let this = self.clone(); |
| 155 | async move { |
| 156 | match req { |
| 157 | Event::Diag(req) => this.handle_diag_request(&driver, req, ctx).await, |
| 158 | Event::Diag2(req) => this.handle_diag2_request(&driver, req, ctx).await, |
| 159 | Event::Inspect(req) => this.handle_inspect_request(req, ctx).await, |
| 160 | Event::Profile(req) => this.handle_profile_request(req, ctx).await, |
| 161 | } |
| 162 | } |
| 163 | }) |
| 164 | .detach(); |
| 165 | } |
| 166 | Ok(()) |
| 167 | } |
| 168 | |
| 169 | async fn take_connection(&self, id: u64) -> anyhow::Result<PolledSocket<Socket>> { |
| 170 | self.inner.take_connection(id).await |
| 171 | } |
| 172 | |
| 173 | async fn handle_inspect_request(&self, req: InspectService, mut ctx: CancelContext) { |
| 174 | match req { |
| 175 | InspectService::Inspect(request, response) => { |
| 176 | let inspect_response = self.handle_inspect(&request, ctx).await; |
| 177 | response.send(grpc_result(Ok(Ok(inspect_response)))); |
| 178 | } |
| 179 | InspectService::Update(request, response) => { |
| 180 | response.send(grpc_result( |
| 181 | ctx.until_cancelled(self.handle_update(&request)).await, |
| 182 | )); |
| 183 | } |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | async fn handle_profile_request(&self, req: AzureProfiler, mut ctx: CancelContext) { |
| 188 | match req { |
| 189 | AzureProfiler::Profile(request, response) => response.send(grpc_result( |
| 190 | ctx.until_cancelled(self.handle_profile(request)).await, |
| 191 | )), |
| 192 | } |
| 193 | } |
| 194 | |
| 195 | async fn handle_diag_request( |
| 196 | &self, |
| 197 | driver: &(impl Driver + Spawn + Clone), |
| 198 | req: UnderhillDiag, |
| 199 | mut ctx: CancelContext, |
| 200 | ) { |
| 201 | match req { |
| 202 | UnderhillDiag::Exec(request, response) => response.send(grpc_result( |
| 203 | ctx.until_cancelled(self.handle_exec(driver, &request)) |
| 204 | .await, |
| 205 | )), |
| 206 | UnderhillDiag::Wait(request, response) => response.send(grpc_result( |
| 207 | ctx.until_cancelled(self.handle_wait(&request)).await, |
| 208 | )), |
| 209 | UnderhillDiag::Start(request, response) => { |
| 210 | response.send(grpc_result( |
| 211 | ctx.until_cancelled(self.handle_start(request)).await, |
| 212 | )); |
| 213 | } |
| 214 | UnderhillDiag::Kmsg(request, response) => { |
| 215 | response.send(grpc_result(Ok(self.handle_kmsg(driver, &request).await))) |
| 216 | } |
| 217 | UnderhillDiag::Crash(request, response) => { |
| 218 | response.send(grpc_result( |
| 219 | ctx.until_cancelled(self.handle_crash(request)).await, |
| 220 | )); |
| 221 | } |
| 222 | UnderhillDiag::Restart(_, response) => { |
| 223 | response.send(grpc_result( |
| 224 | ctx.until_cancelled(self.handle_restart()).await, |
| 225 | )); |
| 226 | } |
| 227 | UnderhillDiag::ReadFile(request, response) => response.send(grpc_result(Ok(self |
| 228 | .handle_read_file(driver, &request) |
| 229 | .await))), |
| 230 | UnderhillDiag::Pause(_, response) => { |
| 231 | response.send(grpc_result(ctx.until_cancelled(self.handle_pause()).await)) |
| 232 | } |
| 233 | UnderhillDiag::PacketCapture(request, response) => response.send(grpc_result( |
| 234 | ctx.until_cancelled(self.handle_packet_capture(&request)) |
| 235 | .await, |
| 236 | )), |
| 237 | UnderhillDiag::Resume(_, response) => { |
| 238 | response.send(grpc_result(ctx.until_cancelled(self.handle_resume()).await)) |
| 239 | } |
| 240 | UnderhillDiag::DumpSavedState((), response) => response.send(grpc_result( |
| 241 | ctx.until_cancelled(self.handle_dump_saved_state()).await, |
| 242 | )), |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | async fn handle_diag2_request( |
| 247 | &self, |
| 248 | _driver: &(impl Driver + Spawn + Clone), |
| 249 | req: OpenhclDiag, |
| 250 | _ctx: CancelContext, |
| 251 | ) { |
| 252 | match req { |
| 253 | OpenhclDiag::Ping((), response) => { |
| 254 | response.send(Ok(())); |
| 255 | } |
| 256 | } |
| 257 | } |
| 258 | |
| 259 | async fn handle_start(&self, request: StartRequest) -> anyhow::Result<()> { |
| 260 | let params = StartParams { |
| 261 | env: request |
| 262 | .env |
| 263 | .into_iter() |
| 264 | .map(|pair| (pair.name, pair.value)) |
| 265 | .collect(), |
| 266 | args: request.args, |
| 267 | }; |
| 268 | self.request_send |
| 269 | .call_failable(DiagRequest::Start, params) |
| 270 | .await?; |
| 271 | Ok(()) |
| 272 | } |
| 273 | |
| 274 | async fn handle_crash(&self, request: diag_proto::CrashRequest) -> anyhow::Result<()> { |
| 275 | self.request_send.send(DiagRequest::Crash(request.pid)); |
| 276 | |
| 277 | Ok(()) |
| 278 | } |
| 279 | |
| 280 | async fn handle_exec( |
| 281 | &self, |
| 282 | driver: &(impl Driver + Spawn + Clone), |
| 283 | request: &ExecRequest, |
| 284 | ) -> anyhow::Result<ExecResponse> { |
| 285 | tracing::info!( |
| 286 | command = %request.command, |
| 287 | stdin = request.stdin, |
| 288 | stdout = request.stdout, |
| 289 | stderr = request.stderr, |
| 290 | tty = request.tty, |
| 291 | "exec request" |
| 292 | ); |
| 293 | |
| 294 | let stdin = if request.stdin != 0 { |
| 295 | Some( |
| 296 | self.take_connection(request.stdin) |
| 297 | .await |
| 298 | .context("failed to get stdin conn")?, |
| 299 | ) |
| 300 | } else { |
| 301 | None |
| 302 | }; |
| 303 | let stdout = if request.stdout != 0 { |
| 304 | Some( |
| 305 | self.take_connection(request.stdout) |
| 306 | .await |
| 307 | .context("failed to get stdout conn")?, |
| 308 | ) |
| 309 | } else { |
| 310 | None |
| 311 | }; |
| 312 | let stderr = if request.stderr != 0 { |
| 313 | Some( |
| 314 | self.take_connection(request.stderr) |
| 315 | .await |
| 316 | .context("failed to get stderr conn")?, |
| 317 | ) |
| 318 | } else { |
| 319 | None |
| 320 | }; |
| 321 | |
| 322 | let mut builder = pal::unix::process::Builder::new(&request.command); |
| 323 | builder.args(&request.args); |
| 324 | if request.clear_env { |
| 325 | builder.env_clear(); |
| 326 | } |
| 327 | for diag_proto::EnvPair { name, value } in &request.env { |
| 328 | if let Some(value) = value { |
| 329 | builder.env(name, value); |
| 330 | } else { |
| 331 | builder.env_remove(name); |
| 332 | } |
| 333 | } |
| 334 | |
| 335 | // HACK: A hack to fix segfault caused by glibc bug in L1 TDX VMM. |
| 336 | // Should be removed after glibc update or a clean CPUID virtualization solution. |
| 337 | // Please refer to https://github.com/microsoft/HvLite/issues/872 for more information. |
| 338 | let tdx_isolated = if cfg!(guest_arch = "x86_64") { |
| 339 | // xtask-fmt allow-target-arch cpu-intrinsic |
| 340 | #[cfg(target_arch = "x86_64")] |
| 341 | { |
| 342 | let result = safe_intrinsics::cpuid( |
| 343 | hvdef::HV_CPUID_FUNCTION_MS_HV_ISOLATION_CONFIGURATION, |
| 344 | 0, |
| 345 | ); |
| 346 | // Value 3 means TDX. |
| 347 | (result.ebx & 0xF) == 3 |
| 348 | } |
| 349 | // xtask-fmt allow-target-arch cpu-intrinsic |
| 350 | #[cfg(not(target_arch = "x86_64"))] |
| 351 | { |
| 352 | false |
| 353 | } |
| 354 | } else { |
| 355 | false |
| 356 | }; |
| 357 | if tdx_isolated { |
| 358 | builder.env("GLIBC_TUNABLES", "glibc.cpu.x86_non_temporal_threshold=0x11a000:glibc.cpu.x86_rep_movsb_threshold=0x4000"); |
| 359 | } |
| 360 | |
| 361 | let mut stdin_relay = None; |
| 362 | let mut stdout_relay = None; |
| 363 | let mut stderr_relay = None; |
| 364 | let mut raw_stdout = None; |
| 365 | let mut raw_stderr = None; |
| 366 | let mut child = { |
| 367 | let (stdin_pipes, stdout_pipes, stderr_pipes); |
| 368 | let stdin_socket; |
| 369 | let stdout_socket; |
| 370 | let stderr_socket; |
| 371 | let pty; |
| 372 | if request.tty { |
| 373 | pty = new_pty::new_pty().context("failed to create pty")?; |
| 374 | |
| 375 | let primary = PolledPipe::new(driver, pty.0) |
| 376 | .context("failed to create polled pty primary")?; |
| 377 | |
| 378 | let secondary = &pty.1; |
| 379 | |
| 380 | let (primary_read, primary_write) = primary.split(); |
| 381 | if let Some(stdin) = stdin { |
| 382 | stdin_relay = Some(driver.spawn("pty stdin relay", async move { |
| 383 | relay(stdin, primary_write).await; |
| 384 | })); |
| 385 | } |
| 386 | if let Some(stdout) = stdout { |
| 387 | stdout_relay = |
| 388 | Some(driver.spawn("pty stdout relay", relay(primary_read, stdout))); |
| 389 | } |
| 390 | |
| 391 | builder |
| 392 | .setsid(true) |
| 393 | .controlling_terminal(secondary.as_fd()) |
| 394 | .stdin(Stdio::Fd(secondary.as_fd())) |
| 395 | .stdout(Stdio::Fd(secondary.as_fd())) |
| 396 | .stderr(Stdio::Fd(secondary.as_fd())); |
| 397 | } else if request.raw_socket_io { |
| 398 | if let Some(stdin) = stdin { |
| 399 | stdin_socket = stdin.into_inner(); |
| 400 | builder.stdin(Stdio::Fd(stdin_socket.as_fd())); |
| 401 | } |
| 402 | if let Some(stdout) = stdout { |
| 403 | stdout_socket = raw_stdout.insert(stdout.into_inner()); |
| 404 | builder.stdout(Stdio::Fd(stdout_socket.as_fd())); |
| 405 | if request.combine_stderr { |
| 406 | builder.stderr(Stdio::Fd(stdout_socket.as_fd())); |
| 407 | } |
| 408 | } |
| 409 | if let Some(stderr) = stderr { |
| 410 | stderr_socket = raw_stderr.insert(stderr.into_inner()); |
| 411 | builder.stderr(Stdio::Fd(stderr_socket.as_fd())); |
| 412 | } |
| 413 | } else { |
| 414 | if let Some(stdin) = stdin { |
| 415 | stdin_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?; |
| 416 | let pipe = PolledPipe::new(driver, stdin_pipes.1) |
| 417 | .context("failed to create polled pipe")?; |
| 418 | stdin_relay = Some(driver.spawn("stdin relay", async move { |
| 419 | relay(stdin, pipe).await; |
| 420 | })); |
| 421 | builder.stdin(Stdio::Fd(stdin_pipes.0.as_fd())); |
| 422 | } |
| 423 | if let Some(stdout) = stdout { |
| 424 | stdout_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?; |
| 425 | let pipe = PolledPipe::new(driver, stdout_pipes.0) |
| 426 | .context("failed to create polled pipe")?; |
| 427 | stdout_relay = Some(driver.spawn("stdout relay", relay(pipe, stdout))); |
| 428 | builder.stdout(Stdio::Fd(stdout_pipes.1.as_fd())); |
| 429 | if request.combine_stderr { |
| 430 | builder.stderr(Stdio::Fd(stdout_pipes.1.as_fd())); |
| 431 | } |
| 432 | } |
| 433 | if let Some(stderr) = stderr { |
| 434 | stderr_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?; |
| 435 | let pipe = PolledPipe::new(driver, stderr_pipes.0) |
| 436 | .context("failed to create polled pipe")?; |
| 437 | stderr_relay = Some(driver.spawn("stderr relay", relay(pipe, stderr))); |
| 438 | builder.stderr(Stdio::Fd(stderr_pipes.1.as_fd())); |
| 439 | } |
| 440 | } |
| 441 | |
| 442 | builder |
| 443 | .spawn() |
| 444 | .with_context(|| format!("failed to launch {}", &request.command))? |
| 445 | }; |
| 446 | |
| 447 | let pid = child.id(); |
| 448 | |
| 449 | tracing::info!(pid, "spawned child"); |
| 450 | |
| 451 | let mut child_ready = driver |
| 452 | .new_dyn_fd_ready(child.as_fd().as_raw_fd()) |
| 453 | .expect("failed creating child poll"); |
| 454 | |
| 455 | let status = driver.spawn("diag child wait", { |
| 456 | let driver = driver.clone(); |
| 457 | async move { |
| 458 | poll_fn(|cx| child_ready.poll_fd_ready(cx, InterestSlot::Read, PollEvents::IN)) |
| 459 | .await; |
| 460 | let status = child.try_wait().unwrap().unwrap(); |
| 461 | tracing::info!(pid, ?status, "child exited"); |
| 462 | |
| 463 | // The process is gone, so the stdin relay's job is done. |
| 464 | drop(stdin_relay); |
| 465 | |
| 466 | // Shut down raw stdout and stderr to notify the host that there |
| 467 | // is no more data. |
| 468 | let finish_raw = |raw: Option<Socket>| { |
| 469 | raw.and_then(|raw| { |
| 470 | let _ = raw.as_sock_ref().shutdown(std::net::Shutdown::Write); |
| 471 | PolledSocket::new(&driver, raw).ok() |
| 472 | }) |
| 473 | }; |
| 474 | let raw_stdout = finish_raw(raw_stdout); |
| 475 | let raw_stderr = finish_raw(raw_stderr); |
| 476 | |
| 477 | // Wait for the host to finish with the stdout and stderr |
| 478 | // sockets, but don't block the process exit notification. |
| 479 | driver |
| 480 | .spawn("socket-wait", async move { |
| 481 | let await_output_relay = async |task, raw| { |
| 482 | let socket = if let Some(task) = task { |
| 483 | Some(task.await) |
| 484 | } else { |
| 485 | raw |
| 486 | }; |
| 487 | if let Some(socket) = socket { |
| 488 | // Wait for the host to close the socket to ensure that all |
| 489 | // the data is written. |
| 490 | let _ = futures::io::copy(socket, &mut futures::io::sink()).await; |
| 491 | } |
| 492 | }; |
| 493 | |
| 494 | await_output_relay(stdout_relay, raw_stdout).await; |
| 495 | await_output_relay(stderr_relay, raw_stderr).await; |
| 496 | }) |
| 497 | .detach(); |
| 498 | |
| 499 | status |
| 500 | } |
| 501 | }); |
| 502 | |
| 503 | self.children.lock().insert(pid, status); |
| 504 | Ok(ExecResponse { pid }) |
| 505 | } |
| 506 | |
| 507 | async fn handle_wait(&self, request: &WaitRequest) -> anyhow::Result<WaitResponse> { |
| 508 | tracing::debug!(pid = request.pid, "wait request"); |
| 509 | let channel = self |
| 510 | .children |
| 511 | .lock() |
| 512 | .remove(&request.pid) |
| 513 | .context("pid not found")?; |
| 514 | |
| 515 | let status = channel.await; |
| 516 | let exit_code = status.code().unwrap_or(255); |
| 517 | |
| 518 | tracing::debug!(pid = request.pid, exit_code, "wait complete"); |
| 519 | |
| 520 | Ok(WaitResponse { exit_code }) |
| 521 | } |
| 522 | |
| 523 | async fn handle_inspect( |
| 524 | &self, |
| 525 | request: &InspectRequest, |
| 526 | mut ctx: CancelContext, |
| 527 | ) -> InspectResponse2 { |
| 528 | tracing::debug!( |
| 529 | path = request.path.as_str(), |
| 530 | depth = request.depth, |
| 531 | "inspect request" |
| 532 | ); |
| 533 | let mut inspection = InspectionBuilder::new(&request.path) |
| 534 | .depth(Some(request.depth as usize)) |
| 535 | .sensitivity(self.inspect_sensitivity_level) |
| 536 | .inspect(inspect::adhoc(|req| { |
| 537 | self.request_send.send(DiagRequest::Inspect(req.defer())); |
| 538 | })); |
| 539 | |
| 540 | // Don't return early on cancel, as we want to return the partial |
| 541 | // inspection results. |
| 542 | let _ = ctx.until_cancelled(inspection.resolve()).await; |
| 543 | |
| 544 | let result = inspection.results(); |
| 545 | InspectResponse2 { result } |
| 546 | } |
| 547 | |
| 548 | async fn handle_update(&self, request: &UpdateRequest) -> anyhow::Result<UpdateResponse2> { |
| 549 | tracing::debug!( |
| 550 | path = request.path.as_str(), |
| 551 | value = request.value.as_str(), |
| 552 | "update request" |
| 553 | ); |
| 554 | let new_value = InspectionBuilder::new(&request.path) |
| 555 | .sensitivity(self.inspect_sensitivity_level) |
| 556 | .update( |
| 557 | &request.value, |
| 558 | inspect::adhoc(|req| { |
| 559 | self.request_send.send(DiagRequest::Inspect(req.defer())); |
| 560 | }), |
| 561 | ) |
| 562 | .await?; |
| 563 | Ok(UpdateResponse2 { new_value }) |
| 564 | } |
| 565 | |
| 566 | async fn handle_kmsg( |
| 567 | &self, |
| 568 | driver: &(impl Driver + Spawn + Clone), |
| 569 | request: &KmsgRequest, |
| 570 | ) -> anyhow::Result<()> { |
| 571 | self.handle_read_file_request(driver, request.conn, request.follow, "/dev/kmsg") |
| 572 | .await |
| 573 | } |
| 574 | |
| 575 | async fn handle_read_file( |
| 576 | &self, |
| 577 | driver: &(impl Driver + Spawn + Clone), |
| 578 | request: &FileRequest, |
| 579 | ) -> anyhow::Result<()> { |
| 580 | self.handle_read_file_request(driver, request.conn, request.follow, &request.file_path) |
| 581 | .await |
| 582 | } |
| 583 | |
| 584 | async fn handle_packet_capture( |
| 585 | &self, |
| 586 | request: &NetworkPacketCaptureRequest, |
| 587 | ) -> anyhow::Result<NetworkPacketCaptureResponse> { |
| 588 | let operation = if request.operation == Operation::Query as i32 { |
| 589 | PacketCaptureOperation::Query |
| 590 | } else if request.operation == Operation::Start as i32 { |
| 591 | PacketCaptureOperation::Start |
| 592 | } else if request.operation == Operation::Stop as i32 { |
| 593 | PacketCaptureOperation::Stop |
| 594 | } else { |
| 595 | anyhow::bail!("unsupported request type {}", request.operation); |
| 596 | }; |
| 597 | |
| 598 | let op_data = match operation { |
| 599 | // Query the number of streams needed, starting with a value of 0. |
| 600 | PacketCaptureOperation::Query => Some(OperationData::OpQueryData(0)), |
| 601 | PacketCaptureOperation::Start => { |
| 602 | let Some(op_data) = &request.op_data else { |
| 603 | anyhow::bail!("missing start operation parameters"); |
| 604 | }; |
| 605 | |
| 606 | match op_data { |
| 607 | diag_proto::network_packet_capture_request::OpData::StartData(start_data) => { |
| 608 | let writers = join_all(start_data.conns.iter().map(async |c| { |
| 609 | let conn = self.take_connection(*c).await?; |
| 610 | Ok(conn.into_inner()) |
| 611 | })) |
| 612 | .await |
| 613 | .into_iter() |
| 614 | .collect::<anyhow::Result<Vec<Socket>>>()?; |
| 615 | Some(OperationData::OpStartData(StartData { |
| 616 | writers, |
| 617 | snaplen: start_data.snaplen, |
| 618 | })) |
| 619 | } |
| 620 | } |
| 621 | } |
| 622 | _ => None, |
| 623 | }; |
| 624 | |
| 625 | let params = PacketCaptureParams { operation, op_data }; |
| 626 | let params = self |
| 627 | .request_send |
| 628 | .call_failable(DiagRequest::PacketCapture, params) |
| 629 | .await?; |
| 630 | let num_streams = match params.op_data { |
| 631 | Some(OperationData::OpQueryData(num_streams)) => num_streams, |
| 632 | _ => 0, |
| 633 | }; |
| 634 | Ok(NetworkPacketCaptureResponse { num_streams }) |
| 635 | } |
| 636 | |
| 637 | async fn handle_profile(&self, request: ProfileRequest) -> anyhow::Result<()> { |
| 638 | let conn = self.take_connection(request.conn).await?; |
| 639 | #[cfg(feature = "profiler")] |
| 640 | { |
| 641 | let profiler_request = profiler_worker::ProfilerRequest { |
| 642 | profiler_args: request.profiler_args, |
| 643 | duration: request.duration, |
| 644 | conn: conn.into_inner(), |
| 645 | }; |
| 646 | |
| 647 | self.request_send |
| 648 | .call_failable(DiagRequest::Profile, profiler_request) |
| 649 | .await?; |
| 650 | } |
| 651 | #[cfg(not(feature = "profiler"))] |
| 652 | { |
| 653 | // Profiler feature disabled, drop the connection. |
| 654 | drop(conn); |
| 655 | tracing::error!("Profiler feature disabled"); |
| 656 | } |
| 657 | Ok(()) |
| 658 | } |
| 659 | |
| 660 | async fn handle_read_file_request( |
| 661 | &self, |
| 662 | driver: &(impl Driver + Spawn + Clone), |
| 663 | conn: u64, |
| 664 | follow: bool, |
| 665 | file_path: &str, |
| 666 | ) -> anyhow::Result<()> { |
| 667 | let mut conn = self.take_connection(conn).await?; |
| 668 | let file = fs_err::File::open(file_path).context("failed to open file")?; |
| 669 | |
| 670 | let file_meta = file.metadata()?; |
| 671 | |
| 672 | if file_meta.file_type().is_char_device() { |
| 673 | let file = |
| 674 | PolledPipe::new(driver, file.into()).context("failed to create polled pipe")?; |
| 675 | |
| 676 | driver |
| 677 | .spawn("read file relay", async move { |
| 678 | if let Err(err) = relay_read_file(file, conn, follow).await { |
| 679 | tracing::warn!( |
| 680 | error = &*err as &dyn std::error::Error, |
| 681 | "read file relay failed" |
| 682 | ); |
| 683 | } |
| 684 | }) |
| 685 | .detach(); |
| 686 | } else if file_meta.file_type().is_file() { |
| 687 | driver |
| 688 | .spawn("read file relay", async move { |
| 689 | // Since this is a file, and in Underhill files are backed |
| 690 | // by RAM, allow blocking reads directly on this thread, |
| 691 | // since the reads should be satisfied instantly. |
| 692 | // |
| 693 | // (If this becomes a problem, we can spawn a thread to do |
| 694 | // this, or use io-uring.) |
| 695 | if let Err(err) = |
| 696 | futures::io::copy(AllowStdIo::new(File::from(file)), &mut conn).await |
| 697 | { |
| 698 | tracing::warn!( |
| 699 | error = &err as &dyn std::error::Error, |
| 700 | "read file relay failed" |
| 701 | ); |
| 702 | } |
| 703 | }) |
| 704 | .detach(); |
| 705 | } else { |
| 706 | anyhow::bail!("cannot read directory"); |
| 707 | } |
| 708 | |
| 709 | Ok(()) |
| 710 | } |
| 711 | |
| 712 | async fn handle_restart(&self) -> anyhow::Result<()> { |
| 713 | self.request_send |
| 714 | .call_failable(DiagRequest::Restart, ()) |
| 715 | .await?; |
| 716 | Ok(()) |
| 717 | } |
| 718 | |
| 719 | async fn handle_pause(&self) -> anyhow::Result<()> { |
| 720 | self.request_send |
| 721 | .call_failable(DiagRequest::Pause, ()) |
| 722 | .await?; |
| 723 | Ok(()) |
| 724 | } |
| 725 | |
| 726 | async fn handle_resume(&self) -> anyhow::Result<()> { |
| 727 | self.request_send |
| 728 | .call_failable(DiagRequest::Resume, ()) |
| 729 | .await?; |
| 730 | Ok(()) |
| 731 | } |
| 732 | |
| 733 | async fn handle_dump_saved_state(&self) -> anyhow::Result<diag_proto::DumpSavedStateResponse> { |
| 734 | let data = self |
| 735 | .request_send |
| 736 | .call_failable(DiagRequest::Save, ()) |
| 737 | .await?; |
| 738 | |
| 739 | Ok(diag_proto::DumpSavedStateResponse { data }) |
| 740 | } |
| 741 | } |
| 742 | |
| 743 | async fn relay< |
| 744 | R: 'static + AsyncRead + Unpin + Send, |
| 745 | W: 'static + AsyncWrite + PollReady + Unpin + Send, |
| 746 | >( |
| 747 | mut read: R, |
| 748 | mut write: W, |
| 749 | ) -> W { |
| 750 | let mut buffer = [0; 1024]; |
| 751 | let result: anyhow::Result<_> = async { |
| 752 | loop { |
| 753 | let n = futures::select! { // merge semantics |
| 754 | n = read.read(&mut buffer).fuse() => n.context("read failed")?, |
| 755 | _ = write.wait_ready(PollEvents::RDHUP).fuse() => { |
| 756 | // RDHUP indicates the connection is closed or shut down. |
| 757 | // Although generically this does not indicate that the |
| 758 | // connection does not want to _read_ any more data, for our |
| 759 | // use cases it does (either we are using a unidirectional |
| 760 | // pipe/socket, or we are using a pty, which never returns |
| 761 | // RDHUP but does return HUP, which is just as good). |
| 762 | // |
| 763 | // Stop this relay to propagate the close notification to |
| 764 | // the other endpoint. |
| 765 | break; |
| 766 | } |
| 767 | }; |
| 768 | if n == 0 { |
| 769 | break; |
| 770 | } |
| 771 | write |
| 772 | .write_all(&buffer[..n]) |
| 773 | .await |
| 774 | .context("write failed")?; |
| 775 | } |
| 776 | Ok(()) |
| 777 | } |
| 778 | .await; |
| 779 | let _ = write.close().await; |
| 780 | if let Err(err) = result { |
| 781 | tracing::warn!(error = &*err as &dyn std::error::Error, "relay error"); |
| 782 | } |
| 783 | write |
| 784 | } |
| 785 | |
| 786 | async fn relay_read_file( |
| 787 | mut file: PolledPipe, |
| 788 | mut conn: PolledSocket<Socket>, |
| 789 | follow: bool, |
| 790 | ) -> anyhow::Result<()> { |
| 791 | let mut buffer = [0; FILE_LINE_MAX]; |
| 792 | loop { |
| 793 | let n = if follow { |
| 794 | futures::select! { // race semantics |
| 795 | _ = conn.wait_ready(PollEvents::RDHUP).fuse() => break, |
| 796 | n = file.read(&mut buffer[..FILE_LINE_MAX - 1]).fuse() => n |
| 797 | } |
| 798 | } else { |
| 799 | // The caller just wants the current contents of file, so issue a |
| 800 | // nonblocking, non-async read, and handle EAGAIN below. |
| 801 | file.get().read(&mut buffer[..FILE_LINE_MAX - 1]) |
| 802 | }; |
| 803 | let n = match n { |
| 804 | Ok(0) => break, |
| 805 | Ok(count) => count, |
| 806 | Err(e) => { |
| 807 | match e.kind() { |
| 808 | io::ErrorKind::BrokenPipe => { |
| 809 | // The kmsg interface returns EPIPE if an entry has overwritten another in the ring. |
| 810 | // Retry the read which has the kernel move the seek position to the next available record. |
| 811 | continue; |
| 812 | } |
| 813 | io::ErrorKind::WouldBlock => { |
| 814 | // There are no more messages. |
| 815 | assert!(!follow); |
| 816 | break; |
| 817 | } |
| 818 | _ => return Err(e).context("file read failed"), |
| 819 | } |
| 820 | } |
| 821 | }; |
| 822 | assert!( |
| 823 | n < buffer.len(), |
| 824 | "the file returned a line bigger than its maximum" |
| 825 | ); |
| 826 | // Add a null terminator. |
| 827 | buffer[n] = 0; |
| 828 | // Write the message followed by a null terminator. |
| 829 | conn.write_all(&buffer[..n + 1]) |
| 830 | .await |
| 831 | .context("socket write failed")?; |
| 832 | } |
| 833 | Ok(()) |
| 834 | } |
| 835 | |