microsoft/openvmm

Public

mirrored fromhttps://github.com/microsoft/openvmmAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
a68be638250b039ecf68d20791766d471fa7a197

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

flowey/flowey_lib_common/src/cache.rs

527lines · modepreview

// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! Cache the contents of a particular directory between runs.
//!
//! The contents of the provided `dir` will be saved at the end of a run, using
//! the user-defined `key` string to tag the contents of the cache.
//!
//! Subsequent runs will then use the `key` to restore the contents of the
//! directory.
//!
//! # A note of file sizes
//!
//! This node is backed by the in-box Cache@2 Task on ADO, and the in-box
//! actions/cache@v3 Action on Github Actions.
//!
//! These actions have limits on the size of data they can cache at any given
//! time, and potentially have issues with particularly large artifacts (e.g:
//! gigabytes in size).
//!
//! In cases where you're intending to cache large files, it is recommended to
//! implement caching functionality directly using [`NodeCtx::persistent_dir`],
//! which is guaranteed to be reliable (when running on a system where such
//! persistent storage is available).
//!
//! # Clearing the cache
//!
//! Clearing the cache is done in different ways depending on the backend:
//!
//! - Local: just delete the cache folder on your machine
//! - Github: use the cache tasks's web UI to manage cache entries
//! - ADO: define a pipeline-level variable called `FloweyCacheGeneration`, and set
//!   it to an new arbitrary value.
//!     - This is because ADO doesn't have a native way to flush the cache
//!       outside of updating the cache key in the YAML file itself.

use flowey::node::prelude::*;
use std::collections::BTreeSet;
use std::io::Seek;
use std::io::Write;

/// Status of a cache directory.
#[derive(Debug, Serialize, Deserialize)]
pub enum CacheHit {
    /// Complete miss - cache is empty.
    Miss,
    /// Direct hit - cache is perfectly restored.
    Hit,
    /// Partial hit - cache was partially restored.
    PartialHit,
}

flowey_request! {
    pub struct Request {
        /// Friendly label for the directory being cached
        pub label: String,
        /// Absolute path to the directory that will be cached between runs
        pub dir: ReadVar<PathBuf>,
        /// The key created when saving a cache and the key used to search for a
        /// cache.
        pub key: ReadVar<String>,
        /// An optional set of alternative restore keys.
        ///
        /// If no cache hit occurs for key, these restore keys are used
        /// sequentially in the order provided to find and restore a cache.
        pub restore_keys: Option<ReadVar<Vec<String>>>,
        /// Variable to write the result of trying to restore the cache.
        pub hitvar: WriteVar<CacheHit>,
    }
}

new_flow_node!(struct Node);

impl FlowNode for Node {
    type Request = Request;

