diff --git a/crates/dtmt/src/cmd/bundle/decompress.rs b/crates/dtmt/src/cmd/bundle/decompress.rs index 9f3ed24..c5fa024 100644 --- a/crates/dtmt/src/cmd/bundle/decompress.rs +++ b/crates/dtmt/src/cmd/bundle/decompress.rs @@ -1,18 +1,14 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; -use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; -use color_eyre::eyre::{self, Result}; -use color_eyre::{Help, SectionExt}; +use clap::{value_parser, Arg, ArgMatches, Command}; +use color_eyre::eyre::Result; -use futures::future::try_join_all; use sdk::decompress; use tokio::fs::{self, File}; use tokio::io::BufReader; use tokio::sync::RwLock; -use crate::cmd::util::collect_bundle_paths; - pub(crate) fn command_definition() -> Command { Command::new("decompress") .about( @@ -23,11 +19,9 @@ pub(crate) fn command_definition() -> Command { .arg( Arg::new("bundle") .required(true) - .action(ArgAction::Append) .value_parser(value_parser!(PathBuf)) .help( - "Path to the bundle(s) to read. If this points to a directory instead \ - of a file, all files in that directory will be checked.", + "Path to the bundle to read. Unlike other operations, this only accepts only a single bundle.", ), ) .arg( @@ -36,7 +30,7 @@ pub(crate) fn command_definition() -> Command { .value_parser(value_parser!(PathBuf)) .help( "The destination to write to. If this points to a directory, the \ - decompressed bundles will be written there, with their original name. \ + name of the input bundle will be used. \ Parent directories must exist.", ), ) @@ -61,10 +55,9 @@ where #[tracing::instrument(skip_all)] pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> Result<()> { - let bundles = matches - .get_many::("bundle") - .unwrap_or_default() - .cloned(); + let bundle = matches + .get_one::("bundle") + .expect("required argument 'bundle' is missing"); let out_path = matches .get_one::("destination") .expect("required parameter 'destination' is missing"); @@ -74,46 +67,11 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> .map(|meta| meta.is_dir()) .unwrap_or(false); - let paths = collect_bundle_paths(bundles).await; + let name = bundle.file_name(); - if paths.is_empty() { - return Err(eyre::eyre!("No bundle provided")); - } - - if paths.len() == 1 { - let bundle = &paths[0]; - let name = bundle.file_name(); - - if is_dir && name.is_some() { - decompress_bundle(ctx, bundle, out_path.join(name.unwrap())).await?; - } else { - decompress_bundle(ctx, bundle, out_path).await?; - } + if is_dir && name.is_some() { + decompress_bundle(ctx, bundle, out_path.join(name.unwrap())).await } else { - if !is_dir { - return Err(eyre::eyre!( - "Multiple bundles provided, but destination is not a directory." - )) - .with_section(|| out_path.display().to_string().header("Path:"))?; - } - - let _ = try_join_all(paths.into_iter().map(|p| async { - let ctx = ctx.clone(); - async move { - let name = if let Some(name) = p.file_name() { - name - } else { - return Err(eyre::eyre!("Invalid bundle path. No file name.")) - .with_section(|| p.display().to_string().header("Path:"))?; - }; - - let dest = out_path.join(name); - decompress_bundle(ctx, p, dest).await - } - .await - })) - .await?; + decompress_bundle(ctx, bundle, out_path).await } - - Ok(()) } diff --git a/crates/dtmt/src/cmd/bundle/extract.rs b/crates/dtmt/src/cmd/bundle/extract.rs index 39aa09f..95643ac 100644 --- a/crates/dtmt/src/cmd/bundle/extract.rs +++ b/crates/dtmt/src/cmd/bundle/extract.rs @@ -1,15 +1,16 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use color_eyre::eyre::{self, Context, Result}; use color_eyre::{Help, Report, SectionExt}; use futures::future::try_join_all; +use futures::{StreamExt, TryFutureExt}; use glob::Pattern; -use sdk::Bundle; +use sdk::{Bundle, BundleFile}; use tokio::{fs, sync::RwLock}; -use crate::cmd::util::collect_bundle_paths; +use crate::cmd::util::resolve_bundle_paths; fn parse_glob_pattern(s: &str) -> Result { match Pattern::new(s) { @@ -148,29 +149,94 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> .unwrap_or_default() .cloned(); - let paths = collect_bundle_paths(bundles).await; + let should_decompile = matches.get_flag("decompile"); + let should_flatten = matches.get_flag("flatten"); + let is_dry_run = matches.get_flag("dry-run"); - if paths.is_empty() { - return Err(eyre::eyre!("No bundle provided")); + let dest = matches + .get_one::("destination") + .expect("required argument 'destination' missing"); + + { + let res = match fs::metadata(&dest).await { + Ok(meta) if !meta.is_dir() => Err(eyre::eyre!("Destination path is not a directory")), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + Err(eyre::eyre!("Destination path does not exist")) + .with_suggestion(|| format!("Create the directory '{}'", dest.display())) + } + Err(err) => Err(Report::new(err)), + _ => Ok(()), + }; + + if res.is_err() { + return res.wrap_err(format!( + "Failed to open destination directory: {}", + dest.display() + )); + } } - let bundles = try_join_all(paths.into_iter().map(|p| async { - let ctx = ctx.clone(); - let path_display = p.display().to_string(); - async move { Bundle::open(ctx, &p).await } + let mut paths = Box::pin(resolve_bundle_paths(bundles)); + + // TODO: Find a way to do this with `for_each_concurrent`. The first attempt + // just kept head-butting into a "use of moved value" wall. + while let Some(path) = paths.next().await { + let res = Bundle::open(ctx.clone(), &path) + .and_then(|bundle| { + extract_bundle( + ctx.clone(), + bundle, + &dest, + ExtractOptions { + includes: &includes, + excludes: &excludes, + decompile: should_decompile, + flatten: should_flatten, + dry_run: is_dry_run, + }, + ) + }) .await - .with_section(|| path_display.header("Bundle Path:")) - })) - .await?; + .wrap_err_with(|| format!("failed to extract from bundle '{}'", path.display())); - let files: Vec<_> = { - let iter = bundles.iter().flat_map(|bundle| bundle.files()); + if let Err(err) = res { + tracing::error!("{:#}", err) + } + } - // Short-curcit the iteration if there is nothing to filter by + Ok(()) +} + +struct ExtractOptions<'a> { + decompile: bool, + flatten: bool, + dry_run: bool, + includes: &'a dyn AsRef<[&'a Pattern]>, + excludes: &'a dyn AsRef<[&'a Pattern]>, +} + +#[tracing::instrument( + skip(ctx, bundle, options), + fields(decompile = options.decompile, flatten = options.flatten, dry_run = options.dry_run) +)] +async fn extract_bundle

( + ctx: Arc>, + bundle: Bundle, + dest: P, + options: ExtractOptions<'_>, +) -> Result<()> +where + P: AsRef + std::fmt::Debug, +{ + let includes = options.includes.as_ref(); + let excludes = options.excludes.as_ref(); + let dest = dest.as_ref(); + + let files: Box> = { if includes.is_empty() && excludes.is_empty() { - iter.collect() + Box::new(bundle.files().iter()) } else { - iter.filter(|file| { + let iter = bundle.files().iter().filter(|file| { let name = file.name(false); let decompiled_name = file.name(true); @@ -186,61 +252,31 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> .any(|glob| glob.matches(&name) || glob.matches(&decompiled_name)); is_included && !is_excluded - }) - .collect() + }); + Box::new(iter) } }; - if tracing::enabled!(tracing::Level::DEBUG) { - let includes: Vec<_> = includes.iter().map(|pattern| pattern.as_str()).collect(); - let excludes: Vec<_> = excludes.iter().map(|pattern| pattern.as_str()).collect(); - let bundle_files: Vec<_> = bundles - .iter() - .flat_map(|bundle| bundle.files()) - .map(|file| file.name(false)) - .collect(); - let filtered: Vec<_> = files.iter().map(|file| file.name(false)).collect(); - tracing::debug!( - ?includes, - ?excludes, - files = ?bundle_files, - ?filtered, - "Built file list to extract" - ); - } + // TODO: Disabled for now, as the `files` iterator would be consumed. + // if tracing::enabled!(tracing::Level::DEBUG) { + // let includes: Vec<_> = includes.iter().map(|pattern| pattern.as_str()).collect(); + // let excludes: Vec<_> = excludes.iter().map(|pattern| pattern.as_str()).collect(); + // let bundle_files: Vec<_> = bundle.files().iter().map(|file| file.name(false)).collect(); + // let filtered: Vec<_> = files.map(|file| file.name(false)).collect(); + // tracing::debug!( + // ?includes, + // ?excludes, + // files = ?bundle_files, + // ?filtered, + // "Built file list to extract" + // ); + // } - let should_decompile = matches.get_flag("decompile"); - let should_flatten = matches.get_flag("flatten"); - let is_dry_run = matches.get_flag("dry-run"); - - let dest = matches - .get_one::("destination") - .expect("required argument 'destination' missing"); - - { - let res = match fs::metadata(&dest).await { - Ok(meta) if !meta.is_dir() => Err(eyre::eyre!("Destination path is not a directory")), - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - Err(eyre::eyre!("Destination path does not exist")) - .with_suggestion(|| "Create the directory") - } - Err(err) => Err(Report::new(err)), - _ => Ok(()), - }; - - if res.is_err() { - return res.wrap_err(format!( - "Failed to open destination directory: {}", - dest.display() - )); - } - } - - let mut tasks = Vec::with_capacity(files.len()); + let mut tasks = Vec::with_capacity(bundle.files().len()); for file in files { - let name = file.name(should_decompile); - let data = if should_decompile { + let name = file.name(options.decompile); + let data = if options.decompile { file.decompiled(ctx.clone()).await } else { file.raw() @@ -250,10 +286,7 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> Ok(mut files) => { match files.len() { 0 => { - println!( - "Decompilation did not produce any data for file {}", - file.name(should_decompile) - ); + tracing::warn!("Decompilation did not produce any data for file {}", name); } // For a single file we want to use the bundle file's name. 1 => { @@ -261,16 +294,16 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> let file = files.pop().unwrap(); let name = file.name().unwrap_or(&name); - let name = if should_flatten { + let name = if options.flatten { flatten_name(name) } else { name.clone() }; - let mut path = dest.clone(); + let mut path = dest.to_path_buf(); path.push(name); - if is_dry_run { + if options.dry_run { tracing::info!(path = %path.display(), "Writing file"); } else { tracing::debug!(path = %path.display(), "Writing file"); @@ -286,12 +319,12 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> // by index. _ => { for (i, file) in files.into_iter().enumerate() { - let mut path = dest.clone(); + let mut path = dest.to_path_buf(); let name = file .name() .map(|name| { - if should_flatten { + if options.flatten { flatten_name(name) } else { name.clone() @@ -301,7 +334,7 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> path.push(name); - if is_dry_run { + if options.dry_run { tracing::info!(path = %path.display(), "Writing file"); } else { tracing::debug!(path = %path.display(), "Writing file"); @@ -343,6 +376,8 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> }; } + // TODO: Check if this might need buffered execution to avoid + // running out of file handles. let results = try_join_all(tasks).await?; for res in results { diff --git a/crates/dtmt/src/cmd/bundle/list.rs b/crates/dtmt/src/cmd/bundle/list.rs index 06c1f12..bdfb268 100644 --- a/crates/dtmt/src/cmd/bundle/list.rs +++ b/crates/dtmt/src/cmd/bundle/list.rs @@ -4,11 +4,11 @@ use std::sync::Arc; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use color_eyre::eyre::{self, Result}; use color_eyre::{Help, SectionExt}; -use futures::future::try_join_all; +use futures::StreamExt; use sdk::Bundle; use tokio::sync::RwLock; -use crate::cmd::util::collect_bundle_paths; +use crate::cmd::util::resolve_bundle_paths; pub(crate) fn command_definition() -> Command { Command::new("list") @@ -31,39 +31,23 @@ pub(crate) fn command_definition() -> Command { ) } -#[tracing::instrument(skip_all)] -pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> Result<()> { - let bundles = matches - .get_many::("bundle") - .unwrap_or_default() - .cloned(); +#[derive(Copy, Clone)] +enum OutputFormat { + Text, +} - let paths = collect_bundle_paths(bundles).await; +fn print_bundle_list(bundle: Bundle, fmt: OutputFormat) { + match fmt { + OutputFormat::Text => { + println!("Bundle: {}", bundle.name()); - if paths.is_empty() { - return Err(eyre::eyre!("No bundle provided")); - } - - let bundles = try_join_all(paths.into_iter().map(|p| async { - let ctx = ctx.clone(); - let path_display = p.display().to_string(); - async move { Bundle::open(ctx, &p).await } - .await - .with_section(|| path_display.header("Bundle Path:")) - })) - .await?; - - if matches.get_flag("json") { - unimplemented!("JSON output is not implemented yet"); - } else { - for b in bundles.iter() { - println!("Bundle: {}", b.name()); - - for f in b.files().iter() { + for f in bundle.files().iter() { if f.variants().len() != 1 { - return Err(eyre::eyre!("Expected exactly one version for this file.")) + let err = eyre::eyre!("Expected exactly one version for this file.") .with_section(|| f.variants().len().to_string().header("Bundle:")) - .with_section(|| b.name().clone().header("Bundle:")); + .with_section(|| bundle.name().clone().header("Bundle:")); + + tracing::error!("{:#}", err); } let v = &f.variants()[0]; @@ -75,7 +59,40 @@ pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> ); } } - - Ok(()) } } + +#[tracing::instrument(skip_all)] +pub(crate) async fn run(ctx: Arc>, matches: &ArgMatches) -> Result<()> { + let bundles = matches + .get_many::("bundle") + .unwrap_or_default() + .cloned(); + + let paths = resolve_bundle_paths(bundles); + + let fmt = if matches.get_flag("json") { + unimplemented!("JSON output is not implemented yet"); + } else { + OutputFormat::Text + }; + + paths + .for_each_concurrent(10, |p| async { + let ctx = ctx.clone(); + async move { + match Bundle::open(ctx, &p).await { + Ok(bundle) => { + print_bundle_list(bundle, fmt); + } + Err(err) => { + tracing::error!("Failed to open bundle '{}': {:#}", p.display(), err); + } + } + } + .await + }) + .await; + + Ok(()) +} diff --git a/crates/dtmt/src/cmd/util.rs b/crates/dtmt/src/cmd/util.rs index e35ee60..c783ed1 100644 --- a/crates/dtmt/src/cmd/util.rs +++ b/crates/dtmt/src/cmd/util.rs @@ -1,13 +1,14 @@ use std::ffi::OsStr; use std::io; use std::path::{Path, PathBuf}; +use std::pin::Pin; +use futures::{Stream, StreamExt}; use tokio::fs; use tokio_stream::wrappers::ReadDirStream; -use tokio_stream::StreamExt; #[tracing::instrument] -pub async fn resolve_bundle_path

(path: P) -> Vec +pub async fn foo

(path: P) -> Vec where P: AsRef + std::fmt::Debug, { @@ -28,7 +29,7 @@ where let stream = ReadDirStream::new(dir); let paths: Vec = stream - .filter_map(|entry| { + .filter_map(|entry| async move { if let Ok(path) = entry.map(|e| e.path()) { match path.file_name().and_then(OsStr::to_str) { Some(name) if name.len() == 16 => { @@ -52,13 +53,52 @@ where paths } +pub async fn resolve_bundle_path

(path: P) -> Pin>> +where + P: AsRef + std::fmt::Debug, +{ + let dir = match fs::read_dir(path.as_ref()).await { + Ok(dir) => { + tracing::trace!(is_dir = true); + dir + } + Err(err) => { + if err.kind() != io::ErrorKind::NotADirectory { + tracing::error!("Failed to read path: {:?}", err); + } + let paths = vec![PathBuf::from(path.as_ref())]; + tracing::debug!(is_dir = false, resolved_paths = ?paths); + return Box::pin(futures::stream::iter(paths)); + } + }; + + let stream = ReadDirStream::new(dir); + let stream = stream.filter_map(|entry| async move { + if let Ok(path) = entry.map(|e| e.path()) { + match path.file_name().and_then(OsStr::to_str) { + Some(name) if name.len() == 16 => { + if name.chars().all(|c| c.is_ascii_hexdigit()) { + Some(path) + } else { + None + } + } + _ => None, + } + } else { + None + } + }); + Box::pin(stream) +} + #[tracing::instrument(skip_all)] pub async fn collect_bundle_paths(paths: I) -> Vec where I: Iterator + std::fmt::Debug, { let tasks = paths.map(|p| async move { - match tokio::spawn(async move { resolve_bundle_path(&p).await }).await { + match tokio::spawn(async move { foo(&p).await }).await { Ok(paths) => paths, Err(err) => { tracing::error!(%err, "failed to spawn task to resolve bundle paths"); @@ -71,6 +111,17 @@ where results.into_iter().flatten().collect() } +#[tracing::instrument(skip_all)] +pub fn resolve_bundle_paths(paths: I) -> impl Stream +where + I: Iterator + std::fmt::Debug, +{ + let limit = 10; + futures::stream::iter(paths) + .then(resolve_bundle_path) + .flat_map_unordered(limit, |p| p) +} + #[cfg(test)] mod tests { use std::path::PathBuf; @@ -78,12 +129,12 @@ mod tests { use tempfile::tempdir; use tokio::process::Command; - use super::resolve_bundle_path; + use super::foo; #[tokio::test] async fn resolve_single_file() { let path = PathBuf::from("foo"); - let paths = resolve_bundle_path(&path).await; + let paths = foo(&path).await; assert_eq!(paths.len(), 1); assert_eq!(paths[0], path); } @@ -91,7 +142,7 @@ mod tests { #[tokio::test] async fn resolve_empty_directory() { let dir = tempdir().expect("failed to create temporary directory"); - let paths = resolve_bundle_path(dir).await; + let paths = foo(dir).await; assert!(paths.is_empty()); } @@ -119,7 +170,7 @@ mod tests { .await .expect("failed to create temporary files"); - let paths = resolve_bundle_path(dir).await; + let paths = foo(dir).await; assert_eq!(bundle_names.len(), paths.len());