microsoft/openvmm

Public

mirrored fromhttps://github.com/microsoft/openvmmAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
copilot/apply-async-process-wait-functionality

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

flowey/flowey_cli/src/cli/pipeline.rs

402lines · modecode

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use anyhow::Context;
5use flowey_core::node::FlowBackend;
6use flowey_core::pipeline::IntoPipeline;
7use flowey_core::pipeline::PipelineBackendHint;
8use std::path::Path;
9use std::path::PathBuf;
10
11#[derive(Clone, clap::ValueEnum)]
12pub enum VizModeCli {
13 Toposort,
14 Dot,
15 FlowDot,
16}
17
18pub(crate) enum CheckMode {
19 Runtime(PathBuf),
20 Check(PathBuf),
21 None,
22}
23
24#[derive(clap::Subcommand)]
25enum PipelineBackendCli<P: clap::Subcommand> {
26 /// A locally executable bash script
27 #[clap(subcommand_value_name = "PIPELINE")]
28 #[clap(subcommand_help_heading = "Pipeline")]
29 Bash {
30 /// Output directory to write pipeline scripts to. If the directory
31 /// doesn't exist, it will be created.
32 #[clap(long, default_value = "./flowey-out")]
33 out_dir: PathBuf,
34
35 /// Persistent storage directory shared across multiple runs. If the
36 /// directory doesn't exist, it will be created.
37 #[clap(long, default_value = "./flowey-persist")]
38 persist_dir: PathBuf,
39
40 /// Enable flowey internal debug logs at runtime
41 #[clap(help_heading = "Global Options (flowey)", global = true, long)]
42 runtime_debug_log: bool,
43
44 /// Attempt to run windows jobs on WSL2. This may or may not work,
45 /// depending on if the flowey nodes at play are resilient to running
46 /// in WSL2.
47 #[clap(help_heading = "Global Options (flowey)", global = true, long)]
48 windows_as_wsl: bool,
49
50 #[clap(subcommand)]
51 pipelines: P,
52 },
53 /// An ADO pipeline YAML file
54 #[clap(subcommand_value_name = "PIPELINE")]
55 #[clap(subcommand_help_heading = "Pipeline")]
56 Ado {
57 #[clap(subcommand)]
58 pipelines: P,
59
60 /// disable flowey internal debug logs at runtime
61 #[clap(help_heading = "Global Options (flowey)", global = true, long)]
62 no_runtime_debug_log: bool,
63
64 /// repo-root relative path to generated YAML file
65 #[clap(long)]
66 out: PathBuf,
67
68 /// check that the provided YAML matches the generated YAML.
69 #[clap(long, value_name = "YAML")]
70 check: Option<PathBuf>,
71
72 /// generate the pipeline JSON, also runs check
73 #[clap(long, value_name = "YAML")]
74 runtime: Option<PathBuf>,
75 },
76 /// A GitHub pipeline YAML file
77 #[clap(subcommand_value_name = "PIPELINE")]
78 #[clap(subcommand_help_heading = "Pipeline")]
79 Github {
80 #[clap(subcommand)]
81 pipelines: P,
82
83 /// disable flowey internal debug logs at runtime
84 #[clap(help_heading = "Global Options (flowey)", global = true, long)]
85 no_runtime_debug_log: bool,
86
87 /// repo-root relative path to generated YAML file
88 #[clap(long)]
89 out: PathBuf,
90
91 /// check that the provided YAML matches the generated YAML.
92 #[clap(long, value_name = "YAML")]
93 check: Option<PathBuf>,
94
95 /// generate the pipeline JSON, also runs check
96 #[clap(long, value_name = "YAML", conflicts_with = "check")]
97 runtime: Option<PathBuf>,
98 },
99 /// Run the pipeline directly using flowey
100 Run {
101 #[clap(subcommand)]
102 pipelines: P,
103
104 /// Output directory to emit artifacts into. If the directory
105 /// doesn't exist, it will be created.
106 #[clap(long, default_value = "./flowey-out")]
107 out_dir: PathBuf,
108
109 /// Persistent storage directory shared across multiple runs. If the
110 /// directory doesn't exist, it will be created.
111 #[clap(long, default_value = "./flowey-persist")]
112 persist_dir: PathBuf,
113
114 /// Attempt to run windows jobs on WSL2. This may or may not work,
115 /// depending on if the flowey nodes at play are resilient to running
116 /// in WSL2.
117 #[clap(help_heading = "Global Options (flowey)", global = true, long)]
118 windows_as_wsl: bool,
119 },
120}
121
122/// Generate and Run pipelines.
123#[derive(clap::Args)]
124#[clap(subcommand_help_heading = "Pipeline Kind")]
125#[clap(subcommand_value_name = "PIPELINE_KIND")]
126pub struct Pipeline<P: clap::Subcommand> {
127 /// (debug) Emit a visualization of the output flow, instead of the flow
128 /// itself.
129 #[clap(help_heading = "Global Options (flowey)", global = true, long)]
130 viz_mode: Option<VizModeCli>,
131
132 /// (debug) Filter the pipeline to only include the specified jobs.
133 ///
134 /// At this time, this will _not_ allow running a job without also running
135 /// any jobs it depends on!
136 ///
137 /// Accepts a comma-separated list of "job ids", a list of which can be
138 /// obtained by running `--include-jobs='?'`
139 ///
140 /// NOTE: because this is intended as a debugging tool, there is no
141 /// mechanism to ensure that "job ids" remain stable in the face of pipeline
142 /// updates / flowey updates. i.e: an `--include-jobs` invocation that works
143 /// today may not work after making changes to the pipeline definition /
144 /// updating flowey.
145 #[clap(help_heading = "Global Options (flowey)", global = true, long)]
146 #[expect(clippy::option_option, reason = "for clap derive")]
147 include_jobs: Option<Option<IncludeJobs>>,
148
149 #[clap(subcommand)]
150 project_pipeline: PipelineBackendCli<P>,
151}
152
153#[derive(Clone)]
154enum IncludeJobs {
155 Query,
156 List(Vec<usize>),
157}
158
159impl std::str::FromStr for IncludeJobs {
160 type Err = &'static str;
161
162 fn from_str(s: &str) -> Result<Self, Self::Err> {
163 if s == "?" {
164 return Ok(IncludeJobs::Query);
165 }
166
167 let mut list = Vec::new();
168 for n in s.split(',') {
169 if n == "?" {
170 return Err("can only pass '?' once");
171 }
172
173 list.push(
174 n.parse()
175 .map_err(|_| "expected comma separated list of numbers")?,
176 );
177 }
178 Ok(IncludeJobs::List(list))
179 }
180}
181
182impl<P: clap::Subcommand + IntoPipeline> Pipeline<P> {
183 pub fn run(self, flowey_crate: &str, repo_root: &Path) -> anyhow::Result<()> {
184 let Self {
185 project_pipeline,
186 viz_mode,
187 include_jobs,
188 } = self;
189
190 match project_pipeline {
191 PipelineBackendCli::Bash {
192 pipelines,
193 out_dir,
194 persist_dir,
195 runtime_debug_log,
196 windows_as_wsl,
197 } => {
198 let mut resolved_pipeline =
199 resolve_pipeline(pipelines, PipelineBackendHint::Local)?;
200
201 if matches!(
202 resolve_include_jobs(&mut resolved_pipeline, include_jobs)?,
203 EarlyExit::Yes
204 ) {
205 return Ok(());
206 }
207
208 if let Some(viz_mode) = viz_mode {
209 viz_pipeline(
210 viz_mode,
211 resolved_pipeline,
212 FlowBackend::Local,
213 crate::running_in_wsl(),
214 )
215 } else {
216 let _ = (out_dir, persist_dir, runtime_debug_log, windows_as_wsl);
217 todo!("bash backend is not actively maintained, and currently broken")
218 }
219 }
220 PipelineBackendCli::Run {
221 pipelines,
222 out_dir,
223 persist_dir,
224 windows_as_wsl,
225 } => {
226 let mut resolved_pipeline =
227 resolve_pipeline(pipelines, PipelineBackendHint::Local)?;
228
229 if matches!(
230 resolve_include_jobs(&mut resolved_pipeline, include_jobs)?,
231 EarlyExit::Yes
232 ) {
233 return Ok(());
234 }
235
236 if let Some(viz_mode) = viz_mode {
237 viz_pipeline(
238 viz_mode,
239 resolved_pipeline,
240 FlowBackend::Local,
241 crate::running_in_wsl(),
242 )
243 } else {
244 crate::pipeline_resolver::direct_run::direct_run(
245 resolved_pipeline,
246 windows_as_wsl,
247 out_dir,
248 persist_dir,
249 )
250 }
251 }
252 PipelineBackendCli::Ado {
253 pipelines,
254 out,
255 no_runtime_debug_log,
256 check,
257 runtime,
258 } => {
259 let mut resolved_pipeline = resolve_pipeline(pipelines, PipelineBackendHint::Ado)?;
260
261 if matches!(
262 resolve_include_jobs(&mut resolved_pipeline, include_jobs)?,
263 EarlyExit::Yes
264 ) {
265 return Ok(());
266 }
267
268 if let Some(viz_mode) = viz_mode {
269 viz_pipeline(viz_mode, resolved_pipeline, FlowBackend::Ado, false)
270 } else {
271 let mode = if let Some(runtime_path) = runtime {
272 CheckMode::Runtime(runtime_path)
273 } else if let Some(check_path) = check {
274 CheckMode::Check(check_path)
275 } else {
276 CheckMode::None
277 };
278
279 crate::pipeline_resolver::ado_yaml::ado_yaml(
280 resolved_pipeline,
281 !no_runtime_debug_log,
282 repo_root,
283 &out,
284 flowey_crate,
285 mode,
286 )
287 }
288 }
289 PipelineBackendCli::Github {
290 pipelines,
291 out,
292 no_runtime_debug_log,
293 check,
294 runtime,
295 } => {
296 let mut resolved_pipeline =
297 resolve_pipeline(pipelines, PipelineBackendHint::Github)?;
298
299 if matches!(
300 resolve_include_jobs(&mut resolved_pipeline, include_jobs)?,
301 EarlyExit::Yes
302 ) {
303 return Ok(());
304 }
305
306 if let Some(viz_mode) = viz_mode {
307 viz_pipeline(viz_mode, resolved_pipeline, FlowBackend::Github, false)
308 } else {
309 let mode = if let Some(runtime_path) = runtime {
310 CheckMode::Runtime(runtime_path)
311 } else if let Some(check_path) = check {
312 CheckMode::Check(check_path)
313 } else {
314 CheckMode::None
315 };
316
317 crate::pipeline_resolver::github_yaml::github_yaml(
318 resolved_pipeline,
319 !no_runtime_debug_log,
320 repo_root,
321 &out,
322 flowey_crate,
323 mode,
324 )
325 }
326 }
327 }
328 }
329}
330
331fn resolve_pipeline<P: IntoPipeline>(
332 pipelines: P,
333 backend_hint: PipelineBackendHint,
334) -> Result<crate::pipeline_resolver::generic::ResolvedPipeline, anyhow::Error> {
335 let pipeline = pipelines
336 .into_pipeline(backend_hint)
337 .context("error defining pipeline")?;
338
339 let resolved_pipeline = crate::pipeline_resolver::generic::resolve_pipeline(pipeline)
340 .context("invalid pipeline")?;
341
342 Ok(resolved_pipeline)
343}
344
345fn viz_pipeline(
346 viz_mode: VizModeCli,
347 resolved_pipeline: crate::pipeline_resolver::generic::ResolvedPipeline,
348 backend: FlowBackend,
349 with_persist_dir: bool,
350) -> Result<(), anyhow::Error> {
351 match viz_mode {
352 VizModeCli::Toposort => crate::pipeline_resolver::viz::viz_pipeline_toposort(
353 resolved_pipeline,
354 backend,
355 with_persist_dir,
356 ),
357 VizModeCli::Dot => {
358 crate::pipeline_resolver::viz::viz_pipeline_dot(resolved_pipeline, backend)
359 }
360 VizModeCli::FlowDot => crate::pipeline_resolver::viz::viz_pipeline_flow_dot(
361 resolved_pipeline,
362 backend,
363 with_persist_dir,
364 ),
365 }
366}
367
368enum EarlyExit {
369 Yes,
370 No,
371}
372
373#[expect(clippy::option_option, reason = "for clap derive")]
374fn resolve_include_jobs(
375 resolved_pipeline: &mut crate::pipeline_resolver::generic::ResolvedPipeline,
376 include_jobs: Option<Option<IncludeJobs>>,
377) -> anyhow::Result<EarlyExit> {
378 let Some(include_jobs) = include_jobs else {
379 return Ok(EarlyExit::No);
380 };
381
382 match include_jobs.unwrap_or(IncludeJobs::Query) {
383 IncludeJobs::Query => {
384 for (present_idx, &graph_idx) in resolved_pipeline.order.iter().enumerate() {
385 println!(
386 "{}: {}",
387 present_idx, resolved_pipeline.graph[graph_idx].label
388 );
389 }
390 Ok(EarlyExit::Yes)
391 }
392 IncludeJobs::List(list) => {
393 let preserve_jobs = list
394 .into_iter()
395 .map(|present_idx| resolved_pipeline.order.get(present_idx).cloned())
396 .collect::<Option<Vec<_>>>()
397 .context("passed invalid job idx. use '?' to list available jobs")?;
398 resolved_pipeline.trim_pipeline_graph(preserve_jobs);
399 Ok(EarlyExit::No)
400 }
401 }
402}
403