    fn imports(_ctx: &mut ImportCtx<'_>) {}

    fn emit(requests: Vec<Self::Request>, ctx: &mut NodeCtx<'_>) -> anyhow::Result<()> {
        // -- end of req processing -- //

        match ctx.backend() {
            FlowBackend::Local => {
                if !ctx.supports_persistent_dir() {
                    ctx.emit_minor_rust_step("Reporting cache misses", |ctx| {
                        let hitvars = requests
                            .into_iter()
                            .map(|v| v.hitvar.claim(ctx))
                            .collect::<Vec<_>>();

                        |rt| {
                            rt.write_all(hitvars, &CacheHit::Miss);
                        }
                    });

                    return Ok(());
                };

                for Request {
                    label,
                    dir,
                    key,
                    restore_keys,
                    hitvar,
                } in requests
                {
                    // work around a bug in how post-job nodes affect stage1 day
                    // culling...
                    let persistent_dir = ctx.persistent_dir().unwrap();

                    // Needed for saving the cache result.
                    let (hitvar_reader, hitvar2) = ctx.new_var();

                    let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();

                    ctx.emit_rust_step(format!("Restore cache: {label}"), |ctx| {
                        require_post_job.claim(ctx);
                        let persistent_dir = persistent_dir.clone().claim(ctx);
                        let dir = dir.clone().claim(ctx);
                        let key = key.clone().claim(ctx);
                        let restore_keys = restore_keys.claim(ctx);
                        let hitvar = hitvar.claim(ctx);
                        let hitvar2 = hitvar2.claim(ctx);
                        |rt| {
                            let persistent_dir = rt.read(persistent_dir);
                            let dir = rt.read(dir);
                            let key = rt.read(key);
                            let restore_keys = rt.read(restore_keys);

                            let set_hitvar = move |val| {
                                log::info!("cache status: {val:?}");
                                rt.write(hitvar, &val);
                                rt.write(hitvar2, &val);
                            };

                            // figure out what cache entries are available to us
                            //
                            // (reading this entire file into memory seems fine at
                            // this juncture, given the sort of datasets we're
                            // working with)
                            let available_keys: BTreeSet<String> = if let Ok(s) =
                                fs_err::read_to_string(persistent_dir.join("cache_keys"))
                            {
                                s.split('\n').map(|s| s.trim().to_owned()).collect()
                            } else {
                                BTreeSet::new()
                            };

                            // using the keys the user provided us, check if there's
                            // a match
                            let mut existing_cache_dir = None;
                            for (idx, key) in Some(key)
                                .into_iter()
                                .chain(restore_keys.into_iter().flatten())
                                .enumerate()
                            {
                                if available_keys.contains(&key) {
                                    existing_cache_dir = Some((idx == 0, hash_key_to_dir(&key)));
                                    break;
                                }
                            }

                            let Some((direct_hit, existing_cache_dir)) = existing_cache_dir else {
                                set_hitvar(CacheHit::Miss);
                                return Ok(());
                            };

                            crate::_util::copy_dir_all(
                                persistent_dir.join(existing_cache_dir),
                                dir,
                            )
                            .context("while restoring cache")?;

                            set_hitvar(if direct_hit {
                                CacheHit::Hit
                            } else {
                                CacheHit::PartialHit
                            });

                            Ok(())
                        }
                    });

                    ctx.emit_rust_step(format!("Saving cache: {label}"), |ctx| {
                        resolve_post_job.claim(ctx);
                        let hitvar_reader = hitvar_reader.claim(ctx);
                        let persistent_dir = persistent_dir.clone().claim(ctx);
                        let dir = dir.claim(ctx);
                        let key = key.claim(ctx);
                        move |rt| {
                            let persistent_dir = rt.read(persistent_dir);
                            let dir = rt.read(dir);
                            let key = rt.read(key);
                            let hitvar_reader = rt.read(hitvar_reader);

                            let mut cache_keys_file = fs_err::OpenOptions::new()
                                .append(true)
                                .create(true)
                                .read(true)
                                .open(persistent_dir.join("cache_keys"))?;

                            if matches!(hitvar_reader, CacheHit::Hit) {
                                // no need to update the cache
                                log::info!("was direct hit - no updates needed");
                                return Ok(());
                            }

                            // otherwise, need to update the cache
                            crate::_util::copy_dir_all(
                                dir,
                                persistent_dir.join(hash_key_to_dir(&key)),
                            )?;

                            cache_keys_file.seek(std::io::SeekFrom::End(0))?;
                            writeln!(cache_keys_file, "{}", key)?;

                            log::info!("cache saved");

                            Ok(())
                        }
                    });
                }
            }
            FlowBackend::Ado => {
                for Request {
                    label,
                    dir,
                    key,
                    restore_keys,
                    hitvar,
                } in requests
                {
                    let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();

                    let (dir_string, key, restore_keys) = {
                        let (processed_dir, write_processed_dir) = ctx.new_var();
                        let (processed_key, write_processed_key) = ctx.new_var();
                        let (processed_keys, write_processed_keys) = if restore_keys.is_some() {
                            let (a, b) = ctx.new_var();
                            (Some(a), Some(b))
                        } else {
                            (None, None)
                        };

                        ctx.emit_rust_step("Pre-processing cache vars", |ctx| {
                            require_post_job.claim(ctx);
                            let write_processed_dir = write_processed_dir.claim(ctx);
                            let write_processed_key = write_processed_key.claim(ctx);
                            let write_processed_keys = write_processed_keys.claim(ctx);

                            let dir = dir.clone().claim(ctx);
                            let key = key.claim(ctx);
                            let restore_keys = restore_keys.claim(ctx);

                            |rt| {
                                let dir = rt.read(dir);
                                // while we're here, we'll convert dir into a
                                // String, so it can be stuffed into an ADO var
                                rt.write(
                                    write_processed_dir,
                                    &dir.absolute()?.display().to_string(),
                                );

                                // Inject `$(FloweyCacheGeneration)` as part of the
                                // cache key to provide a non-intrusive mechanism to
                                // cycle the ADO cache when it gets into an
                                // inconsistent state.
                                //
                                // Deny cross-os caching by default (matching Github
                                // CI works by default).
                                //
                                // FUTURE: add toggle to request to allow cross-os
                                // caching?
                                let inject_extras = |s| {
                                    format!(r#""$(FloweyCacheGeneration)" | "$(Agent.OS)" | "{s}""#)
                                };

                                let key = rt.read(key);
                                rt.write(write_processed_key, &inject_extras(key));

                                if let Some(write_processed_keys) = write_processed_keys {
                                    let restore_keys = rt.read(restore_keys.unwrap());
                                    rt.write(
                                        write_processed_keys,
                                        &restore_keys
                                            .into_iter()
                                            .map(inject_extras)
                                            .collect::<Vec<_>>()
                                            .join("\\n"),
                                    );
                                }

                                Ok(())
                            }
                        });

                        (processed_dir, processed_key, processed_keys)
                    };

                    let (hitvar_str_reader, hitvar_str_writer) = ctx.new_var();

                    ctx.emit_ado_step(format!("Restore cache: {label}"), |ctx| {
                        let dir_string = dir_string.clone().claim(ctx);
                        let key = key.claim(ctx);
                        let restore_keys = restore_keys.claim(ctx);
                        let hitvar_str_writer = hitvar_str_writer.claim(ctx);
                        |rt| {
                            let dir_string = rt.get_var(dir_string).as_raw_var_name();
                            let key = rt.get_var(key).as_raw_var_name();
                            let restore_keys = if let Some(restore_keys) = restore_keys {
                                format!(
                                    "restore_keys: $({})",
                                    rt.get_var(restore_keys).as_raw_var_name()
                                )
                            } else {
                                String::new()
                            };

                            let hitvar_ado =
                                AdoRuntimeVar::dangerous_from_global("FLOWEY_CACHE_HITVAR", false);
                            // note the _lack_ of $() around the var!
                            let hitvar_input =
                                format!("cacheHitVar: {}", hitvar_ado.as_raw_var_name());
                            rt.set_var(hitvar_str_writer, hitvar_ado);

                            format!(
                                r#"
                                - task: Cache@2
                                  inputs:
                                    key: '$({key})'
                                    path: $({dir_string})
                                    {restore_keys}
                                    {hitvar_input}
                            "#
                            )
                        }
                    });

                    ctx.emit_rust_step("map ADO hitvar to flowey", |ctx| {
                        let label = label.clone();
                        let dir = dir.clone().claim(ctx);

                        let hitvar = hitvar.claim(ctx);
                        let hitvar_str_reader = hitvar_str_reader.claim(ctx);
                        move |rt| {
                            let dir = rt.read(dir);
                            let hitvar_str = rt.read(hitvar_str_reader);
                            let mut var = match hitvar_str.as_str() {
                                "true" => CacheHit::Hit,
                                "false" => CacheHit::Miss,
                                "inexact" => CacheHit::PartialHit,
                                other => anyhow::bail!("unexpected cacheHitVar value: {other}"),
                            };

                            // WORKAROUND: ADO is really cool software, and
                            // sometimes, when it feels like it, i'll get into
                            // an inconsistent state where it reports a cache
                            // hit, but then the cache is actually empty.
                            if matches!(var, CacheHit::Hit | CacheHit::PartialHit) {
                                if dir.read_dir()?.next().is_none() {
                                    log::error!("Detected inconsistent ADO cache entry: {label}");
                                    log::error!("Please define/cycle the `FloweyCacheGeneration` pipeline variable");
                                    var = CacheHit::Miss;
                                }
                            }

                            rt.write(hitvar, &var);
                            Ok(())
                        }
                    });

                    ctx.emit_rust_step(format!("validate cache entry: {label}"), |ctx| {
                        resolve_post_job.claim(ctx);
                        let dir = dir.clone().claim(ctx);
                        move |rt| {
                            let mut dir_contents = rt.read(dir).read_dir()?.peekable();

                            if dir_contents.peek().is_none() {
                                log::error!("Detected empty cache folder for entry: {label}");
                                log::error!("This is a bug - please update the pipeline code");
                                anyhow::bail!("cache error")
                            }

                            for entry in dir_contents {
                                let entry = entry?;
                                log::debug!("uploading: {}", entry.path().display());
                            }

                            Ok(())
                        }
                    });
                }
            }
            FlowBackend::Github => {
                for Request {
                    label,
                    dir,
                    key,
                    restore_keys,
                    hitvar,
                } in requests
                {
                    let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();

                    let (dir_string, key, restore_keys) = {
                        let (processed_dir, write_processed_dir) = ctx.new_var();
                        let (processed_key, write_processed_key) = ctx.new_var();
                        let (processed_keys, write_processed_keys) = if restore_keys.is_some() {
                            let (a, b) = ctx.new_var();
                            (Some(a), Some(b))
                        } else {
                            (None, None)
                        };

                        ctx.emit_rust_step("Pre-processing cache vars", |ctx| {
                            require_post_job.claim(ctx);
                            let write_processed_dir = write_processed_dir.claim(ctx);
                            let write_processed_key = write_processed_key.claim(ctx);
                            let write_processed_keys = write_processed_keys.claim(ctx);

                            let dir = dir.clone().claim(ctx);
                            let key = key.claim(ctx);
                            let restore_keys = restore_keys.claim(ctx);

                            |rt| {
                                let dir = rt.read(dir);
                                rt.write(
                                    write_processed_dir,
                                    &dir.absolute()?.display().to_string(),
                                );

                                let key = rt.read(key);
                                let key = format!("{key}-{}-{}", rt.arch(), rt.platform());
                                rt.write(write_processed_key, &key);

                                if let Some(write_processed_keys) = write_processed_keys {
                                    let restore_keys = rt.read(restore_keys.unwrap());
                                    rt.write(
                                        write_processed_keys,
                                        &format!(
                                            r#""[{}]""#,
                                            restore_keys
                                                .into_iter()
                                                .map(|s| format!(
                                                    "'{s}-{}-{}'",
                                                    rt.arch(),
                                                    rt.platform()
                                                ))
                                                .collect::<Vec<_>>()
                                                .join(", ")
                                        ),
                                    );
                                }

                                Ok(())
                            }
                        });

                        (processed_dir, processed_key, processed_keys)
                    };

                    let (hitvar_str_reader, hitvar_str_writer) = ctx.new_var();

                    let mut step = ctx
                        .emit_gh_step(format!("Restore cache: {label}"), "actions/cache@v4")
                        .with("key", key)
                        .with("path", dir_string);
                    if let Some(restore_keys) = restore_keys {
                        step = step.with("restore-keys", restore_keys);
                    }
                    step.output("cache-hit", hitvar_str_writer).finish(ctx);

                    ctx.emit_minor_rust_step("map Github cache-hit to flowey", |ctx| {
                        let hitvar = hitvar.claim(ctx);
                        let hitvar_str_reader = hitvar_str_reader.claim(ctx);
                        // TODO: How do we distinguish between a partial hit and a miss?
                        move |rt| {
                            let hitvar_str = rt.read(hitvar_str_reader);
                            // Github's cache action brilliantly only reports "false" if missing a cache key that exists,
                            // and leaves it blank if its a miss in other cases.
                            let var = match hitvar_str.as_str() {
                                "true" => CacheHit::Hit,
                                _ => CacheHit::Miss,
                            };

                            rt.write(hitvar, &var);
                        }
                    });

                    ctx.emit_rust_step(format!("validate cache entry: {label}"), |ctx| {
                        resolve_post_job.claim(ctx);
                        let dir = dir.clone().claim(ctx);
                        move |rt| {
                            let mut dir_contents = rt.read(dir).read_dir()?.peekable();

                            if dir_contents.peek().is_none() {
                                log::error!("Detected empty cache folder for entry: {label}");
                                log::error!("This is a bug - please update the pipeline code");
                                anyhow::bail!("cache error")
                            }

                            for entry in dir_contents {
                                let entry = entry?;
                                log::debug!("uploading: {}", entry.path().display());
                            }

                            Ok(())
                        }
                    });
                }
            }
        }

        Ok(())
    }
}

// _technically_, if we want to be _super_ sure we're not gonna have a hash
// collision, we should also do a content-hash of the thing we're about to
// cache... but this should be OK for now, given that we don't expect to have a
// massive number of cache entries.
fn hash_key_to_dir(key: &str) -> String {
    let hasher = &mut rustc_hash::FxHasher::default();
    std::hash::Hash::hash(&key, hasher);
    let hash = std::hash::Hasher::finish(hasher);
    format!("{:08x?}", hash)
}