Combobulator: Concurrent Processing #342
39
Cargo.lock
generated
39
Cargo.lock
generated
@@ -1484,21 +1484,6 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.32"
|
||||
@@ -1506,7 +1491,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1515,23 +1499,6 @@ version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.32"
|
||||
@@ -1561,13 +1528,10 @@ version = "0.3.32"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"slab",
|
||||
]
|
||||
@@ -2392,6 +2356,7 @@ dependencies = [
|
||||
"async-nats",
|
||||
"aws-config",
|
||||
"aws-sdk-s3",
|
||||
"futures-util",
|
||||
"map-tool",
|
||||
"rbx_asset",
|
||||
"rbx_binary",
|
||||
@@ -2411,7 +2376,7 @@ name = "maps-validation"
|
||||
version = "0.1.1"
|
||||
dependencies = [
|
||||
"async-nats",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"heck",
|
||||
"rbx_asset",
|
||||
"rbx_binary",
|
||||
|
||||
@@ -8,6 +8,7 @@ resolver = "2"
|
||||
|
||||
[workspace.dependencies]
|
||||
async-nats = "0.46.0"
|
||||
futures-util = "0.3.31"
|
||||
rbx_asset = { version = "0.5.0", features = ["gzip", "rustls-tls"], default-features = false, registry = "strafesnet" }
|
||||
rbx_binary = "2.0.1"
|
||||
rbx_dom_weak = "4.1.0"
|
||||
|
||||
@@ -7,6 +7,7 @@ edition = "2024"
|
||||
async-nats.workspace = true
|
||||
aws-config = { version = "1", features = ["behavior-version-latest"] }
|
||||
aws-sdk-s3 = "1"
|
||||
futures-util.workspace = true
|
||||
map-tool = { version = "3.0.0", registry = "strafesnet", features = ["roblox"], default-features = false }
|
||||
rbx_asset.workspace = true
|
||||
rbx_binary.workspace = true
|
||||
|
||||
@@ -3,8 +3,12 @@ use std::io::Cursor;
|
||||
use crate::nats_types::ReleaseMapfixRequest;
|
||||
use crate::s3::S3Cache;
|
||||
|
||||
use futures_util::stream::iter as stream_iter;
|
||||
use futures_util::{StreamExt,TryStreamExt};
|
||||
use strafesnet_deferred_loader::deferred_loader::LoadFailureMode;
|
||||
|
||||
const CONCURRENT_REQUESTS:usize=16;
|
||||
|
||||
#[expect(dead_code)]
|
||||
#[derive(Debug)]
|
||||
pub enum ConvertError{
|
||||
@@ -121,10 +125,9 @@ impl Processor{
|
||||
let assets=map_tool::roblox::get_unique_assets(&dom);
|
||||
|
||||
// place textures into 'loader'
|
||||
let mut texture_loader=crate::loader::TextureLoader::new();
|
||||
|
||||
let texture_loader=crate::loader::TextureLoader::new();
|
||||
// process textures: download, cache, convert to DDS
|
||||
for &id in &assets.textures{
|
||||
let texture_loader=stream_iter(assets.textures).map(async|id|{
|
||||
let asset_id=id.0;
|
||||
let dds_key=S3Cache::texture_dds_key(asset_id);
|
||||
|
||||
@@ -138,7 +141,9 @@ impl Processor{
|
||||
map_tool::roblox::convert_texture_to_dds(&data)
|
||||
}else{
|
||||
println!("[combobulator] Downloading texture {asset_id}");
|
||||
let Some(data)=self.download_asset(asset_id).await? else{continue};
|
||||
let Some(data)=self.download_asset(asset_id).await? else{
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// decode while we have ownership
|
||||
let dds_result=map_tool::roblox::convert_texture_to_dds(&data);
|
||||
@@ -152,7 +157,7 @@ impl Processor{
|
||||
Ok(dds)=>dds,
|
||||
Err(e)=>{
|
||||
println!("[combobulator] Texture {asset_id} convert error: {e}");
|
||||
continue;
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -162,12 +167,19 @@ impl Processor{
|
||||
};
|
||||
println!("[combobulator] Texture {asset_id} processed");
|
||||
|
||||
texture_loader.insert(id,dds);
|
||||
}
|
||||
Ok(Some((id,dds)))
|
||||
})
|
||||
.buffer_unordered(CONCURRENT_REQUESTS)
|
||||
.try_fold(texture_loader,async|mut texture_loader,maybe_loaded_texture|{
|
||||
if let Some((id,dds))=maybe_loaded_texture{
|
||||
texture_loader.insert(id,dds);
|
||||
}
|
||||
Ok(texture_loader)
|
||||
}).await?;
|
||||
|
||||
let mut mesh_loader=crate::loader::MeshLoader::new();
|
||||
let mesh_loader=crate::loader::MeshLoader::new();
|
||||
// process meshes
|
||||
for &id in &assets.meshes{
|
||||
let mesh_loader=stream_iter(assets.meshes).map(async|id|{
|
||||
let asset_id=id.0;
|
||||
let mesh_key=S3Cache::mesh_key(asset_id);
|
||||
|
||||
@@ -175,7 +187,9 @@ impl Processor{
|
||||
strafesnet_rbx_loader::mesh::convert(&data)
|
||||
}else{
|
||||
println!("[combobulator] Downloading mesh {asset_id}");
|
||||
let Some(data)=self.download_asset(asset_id).await? else{continue};
|
||||
let Some(data)=self.download_asset(asset_id).await? else{
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// decode while we have ownership
|
||||
let mesh_result=strafesnet_rbx_loader::mesh::convert(&data);
|
||||
@@ -187,13 +201,23 @@ impl Processor{
|
||||
|
||||
// handle error after cacheing data
|
||||
match mesh_result{
|
||||
Ok(mesh)=>mesh_loader.insert_mesh(id,mesh),
|
||||
Err(e)=>println!("[combobulator] Mesh {asset_id} convert error: {e}"),
|
||||
Ok(mesh)=>Ok(Some((id,mesh))),
|
||||
Err(e)=>{
|
||||
println!("[combobulator] Mesh {asset_id} convert error: {e}");
|
||||
Ok(None)
|
||||
},
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(CONCURRENT_REQUESTS)
|
||||
.try_fold(mesh_loader,async|mut mesh_loader,maybe_loaded_mesh|{
|
||||
if let Some((id,mesh))=maybe_loaded_mesh{
|
||||
mesh_loader.insert_mesh(id,mesh);
|
||||
}
|
||||
Ok(mesh_loader)
|
||||
}).await?;
|
||||
|
||||
// process unions
|
||||
for &id in &assets.unions{
|
||||
let mesh_loader=stream_iter(assets.unions).map(async|id|{
|
||||
let asset_id=id.0;
|
||||
let union_key=S3Cache::union_key(asset_id);
|
||||
|
||||
@@ -201,7 +225,9 @@ impl Processor{
|
||||
rbx_binary::from_reader(data.as_slice())
|
||||
}else{
|
||||
println!("[combobulator] Downloading union {asset_id}");
|
||||
let Some(data)=self.download_asset(asset_id).await? else{continue};
|
||||
let Some(data)=self.download_asset(asset_id).await? else{
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// decode the data while we have ownership
|
||||
let union_result=rbx_binary::from_reader(data.as_slice());
|
||||
@@ -213,10 +239,20 @@ impl Processor{
|
||||
|
||||
// handle error after cacheing data
|
||||
match union_result{
|
||||
Ok(union)=>mesh_loader.insert_union(id,union),
|
||||
Err(e)=>println!("[combobulator] Union {asset_id} convert error: {e}"),
|
||||
Ok(union)=>Ok(Some((id,union))),
|
||||
Err(e)=>{
|
||||
println!("[combobulator] Union {asset_id} convert error: {e}");
|
||||
Ok(None)
|
||||
},
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(CONCURRENT_REQUESTS)
|
||||
.try_fold(mesh_loader,async|mut mesh_loader,maybe_loaded_union|{
|
||||
if let Some((id,union))=maybe_loaded_union{
|
||||
mesh_loader.insert_union(id,union);
|
||||
}
|
||||
Ok(mesh_loader)
|
||||
}).await?;
|
||||
|
||||
// convert to SNF and upload
|
||||
println!("[combobulator] Converting to SNF");
|
||||
|
||||
@@ -5,7 +5,7 @@ edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
async-nats.workspace = true
|
||||
futures = "0.3.31"
|
||||
futures-util.workspace = true
|
||||
rbx_asset.workspace = true
|
||||
rbx_binary.workspace = true
|
||||
rbx_dom_weak.workspace = true
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use futures::StreamExt;
|
||||
use futures_util::StreamExt;
|
||||
|
||||
mod download;
|
||||
mod grpc;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use futures::StreamExt;
|
||||
use futures_util::stream::iter as stream_iter;
|
||||
use futures_util::StreamExt;
|
||||
|
||||
use crate::download::download_asset_version;
|
||||
use crate::nats_types::ReleaseSubmissionsBatchRequest;
|
||||
@@ -92,7 +93,7 @@ async fn release_inner(
|
||||
.collect();
|
||||
|
||||
// fut_download
|
||||
let fut_download=futures::stream::iter(asset_versions)
|
||||
let fut_download=stream_iter(asset_versions)
|
||||
.map(|(index,asset_version)|async move{
|
||||
let modes=download_fut(cloud_context,asset_version).await;
|
||||
(index,modes)
|
||||
@@ -137,7 +138,7 @@ async fn release_inner(
|
||||
}
|
||||
|
||||
// concurrently dispatch results
|
||||
let release_results:Vec<_> =futures::stream::iter(
|
||||
let release_results:Vec<_> =stream_iter(
|
||||
release_info
|
||||
.Submissions
|
||||
.into_iter()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use futures::TryStreamExt;
|
||||
use futures_util::stream::iter as stream_iter;
|
||||
use futures_util::TryStreamExt;
|
||||
use rust_grpc::validator::Policy;
|
||||
|
||||
use crate::download::download_asset_version;
|
||||
@@ -153,7 +154,7 @@ impl crate::message_handler::MessageHandler{
|
||||
}
|
||||
|
||||
// send all script hashes to REST endpoint and retrieve the replacements
|
||||
futures::stream::iter(script_map.iter_mut().map(Ok))
|
||||
stream_iter(script_map.iter_mut().map(Ok))
|
||||
.try_for_each_concurrent(Some(SCRIPT_CONCURRENCY),|(source,NamePolicy{policy,name})|async{
|
||||
// get the hash
|
||||
let hash=hash_source(source.as_str());
|
||||
|
||||
Reference in New Issue
Block a user