microsoft/openvmm
Publicmirrored fromhttps://github.com/microsoft/openvmmAvailable
flowey/flowey_cli/src/pipeline_resolver/generic.rs
318lines · modecode
| 1 | // Copyright (c) Microsoft Corporation. |
| 2 | // Licensed under the MIT License. |
| 3 | |
| 4 | use anyhow::Context; |
| 5 | use flowey_core::node::user_facing::GhPermission; |
| 6 | use flowey_core::node::user_facing::GhPermissionValue; |
| 7 | use flowey_core::node::FlowArch; |
| 8 | use flowey_core::node::FlowPlatform; |
| 9 | use flowey_core::node::NodeHandle; |
| 10 | use flowey_core::patch::ResolvedPatches; |
| 11 | use flowey_core::pipeline::internal::AdoPool; |
| 12 | use flowey_core::pipeline::internal::ArtifactMeta; |
| 13 | use flowey_core::pipeline::internal::InternalAdoResourcesRepository; |
| 14 | use flowey_core::pipeline::internal::Parameter; |
| 15 | use flowey_core::pipeline::internal::ParameterMeta; |
| 16 | use flowey_core::pipeline::internal::PipelineFinalized; |
| 17 | use flowey_core::pipeline::internal::PipelineJobMetadata; |
| 18 | use flowey_core::pipeline::AdoCiTriggers; |
| 19 | use flowey_core::pipeline::AdoPrTriggers; |
| 20 | use flowey_core::pipeline::AdoScheduleTriggers; |
| 21 | use flowey_core::pipeline::GhCiTriggers; |
| 22 | use flowey_core::pipeline::GhPrTriggers; |
| 23 | use flowey_core::pipeline::GhRunner; |
| 24 | use flowey_core::pipeline::GhScheduleTriggers; |
| 25 | use flowey_core::pipeline::Pipeline; |
| 26 | use std::collections::BTreeMap; |
| 27 | use std::collections::BTreeSet; |
| 28 | |
| 29 | pub struct ResolvedPipeline { |
| 30 | pub graph: petgraph::Graph<ResolvedPipelineJob, ()>, |
| 31 | pub order: Vec<petgraph::prelude::NodeIndex>, |
| 32 | pub parameters: Vec<Parameter>, |
| 33 | pub ado_schedule_triggers: Vec<AdoScheduleTriggers>, |
| 34 | pub ado_name: Option<String>, |
| 35 | pub ado_ci_triggers: Option<AdoCiTriggers>, |
| 36 | pub ado_pr_triggers: Option<AdoPrTriggers>, |
| 37 | pub ado_bootstrap_template: String, |
| 38 | pub ado_resources_repository: Vec<InternalAdoResourcesRepository>, |
| 39 | pub ado_post_process_yaml_cb: Option<Box<dyn FnOnce(serde_yaml::Value) -> serde_yaml::Value>>, |
| 40 | pub ado_variables: BTreeMap<String, String>, |
| 41 | pub gh_name: Option<String>, |
| 42 | pub gh_schedule_triggers: Vec<GhScheduleTriggers>, |
| 43 | pub gh_ci_triggers: Option<GhCiTriggers>, |
| 44 | pub gh_pr_triggers: Option<GhPrTriggers>, |
| 45 | pub gh_bootstrap_template: String, |
| 46 | } |
| 47 | |
| 48 | #[derive(Debug, Clone)] |
| 49 | pub struct ResolvedJobArtifact { |
| 50 | pub flowey_var: String, |
| 51 | pub name: String, |
| 52 | } |
| 53 | |
| 54 | #[derive(Debug, Clone)] |
| 55 | pub struct ResolvedJobUseParameter { |
| 56 | pub flowey_var: String, |
| 57 | pub pipeline_param_idx: usize, |
| 58 | } |
| 59 | |
| 60 | #[derive(Debug, Clone)] // Clone is because of shoddy viz code |
| 61 | pub struct ResolvedPipelineJob { |
| 62 | pub root_nodes: BTreeMap<NodeHandle, Vec<Box<[u8]>>>, |
| 63 | pub patches: ResolvedPatches, |
| 64 | pub label: String, |
| 65 | pub platform: FlowPlatform, |
| 66 | pub arch: FlowArch, |
| 67 | pub ado_pool: Option<AdoPool>, |
| 68 | pub ado_variables: BTreeMap<String, String>, |
| 69 | pub gh_override_if: Option<String>, |
| 70 | pub gh_global_env: BTreeMap<String, String>, |
| 71 | pub gh_pool: Option<GhRunner>, |
| 72 | pub gh_permissions: BTreeMap<NodeHandle, BTreeMap<GhPermission, GhPermissionValue>>, |
| 73 | pub external_read_vars: BTreeSet<String>, |
| 74 | pub cond_param_idx: Option<usize>, |
| 75 | |
| 76 | pub parameters_used: Vec<ResolvedJobUseParameter>, |
| 77 | // correspond to injected download nodes at the start of the job |
| 78 | pub artifacts_used: Vec<ResolvedJobArtifact>, |
| 79 | // correspond to injected publish nodes at the end of the job |
| 80 | pub artifacts_published: Vec<ResolvedJobArtifact>, |
| 81 | } |
| 82 | |
| 83 | pub fn resolve_pipeline(pipeline: Pipeline) -> anyhow::Result<ResolvedPipeline> { |
| 84 | let PipelineFinalized { |
| 85 | jobs, |
| 86 | artifacts, |
| 87 | parameters, |
| 88 | extra_deps, |
| 89 | ado_name, |
| 90 | ado_schedule_triggers, |
| 91 | ado_ci_triggers, |
| 92 | ado_pr_triggers, |
| 93 | ado_bootstrap_template, |
| 94 | ado_resources_repository, |
| 95 | ado_post_process_yaml_cb, |
| 96 | ado_variables, |
| 97 | gh_name, |
| 98 | gh_schedule_triggers, |
| 99 | gh_ci_triggers, |
| 100 | gh_pr_triggers, |
| 101 | gh_bootstrap_template, |
| 102 | } = PipelineFinalized::from_pipeline(pipeline); |
| 103 | |
| 104 | let mut graph = petgraph::Graph::new(); |
| 105 | |
| 106 | let mut job_to_artifacts = { |
| 107 | let mut m = BTreeMap::<usize, (BTreeSet<String>, BTreeSet<String>)>::new(); |
| 108 | |
| 109 | for ArtifactMeta { |
| 110 | name, |
| 111 | published_by_job, |
| 112 | used_by_jobs, |
| 113 | } in &artifacts |
| 114 | { |
| 115 | let no_existing = m |
| 116 | .entry( |
| 117 | published_by_job |
| 118 | .context(format!("artifact '{name}' is not published by any job"))?, |
| 119 | ) |
| 120 | .or_default() |
| 121 | .0 |
| 122 | .insert(name.clone()); |
| 123 | assert!(no_existing); |
| 124 | |
| 125 | for job_idx in used_by_jobs { |
| 126 | let no_existing = m.entry(*job_idx).or_default().1.insert(name.clone()); |
| 127 | assert!(no_existing); |
| 128 | } |
| 129 | } |
| 130 | |
| 131 | m |
| 132 | }; |
| 133 | |
| 134 | let (parameters, mut job_to_params) = { |
| 135 | let mut params = Vec::new(); |
| 136 | let mut m = BTreeMap::<usize, BTreeSet<usize>>::new(); |
| 137 | |
| 138 | for ( |
| 139 | param_idx, |
| 140 | ParameterMeta { |
| 141 | parameter, |
| 142 | used_by_jobs, |
| 143 | }, |
| 144 | ) in parameters.into_iter().enumerate() |
| 145 | { |
| 146 | params.push(parameter); |
| 147 | for job_idx in used_by_jobs { |
| 148 | let no_existing = m.entry(job_idx).or_default().insert(param_idx); |
| 149 | assert!(no_existing); |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | (params, m) |
| 154 | }; |
| 155 | |
| 156 | let mut flowey_bootstrap_platforms = BTreeSet::new(); |
| 157 | |
| 158 | // first things first: spin up graph nodes for each job |
| 159 | let mut job_graph_idx = Vec::new(); |
| 160 | for ( |
| 161 | job_idx, |
| 162 | PipelineJobMetadata { |
| 163 | root_nodes, |
| 164 | patches, |
| 165 | label, |
| 166 | platform, |
| 167 | arch, |
| 168 | cond_param_idx, |
| 169 | ado_pool, |
| 170 | ado_variables, |
| 171 | gh_override_if, |
| 172 | gh_global_env, |
| 173 | gh_pool, |
| 174 | gh_permissions, |
| 175 | }, |
| 176 | ) in jobs.into_iter().enumerate() |
| 177 | { |
| 178 | let (artifacts_published, artifacts_used) = |
| 179 | job_to_artifacts.remove(&job_idx).unwrap_or_default(); |
| 180 | let parameters_used = job_to_params.remove(&job_idx).unwrap_or_default(); |
| 181 | |
| 182 | let artifacts_published: Vec<_> = artifacts_published |
| 183 | .into_iter() |
| 184 | .map(|a| ResolvedJobArtifact { |
| 185 | flowey_var: flowey_core::pipeline::internal::consistent_artifact_runtime_var_name( |
| 186 | &a, false, |
| 187 | ), |
| 188 | name: a, |
| 189 | }) |
| 190 | .collect(); |
| 191 | let artifacts_used: Vec<_> = artifacts_used |
| 192 | .into_iter() |
| 193 | .map(|a| ResolvedJobArtifact { |
| 194 | flowey_var: flowey_core::pipeline::internal::consistent_artifact_runtime_var_name( |
| 195 | &a, true, |
| 196 | ), |
| 197 | name: a, |
| 198 | }) |
| 199 | .collect(); |
| 200 | let parameters_used: Vec<_> = parameters_used |
| 201 | .into_iter() |
| 202 | .map(|param_idx| ResolvedJobUseParameter { |
| 203 | flowey_var: flowey_core::pipeline::internal::consistent_param_runtime_var_name( |
| 204 | param_idx, |
| 205 | ), |
| 206 | pipeline_param_idx: param_idx, |
| 207 | }) |
| 208 | .collect(); |
| 209 | |
| 210 | // individual pipeline resolvers still need to ensure that the var is in |
| 211 | // the var-db at job start time, but this external-var reporting code |
| 212 | // can be shared across all impls |
| 213 | let mut external_read_vars = BTreeSet::new(); |
| 214 | external_read_vars.extend(artifacts_used.iter().map(|a| a.flowey_var.clone())); |
| 215 | external_read_vars.extend(artifacts_published.iter().map(|a| a.flowey_var.clone())); |
| 216 | external_read_vars.extend(parameters_used.iter().map(|p| p.flowey_var.clone())); |
| 217 | |
| 218 | let idx = graph.add_node(ResolvedPipelineJob { |
| 219 | root_nodes, |
| 220 | patches: patches.finalize(), |
| 221 | label, |
| 222 | ado_pool, |
| 223 | ado_variables, |
| 224 | gh_override_if, |
| 225 | gh_global_env, |
| 226 | gh_pool, |
| 227 | gh_permissions, |
| 228 | platform, |
| 229 | arch, |
| 230 | cond_param_idx, |
| 231 | external_read_vars, |
| 232 | parameters_used, |
| 233 | artifacts_used, |
| 234 | artifacts_published, |
| 235 | }); |
| 236 | |
| 237 | // ...also using this opportunity to keep track of what flowey bins we need to bootstrap |
| 238 | flowey_bootstrap_platforms.insert(platform); |
| 239 | |
| 240 | job_graph_idx.push(idx); |
| 241 | } |
| 242 | |
| 243 | // next, add node edges based on artifact flow |
| 244 | for ArtifactMeta { |
| 245 | name: _, |
| 246 | published_by_job, |
| 247 | used_by_jobs, |
| 248 | } in artifacts |
| 249 | { |
| 250 | let published_idx = job_graph_idx[published_by_job.expect("checked in loop above")]; |
| 251 | for job in used_by_jobs { |
| 252 | let used_idx = job_graph_idx[job]; |
| 253 | graph.add_edge(published_idx, used_idx, ()); |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | // lastly, add node edges based on any additional explicit dependencies |
| 258 | for (from, to) in extra_deps { |
| 259 | graph.add_edge(job_graph_idx[from], job_graph_idx[to], ()); |
| 260 | } |
| 261 | |
| 262 | // TODO: better error handling |
| 263 | let order = petgraph::algo::toposort(&graph, None) |
| 264 | .map_err(|_| anyhow::anyhow!("detected cycle in pipeline"))?; |
| 265 | |
| 266 | Ok(ResolvedPipeline { |
| 267 | graph, |
| 268 | order, |
| 269 | parameters, |
| 270 | ado_name, |
| 271 | ado_variables, |
| 272 | ado_schedule_triggers, |
| 273 | ado_ci_triggers, |
| 274 | ado_pr_triggers, |
| 275 | ado_bootstrap_template, |
| 276 | ado_resources_repository, |
| 277 | ado_post_process_yaml_cb, |
| 278 | gh_name, |
| 279 | gh_schedule_triggers, |
| 280 | gh_ci_triggers, |
| 281 | gh_pr_triggers, |
| 282 | gh_bootstrap_template, |
| 283 | }) |
| 284 | } |
| 285 | |
| 286 | impl ResolvedPipeline { |
| 287 | /// Trim the pipeline graph to only include the specified jobs (taking care |
| 288 | /// to also preserve any dependant jobs they rely on). |
| 289 | pub fn trim_pipeline_graph(&mut self, preserve_jobs: Vec<petgraph::prelude::NodeIndex>) { |
| 290 | // DEVNOTE: this is a horribly suboptimal way to implement this, but it |
| 291 | // works fine with the graph-sizes we currently have, so we can optimize |
| 292 | // this later... |
| 293 | |
| 294 | let mut jobs_to_delete: BTreeSet<_> = self.graph.node_indices().collect(); |
| 295 | for idx in preserve_jobs { |
| 296 | let g = petgraph::visit::Reversed(&self.graph); |
| 297 | |
| 298 | let mut dfs = petgraph::visit::Dfs::new(g, idx); |
| 299 | while let Some(save_idx) = dfs.next(g) { |
| 300 | jobs_to_delete.remove(&save_idx); |
| 301 | } |
| 302 | } |
| 303 | |
| 304 | let mut jobs_to_delete = jobs_to_delete.into_iter().collect::<Vec<_>>(); |
| 305 | jobs_to_delete.sort(); |
| 306 | |
| 307 | // in petgraph, when you remove a node, it invalidates the node idx of |
| 308 | // all subsequent nodes. |
| 309 | // |
| 310 | // I'm sure there's a better way to do this filtering, but just removing |
| 311 | // nodes in reverse order seems to work fine. |
| 312 | for idx in jobs_to_delete.into_iter().rev() { |
| 313 | self.graph.remove_node(idx).unwrap(); |
| 314 | } |
| 315 | |
| 316 | self.order = petgraph::algo::toposort(&self.graph, None).unwrap(); |
| 317 | } |
| 318 | } |
| 319 | |