microsoft/openvmm

Public

mirrored fromhttps://github.com/microsoft/openvmmAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
93af13fed5d5fc7a8a08fbf37c0ea1e155c4160a

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

flowey/flowey_lib_common/src/cache.rs

589lines · modecode

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Cache the contents of a particular directory between runs.
5//!
6//! The contents of the provided `dir` will be saved at the end of a run, using
7//! the user-defined `key` string to tag the contents of the cache.
8//!
9//! Subsequent runs will then use the `key` to restore the contents of the
10//! directory.
11//!
12//! # A note of file sizes
13//!
14//! This node is backed by the in-box Cache@2 Task on ADO, and the in-box
15//! actions/cache@v3 Action on Github Actions.
16//!
17//! These actions have limits on the size of data they can cache at any given
18//! time, and potentially have issues with particularly large artifacts (e.g:
19//! gigabytes in size).
20//!
21//! In cases where you're intending to cache large files, it is recommended to
22//! implement caching functionality directly using [`NodeCtx::persistent_dir`],
23//! which is guaranteed to be reliable (when running on a system where such
24//! persistent storage is available).
25//!
26//! # Clearing the cache
27//!
28//! Clearing the cache is done in different ways depending on the backend:
29//!
30//! - Local: just delete the cache folder on your machine
31//! - Github: use the cache tasks's web UI to manage cache entries
32//! - ADO: define a pipeline-level variable called `FloweyCacheGeneration`, and set
33//! it to an new arbitrary value.
34//! - This is because ADO doesn't have a native way to flush the cache
35//! outside of updating the cache key in the YAML file itself.
36
37use flowey::node::prelude::*;
38use std::collections::BTreeSet;
39use std::io::Seek;
40use std::io::Write;
41
42/// Status of a cache directory.
43#[derive(Debug, Serialize, Deserialize)]
44pub enum CacheHit {
45 /// Complete miss - cache is empty.
46 Miss,
47 /// Direct hit - cache is perfectly restored.
48 Hit,
49 /// Partial hit - cache was partially restored.
50 PartialHit,
51}
52
53/// How the result of the cache task should be reported.
54#[derive(Serialize, Deserialize)]
55pub enum CacheResult {
56 /// Don't care about the details, only care that the task was run.
57 SideEffect(WriteVar<SideEffect>),
58 /// Get details on the result of the cache restore.
59 HitVar(WriteVar<CacheHit>),
60}
61
62flowey_request! {
63 pub struct Request {
64 /// Friendly label for the directory being cached
65 pub label: String,
66 /// Absolute path to the directory that will be cached between runs
67 pub dir: ReadVar<PathBuf>,
68 /// The key created when saving a cache and the key used to search for a
69 /// cache.
70 pub key: ReadVar<String>,
71 /// An optional set of alternative restore keys.
72 ///
73 /// If no cache hit occurs for key, these restore keys are used
74 /// sequentially in the order provided to find and restore a cache.
75 pub restore_keys: Option<ReadVar<Vec<String>>>,
76 /// Variable to write the result of trying to restore the cache.
77 pub hitvar: CacheResult,
78 }
79}
80
81enum ClaimedCacheResult {
82 SideEffect,
83 HitVar(ClaimedWriteVar<CacheHit>),
84}
85
86new_flow_node!(struct Node);
87
88impl FlowNode for Node {
89 type Request = Request;
90
91 fn imports(_ctx: &mut ImportCtx<'_>) {}
92
93 fn emit(requests: Vec<Self::Request>, ctx: &mut NodeCtx<'_>) -> anyhow::Result<()> {
94 // -- end of req processing -- //
95
96 match ctx.backend() {
97 FlowBackend::Local => {
98 if !ctx.supports_persistent_dir() {
99 ctx.emit_rust_step("Reporting cache misses", |ctx| {
100 let hitvars = requests
101 .into_iter()
102 .map(|v| match v.hitvar {
103 CacheResult::SideEffect(v) => {
104 v.claim(ctx);
105 ClaimedCacheResult::SideEffect
106 }
107 CacheResult::HitVar(v) => ClaimedCacheResult::HitVar(v.claim(ctx)),
108 })
109 .collect::<Vec<_>>();
110
111 |rt| {
112 for var in hitvars {
113 match var {
114 ClaimedCacheResult::SideEffect => {}
115 ClaimedCacheResult::HitVar(var) => {
116 rt.write(var, &CacheHit::Miss)
117 }
118 }
119 }
120 Ok(())
121 }
122 });
123
124 return Ok(());
125 };
126
127 for Request {
128 label,
129 dir,
130 key,
131 restore_keys,
132 hitvar,
133 } in requests
134 {
135 // work around a bug in how post-job nodes affect stage1 day
136 // culling...
137 let persistent_dir = ctx.persistent_dir().unwrap();
138
139 // regardless if we're reporting the hit back to the user, we'll
140 // want to record the hit status so we can efficiently skip
141 // saving to the cache in the post-job step
142 let (side_effect, (hitvar_reader, hitvar)) = match hitvar {
143 CacheResult::HitVar(hitvar) => (None, (hitvar.new_reader(), hitvar)),
144 CacheResult::SideEffect(var) => (Some(var), ctx.new_var()),
145 };
146
147 let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
148
149 ctx.emit_rust_step(format!("Restore cache: {label}"), |ctx| {
150 require_post_job.claim(ctx);
151 side_effect.claim(ctx);
152 let persistent_dir = persistent_dir.clone().claim(ctx);
153 let dir = dir.clone().claim(ctx);
154 let key = key.clone().claim(ctx);
155 let restore_keys = restore_keys.claim(ctx);
156 let hitvar = hitvar.claim(ctx);
157 |rt| {
158 let persistent_dir = rt.read(persistent_dir);
159 let dir = rt.read(dir);
160 let key = rt.read(key);
161 let restore_keys = restore_keys.map(|x| rt.read(x));
162
163 let set_hitvar = move |val| {
164 log::info!("cache status: {val:?}");
165 rt.write(hitvar, &val);
166 };
167
168 // figure out what cache entries are available to us
169 //
170 // (reading this entire file into memory seems fine at
171 // this juncture, given the sort of datasets we're
172 // working with)
173 let available_keys: BTreeSet<String> = if let Ok(s) =
174 fs_err::read_to_string(persistent_dir.join("cache_keys"))
175 {
176 s.split('\n').map(|s| s.trim().to_owned()).collect()
177 } else {
178 BTreeSet::new()
179 };
180
181 // using the keys the user provided us, check if there's
182 // a match
183 let mut existing_cache_dir = None;
184 for (idx, key) in Some(key)
185 .into_iter()
186 .chain(restore_keys.into_iter().flatten())
187 .enumerate()
188 {
189 if available_keys.contains(&key) {
190 existing_cache_dir = Some((idx == 0, hash_key_to_dir(&key)));
191 break;
192 }
193 }
194
195 let Some((direct_hit, existing_cache_dir)) = existing_cache_dir else {
196 set_hitvar(CacheHit::Miss);
197 return Ok(());
198 };
199
200 crate::_util::copy_dir_all(
201 persistent_dir.join(existing_cache_dir),
202 dir,
203 )
204 .context("while restoring cache")?;
205
206 set_hitvar(if direct_hit {
207 CacheHit::Hit
208 } else {
209 CacheHit::PartialHit
210 });
211
212 Ok(())
213 }
214 });
215
216 ctx.emit_rust_step(format!("Saving cache: {label}"), |ctx| {
217 resolve_post_job.claim(ctx);
218 let hitvar_reader = hitvar_reader.claim(ctx);
219 let persistent_dir = persistent_dir.clone().claim(ctx);
220 let dir = dir.claim(ctx);
221 let key = key.claim(ctx);
222 move |rt| {
223 let persistent_dir = rt.read(persistent_dir);
224 let dir = rt.read(dir);
225 let key = rt.read(key);
226 let hitvar_reader = rt.read(hitvar_reader);
227
228 let mut cache_keys_file = fs_err::OpenOptions::new()
229 .append(true)
230 .create(true)
231 .read(true)
232 .open(persistent_dir.join("cache_keys"))?;
233
234 if matches!(hitvar_reader, CacheHit::Hit) {
235 // no need to update the cache
236 log::info!("was direct hit - no updates needed");
237 return Ok(());
238 }
239
240 // otherwise, need to update the cache
241 crate::_util::copy_dir_all(
242 dir,
243 persistent_dir.join(hash_key_to_dir(&key)),
244 )?;
245
246 cache_keys_file.seek(std::io::SeekFrom::End(0))?;
247 writeln!(cache_keys_file, "{}", key)?;
248
249 log::info!("cache saved");
250
251 Ok(())
252 }
253 });
254 }
255 }
256 FlowBackend::Ado => {
257 for Request {
258 label,
259 dir,
260 key,
261 restore_keys,
262 hitvar,
263 } in requests
264 {
265 let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
266
267 let (dir_string, key, restore_keys) = {
268 let (processed_dir, write_processed_dir) = ctx.new_var();
269 let (processed_key, write_processed_key) = ctx.new_var();
270 let (processed_keys, write_processed_keys) = if restore_keys.is_some() {
271 let (a, b) = ctx.new_var();
272 (Some(a), Some(b))
273 } else {
274 (None, None)
275 };
276
277 ctx.emit_rust_step("Pre-processing cache vars", |ctx| {
278 require_post_job.claim(ctx);
279 let write_processed_dir = write_processed_dir.claim(ctx);
280 let write_processed_key = write_processed_key.claim(ctx);
281 let write_processed_keys = write_processed_keys.claim(ctx);
282
283 let dir = dir.clone().claim(ctx);
284 let key = key.claim(ctx);
285 let restore_keys = restore_keys.claim(ctx);
286
287 |rt| {
288 let dir = rt.read(dir);
289 // while we're here, we'll convert dir into a
290 // String, so it can be stuffed into an ADO var
291 rt.write(
292 write_processed_dir,
293 &dir.absolute()?.display().to_string(),
294 );
295
296 // Inject `$(FloweyCacheGeneration)` as part of the
297 // cache key to provide a non-intrusive mechanism to
298 // cycle the ADO cache when it gets into an
299 // inconsistent state.
300 //
301 // Deny cross-os caching by default (matching Github
302 // CI works by default).
303 //
304 // FUTURE: add toggle to request to allow cross-os
305 // caching?
306 let inject_extras = |s| {
307 format!(r#""$(FloweyCacheGeneration)" | "$(Agent.OS)" | "{s}""#)
308 };
309
310 let key = rt.read(key);
311 rt.write(write_processed_key, &inject_extras(key));
312
313 if let Some(write_processed_keys) = write_processed_keys {
314 let restore_keys = rt.read(restore_keys.unwrap());
315 rt.write(
316 write_processed_keys,
317 &restore_keys
318 .into_iter()
319 .map(inject_extras)
320 .collect::<Vec<_>>()
321 .join("\\n"),
322 );
323 }
324
325 Ok(())
326 }
327 });
328
329 (processed_dir, processed_key, processed_keys)
330 };
331
332 let (hitvar_str_reader, hitvar_str_writer) =
333 if matches!(hitvar, CacheResult::HitVar(..)) {
334 let (r, w) = ctx.new_var();
335 (Some(r), Some(w))
336 } else {
337 (None, None)
338 };
339
340 ctx.emit_ado_step(format!("Restore cache: {label}"), |ctx| {
341 let dir_string = dir_string.clone().claim(ctx);
342 let key = key.claim(ctx);
343 let restore_keys = restore_keys.claim(ctx);
344 let hitvar_str_writer = hitvar_str_writer.claim(ctx);
345 |rt| {
346 let dir_string = rt.get_var(dir_string).as_raw_var_name();
347 let key = rt.get_var(key).as_raw_var_name();
348 let restore_keys = if let Some(restore_keys) = restore_keys {
349 format!(
350 "restore_keys: $({})",
351 rt.get_var(restore_keys).as_raw_var_name()
352 )
353 } else {
354 String::new()
355 };
356
357 let hitvar_input = if let Some(hitvar_str_writer) = hitvar_str_writer {
358 let hitvar_ado = AdoRuntimeVar::dangerous_from_global(
359 "FLOWEY_CACHE_HITVAR",
360 false,
361 );
362 // note the _lack_ of $() around the var!
363 let input =
364 format!("cacheHitVar: {}", hitvar_ado.as_raw_var_name());
365 rt.set_var(hitvar_str_writer, hitvar_ado);
366 input
367 } else {
368 String::new()
369 };
370
371 format!(
372 r#"
373 - task: Cache@2
374 inputs:
375 key: '$({key})'
376 path: $({dir_string})
377 {restore_keys}
378 {hitvar_input}
379 "#
380 )
381 }
382 });
383
384 if let Some(hitvar_str_reader) = hitvar_str_reader {
385 ctx.emit_rust_step("map ADO hitvar to flowey", |ctx| {
386 let label = label.clone();
387 let dir = dir.clone().claim(ctx);
388
389 let CacheResult::HitVar(hitvar) = hitvar else {
390 unreachable!()
391 };
392
393 let hitvar = hitvar.claim(ctx);
394 let hitvar_str_reader = hitvar_str_reader.claim(ctx);
395 move |rt| {
396 let dir = rt.read(dir);
397 let hitvar_str = rt.read(hitvar_str_reader);
398 let mut var = match hitvar_str.as_str() {
399 "true" => CacheHit::Hit,
400 "false" => CacheHit::Miss,
401 "inexact" => CacheHit::PartialHit,
402 other => anyhow::bail!("unexpected cacheHitVar value: {other}"),
403 };
404
405 // WORKAROUND: ADO is really cool software, and
406 // sometimes, when it feels like it, i'll get into
407 // an inconsistent state where it reports a cache
408 // hit, but then the cache is actually empty.
409 if matches!(var, CacheHit::Hit | CacheHit::PartialHit) {
410 if dir.read_dir()?.next().is_none() {
411 log::error!("Detected inconsistent ADO cache entry: {label}");
412 log::error!("Please define/cycle the `FloweyCacheGeneration` pipeline variable");
413 var = CacheHit::Miss;
414 }
415 }
416
417 rt.write(hitvar, &var);
418 Ok(())
419 }
420 });
421 }
422
423 ctx.emit_rust_step(format!("validate cache entry: {label}"), |ctx| {
424 resolve_post_job.claim(ctx);
425 let dir = dir.clone().claim(ctx);
426 move |rt| {
427 let mut dir_contents = rt.read(dir).read_dir()?.peekable();
428
429 if dir_contents.peek().is_none() {
430 log::error!("Detected empty cache folder for entry: {label}");
431 log::error!("This is a bug - please update the pipeline code");
432 anyhow::bail!("cache error")
433 }
434
435 for entry in dir_contents {
436 let entry = entry?;
437 log::debug!("uploading: {}", entry.path().display());
438 }
439
440 Ok(())
441 }
442 });
443 }
444 }
445 FlowBackend::Github => {
446 for Request {
447 label,
448 dir,
449 key,
450 restore_keys,
451 hitvar,
452 } in requests
453 {
454 let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
455
456 let (dir_string, key, restore_keys) = {
457 let (processed_dir, write_processed_dir) = ctx.new_var();
458 let (processed_key, write_processed_key) = ctx.new_var();
459 let (processed_keys, write_processed_keys) = if restore_keys.is_some() {
460 let (a, b) = ctx.new_var();
461 (Some(a), Some(b))
462 } else {
463 (None, None)
464 };
465
466 ctx.emit_rust_step("Pre-processing cache vars", |ctx| {
467 require_post_job.claim(ctx);
468 let write_processed_dir = write_processed_dir.claim(ctx);
469 let write_processed_key = write_processed_key.claim(ctx);
470 let write_processed_keys = write_processed_keys.claim(ctx);
471
472 let dir = dir.clone().claim(ctx);
473 let key = key.claim(ctx);
474 let restore_keys = restore_keys.claim(ctx);
475
476 |rt| {
477 let dir = rt.read(dir);
478 rt.write(
479 write_processed_dir,
480 &dir.absolute()?.display().to_string(),
481 );
482
483 let key = rt.read(key);
484 rt.write(write_processed_key, &key);
485
486 if let Some(write_processed_keys) = write_processed_keys {
487 let restore_keys = rt.read(restore_keys.unwrap());
488 rt.write(
489 write_processed_keys,
490 &format!(
491 r#""[{}]""#,
492 restore_keys
493 .into_iter()
494 .map(|s| format!("'{s}'"))
495 .collect::<Vec<_>>()
496 .join(", ")
497 ),
498 );
499 }
500
501 Ok(())
502 }
503 });
504
505 (processed_dir, processed_key, processed_keys)
506 };
507
508 let (hitvar_str_reader, hitvar_str_writer) =
509 if matches!(hitvar, CacheResult::HitVar(..)) {
510 let (r, w) = ctx.new_var();
511 (Some(r), Some(w))
512 } else {
513 (None, None)
514 };
515
516 let mut step = ctx
517 .emit_gh_step(format!("Restore cache: {label}"), "actions/cache@v4")
518 .with("key", key)
519 .with("path", dir_string);
520 if let Some(restore_keys) = restore_keys {
521 step = step.with("restore-keys", restore_keys);
522 }
523 if let Some(hitvar_str_writer) = hitvar_str_writer {
524 step = step.output("cache-hit", hitvar_str_writer);
525 }
526 step.finish(ctx);
527
528 if let Some(hitvar_str_reader) = hitvar_str_reader {
529 ctx.emit_rust_step("map Github cache-hit to flowey", |ctx| {
530 let CacheResult::HitVar(hitvar) = hitvar else {
531 unreachable!()
532 };
533
534 let hitvar = hitvar.claim(ctx);
535 let hitvar_str_reader = hitvar_str_reader.claim(ctx);
536 // TODO: How do we distinguish between a partial hit and a miss?
537 move |rt| {
538 let hitvar_str = rt.read(hitvar_str_reader);
539 // Github's cache action brilliantly only reports "false" if missing a cache key that exists,
540 // and leaves it blank if its a miss in other cases.
541 let var = match hitvar_str.as_str() {
542 "true" => CacheHit::Hit,
543 _ => CacheHit::Miss,
544 };
545
546 rt.write(hitvar, &var);
547 Ok(())
548 }
549 });
550 }
551
552 ctx.emit_rust_step(format!("validate cache entry: {label}"), |ctx| {
553 resolve_post_job.claim(ctx);
554 let dir = dir.clone().claim(ctx);
555 move |rt| {
556 let mut dir_contents = rt.read(dir).read_dir()?.peekable();
557
558 if dir_contents.peek().is_none() {
559 log::error!("Detected empty cache folder for entry: {label}");
560 log::error!("This is a bug - please update the pipeline code");
561 anyhow::bail!("cache error")
562 }
563
564 for entry in dir_contents {
565 let entry = entry?;
566 log::debug!("uploading: {}", entry.path().display());
567 }
568
569 Ok(())
570 }
571 });
572 }
573 }
574 }
575
576 Ok(())
577 }
578}
579
580// _technically_, if we want to be _super_ sure we're not gonna have a hash
581// collision, we should also do a content-hash of the thing we're about to
582// cache... but this should be OK for now, given that we don't expect to have a
583// massive number of cache entries.
584fn hash_key_to_dir(key: &str) -> String {
585 let hasher = &mut rustc_hash::FxHasher::default();
586 std::hash::Hash::hash(&key, hasher);
587 let hash = std::hash::Hasher::finish(hasher);
588 format!("{:08x?}", hash)
589}
590