microsoft/openvmm

Public

mirrored fromhttps://github.com/microsoft/openvmmAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
58d7ac0c15eef5e011d9b956237ac749dccd14a0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

flowey/flowey_cli/src/pipeline_resolver/generic.rs

318lines · modecode

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use anyhow::Context;
5use flowey_core::node::user_facing::GhPermission;
6use flowey_core::node::user_facing::GhPermissionValue;
7use flowey_core::node::FlowArch;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::NodeHandle;
10use flowey_core::patch::ResolvedPatches;
11use flowey_core::pipeline::internal::AdoPool;
12use flowey_core::pipeline::internal::ArtifactMeta;
13use flowey_core::pipeline::internal::InternalAdoResourcesRepository;
14use flowey_core::pipeline::internal::Parameter;
15use flowey_core::pipeline::internal::ParameterMeta;
16use flowey_core::pipeline::internal::PipelineFinalized;
17use flowey_core::pipeline::internal::PipelineJobMetadata;
18use flowey_core::pipeline::AdoCiTriggers;
19use flowey_core::pipeline::AdoPrTriggers;
20use flowey_core::pipeline::AdoScheduleTriggers;
21use flowey_core::pipeline::GhCiTriggers;
22use flowey_core::pipeline::GhPrTriggers;
23use flowey_core::pipeline::GhRunner;
24use flowey_core::pipeline::GhScheduleTriggers;
25use flowey_core::pipeline::Pipeline;
26use std::collections::BTreeMap;
27use std::collections::BTreeSet;
28
29pub struct ResolvedPipeline {
30 pub graph: petgraph::Graph<ResolvedPipelineJob, ()>,
31 pub order: Vec<petgraph::prelude::NodeIndex>,
32 pub parameters: Vec<Parameter>,
33 pub ado_schedule_triggers: Vec<AdoScheduleTriggers>,
34 pub ado_name: Option<String>,
35 pub ado_ci_triggers: Option<AdoCiTriggers>,
36 pub ado_pr_triggers: Option<AdoPrTriggers>,
37 pub ado_bootstrap_template: String,
38 pub ado_resources_repository: Vec<InternalAdoResourcesRepository>,
39 pub ado_post_process_yaml_cb: Option<Box<dyn FnOnce(serde_yaml::Value) -> serde_yaml::Value>>,
40 pub ado_variables: BTreeMap<String, String>,
41 pub gh_name: Option<String>,
42 pub gh_schedule_triggers: Vec<GhScheduleTriggers>,
43 pub gh_ci_triggers: Option<GhCiTriggers>,
44 pub gh_pr_triggers: Option<GhPrTriggers>,
45 pub gh_bootstrap_template: String,
46}
47
48#[derive(Debug, Clone)]
49pub struct ResolvedJobArtifact {
50 pub flowey_var: String,
51 pub name: String,
52}
53
54#[derive(Debug, Clone)]
55pub struct ResolvedJobUseParameter {
56 pub flowey_var: String,
57 pub pipeline_param_idx: usize,
58}
59
60#[derive(Debug, Clone)] // Clone is because of shoddy viz code
61pub struct ResolvedPipelineJob {
62 pub root_nodes: BTreeMap<NodeHandle, Vec<Box<[u8]>>>,
63 pub patches: ResolvedPatches,
64 pub label: String,
65 pub platform: FlowPlatform,
66 pub arch: FlowArch,
67 pub ado_pool: Option<AdoPool>,
68 pub ado_variables: BTreeMap<String, String>,
69 pub gh_override_if: Option<String>,
70 pub gh_global_env: BTreeMap<String, String>,
71 pub gh_pool: Option<GhRunner>,
72 pub gh_permissions: BTreeMap<NodeHandle, BTreeMap<GhPermission, GhPermissionValue>>,
73 pub external_read_vars: BTreeSet<String>,
74 pub cond_param_idx: Option<usize>,
75
76 pub parameters_used: Vec<ResolvedJobUseParameter>,
77 // correspond to injected download nodes at the start of the job
78 pub artifacts_used: Vec<ResolvedJobArtifact>,
79 // correspond to injected publish nodes at the end of the job
80 pub artifacts_published: Vec<ResolvedJobArtifact>,
81}
82
83pub fn resolve_pipeline(pipeline: Pipeline) -> anyhow::Result<ResolvedPipeline> {
84 let PipelineFinalized {
85 jobs,
86 artifacts,
87 parameters,
88 extra_deps,
89 ado_name,
90 ado_schedule_triggers,
91 ado_ci_triggers,
92 ado_pr_triggers,
93 ado_bootstrap_template,
94 ado_resources_repository,
95 ado_post_process_yaml_cb,
96 ado_variables,
97 gh_name,
98 gh_schedule_triggers,
99 gh_ci_triggers,
100 gh_pr_triggers,
101 gh_bootstrap_template,
102 } = PipelineFinalized::from_pipeline(pipeline);
103
104 let mut graph = petgraph::Graph::new();
105
106 let mut job_to_artifacts = {
107 let mut m = BTreeMap::<usize, (BTreeSet<String>, BTreeSet<String>)>::new();
108
109 for ArtifactMeta {
110 name,
111 published_by_job,
112 used_by_jobs,
113 } in &artifacts
114 {
115 let no_existing = m
116 .entry(
117 published_by_job
118 .context(format!("artifact '{name}' is not published by any job"))?,
119 )
120 .or_default()
121 .0
122 .insert(name.clone());
123 assert!(no_existing);
124
125 for job_idx in used_by_jobs {
126 let no_existing = m.entry(*job_idx).or_default().1.insert(name.clone());
127 assert!(no_existing);
128 }
129 }
130
131 m
132 };
133
134 let (parameters, mut job_to_params) = {
135 let mut params = Vec::new();
136 let mut m = BTreeMap::<usize, BTreeSet<usize>>::new();
137
138 for (
139 param_idx,
140 ParameterMeta {
141 parameter,
142 used_by_jobs,
143 },
144 ) in parameters.into_iter().enumerate()
145 {
146 params.push(parameter);
147 for job_idx in used_by_jobs {
148 let no_existing = m.entry(job_idx).or_default().insert(param_idx);
149 assert!(no_existing);
150 }
151 }
152
153 (params, m)
154 };
155
156 let mut flowey_bootstrap_platforms = BTreeSet::new();
157
158 // first things first: spin up graph nodes for each job
159 let mut job_graph_idx = Vec::new();
160 for (
161 job_idx,
162 PipelineJobMetadata {
163 root_nodes,
164 patches,
165 label,
166 platform,
167 arch,
168 cond_param_idx,
169 ado_pool,
170 ado_variables,
171 gh_override_if,
172 gh_global_env,
173 gh_pool,
174 gh_permissions,
175 },
176 ) in jobs.into_iter().enumerate()
177 {
178 let (artifacts_published, artifacts_used) =
179 job_to_artifacts.remove(&job_idx).unwrap_or_default();
180 let parameters_used = job_to_params.remove(&job_idx).unwrap_or_default();
181
182 let artifacts_published: Vec<_> = artifacts_published
183 .into_iter()
184 .map(|a| ResolvedJobArtifact {
185 flowey_var: flowey_core::pipeline::internal::consistent_artifact_runtime_var_name(
186 &a, false,
187 ),
188 name: a,
189 })
190 .collect();
191 let artifacts_used: Vec<_> = artifacts_used
192 .into_iter()
193 .map(|a| ResolvedJobArtifact {
194 flowey_var: flowey_core::pipeline::internal::consistent_artifact_runtime_var_name(
195 &a, true,
196 ),
197 name: a,
198 })
199 .collect();
200 let parameters_used: Vec<_> = parameters_used
201 .into_iter()
202 .map(|param_idx| ResolvedJobUseParameter {
203 flowey_var: flowey_core::pipeline::internal::consistent_param_runtime_var_name(
204 param_idx,
205 ),
206 pipeline_param_idx: param_idx,
207 })
208 .collect();
209
210 // individual pipeline resolvers still need to ensure that the var is in
211 // the var-db at job start time, but this external-var reporting code
212 // can be shared across all impls
213 let mut external_read_vars = BTreeSet::new();
214 external_read_vars.extend(artifacts_used.iter().map(|a| a.flowey_var.clone()));
215 external_read_vars.extend(artifacts_published.iter().map(|a| a.flowey_var.clone()));
216 external_read_vars.extend(parameters_used.iter().map(|p| p.flowey_var.clone()));
217
218 let idx = graph.add_node(ResolvedPipelineJob {
219 root_nodes,
220 patches: patches.finalize(),
221 label,
222 ado_pool,
223 ado_variables,
224 gh_override_if,
225 gh_global_env,
226 gh_pool,
227 gh_permissions,
228 platform,
229 arch,
230 cond_param_idx,
231 external_read_vars,
232 parameters_used,
233 artifacts_used,
234 artifacts_published,
235 });
236
237 // ...also using this opportunity to keep track of what flowey bins we need to bootstrap
238 flowey_bootstrap_platforms.insert(platform);
239
240 job_graph_idx.push(idx);
241 }
242
243 // next, add node edges based on artifact flow
244 for ArtifactMeta {
245 name: _,
246 published_by_job,
247 used_by_jobs,
248 } in artifacts
249 {
250 let published_idx = job_graph_idx[published_by_job.expect("checked in loop above")];
251 for job in used_by_jobs {
252 let used_idx = job_graph_idx[job];
253 graph.add_edge(published_idx, used_idx, ());
254 }
255 }
256
257 // lastly, add node edges based on any additional explicit dependencies
258 for (from, to) in extra_deps {
259 graph.add_edge(job_graph_idx[from], job_graph_idx[to], ());
260 }
261
262 // TODO: better error handling
263 let order = petgraph::algo::toposort(&graph, None)
264 .map_err(|_| anyhow::anyhow!("detected cycle in pipeline"))?;
265
266 Ok(ResolvedPipeline {
267 graph,
268 order,
269 parameters,
270 ado_name,
271 ado_variables,
272 ado_schedule_triggers,
273 ado_ci_triggers,
274 ado_pr_triggers,
275 ado_bootstrap_template,
276 ado_resources_repository,
277 ado_post_process_yaml_cb,
278 gh_name,
279 gh_schedule_triggers,
280 gh_ci_triggers,
281 gh_pr_triggers,
282 gh_bootstrap_template,
283 })
284}
285
286impl ResolvedPipeline {
287 /// Trim the pipeline graph to only include the specified jobs (taking care
288 /// to also preserve any dependant jobs they rely on).
289 pub fn trim_pipeline_graph(&mut self, preserve_jobs: Vec<petgraph::prelude::NodeIndex>) {
290 // DEVNOTE: this is a horribly suboptimal way to implement this, but it
291 // works fine with the graph-sizes we currently have, so we can optimize
292 // this later...
293
294 let mut jobs_to_delete: BTreeSet<_> = self.graph.node_indices().collect();
295 for idx in preserve_jobs {
296 let g = petgraph::visit::Reversed(&self.graph);
297
298 let mut dfs = petgraph::visit::Dfs::new(g, idx);
299 while let Some(save_idx) = dfs.next(g) {
300 jobs_to_delete.remove(&save_idx);
301 }
302 }
303
304 let mut jobs_to_delete = jobs_to_delete.into_iter().collect::<Vec<_>>();
305 jobs_to_delete.sort();
306
307 // in petgraph, when you remove a node, it invalidates the node idx of
308 // all subsequent nodes.
309 //
310 // I'm sure there's a better way to do this filtering, but just removing
311 // nodes in reverse order seems to work fine.
312 for idx in jobs_to_delete.into_iter().rev() {
313 self.graph.remove_node(idx).unwrap();
314 }
315
316 self.order = petgraph::algo::toposort(&self.graph, None).unwrap();
317 }
318}
319