diff --git a/validation/src/main.rs b/validation/src/main.rs index f36cacc..226ba71 100644 --- a/validation/src/main.rs +++ b/validation/src/main.rs @@ -13,6 +13,9 @@ mod check_submission; mod create; mod create_mapfix; mod create_submission; +mod release; +mod release_mapfix; +mod release_submissions_batch; mod upload_mapfix; mod upload_submission; mod validator; @@ -69,13 +72,16 @@ async fn main()->Result<(),StartupError>{ let scripts=crate::grpc::scripts::Service::new(crate::grpc::scripts::ValidatorScriptsServiceClient::new(channel.clone())); let script_policy=crate::grpc::script_policy::Service::new(crate::grpc::script_policy::ValidatorScriptPolicyServiceClient::new(channel.clone())); let submissions=crate::grpc::submissions::Service::new(crate::grpc::submissions::ValidatorSubmissionsServiceClient::new(channel.clone())); + let load_asset_version_runtime=rbx_asset::cloud::LuauSessionLatestRequest{ + place_id:load_asset_version_place_id, + universe_id:load_asset_version_universe_id, + }; let message_handler=message_handler::MessageHandler{ cloud_context, cookie_context, cloud_context_luau_execution, group_id, - load_asset_version_place_id, - load_asset_version_universe_id, + load_asset_version_runtime, mapfixes, operations, scripts, diff --git a/validation/src/message_handler.rs b/validation/src/message_handler.rs index d62d153..bcb8065 100644 --- a/validation/src/message_handler.rs +++ b/validation/src/message_handler.rs @@ -9,6 +9,8 @@ pub enum HandleMessageError{ CreateSubmission(tonic::Status), CheckMapfix(crate::check_mapfix::Error), CheckSubmission(crate::check_submission::Error), + ReleaseMapfix(crate::release_mapfix::Error), + ReleaseSubmissionsBatch(crate::release_submissions_batch::Error), UploadMapfix(crate::upload_mapfix::Error), UploadSubmission(crate::upload_submission::Error), ValidateMapfix(crate::validate_mapfix::Error), @@ -32,8 +34,7 @@ pub struct MessageHandler{ pub(crate) cookie_context:rbx_asset::cookie::Context, pub(crate) cloud_context_luau_execution:rbx_asset::cloud::Context, pub(crate) group_id:Option, - pub(crate) load_asset_version_place_id:u64, - pub(crate) load_asset_version_universe_id:u64, + pub(crate) load_asset_version_runtime:rbx_asset::cloud::LuauSessionLatestRequest, pub(crate) mapfixes:crate::grpc::mapfixes::Service, pub(crate) operations:crate::grpc::operations::Service, pub(crate) scripts:crate::grpc::scripts::Service, @@ -50,6 +51,8 @@ impl MessageHandler{ "maptest.submissions.create"=>self.create_submission(from_slice(&message.payload)?).await.map_err(HandleMessageError::CreateSubmission), "maptest.mapfixes.check"=>self.check_mapfix(from_slice(&message.payload)?).await.map_err(HandleMessageError::CheckMapfix), "maptest.submissions.check"=>self.check_submission(from_slice(&message.payload)?).await.map_err(HandleMessageError::CheckSubmission), + "maptest.mapfixes.release"=>self.release_mapfix(from_slice(&message.payload)?).await.map_err(HandleMessageError::ReleaseMapfix), + "maptest.submissions.batchrelease"=>self.release_submissions_batch(from_slice(&message.payload)?).await.map_err(HandleMessageError::ReleaseSubmissionsBatch), "maptest.mapfixes.upload"=>self.upload_mapfix(from_slice(&message.payload)?).await.map_err(HandleMessageError::UploadMapfix), "maptest.submissions.upload"=>self.upload_submission(from_slice(&message.payload)?).await.map_err(HandleMessageError::UploadSubmission), "maptest.mapfixes.validate"=>self.validate_mapfix(from_slice(&message.payload)?).await.map_err(HandleMessageError::ValidateMapfix), diff --git a/validation/src/nats_types.rs b/validation/src/nats_types.rs index 2c29cd3..19f2a99 100644 --- a/validation/src/nats_types.rs +++ b/validation/src/nats_types.rs @@ -81,3 +81,34 @@ pub struct UploadMapfixRequest{ pub ModelVersion:u64, pub TargetAssetID:u64, } + +// Release a new map +#[allow(nonstandard_style)] +#[derive(serde::Deserialize)] +pub struct ReleaseSubmissionRequest{ + pub SubmissionID:u64, + pub ReleaseDate:i64, + pub ModelID:u64, + pub ModelVersion:u64, + pub UploadedAssetID:u64, + pub DisplayName:String, + pub Creator:String, + pub GameID:u32, + pub Submitter:u64, +} + +#[allow(nonstandard_style)] +#[derive(serde::Deserialize)] +pub struct ReleaseSubmissionsBatchRequest{ + pub Submissions:Vec, + pub OperationID:u32, +} + +#[allow(nonstandard_style)] +#[derive(serde::Deserialize)] +pub struct ReleaseMapfixRequest{ + pub MapfixID:u64, + pub ModelID:u64, + pub ModelVersion:u64, + pub TargetAssetID:u64, +} diff --git a/validation/src/release.rs b/validation/src/release.rs new file mode 100644 index 0000000..e315e60 --- /dev/null +++ b/validation/src/release.rs @@ -0,0 +1,104 @@ +use crate::rbx_util::read_dom; + +#[expect(unused)] +#[derive(Debug)] +pub enum ModesError{ + ApiActionMapfixReleased(tonic::Status), + ModelFileDecode(crate::rbx_util::ReadDomError), + GetRootInstance(crate::rbx_util::GetRootInstanceError), + NonSequentialModes, + TooManyModes(usize), +} +impl std::fmt::Display for ModesError{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for ModesError{} + +// decode and get modes function +pub fn count_modes(maybe_gzip:rbx_asset::types::MaybeGzippedBytes)->Result{ + // decode dom (slow!) + let dom=maybe_gzip.read_with(read_dom,read_dom).map_err(ModesError::ModelFileDecode)?; + + // extract the root instance + let model_instance=crate::rbx_util::get_root_instance(&dom).map_err(ModesError::GetRootInstance)?; + + // extract information from the model + let model_info=crate::check::get_model_info(&dom,model_instance); + + // count modes + let modes=model_info.count_modes().ok_or(ModesError::NonSequentialModes)?; + + // hard limit LOL + let modes=if modes), + LuauSession(rbx_asset::cloud::LuauSessionError), +} +impl std::fmt::Display for LoadAssetVersionsError{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for LoadAssetVersionsError{} + +// get asset versions in bulk using Roblox Luau API +pub async fn load_asset_versions>( + context:&rbx_asset::cloud::Context, + runtime:&rbx_asset::cloud::LuauSessionLatestRequest, + assets:I, +)->Result,LoadAssetVersionsError>{ + // construct script with inline IDs + // TODO: concurrent execution + let mut script="local InsertService=game:GetService(\"InsertService\")\n".to_string(); + script+="return {\n"; + for asset in assets{ + script+="InsertService:GetLatestAssetVersionAsync("; + script+=&asset.to_string(); + script+="),\n"; + } + script+="}\n"; + + let session=rbx_asset::cloud::LuauSessionCreate{ + script:&script, + user:None, + timeout:None, + binaryInput:None, + enableBinaryOutput:None, + binaryOutputUri:None, + }; + let session_response=context.create_luau_session(runtime,session).await.map_err(LoadAssetVersionsError::CreateSession)?; + + let result=crate::rbx_util::get_luau_result_exp_backoff(&context,&session_response).await; + + // * Note that only one mapfix can be active per map + // * so it's theoretically impossible for the map to be updated unexpectedly. + // * This means that the incremental asset version does not + // * need to be checked before and after the load asset version is checked. + + match result{ + Ok(Ok(rbx_asset::cloud::LuauResults{results}))=>{ + results.into_iter().map(|load_asset_version| + load_asset_version.as_u64().ok_or_else(||LoadAssetVersionsError::NonPositiveNumber(load_asset_version.clone())) + ).collect() + }, + Ok(Err(e))=>Err(LoadAssetVersionsError::Script(e)), + Err(e)=>Err(LoadAssetVersionsError::LuauSession(e)), + } + + // * Don't need to check asset version to make sure it hasn't been updated +} diff --git a/validation/src/release_mapfix.rs b/validation/src/release_mapfix.rs index e69de29..0940a49 100644 --- a/validation/src/release_mapfix.rs +++ b/validation/src/release_mapfix.rs @@ -0,0 +1,101 @@ +use crate::download::download_asset_version; +use crate::nats_types::ReleaseMapfixRequest; +use crate::release::{count_modes,load_asset_versions}; + +#[expect(unused)] +#[derive(Debug)] +pub enum InnerError{ + Download(crate::download::Error), + Modes(crate::release::ModesError), + LoadAssetVersions(crate::release::LoadAssetVersionsError), + LoadAssetVersionsListLength, +} +impl std::fmt::Display for InnerError{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for InnerError{} + +async fn release_inner( + cloud_context:&rbx_asset::cloud::Context, + cloud_context_luau_execution:&rbx_asset::cloud::Context, + load_asset_version_runtime:&rbx_asset::cloud::LuauSessionLatestRequest, + release_info:ReleaseMapfixRequest, +)->Result{ + // download the map model + let maybe_gzip=download_asset_version(cloud_context,rbx_asset::cloud::GetAssetVersionRequest{ + asset_id:release_info.ModelID, + version:release_info.ModelVersion, + }).await.map_err(InnerError::Download)?; + + // count modes + let modes=count_modes(maybe_gzip).map_err(InnerError::Modes)?; + + // fetch load asset version + let load_asset_versions=load_asset_versions( + cloud_context_luau_execution, + load_asset_version_runtime, + [release_info.TargetAssetID], + ).await.map_err(InnerError::LoadAssetVersions)?; + + // exactly one value in the results + let &[load_asset_version]=load_asset_versions.as_slice()else{ + return Err(InnerError::LoadAssetVersionsListLength); + }; + + Ok(rust_grpc::validator::MapfixReleaseRequest{ + mapfix_id:release_info.MapfixID, + target_asset_id:release_info.TargetAssetID, + asset_version:load_asset_version, + modes:modes, + }) +} + +#[expect(unused)] +#[derive(Debug)] +pub enum Error{ + ApiActionMapfixRelease(tonic::Status), +} +impl std::fmt::Display for Error{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for Error{} + +impl crate::message_handler::MessageHandler{ + pub async fn release_mapfix(&self,release_info:ReleaseMapfixRequest)->Result<(),Error>{ + let mapfix_id=release_info.MapfixID; + let result=release_inner( + &self.cloud_context, + &self.cloud_context_luau_execution, + &self.load_asset_version_runtime, + release_info, + ).await; + + match result{ + Ok(request)=>{ + // update map metadata + self.mapfixes.set_status_released(request).await.map_err(Error::ApiActionMapfixRelease)?; + }, + Err(e)=>{ + // log error + println!("[release_mapfix] Error: {e}"); + + // post an error message to the audit log + self.mapfixes.create_audit_error(rust_grpc::validator::AuditErrorRequest{ + id:mapfix_id, + error_message:e.to_string(), + }).await.map_err(Error::ApiActionMapfixRelease)?; + + // update the mapfix model status to uploaded + self.mapfixes.set_status_not_released(rust_grpc::validator::MapfixId{ + id:mapfix_id, + }).await.map_err(Error::ApiActionMapfixRelease)?; + }, + } + + Ok(()) + } +} diff --git a/validation/src/release_submissions_batch.rs b/validation/src/release_submissions_batch.rs new file mode 100644 index 0000000..5eed427 --- /dev/null +++ b/validation/src/release_submissions_batch.rs @@ -0,0 +1,225 @@ +use futures::StreamExt; + +use crate::download::download_asset_version; +use crate::nats_types::ReleaseSubmissionsBatchRequest; +use crate::release::{count_modes,load_asset_versions}; + + +#[expect(unused)] +#[derive(Debug)] +pub enum DownloadFutError{ + Download(crate::download::Error), + Join(tokio::task::JoinError), + Modes(crate::release::ModesError), +} +impl std::fmt::Display for DownloadFutError{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for DownloadFutError{} + +#[derive(Debug)] +pub struct ErrorContext{ + submission_id:u64, + error:E, +} +impl std::fmt::Display for ErrorContext{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"SubmissionID({})={:?}",self.submission_id,self.error) + } +} +impl std::error::Error for ErrorContext{} + +async fn download_fut( + cloud_context:&rbx_asset::cloud::Context, + asset_version:rbx_asset::cloud::GetAssetVersionRequest, +)->Result{ + // download + let maybe_gzip=download_asset_version(cloud_context,asset_version) + .await + .map_err(DownloadFutError::Download)?; + + // count modes in a green thread + let modes=tokio::task::spawn_blocking(|| + count_modes(maybe_gzip) + ) + .await + .map_err(DownloadFutError::Join)? + .map_err(DownloadFutError::Modes)?; + + Ok::<_,DownloadFutError>(modes) +} + +#[expect(unused)] +#[derive(Debug)] +pub enum InnerError{ + Io(std::io::Error), + LoadAssetVersions(crate::release::LoadAssetVersionsError), + LoadAssetVersionsListLength, + DownloadFutErrors(Vec>), + ReleaseErrors(Vec>), +} +impl std::fmt::Display for InnerError{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for InnerError{} + +const MAX_PARALLEL_DECODE:usize=6; +const MAX_CONCURRENT_RELEASE:usize=16; + +async fn release_inner( + release_info:ReleaseSubmissionsBatchRequest, + cloud_context:&rbx_asset::cloud::Context, + cloud_context_luau_execution:&rbx_asset::cloud::Context, + load_asset_version_runtime:&rbx_asset::cloud::LuauSessionLatestRequest, + submissions:&crate::grpc::submissions::Service, +)->Result<(),InnerError>{ + let available_parallelism=std::thread::available_parallelism().map_err(InnerError::Io)?.get(); + // set up futures + + // fut_download + let fut_download=futures::stream::iter( + release_info + .Submissions + .iter() + .enumerate() + .map(|(index,submission)|async move{ + let asset_version=rbx_asset::cloud::GetAssetVersionRequest{ + asset_id:submission.ModelID, + version:submission.ModelVersion, + }; + let modes=download_fut(cloud_context,asset_version).await; + (index,modes) + }) + ) + .buffer_unordered(available_parallelism.min(MAX_PARALLEL_DECODE)) + .collect::)>>(); + + // fut_luau + let fut_load_asset_versions=load_asset_versions( + cloud_context_luau_execution, + load_asset_version_runtime, + release_info.Submissions.iter().map(|submission|submission.UploadedAssetID), + ); + + // execute futures + let (mut modes_unordered,load_asset_versions_result)=tokio::join!(fut_download,fut_load_asset_versions); + + let load_asset_versions=load_asset_versions_result.map_err(InnerError::LoadAssetVersions)?; + + // sanity check roblox output + if load_asset_versions.len()!=release_info.Submissions.len(){ + return Err(InnerError::LoadAssetVersionsListLength); + }; + + // rip asymptotic complexity (hash map would be better) + modes_unordered.sort_by_key(|&(index,_)|index); + + // check modes calculations for all success + let mut modes=Vec::with_capacity(modes_unordered.len()); + let mut errors=Vec::with_capacity(modes_unordered.len()); + for (index,result) in modes_unordered{ + match result{ + Ok(value)=>modes.push(value), + Err(error)=>errors.push(ErrorContext{ + submission_id:release_info.Submissions[index].SubmissionID, + error:error, + }), + } + } + if !errors.is_empty(){ + return Err(InnerError::DownloadFutErrors(errors)); + } + + // concurrently dispatch results + let release_results:Vec<_> =futures::stream::iter( + release_info + .Submissions + .into_iter() + .zip(modes) + .zip(load_asset_versions) + .map(|((submission,modes),asset_version)|async move{ + let result=submissions.set_status_released(rust_grpc::validator::SubmissionReleaseRequest{ + submission_id:submission.SubmissionID, + map_create:Some(rust_grpc::maps_extended::MapCreate{ + id:submission.UploadedAssetID as i64, + display_name:submission.DisplayName, + creator:submission.Creator, + game_id:submission.GameID, + date:submission.ReleaseDate, + submitter:submission.Submitter, + thumbnail:0, + asset_version, + modes, + }), + }).await; + (submission.SubmissionID,result) + }) + ) + .buffer_unordered(MAX_CONCURRENT_RELEASE) + .collect().await; + + // check for errors + let errors:Vec<_> = + release_results + .into_iter() + .filter_map(|(submission_id,result)| + result.err().map(|e|ErrorContext{ + submission_id, + error:e, + }) + ) + .collect(); + + if !errors.is_empty(){ + return Err(InnerError::ReleaseErrors(errors)); + } + + Ok(()) +} + +#[allow(dead_code)] +#[derive(Debug)] +pub enum Error{ + UpdateOperation(tonic::Status), +} +impl std::fmt::Display for Error{ + fn fmt(&self,f:&mut std::fmt::Formatter<'_>)->std::fmt::Result{ + write!(f,"{self:?}") + } +} +impl std::error::Error for Error{} + +impl crate::message_handler::MessageHandler{ + pub async fn release_submissions_batch(&self,release_info:ReleaseSubmissionsBatchRequest)->Result<(),Error>{ + let operation_id=release_info.OperationID; + let result=release_inner( + release_info, + &self.cloud_context, + &self.cloud_context_luau_execution, + &self.load_asset_version_runtime, + &self.submissions, + ).await; + + match result{ + Ok(())=>{ + // operation success + self.operations.success(rust_grpc::validator::OperationSuccessRequest{ + operation_id, + path:String::new(), + }).await.map_err(Error::UpdateOperation)?; + }, + Err(e)=>{ + // operation error + self.operations.fail(rust_grpc::validator::OperationFailRequest{ + operation_id, + status_message:e.to_string(), + }).await.map_err(Error::UpdateOperation)?; + }, + } + Ok(()) + } +}