microsoft/openvmm

Public

mirrored fromhttps://github.com/microsoft/openvmmAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
1ce32569373cda4d4b485f6e6f09f15e1e4dd569

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

flowey/flowey_cli/src/cli/exec_snippet.rs

401lines · modecode

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhVarState;
10use flowey_core::node::NodeHandle;
11use flowey_core::node::steps::rust::RustRuntimeServices;
12use flowey_core::node::user_facing::ClaimedGhParam;
13use flowey_core::node::user_facing::GhPermission;
14use flowey_core::node::user_facing::GhPermissionValue;
15use flowey_core::pipeline::HostExt;
16use flowey_core::pipeline::PipelineBackendHint;
17use serde::Deserialize;
18use serde::Serialize;
19use std::collections::BTreeMap;
20use std::path::PathBuf;
21
22pub fn construct_exec_snippet_cli(
23 flowey_bin: &str,
24 node_modpath: &str,
25 snippet_idx: usize,
26 job_idx: usize,
27) -> String {
28 format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
29}
30
31/// (internal) execute an inline code snippet from the given node.
32#[derive(clap::Args)]
33pub struct ExecSnippet {
34 /// Job idx to query `pipeline_static_db` with
35 pub(crate) job_idx: usize,
36
37 node_modpath_and_snippet_idx: Vec<String>,
38
39 /// (debug) If true, the snippet will not actually be run
40 #[clap(long)]
41 dry_run: bool,
42}
43
44pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
45pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
46
47impl ExecSnippet {
48 pub fn run(self) -> anyhow::Result<()> {
49 let Self {
50 node_modpath_and_snippet_idx,
51 job_idx,
52 dry_run,
53 } = self;
54
55 let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
56 let flow_arch = FlowArch::host(PipelineBackendHint::Local);
57
58 let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
59
60 let working_dir: PathBuf = {
61 let Some(working_dir) = runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
62 else {
63 anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
64 };
65 serde_json::from_slice::<String>(&working_dir)
66 .context(format!(
67 "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
68 ))?
69 .into()
70 };
71
72 let FloweyPipelineStaticDb {
73 flow_backend,
74 var_db_backend_kind: _,
75 job_reqs,
76 } = {
77 let current_exe = std::env::current_exe()
78 .context("failed to get path to current flowey executable")?;
79 let pipeline_static_db =
80 fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
81 serde_json::from_reader(pipeline_static_db)?
82 };
83
84 for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
85 .chunks_exact(2)
86 .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
87 {
88 let snippet_idx = snippet_idx.parse::<usize>().unwrap();
89
90 let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
91 .get(&job_idx)
92 .context("invalid job_idx")?
93 .get(&node_modpath)
94 .context("pipeline db did not include data for specified node")?
95 .iter()
96 .map(|v| v.0.clone())
97 .collect::<Vec<_>>();
98
99 let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
100 anyhow::bail!("could not find node with that name")
101 };
102
103 let mut node = node_handle.new_erased_node();
104
105 // each snippet gets its own isolated working dir
106 {
107 let snippet_working_dir = working_dir.join(format!(
108 "{}_{}",
109 node_handle.modpath().replace("::", "__"),
110 snippet_idx
111 ));
112 if !snippet_working_dir.exists() {
113 fs_err::create_dir_all(&snippet_working_dir)?;
114 }
115 log::trace!(
116 "Setting current working directory from {:?} to {:?}",
117 std::env::current_dir()?,
118 snippet_working_dir
119 );
120 std::env::set_current_dir(snippet_working_dir)?;
121 }
122
123 // not all backends support a persistent storage dir, therefore it is optional
124 let persistent_storage_dir_var = runtime_var_db
125 .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
126 .is_some()
127 .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
128
129 let mut rust_runtime_services =
130 flowey_core::node::steps::rust::new_rust_runtime_services(
131 &mut runtime_var_db,
132 flow_backend.into(),
133 flow_platform,
134 flow_arch,
135 );
136
137 let mut ctx_backend = ExecSnippetCtx::new(
138 flow_backend.into(),
139 flow_platform,
140 flow_arch,
141 node_handle,
142 snippet_idx,
143 dry_run,
144 persistent_storage_dir_var,
145 &mut rust_runtime_services,
146 );
147
148 let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
149 node.emit(raw_json_reqs.clone(), &mut ctx)?;
150
151 match ctx_backend.into_result() {
152 Some(res) => res?,
153 None => {
154 if dry_run {
155 // all good, expected
156 } else {
157 anyhow::bail!("snippet wasn't run (invalid index)")
158 }
159 }
160 }
161 }
162
163 Ok(())
164 }
165}
166
167pub struct ExecSnippetCtx<'a, 'b> {
168 flow_backend: FlowBackend,
169 flow_platform: FlowPlatform,
170 flow_arch: FlowArch,
171 node_handle: NodeHandle,
172 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
173 idx_tracker: usize,
174 var_tracker: usize,
175 target_idx: usize,
176 dry_run: bool,
177 persistent_storage_dir_var: Option<String>,
178 result: Option<anyhow::Result<()>>,
179}
180
181impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
182 pub fn new(
183 flow_backend: FlowBackend,
184 flow_platform: FlowPlatform,
185 flow_arch: FlowArch,
186 node_handle: NodeHandle,
187 target_idx: usize,
188 dry_run: bool,
189 persistent_storage_dir_var: Option<String>,
190 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
191 ) -> Self {
192 Self {
193 flow_backend,
194 flow_platform,
195 flow_arch,
196 node_handle,
197 rust_runtime_services,
198 var_tracker: 0,
199 idx_tracker: 0,
200 target_idx,
201 dry_run,
202 persistent_storage_dir_var,
203 result: None,
204 }
205 }
206
207 pub fn into_result(self) -> Option<anyhow::Result<()>> {
208 self.result
209 }
210}
211
212impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
213 fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
214 // nothing to do - filing requests only matters pre-exec
215 }
216
217 fn on_new_var(&mut self) -> String {
218 let v = self.var_tracker;
219 self.var_tracker += 1;
220 format!("{}:{}", self.node_handle.modpath(), v)
221 }
222
223 fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
224 // nothing to do - variable claims only matter pre-exec
225 }
226
227 fn on_emit_rust_step(
228 &mut self,
229 label: &str,
230 _can_merge: bool,
231 code: Box<
232 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
233 >,
234 ) {
235 if self.idx_tracker == self.target_idx {
236 let label = if !label.is_empty() {
237 label
238 } else {
239 "<unnamed snippet>"
240 };
241
242 self.result = Some(run_code(
243 self.flow_backend,
244 format!("{} ({})", label, self.node_handle.modpath()),
245 self.dry_run,
246 || code(self.rust_runtime_services),
247 ));
248 }
249 self.idx_tracker += 1;
250 }
251
252 fn on_emit_ado_step(
253 &mut self,
254 label: &str,
255 _yaml_snippet: Box<
256 dyn for<'a> FnOnce(
257 &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
258 ) -> String,
259 >,
260 code: Option<
261 Box<
262 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
263 >,
264 >,
265 _condvar: Option<String>,
266 ) {
267 // don't need to care about condvar, since we wouldn't have been called
268 // if the YAML resolved the condvar to false.
269 if self.idx_tracker == self.target_idx {
270 if let Some(code) = code {
271 self.result = Some(run_code(
272 self.flow_backend,
273 format!(
274 "(inline snippet) {} ({})",
275 label,
276 self.node_handle.modpath()
277 ),
278 self.dry_run,
279 || code(self.rust_runtime_services),
280 ));
281 }
282 }
283
284 self.idx_tracker += 1;
285 }
286
287 fn on_emit_gh_step(
288 &mut self,
289 _label: &str,
290 _uses: &str,
291 _with: BTreeMap<String, ClaimedGhParam>,
292 _condvar: Option<String>,
293 _outputs: BTreeMap<String, Vec<GhVarState>>,
294 _permissions: BTreeMap<GhPermission, GhPermissionValue>,
295 _gh_to_rust: Vec<GhVarState>,
296 _rust_to_gh: Vec<GhVarState>,
297 ) {
298 self.idx_tracker += 1;
299 }
300
301 fn on_emit_side_effect_step(&mut self) {
302 // not executable, we simply skip
303 }
304
305 fn backend(&mut self) -> FlowBackend {
306 self.flow_backend
307 }
308
309 fn platform(&mut self) -> FlowPlatform {
310 self.flow_platform
311 }
312
313 fn arch(&mut self) -> FlowArch {
314 self.flow_arch
315 }
316
317 fn current_node(&self) -> NodeHandle {
318 self.node_handle
319 }
320
321 fn persistent_dir_path_var(&mut self) -> Option<String> {
322 self.persistent_storage_dir_var.clone()
323 }
324
325 fn on_unused_read_var(&mut self, _var: &str) {
326 // not relevant at runtime
327 }
328}
329
330#[derive(Serialize, Deserialize)]
331pub(crate) enum VarDbBackendKind {
332 Json,
333}
334
335#[derive(Serialize, Deserialize)]
336pub(crate) struct FloweyPipelineStaticDb {
337 pub flow_backend: FlowBackendCli,
338 pub var_db_backend_kind: VarDbBackendKind,
339 pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
340}
341
342// encode requests as JSON stored in a JSON string (to make human inspection
343// easier).
344#[derive(Serialize, Deserialize)]
345#[serde(transparent)]
346pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
347
348pub(crate) mod serialized_request {
349 use serde::Deserialize;
350 use serde::Deserializer;
351 use serde::Serializer;
352
353 #[expect(clippy::borrowed_box, reason = "required by serde")]
354 pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
355 ser.serialize_str(
356 &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
357 .unwrap(),
358 )
359 }
360
361 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
362 let s: String = Deserialize::deserialize(d)?;
363 Ok(
364 serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
365 .unwrap()
366 .into(),
367 )
368 }
369}
370
371fn run_code(
372 flow_backend: FlowBackend,
373 label: impl std::fmt::Display,
374 dry_run: bool,
375 code: impl FnOnce() -> anyhow::Result<()>,
376) -> anyhow::Result<()> {
377 if matches!(flow_backend, FlowBackend::Ado) {
378 println!("##[group]=== {} ===", label)
379 } else {
380 // green color
381 log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
382 }
383
384 let result = if dry_run {
385 log::info!("...but not actually, because of --dry-run");
386 Ok(())
387 } else {
388 code()
389 };
390
391 // green color
392 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
393
394 if matches!(flow_backend, FlowBackend::Ado) {
395 println!("##[endgroup]")
396 } else {
397 log::info!(""); // log a newline, for the pretty
398 }
399
400 result
401}
402