fix: Fix running out of file handles

Fixes #1.
This commit is contained in:
Lucas Schwiderski 2022-12-10 12:37:35 +01:00
parent 4ac9c88dfc
commit a76ea165ac
Signed by: lucas
GPG key ID: AA12679AAA6DF4D8
4 changed files with 232 additions and 171 deletions

View file

@ -1,18 +1,14 @@
use std::path::{Path, PathBuf};
use std::sync::Arc;
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use color_eyre::eyre::{self, Result};
use color_eyre::{Help, SectionExt};
use clap::{value_parser, Arg, ArgMatches, Command};
use color_eyre::eyre::Result;
use futures::future::try_join_all;
use sdk::decompress;
use tokio::fs::{self, File};
use tokio::io::BufReader;
use tokio::sync::RwLock;
use crate::cmd::util::collect_bundle_paths;
pub(crate) fn command_definition() -> Command {
Command::new("decompress")
.about(
@ -23,11 +19,9 @@ pub(crate) fn command_definition() -> Command {
.arg(
Arg::new("bundle")
.required(true)
.action(ArgAction::Append)
.value_parser(value_parser!(PathBuf))
.help(
"Path to the bundle(s) to read. If this points to a directory instead \
of a file, all files in that directory will be checked.",
"Path to the bundle to read. Unlike other operations, this only accepts only a single bundle.",
),
)
.arg(
@ -36,7 +30,7 @@ pub(crate) fn command_definition() -> Command {
.value_parser(value_parser!(PathBuf))
.help(
"The destination to write to. If this points to a directory, the \
decompressed bundles will be written there, with their original name. \
name of the input bundle will be used. \
Parent directories must exist.",
),
)
@ -61,10 +55,9 @@ where
#[tracing::instrument(skip_all)]
pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) -> Result<()> {
let bundles = matches
.get_many::<PathBuf>("bundle")
.unwrap_or_default()
.cloned();
let bundle = matches
.get_one::<PathBuf>("bundle")
.expect("required argument 'bundle' is missing");
let out_path = matches
.get_one::<PathBuf>("destination")
.expect("required parameter 'destination' is missing");
@ -74,46 +67,11 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
.map(|meta| meta.is_dir())
.unwrap_or(false);
let paths = collect_bundle_paths(bundles).await;
let name = bundle.file_name();
if paths.is_empty() {
return Err(eyre::eyre!("No bundle provided"));
}
if paths.len() == 1 {
let bundle = &paths[0];
let name = bundle.file_name();
if is_dir && name.is_some() {
decompress_bundle(ctx, bundle, out_path.join(name.unwrap())).await?;
} else {
decompress_bundle(ctx, bundle, out_path).await?;
}
if is_dir && name.is_some() {
decompress_bundle(ctx, bundle, out_path.join(name.unwrap())).await
} else {
if !is_dir {
return Err(eyre::eyre!(
"Multiple bundles provided, but destination is not a directory."
))
.with_section(|| out_path.display().to_string().header("Path:"))?;
}
let _ = try_join_all(paths.into_iter().map(|p| async {
let ctx = ctx.clone();
async move {
let name = if let Some(name) = p.file_name() {
name
} else {
return Err(eyre::eyre!("Invalid bundle path. No file name."))
.with_section(|| p.display().to_string().header("Path:"))?;
};
let dest = out_path.join(name);
decompress_bundle(ctx, p, dest).await
}
.await
}))
.await?;
decompress_bundle(ctx, bundle, out_path).await
}
Ok(())
}

View file

@ -1,15 +1,16 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use color_eyre::eyre::{self, Context, Result};
use color_eyre::{Help, Report, SectionExt};
use futures::future::try_join_all;
use futures::{StreamExt, TryFutureExt};
use glob::Pattern;
use sdk::Bundle;
use sdk::{Bundle, BundleFile};
use tokio::{fs, sync::RwLock};
use crate::cmd::util::collect_bundle_paths;
use crate::cmd::util::resolve_bundle_paths;
fn parse_glob_pattern(s: &str) -> Result<Pattern, String> {
match Pattern::new(s) {
@ -148,29 +149,94 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
.unwrap_or_default()
.cloned();
let paths = collect_bundle_paths(bundles).await;
let should_decompile = matches.get_flag("decompile");
let should_flatten = matches.get_flag("flatten");
let is_dry_run = matches.get_flag("dry-run");
if paths.is_empty() {
return Err(eyre::eyre!("No bundle provided"));
let dest = matches
.get_one::<PathBuf>("destination")
.expect("required argument 'destination' missing");
{
let res = match fs::metadata(&dest).await {
Ok(meta) if !meta.is_dir() => Err(eyre::eyre!("Destination path is not a directory")),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
Err(eyre::eyre!("Destination path does not exist"))
.with_suggestion(|| format!("Create the directory '{}'", dest.display()))
}
Err(err) => Err(Report::new(err)),
_ => Ok(()),
};
if res.is_err() {
return res.wrap_err(format!(
"Failed to open destination directory: {}",
dest.display()
));
}
}
let bundles = try_join_all(paths.into_iter().map(|p| async {
let ctx = ctx.clone();
let path_display = p.display().to_string();
async move { Bundle::open(ctx, &p).await }
let mut paths = Box::pin(resolve_bundle_paths(bundles));
// TODO: Find a way to do this with `for_each_concurrent`. The first attempt
// just kept head-butting into a "use of moved value" wall.
while let Some(path) = paths.next().await {
let res = Bundle::open(ctx.clone(), &path)
.and_then(|bundle| {
extract_bundle(
ctx.clone(),
bundle,
&dest,
ExtractOptions {
includes: &includes,
excludes: &excludes,
decompile: should_decompile,
flatten: should_flatten,
dry_run: is_dry_run,
},
)
})
.await
.with_section(|| path_display.header("Bundle Path:"))
}))
.await?;
.wrap_err_with(|| format!("failed to extract from bundle '{}'", path.display()));
let files: Vec<_> = {
let iter = bundles.iter().flat_map(|bundle| bundle.files());
if let Err(err) = res {
tracing::error!("{:#}", err)
}
}
// Short-curcit the iteration if there is nothing to filter by
Ok(())
}
struct ExtractOptions<'a> {
decompile: bool,
flatten: bool,
dry_run: bool,
includes: &'a dyn AsRef<[&'a Pattern]>,
excludes: &'a dyn AsRef<[&'a Pattern]>,
}
#[tracing::instrument(
skip(ctx, bundle, options),
fields(decompile = options.decompile, flatten = options.flatten, dry_run = options.dry_run)
)]
async fn extract_bundle<P>(
ctx: Arc<RwLock<sdk::Context>>,
bundle: Bundle,
dest: P,
options: ExtractOptions<'_>,
) -> Result<()>
where
P: AsRef<Path> + std::fmt::Debug,
{
let includes = options.includes.as_ref();
let excludes = options.excludes.as_ref();
let dest = dest.as_ref();
let files: Box<dyn Iterator<Item = &BundleFile>> = {
if includes.is_empty() && excludes.is_empty() {
iter.collect()
Box::new(bundle.files().iter())
} else {
iter.filter(|file| {
let iter = bundle.files().iter().filter(|file| {
let name = file.name(false);
let decompiled_name = file.name(true);
@ -186,61 +252,31 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
.any(|glob| glob.matches(&name) || glob.matches(&decompiled_name));
is_included && !is_excluded
})
.collect()
});
Box::new(iter)
}
};
if tracing::enabled!(tracing::Level::DEBUG) {
let includes: Vec<_> = includes.iter().map(|pattern| pattern.as_str()).collect();
let excludes: Vec<_> = excludes.iter().map(|pattern| pattern.as_str()).collect();
let bundle_files: Vec<_> = bundles
.iter()
.flat_map(|bundle| bundle.files())
.map(|file| file.name(false))
.collect();
let filtered: Vec<_> = files.iter().map(|file| file.name(false)).collect();
tracing::debug!(
?includes,
?excludes,
files = ?bundle_files,
?filtered,
"Built file list to extract"
);
}
// TODO: Disabled for now, as the `files` iterator would be consumed.
// if tracing::enabled!(tracing::Level::DEBUG) {
// let includes: Vec<_> = includes.iter().map(|pattern| pattern.as_str()).collect();
// let excludes: Vec<_> = excludes.iter().map(|pattern| pattern.as_str()).collect();
// let bundle_files: Vec<_> = bundle.files().iter().map(|file| file.name(false)).collect();
// let filtered: Vec<_> = files.map(|file| file.name(false)).collect();
// tracing::debug!(
// ?includes,
// ?excludes,
// files = ?bundle_files,
// ?filtered,
// "Built file list to extract"
// );
// }
let should_decompile = matches.get_flag("decompile");
let should_flatten = matches.get_flag("flatten");
let is_dry_run = matches.get_flag("dry-run");
let dest = matches
.get_one::<PathBuf>("destination")
.expect("required argument 'destination' missing");
{
let res = match fs::metadata(&dest).await {
Ok(meta) if !meta.is_dir() => Err(eyre::eyre!("Destination path is not a directory")),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
Err(eyre::eyre!("Destination path does not exist"))
.with_suggestion(|| "Create the directory")
}
Err(err) => Err(Report::new(err)),
_ => Ok(()),
};
if res.is_err() {
return res.wrap_err(format!(
"Failed to open destination directory: {}",
dest.display()
));
}
}
let mut tasks = Vec::with_capacity(files.len());
let mut tasks = Vec::with_capacity(bundle.files().len());
for file in files {
let name = file.name(should_decompile);
let data = if should_decompile {
let name = file.name(options.decompile);
let data = if options.decompile {
file.decompiled(ctx.clone()).await
} else {
file.raw()
@ -250,10 +286,7 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
Ok(mut files) => {
match files.len() {
0 => {
println!(
"Decompilation did not produce any data for file {}",
file.name(should_decompile)
);
tracing::warn!("Decompilation did not produce any data for file {}", name);
}
// For a single file we want to use the bundle file's name.
1 => {
@ -261,16 +294,16 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
let file = files.pop().unwrap();
let name = file.name().unwrap_or(&name);
let name = if should_flatten {
let name = if options.flatten {
flatten_name(name)
} else {
name.clone()
};
let mut path = dest.clone();
let mut path = dest.to_path_buf();
path.push(name);
if is_dry_run {
if options.dry_run {
tracing::info!(path = %path.display(), "Writing file");
} else {
tracing::debug!(path = %path.display(), "Writing file");
@ -286,12 +319,12 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
// by index.
_ => {
for (i, file) in files.into_iter().enumerate() {
let mut path = dest.clone();
let mut path = dest.to_path_buf();
let name = file
.name()
.map(|name| {
if should_flatten {
if options.flatten {
flatten_name(name)
} else {
name.clone()
@ -301,7 +334,7 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
path.push(name);
if is_dry_run {
if options.dry_run {
tracing::info!(path = %path.display(), "Writing file");
} else {
tracing::debug!(path = %path.display(), "Writing file");
@ -343,6 +376,8 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
};
}
// TODO: Check if this might need buffered execution to avoid
// running out of file handles.
let results = try_join_all(tasks).await?;
for res in results {

View file

@ -4,11 +4,11 @@ use std::sync::Arc;
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use color_eyre::eyre::{self, Result};
use color_eyre::{Help, SectionExt};
use futures::future::try_join_all;
use futures::StreamExt;
use sdk::Bundle;
use tokio::sync::RwLock;
use crate::cmd::util::collect_bundle_paths;
use crate::cmd::util::resolve_bundle_paths;
pub(crate) fn command_definition() -> Command {
Command::new("list")
@ -31,39 +31,23 @@ pub(crate) fn command_definition() -> Command {
)
}
#[tracing::instrument(skip_all)]
pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) -> Result<()> {
let bundles = matches
.get_many::<PathBuf>("bundle")
.unwrap_or_default()
.cloned();
#[derive(Copy, Clone)]
enum OutputFormat {
Text,
}
let paths = collect_bundle_paths(bundles).await;
fn print_bundle_list(bundle: Bundle, fmt: OutputFormat) {
match fmt {
OutputFormat::Text => {
println!("Bundle: {}", bundle.name());
if paths.is_empty() {
return Err(eyre::eyre!("No bundle provided"));
}
let bundles = try_join_all(paths.into_iter().map(|p| async {
let ctx = ctx.clone();
let path_display = p.display().to_string();
async move { Bundle::open(ctx, &p).await }
.await
.with_section(|| path_display.header("Bundle Path:"))
}))
.await?;
if matches.get_flag("json") {
unimplemented!("JSON output is not implemented yet");
} else {
for b in bundles.iter() {
println!("Bundle: {}", b.name());
for f in b.files().iter() {
for f in bundle.files().iter() {
if f.variants().len() != 1 {
return Err(eyre::eyre!("Expected exactly one version for this file."))
let err = eyre::eyre!("Expected exactly one version for this file.")
.with_section(|| f.variants().len().to_string().header("Bundle:"))
.with_section(|| b.name().clone().header("Bundle:"));
.with_section(|| bundle.name().clone().header("Bundle:"));
tracing::error!("{:#}", err);
}
let v = &f.variants()[0];
@ -75,7 +59,40 @@ pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) ->
);
}
}
Ok(())
}
}
#[tracing::instrument(skip_all)]
pub(crate) async fn run(ctx: Arc<RwLock<sdk::Context>>, matches: &ArgMatches) -> Result<()> {
let bundles = matches
.get_many::<PathBuf>("bundle")
.unwrap_or_default()
.cloned();
let paths = resolve_bundle_paths(bundles);
let fmt = if matches.get_flag("json") {
unimplemented!("JSON output is not implemented yet");
} else {
OutputFormat::Text
};
paths
.for_each_concurrent(10, |p| async {
let ctx = ctx.clone();
async move {
match Bundle::open(ctx, &p).await {
Ok(bundle) => {
print_bundle_list(bundle, fmt);
}
Err(err) => {
tracing::error!("Failed to open bundle '{}': {:#}", p.display(), err);
}
}
}
.await
})
.await;
Ok(())
}

View file

@ -1,13 +1,14 @@
use std::ffi::OsStr;
use std::io;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use futures::{Stream, StreamExt};
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;
use tokio_stream::StreamExt;
#[tracing::instrument]
pub async fn resolve_bundle_path<P>(path: P) -> Vec<PathBuf>
pub async fn foo<P>(path: P) -> Vec<PathBuf>
where
P: AsRef<Path> + std::fmt::Debug,
{
@ -28,7 +29,7 @@ where
let stream = ReadDirStream::new(dir);
let paths: Vec<PathBuf> = stream
.filter_map(|entry| {
.filter_map(|entry| async move {
if let Ok(path) = entry.map(|e| e.path()) {
match path.file_name().and_then(OsStr::to_str) {
Some(name) if name.len() == 16 => {
@ -52,13 +53,52 @@ where
paths
}
pub async fn resolve_bundle_path<P>(path: P) -> Pin<Box<dyn Stream<Item = PathBuf>>>
where
P: AsRef<Path> + std::fmt::Debug,
{
let dir = match fs::read_dir(path.as_ref()).await {
Ok(dir) => {
tracing::trace!(is_dir = true);
dir
}
Err(err) => {
if err.kind() != io::ErrorKind::NotADirectory {
tracing::error!("Failed to read path: {:?}", err);
}
let paths = vec![PathBuf::from(path.as_ref())];
tracing::debug!(is_dir = false, resolved_paths = ?paths);
return Box::pin(futures::stream::iter(paths));
}
};
let stream = ReadDirStream::new(dir);
let stream = stream.filter_map(|entry| async move {
if let Ok(path) = entry.map(|e| e.path()) {
match path.file_name().and_then(OsStr::to_str) {
Some(name) if name.len() == 16 => {
if name.chars().all(|c| c.is_ascii_hexdigit()) {
Some(path)
} else {
None
}
}
_ => None,
}
} else {
None
}
});
Box::pin(stream)
}
#[tracing::instrument(skip_all)]
pub async fn collect_bundle_paths<I>(paths: I) -> Vec<PathBuf>
where
I: Iterator<Item = PathBuf> + std::fmt::Debug,
{
let tasks = paths.map(|p| async move {
match tokio::spawn(async move { resolve_bundle_path(&p).await }).await {
match tokio::spawn(async move { foo(&p).await }).await {
Ok(paths) => paths,
Err(err) => {
tracing::error!(%err, "failed to spawn task to resolve bundle paths");
@ -71,6 +111,17 @@ where
results.into_iter().flatten().collect()
}
#[tracing::instrument(skip_all)]
pub fn resolve_bundle_paths<I>(paths: I) -> impl Stream<Item = PathBuf>
where
I: Iterator<Item = PathBuf> + std::fmt::Debug,
{
let limit = 10;
futures::stream::iter(paths)
.then(resolve_bundle_path)
.flat_map_unordered(limit, |p| p)
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
@ -78,12 +129,12 @@ mod tests {
use tempfile::tempdir;
use tokio::process::Command;
use super::resolve_bundle_path;
use super::foo;
#[tokio::test]
async fn resolve_single_file() {
let path = PathBuf::from("foo");
let paths = resolve_bundle_path(&path).await;
let paths = foo(&path).await;
assert_eq!(paths.len(), 1);
assert_eq!(paths[0], path);
}
@ -91,7 +142,7 @@ mod tests {
#[tokio::test]
async fn resolve_empty_directory() {
let dir = tempdir().expect("failed to create temporary directory");
let paths = resolve_bundle_path(dir).await;
let paths = foo(dir).await;
assert!(paths.is_empty());
}
@ -119,7 +170,7 @@ mod tests {
.await
.expect("failed to create temporary files");
let paths = resolve_bundle_path(dir).await;
let paths = foo(dir).await;
assert_eq!(bundle_names.len(), paths.len());