Merge branch 'issue/1'
* issue/1: fix: Fix running out of file handles
This commit is contained in:
commit
4a834b1687
4 changed files with 232 additions and 171 deletions
|
@ -1,18 +1,14 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use color_eyre::eyre::{self, Result};
|
||||
use color_eyre::{Help, SectionExt};
|
||||
use clap::{value_parser, Arg, ArgMatches, Command};
|
||||
use color_eyre::eyre::Result;
|
||||
|
||||
use futures::future::try_join_all;
|
||||
use sdk::decompress;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::cmd::util::collect_bundle_paths;
|
||||
|
||||
pub(crate) fn command_definition() -> Command {
|
||||
Command::new("decompress")
|
||||
.about(
|
||||
|
@ -23,11 +19,9 @@ pub(crate) fn command_definition() -> Command {
|
|||
.arg(
|
||||
Arg::new("bundle")
|
||||
.required(true)
|
||||
.action(ArgAction::Append)
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.help(
|
||||
"Path to the bundle(s) to read. If this points to a directory instead \
|
||||
of a file, all files in that directory will be checked.",
|
||||
"Path to the bundle to read. Unlike other operations, this only accepts only a single bundle.",
|
||||
),
|
||||
)
|
||||
.arg(
|
||||
|
@ -36,7 +30,7 @@ pub(crate) fn command_definition() -> Command {
|
|||
.value_parser(value_parser!(PathBuf))
|
||||
.help(
|
||||
"The destination to write to. If this points to a directory, the \
|
||||
decompressed bundles will be written there, with their original name. \
|
||||
name of the input bundle will be used. \
|
||||
Parent directories must exist.",
|
||||
),
|
||||
)
|
||||
|
@ -61,10 +55,9 @@ where
|
|||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) -> Result<()> {
|
||||
let bundles = matches
|
||||
.get_many::<PathBuf>("bundle")
|
||||
.unwrap_or_default()
|
||||
.cloned();
|
||||
let bundle = matches
|
||||
.get_one::<PathBuf>("bundle")
|
||||
.expect("required argument 'bundle' is missing");
|
||||
let out_path = matches
|
||||
.get_one::<PathBuf>("destination")
|
||||
.expect("required parameter 'destination' is missing");
|
||||
|
@ -74,46 +67,11 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
.map(|meta| meta.is_dir())
|
||||
.unwrap_or(false);
|
||||
|
||||
let paths = collect_bundle_paths(bundles).await;
|
||||
let name = bundle.file_name();
|
||||
|
||||
if paths.is_empty() {
|
||||
return Err(eyre::eyre!("No bundle provided"));
|
||||
}
|
||||
|
||||
if paths.len() == 1 {
|
||||
let bundle = &paths[0];
|
||||
let name = bundle.file_name();
|
||||
|
||||
if is_dir && name.is_some() {
|
||||
decompress_bundle(ctx, bundle, out_path.join(name.unwrap())).await?;
|
||||
} else {
|
||||
decompress_bundle(ctx, bundle, out_path).await?;
|
||||
}
|
||||
if is_dir && name.is_some() {
|
||||
decompress_bundle(ctx, bundle, out_path.join(name.unwrap())).await
|
||||
} else {
|
||||
if !is_dir {
|
||||
return Err(eyre::eyre!(
|
||||
"Multiple bundles provided, but destination is not a directory."
|
||||
))
|
||||
.with_section(|| out_path.display().to_string().header("Path:"))?;
|
||||
}
|
||||
|
||||
let _ = try_join_all(paths.into_iter().map(|p| async {
|
||||
let ctx = ctx.clone();
|
||||
async move {
|
||||
let name = if let Some(name) = p.file_name() {
|
||||
name
|
||||
} else {
|
||||
return Err(eyre::eyre!("Invalid bundle path. No file name."))
|
||||
.with_section(|| p.display().to_string().header("Path:"))?;
|
||||
};
|
||||
|
||||
let dest = out_path.join(name);
|
||||
decompress_bundle(ctx, p, dest).await
|
||||
}
|
||||
.await
|
||||
}))
|
||||
.await?;
|
||||
decompress_bundle(ctx, bundle, out_path).await
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,15 +1,16 @@
|
|||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use color_eyre::eyre::{self, Context, Result};
|
||||
use color_eyre::{Help, Report, SectionExt};
|
||||
use futures::future::try_join_all;
|
||||
use futures::{StreamExt, TryFutureExt};
|
||||
use glob::Pattern;
|
||||
use sdk::Bundle;
|
||||
use sdk::{Bundle, BundleFile};
|
||||
use tokio::{fs, sync::RwLock};
|
||||
|
||||
use crate::cmd::util::collect_bundle_paths;
|
||||
use crate::cmd::util::resolve_bundle_paths;
|
||||
|
||||
fn parse_glob_pattern(s: &str) -> Result<Pattern, String> {
|
||||
match Pattern::new(s) {
|
||||
|
@ -148,29 +149,94 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
.unwrap_or_default()
|
||||
.cloned();
|
||||
|
||||
let paths = collect_bundle_paths(bundles).await;
|
||||
let should_decompile = matches.get_flag("decompile");
|
||||
let should_flatten = matches.get_flag("flatten");
|
||||
let is_dry_run = matches.get_flag("dry-run");
|
||||
|
||||
if paths.is_empty() {
|
||||
return Err(eyre::eyre!("No bundle provided"));
|
||||
let dest = matches
|
||||
.get_one::<PathBuf>("destination")
|
||||
.expect("required argument 'destination' missing");
|
||||
|
||||
{
|
||||
let res = match fs::metadata(&dest).await {
|
||||
Ok(meta) if !meta.is_dir() => Err(eyre::eyre!("Destination path is not a directory")),
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
Err(eyre::eyre!("Destination path does not exist"))
|
||||
.with_suggestion(|| format!("Create the directory '{}'", dest.display()))
|
||||
}
|
||||
Err(err) => Err(Report::new(err)),
|
||||
_ => Ok(()),
|
||||
};
|
||||
|
||||
if res.is_err() {
|
||||
return res.wrap_err(format!(
|
||||
"Failed to open destination directory: {}",
|
||||
dest.display()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let bundles = try_join_all(paths.into_iter().map(|p| async {
|
||||
let ctx = ctx.clone();
|
||||
let path_display = p.display().to_string();
|
||||
async move { Bundle::open(ctx, &p).await }
|
||||
let mut paths = Box::pin(resolve_bundle_paths(bundles));
|
||||
|
||||
// TODO: Find a way to do this with `for_each_concurrent`. The first attempt
|
||||
// just kept head-butting into a "use of moved value" wall.
|
||||
while let Some(path) = paths.next().await {
|
||||
let res = Bundle::open(ctx.clone(), &path)
|
||||
.and_then(|bundle| {
|
||||
extract_bundle(
|
||||
ctx.clone(),
|
||||
bundle,
|
||||
&dest,
|
||||
ExtractOptions {
|
||||
includes: &includes,
|
||||
excludes: &excludes,
|
||||
decompile: should_decompile,
|
||||
flatten: should_flatten,
|
||||
dry_run: is_dry_run,
|
||||
},
|
||||
)
|
||||
})
|
||||
.await
|
||||
.with_section(|| path_display.header("Bundle Path:"))
|
||||
}))
|
||||
.await?;
|
||||
.wrap_err_with(|| format!("failed to extract from bundle '{}'", path.display()));
|
||||
|
||||
let files: Vec<_> = {
|
||||
let iter = bundles.iter().flat_map(|bundle| bundle.files());
|
||||
if let Err(err) = res {
|
||||
tracing::error!("{:#}", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Short-curcit the iteration if there is nothing to filter by
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct ExtractOptions<'a> {
|
||||
decompile: bool,
|
||||
flatten: bool,
|
||||
dry_run: bool,
|
||||
includes: &'a dyn AsRef<[&'a Pattern]>,
|
||||
excludes: &'a dyn AsRef<[&'a Pattern]>,
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
skip(ctx, bundle, options),
|
||||
fields(decompile = options.decompile, flatten = options.flatten, dry_run = options.dry_run)
|
||||
)]
|
||||
async fn extract_bundle<P>(
|
||||
ctx: Arc<RwLock<sdk::Context>>,
|
||||
bundle: Bundle,
|
||||
dest: P,
|
||||
options: ExtractOptions<'_>,
|
||||
) -> Result<()>
|
||||
where
|
||||
P: AsRef<Path> + std::fmt::Debug,
|
||||
{
|
||||
let includes = options.includes.as_ref();
|
||||
let excludes = options.excludes.as_ref();
|
||||
let dest = dest.as_ref();
|
||||
|
||||
let files: Box<dyn Iterator<Item = &BundleFile>> = {
|
||||
if includes.is_empty() && excludes.is_empty() {
|
||||
iter.collect()
|
||||
Box::new(bundle.files().iter())
|
||||
} else {
|
||||
iter.filter(|file| {
|
||||
let iter = bundle.files().iter().filter(|file| {
|
||||
let name = file.name(false);
|
||||
let decompiled_name = file.name(true);
|
||||
|
||||
|
@ -186,61 +252,31 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
.any(|glob| glob.matches(&name) || glob.matches(&decompiled_name));
|
||||
|
||||
is_included && !is_excluded
|
||||
})
|
||||
.collect()
|
||||
});
|
||||
Box::new(iter)
|
||||
}
|
||||
};
|
||||
|
||||
if tracing::enabled!(tracing::Level::DEBUG) {
|
||||
let includes: Vec<_> = includes.iter().map(|pattern| pattern.as_str()).collect();
|
||||
let excludes: Vec<_> = excludes.iter().map(|pattern| pattern.as_str()).collect();
|
||||
let bundle_files: Vec<_> = bundles
|
||||
.iter()
|
||||
.flat_map(|bundle| bundle.files())
|
||||
.map(|file| file.name(false))
|
||||
.collect();
|
||||
let filtered: Vec<_> = files.iter().map(|file| file.name(false)).collect();
|
||||
tracing::debug!(
|
||||
?includes,
|
||||
?excludes,
|
||||
files = ?bundle_files,
|
||||
?filtered,
|
||||
"Built file list to extract"
|
||||
);
|
||||
}
|
||||
// TODO: Disabled for now, as the `files` iterator would be consumed.
|
||||
// if tracing::enabled!(tracing::Level::DEBUG) {
|
||||
// let includes: Vec<_> = includes.iter().map(|pattern| pattern.as_str()).collect();
|
||||
// let excludes: Vec<_> = excludes.iter().map(|pattern| pattern.as_str()).collect();
|
||||
// let bundle_files: Vec<_> = bundle.files().iter().map(|file| file.name(false)).collect();
|
||||
// let filtered: Vec<_> = files.map(|file| file.name(false)).collect();
|
||||
// tracing::debug!(
|
||||
// ?includes,
|
||||
// ?excludes,
|
||||
// files = ?bundle_files,
|
||||
// ?filtered,
|
||||
// "Built file list to extract"
|
||||
// );
|
||||
// }
|
||||
|
||||
let should_decompile = matches.get_flag("decompile");
|
||||
let should_flatten = matches.get_flag("flatten");
|
||||
let is_dry_run = matches.get_flag("dry-run");
|
||||
|
||||
let dest = matches
|
||||
.get_one::<PathBuf>("destination")
|
||||
.expect("required argument 'destination' missing");
|
||||
|
||||
{
|
||||
let res = match fs::metadata(&dest).await {
|
||||
Ok(meta) if !meta.is_dir() => Err(eyre::eyre!("Destination path is not a directory")),
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
Err(eyre::eyre!("Destination path does not exist"))
|
||||
.with_suggestion(|| "Create the directory")
|
||||
}
|
||||
Err(err) => Err(Report::new(err)),
|
||||
_ => Ok(()),
|
||||
};
|
||||
|
||||
if res.is_err() {
|
||||
return res.wrap_err(format!(
|
||||
"Failed to open destination directory: {}",
|
||||
dest.display()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let mut tasks = Vec::with_capacity(files.len());
|
||||
let mut tasks = Vec::with_capacity(bundle.files().len());
|
||||
|
||||
for file in files {
|
||||
let name = file.name(should_decompile);
|
||||
let data = if should_decompile {
|
||||
let name = file.name(options.decompile);
|
||||
let data = if options.decompile {
|
||||
file.decompiled(ctx.clone()).await
|
||||
} else {
|
||||
file.raw()
|
||||
|
@ -250,10 +286,7 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
Ok(mut files) => {
|
||||
match files.len() {
|
||||
0 => {
|
||||
println!(
|
||||
"Decompilation did not produce any data for file {}",
|
||||
file.name(should_decompile)
|
||||
);
|
||||
tracing::warn!("Decompilation did not produce any data for file {}", name);
|
||||
}
|
||||
// For a single file we want to use the bundle file's name.
|
||||
1 => {
|
||||
|
@ -261,16 +294,16 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
let file = files.pop().unwrap();
|
||||
|
||||
let name = file.name().unwrap_or(&name);
|
||||
let name = if should_flatten {
|
||||
let name = if options.flatten {
|
||||
flatten_name(name)
|
||||
} else {
|
||||
name.clone()
|
||||
};
|
||||
|
||||
let mut path = dest.clone();
|
||||
let mut path = dest.to_path_buf();
|
||||
path.push(name);
|
||||
|
||||
if is_dry_run {
|
||||
if options.dry_run {
|
||||
tracing::info!(path = %path.display(), "Writing file");
|
||||
} else {
|
||||
tracing::debug!(path = %path.display(), "Writing file");
|
||||
|
@ -286,12 +319,12 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
// by index.
|
||||
_ => {
|
||||
for (i, file) in files.into_iter().enumerate() {
|
||||
let mut path = dest.clone();
|
||||
let mut path = dest.to_path_buf();
|
||||
|
||||
let name = file
|
||||
.name()
|
||||
.map(|name| {
|
||||
if should_flatten {
|
||||
if options.flatten {
|
||||
flatten_name(name)
|
||||
} else {
|
||||
name.clone()
|
||||
|
@ -301,7 +334,7 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
|
||||
path.push(name);
|
||||
|
||||
if is_dry_run {
|
||||
if options.dry_run {
|
||||
tracing::info!(path = %path.display(), "Writing file");
|
||||
} else {
|
||||
tracing::debug!(path = %path.display(), "Writing file");
|
||||
|
@ -343,6 +376,8 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
};
|
||||
}
|
||||
|
||||
// TODO: Check if this might need buffered execution to avoid
|
||||
// running out of file handles.
|
||||
let results = try_join_all(tasks).await?;
|
||||
|
||||
for res in results {
|
||||
|
|
|
@ -4,11 +4,11 @@ use std::sync::Arc;
|
|||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use color_eyre::eyre::{self, Result};
|
||||
use color_eyre::{Help, SectionExt};
|
||||
use futures::future::try_join_all;
|
||||
use futures::StreamExt;
|
||||
use sdk::Bundle;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::cmd::util::collect_bundle_paths;
|
||||
use crate::cmd::util::resolve_bundle_paths;
|
||||
|
||||
pub(crate) fn command_definition() -> Command {
|
||||
Command::new("list")
|
||||
|
@ -31,39 +31,23 @@ pub(crate) fn command_definition() -> Command {
|
|||
)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) -> Result<()> {
|
||||
let bundles = matches
|
||||
.get_many::<PathBuf>("bundle")
|
||||
.unwrap_or_default()
|
||||
.cloned();
|
||||
#[derive(Copy, Clone)]
|
||||
enum OutputFormat {
|
||||
Text,
|
||||
}
|
||||
|
||||
let paths = collect_bundle_paths(bundles).await;
|
||||
fn print_bundle_list(bundle: Bundle, fmt: OutputFormat) {
|
||||
match fmt {
|
||||
OutputFormat::Text => {
|
||||
println!("Bundle: {}", bundle.name());
|
||||
|
||||
if paths.is_empty() {
|
||||
return Err(eyre::eyre!("No bundle provided"));
|
||||
}
|
||||
|
||||
let bundles = try_join_all(paths.into_iter().map(|p| async {
|
||||
let ctx = ctx.clone();
|
||||
let path_display = p.display().to_string();
|
||||
async move { Bundle::open(ctx, &p).await }
|
||||
.await
|
||||
.with_section(|| path_display.header("Bundle Path:"))
|
||||
}))
|
||||
.await?;
|
||||
|
||||
if matches.get_flag("json") {
|
||||
unimplemented!("JSON output is not implemented yet");
|
||||
} else {
|
||||
for b in bundles.iter() {
|
||||
println!("Bundle: {}", b.name());
|
||||
|
||||
for f in b.files().iter() {
|
||||
for f in bundle.files().iter() {
|
||||
if f.variants().len() != 1 {
|
||||
return Err(eyre::eyre!("Expected exactly one version for this file."))
|
||||
let err = eyre::eyre!("Expected exactly one version for this file.")
|
||||
.with_section(|| f.variants().len().to_string().header("Bundle:"))
|
||||
.with_section(|| b.name().clone().header("Bundle:"));
|
||||
.with_section(|| bundle.name().clone().header("Bundle:"));
|
||||
|
||||
tracing::error!("{:#}", err);
|
||||
}
|
||||
|
||||
let v = &f.variants()[0];
|
||||
|
@ -75,7 +59,40 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) -> Result<()> {
|
||||
let bundles = matches
|
||||
.get_many::<PathBuf>("bundle")
|
||||
.unwrap_or_default()
|
||||
.cloned();
|
||||
|
||||
let paths = resolve_bundle_paths(bundles);
|
||||
|
||||
let fmt = if matches.get_flag("json") {
|
||||
unimplemented!("JSON output is not implemented yet");
|
||||
} else {
|
||||
OutputFormat::Text
|
||||
};
|
||||
|
||||
paths
|
||||
.for_each_concurrent(10, |p| async {
|
||||
let ctx = ctx.clone();
|
||||
async move {
|
||||
match Bundle::open(ctx, &p).await {
|
||||
Ok(bundle) => {
|
||||
print_bundle_list(bundle, fmt);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to open bundle '{}': {:#}", p.display(), err);
|
||||
}
|
||||
}
|
||||
}
|
||||
.await
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
use std::ffi::OsStr;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::{Stream, StreamExt};
|
||||
use tokio::fs;
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
#[tracing::instrument]
|
||||
pub async fn resolve_bundle_path<P>(path: P) -> Vec<PathBuf>
|
||||
pub async fn foo<P>(path: P) -> Vec<PathBuf>
|
||||
where
|
||||
P: AsRef<Path> + std::fmt::Debug,
|
||||
{
|
||||
|
@ -28,7 +29,7 @@ where
|
|||
|
||||
let stream = ReadDirStream::new(dir);
|
||||
let paths: Vec<PathBuf> = stream
|
||||
.filter_map(|entry| {
|
||||
.filter_map(|entry| async move {
|
||||
if let Ok(path) = entry.map(|e| e.path()) {
|
||||
match path.file_name().and_then(OsStr::to_str) {
|
||||
Some(name) if name.len() == 16 => {
|
||||
|
@ -52,13 +53,52 @@ where
|
|||
paths
|
||||
}
|
||||
|
||||
pub async fn resolve_bundle_path<P>(path: P) -> Pin<Box<dyn Stream<Item = PathBuf>>>
|
||||
where
|
||||
P: AsRef<Path> + std::fmt::Debug,
|
||||
{
|
||||
let dir = match fs::read_dir(path.as_ref()).await {
|
||||
Ok(dir) => {
|
||||
tracing::trace!(is_dir = true);
|
||||
dir
|
||||
}
|
||||
Err(err) => {
|
||||
if err.kind() != io::ErrorKind::NotADirectory {
|
||||
tracing::error!("Failed to read path: {:?}", err);
|
||||
}
|
||||
let paths = vec![PathBuf::from(path.as_ref())];
|
||||
tracing::debug!(is_dir = false, resolved_paths = ?paths);
|
||||
return Box::pin(futures::stream::iter(paths));
|
||||
}
|
||||
};
|
||||
|
||||
let stream = ReadDirStream::new(dir);
|
||||
let stream = stream.filter_map(|entry| async move {
|
||||
if let Ok(path) = entry.map(|e| e.path()) {
|
||||
match path.file_name().and_then(OsStr::to_str) {
|
||||
Some(name) if name.len() == 16 => {
|
||||
if name.chars().all(|c| c.is_ascii_hexdigit()) {
|
||||
Some(path)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
Box::pin(stream)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn collect_bundle_paths<I>(paths: I) -> Vec<PathBuf>
|
||||
where
|
||||
I: Iterator<Item = PathBuf> + std::fmt::Debug,
|
||||
{
|
||||
let tasks = paths.map(|p| async move {
|
||||
match tokio::spawn(async move { resolve_bundle_path(&p).await }).await {
|
||||
match tokio::spawn(async move { foo(&p).await }).await {
|
||||
Ok(paths) => paths,
|
||||
Err(err) => {
|
||||
tracing::error!(%err, "failed to spawn task to resolve bundle paths");
|
||||
|
@ -71,6 +111,17 @@ where
|
|||
results.into_iter().flatten().collect()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn resolve_bundle_paths<I>(paths: I) -> impl Stream<Item = PathBuf>
|
||||
where
|
||||
I: Iterator<Item = PathBuf> + std::fmt::Debug,
|
||||
{
|
||||
let limit = 10;
|
||||
futures::stream::iter(paths)
|
||||
.then(resolve_bundle_path)
|
||||
.flat_map_unordered(limit, |p| p)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::path::PathBuf;
|
||||
|
@ -78,12 +129,12 @@ mod tests {
|
|||
use tempfile::tempdir;
|
||||
use tokio::process::Command;
|
||||
|
||||
use super::resolve_bundle_path;
|
||||
use super::foo;
|
||||
|
||||
#[tokio::test]
|
||||
async fn resolve_single_file() {
|
||||
let path = PathBuf::from("foo");
|
||||
let paths = resolve_bundle_path(&path).await;
|
||||
let paths = foo(&path).await;
|
||||
assert_eq!(paths.len(), 1);
|
||||
assert_eq!(paths[0], path);
|
||||
}
|
||||
|
@ -91,7 +142,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn resolve_empty_directory() {
|
||||
let dir = tempdir().expect("failed to create temporary directory");
|
||||
let paths = resolve_bundle_path(dir).await;
|
||||
let paths = foo(dir).await;
|
||||
assert!(paths.is_empty());
|
||||
}
|
||||
|
||||
|
@ -119,7 +170,7 @@ mod tests {
|
|||
.await
|
||||
.expect("failed to create temporary files");
|
||||
|
||||
let paths = resolve_bundle_path(dir).await;
|
||||
let paths = foo(dir).await;
|
||||
|
||||
assert_eq!(bundle_names.len(), paths.len());
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue