combobulator: use up to 16 parallel requests
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
2026-03-06 10:07:19 -08:00
parent b6ac6ce47f
commit d26126c9d3
3 changed files with 56 additions and 18 deletions

1
Cargo.lock generated
View File

@@ -2356,6 +2356,7 @@ dependencies = [
"async-nats",
"aws-config",
"aws-sdk-s3",
"futures-util",
"map-tool",
"rbx_asset",
"rbx_binary",

View File

@@ -7,6 +7,7 @@ edition = "2024"
async-nats.workspace = true
aws-config = { version = "1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1"
futures-util.workspace = true
map-tool = { version = "3.0.0", registry = "strafesnet", features = ["roblox"], default-features = false }
rbx_asset.workspace = true
rbx_binary.workspace = true

View File

@@ -3,8 +3,12 @@ use std::io::Cursor;
use crate::nats_types::ReleaseMapfixRequest;
use crate::s3::S3Cache;
use futures_util::stream::iter as stream_iter;
use futures_util::{StreamExt,TryStreamExt};
use strafesnet_deferred_loader::deferred_loader::LoadFailureMode;
const CONCURRENT_REQUESTS:usize=16;
#[expect(dead_code)]
#[derive(Debug)]
pub enum ConvertError{
@@ -121,10 +125,9 @@ impl Processor{
let assets=map_tool::roblox::get_unique_assets(&dom);
// place textures into 'loader'
let mut texture_loader=crate::loader::TextureLoader::new();
let texture_loader=crate::loader::TextureLoader::new();
// process textures: download, cache, convert to DDS
for &id in &assets.textures{
let texture_loader=stream_iter(assets.textures).map(async|id|{
let asset_id=id.0;
let dds_key=S3Cache::texture_dds_key(asset_id);
@@ -138,7 +141,9 @@ impl Processor{
map_tool::roblox::convert_texture_to_dds(&data)
}else{
println!("[combobulator] Downloading texture {asset_id}");
let Some(data)=self.download_asset(asset_id).await? else{continue};
let Some(data)=self.download_asset(asset_id).await? else{
return Ok(None);
};
// decode while we have ownership
let dds_result=map_tool::roblox::convert_texture_to_dds(&data);
@@ -152,7 +157,7 @@ impl Processor{
Ok(dds)=>dds,
Err(e)=>{
println!("[combobulator] Texture {asset_id} convert error: {e}");
continue;
return Ok(None);
}
};
@@ -162,12 +167,19 @@ impl Processor{
};
println!("[combobulator] Texture {asset_id} processed");
texture_loader.insert(id,dds);
}
Ok(Some((id,dds)))
})
.buffer_unordered(CONCURRENT_REQUESTS)
.try_fold(texture_loader,async|mut texture_loader,maybe_loaded_texture|{
if let Some((id,dds))=maybe_loaded_texture{
texture_loader.insert(id,dds);
}
Ok(texture_loader)
}).await?;
let mut mesh_loader=crate::loader::MeshLoader::new();
let mesh_loader=crate::loader::MeshLoader::new();
// process meshes
for &id in &assets.meshes{
let mesh_loader=stream_iter(assets.meshes).map(async|id|{
let asset_id=id.0;
let mesh_key=S3Cache::mesh_key(asset_id);
@@ -175,7 +187,9 @@ impl Processor{
strafesnet_rbx_loader::mesh::convert(&data)
}else{
println!("[combobulator] Downloading mesh {asset_id}");
let Some(data)=self.download_asset(asset_id).await? else{continue};
let Some(data)=self.download_asset(asset_id).await? else{
return Ok(None);
};
// decode while we have ownership
let mesh_result=strafesnet_rbx_loader::mesh::convert(&data);
@@ -187,13 +201,23 @@ impl Processor{
// handle error after cacheing data
match mesh_result{
Ok(mesh)=>mesh_loader.insert_mesh(id,mesh),
Err(e)=>println!("[combobulator] Mesh {asset_id} convert error: {e}"),
Ok(mesh)=>Ok(Some((id,mesh))),
Err(e)=>{
println!("[combobulator] Mesh {asset_id} convert error: {e}");
Ok(None)
},
}
}
})
.buffer_unordered(CONCURRENT_REQUESTS)
.try_fold(mesh_loader,async|mut mesh_loader,maybe_loaded_mesh|{
if let Some((id,mesh))=maybe_loaded_mesh{
mesh_loader.insert_mesh(id,mesh);
}
Ok(mesh_loader)
}).await?;
// process unions
for &id in &assets.unions{
let mesh_loader=stream_iter(assets.unions).map(async|id|{
let asset_id=id.0;
let union_key=S3Cache::union_key(asset_id);
@@ -201,7 +225,9 @@ impl Processor{
rbx_binary::from_reader(data.as_slice())
}else{
println!("[combobulator] Downloading union {asset_id}");
let Some(data)=self.download_asset(asset_id).await? else{continue};
let Some(data)=self.download_asset(asset_id).await? else{
return Ok(None);
};
// decode the data while we have ownership
let union_result=rbx_binary::from_reader(data.as_slice());
@@ -213,10 +239,20 @@ impl Processor{
// handle error after cacheing data
match union_result{
Ok(union)=>mesh_loader.insert_union(id,union),
Err(e)=>println!("[combobulator] Union {asset_id} convert error: {e}"),
Ok(union)=>Ok(Some((id,union))),
Err(e)=>{
println!("[combobulator] Union {asset_id} convert error: {e}");
Ok(None)
},
}
}
})
.buffer_unordered(CONCURRENT_REQUESTS)
.try_fold(mesh_loader,async|mut mesh_loader,maybe_loaded_union|{
if let Some((id,union))=maybe_loaded_union{
mesh_loader.insert_union(id,union);
}
Ok(mesh_loader)
}).await?;
// convert to SNF and upload
println!("[combobulator] Converting to SNF");