diff --git a/Cargo.lock b/Cargo.lock index f0fad33..0a87795 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2356,6 +2356,7 @@ dependencies = [ "async-nats", "aws-config", "aws-sdk-s3", + "futures-util", "map-tool", "rbx_asset", "rbx_binary", diff --git a/combobulator/Cargo.toml b/combobulator/Cargo.toml index c532b4a..83c4452 100644 --- a/combobulator/Cargo.toml +++ b/combobulator/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" async-nats.workspace = true aws-config = { version = "1", features = ["behavior-version-latest"] } aws-sdk-s3 = "1" +futures-util.workspace = true map-tool = { version = "3.0.0", registry = "strafesnet", features = ["roblox"], default-features = false } rbx_asset.workspace = true rbx_binary.workspace = true diff --git a/combobulator/src/process.rs b/combobulator/src/process.rs index a229fd8..b2b3a50 100644 --- a/combobulator/src/process.rs +++ b/combobulator/src/process.rs @@ -3,8 +3,12 @@ use std::io::Cursor; use crate::nats_types::ReleaseMapfixRequest; use crate::s3::S3Cache; +use futures_util::stream::iter as stream_iter; +use futures_util::{StreamExt,TryStreamExt}; use strafesnet_deferred_loader::deferred_loader::LoadFailureMode; +const CONCURRENT_REQUESTS:usize=16; + #[expect(dead_code)] #[derive(Debug)] pub enum ConvertError{ @@ -121,10 +125,9 @@ impl Processor{ let assets=map_tool::roblox::get_unique_assets(&dom); // place textures into 'loader' - let mut texture_loader=crate::loader::TextureLoader::new(); - + let texture_loader=crate::loader::TextureLoader::new(); // process textures: download, cache, convert to DDS - for &id in &assets.textures{ + let texture_loader=stream_iter(assets.textures).map(async|id|{ let asset_id=id.0; let dds_key=S3Cache::texture_dds_key(asset_id); @@ -138,7 +141,9 @@ impl Processor{ map_tool::roblox::convert_texture_to_dds(&data) }else{ println!("[combobulator] Downloading texture {asset_id}"); - let Some(data)=self.download_asset(asset_id).await? else{continue}; + let Some(data)=self.download_asset(asset_id).await? else{ + return Ok(None); + }; // decode while we have ownership let dds_result=map_tool::roblox::convert_texture_to_dds(&data); @@ -152,7 +157,7 @@ impl Processor{ Ok(dds)=>dds, Err(e)=>{ println!("[combobulator] Texture {asset_id} convert error: {e}"); - continue; + return Ok(None); } }; @@ -162,12 +167,19 @@ impl Processor{ }; println!("[combobulator] Texture {asset_id} processed"); - texture_loader.insert(id,dds); - } + Ok(Some((id,dds))) + }) + .buffer_unordered(CONCURRENT_REQUESTS) + .try_fold(texture_loader,async|mut texture_loader,maybe_loaded_texture|{ + if let Some((id,dds))=maybe_loaded_texture{ + texture_loader.insert(id,dds); + } + Ok(texture_loader) + }).await?; - let mut mesh_loader=crate::loader::MeshLoader::new(); + let mesh_loader=crate::loader::MeshLoader::new(); // process meshes - for &id in &assets.meshes{ + let mesh_loader=stream_iter(assets.meshes).map(async|id|{ let asset_id=id.0; let mesh_key=S3Cache::mesh_key(asset_id); @@ -175,7 +187,9 @@ impl Processor{ strafesnet_rbx_loader::mesh::convert(&data) }else{ println!("[combobulator] Downloading mesh {asset_id}"); - let Some(data)=self.download_asset(asset_id).await? else{continue}; + let Some(data)=self.download_asset(asset_id).await? else{ + return Ok(None); + }; // decode while we have ownership let mesh_result=strafesnet_rbx_loader::mesh::convert(&data); @@ -187,13 +201,23 @@ impl Processor{ // handle error after cacheing data match mesh_result{ - Ok(mesh)=>mesh_loader.insert_mesh(id,mesh), - Err(e)=>println!("[combobulator] Mesh {asset_id} convert error: {e}"), + Ok(mesh)=>Ok(Some((id,mesh))), + Err(e)=>{ + println!("[combobulator] Mesh {asset_id} convert error: {e}"); + Ok(None) + }, } - } + }) + .buffer_unordered(CONCURRENT_REQUESTS) + .try_fold(mesh_loader,async|mut mesh_loader,maybe_loaded_mesh|{ + if let Some((id,mesh))=maybe_loaded_mesh{ + mesh_loader.insert_mesh(id,mesh); + } + Ok(mesh_loader) + }).await?; // process unions - for &id in &assets.unions{ + let mesh_loader=stream_iter(assets.unions).map(async|id|{ let asset_id=id.0; let union_key=S3Cache::union_key(asset_id); @@ -201,7 +225,9 @@ impl Processor{ rbx_binary::from_reader(data.as_slice()) }else{ println!("[combobulator] Downloading union {asset_id}"); - let Some(data)=self.download_asset(asset_id).await? else{continue}; + let Some(data)=self.download_asset(asset_id).await? else{ + return Ok(None); + }; // decode the data while we have ownership let union_result=rbx_binary::from_reader(data.as_slice()); @@ -213,10 +239,20 @@ impl Processor{ // handle error after cacheing data match union_result{ - Ok(union)=>mesh_loader.insert_union(id,union), - Err(e)=>println!("[combobulator] Union {asset_id} convert error: {e}"), + Ok(union)=>Ok(Some((id,union))), + Err(e)=>{ + println!("[combobulator] Union {asset_id} convert error: {e}"); + Ok(None) + }, } - } + }) + .buffer_unordered(CONCURRENT_REQUESTS) + .try_fold(mesh_loader,async|mut mesh_loader,maybe_loaded_union|{ + if let Some((id,union))=maybe_loaded_union{ + mesh_loader.insert_union(id,union); + } + Ok(mesh_loader) + }).await?; // convert to SNF and upload println!("[combobulator] Converting to SNF");