diff --git a/crates/rbuilder-operator/src/clickhouse.rs b/crates/rbuilder-operator/src/clickhouse.rs index f1f845e84..bcc7cd1bf 100644 --- a/crates/rbuilder-operator/src/clickhouse.rs +++ b/crates/rbuilder-operator/src/clickhouse.rs @@ -337,7 +337,7 @@ impl BidObserver for BuiltBlocksWriter { let mut used_bundle_hashes = Vec::new(); let mut used_bundle_uuids = Vec::new(); for res in &built_block_trace.included_orders { - if let Order::Bundle(bundle) = &res.order { + if let Order::Bundle(bundle) = res.order.as_ref() { used_bundle_hashes .push(bundle.external_hash.unwrap_or(bundle.hash).to_string()); used_bundle_uuids.push(bundle.uuid.to_string()); diff --git a/crates/rbuilder-primitives/src/lib.rs b/crates/rbuilder-primitives/src/lib.rs index 4970da9e5..cbe3bb004 100644 --- a/crates/rbuilder-primitives/src/lib.rs +++ b/crates/rbuilder-primitives/src/lib.rs @@ -1011,13 +1011,25 @@ impl SimValue { /// Order simulated (usually on top of block) + SimValue #[derive(Debug, Clone, PartialEq, Eq)] pub struct SimulatedOrder { - pub order: Order, + pub order: Arc, pub sim_value: SimValue, /// Info about read/write slots during the simulation to help figure out what the Order is doing. pub used_state_trace: Option, } impl SimulatedOrder { + pub fn new( + order: Arc, + sim_value: SimValue, + used_state_trace: Option, + ) -> Self { + Self { + order, + sim_value, + used_state_trace, + } + } + pub fn id(&self) -> OrderId { self.order.id() } diff --git a/crates/rbuilder/src/backtest/build_block/backtest_build_block.rs b/crates/rbuilder/src/backtest/build_block/backtest_build_block.rs index 228435c22..8574acbd7 100644 --- a/crates/rbuilder/src/backtest/build_block/backtest_build_block.rs +++ b/crates/rbuilder/src/backtest/build_block/backtest_build_block.rs @@ -219,7 +219,7 @@ fn print_orders_with_tx_hash( .iter() .map(|order_with_timestamp| &order_with_timestamp.order) .filter(|order| order.list_txs().iter().any(|(tx, _)| tx.hash() == tx_hash)) - .for_each(print_order); + .for_each(|order| print_order(order)); println!("\nSIM ORDERS:"); sim_orders .iter() @@ -241,7 +241,7 @@ fn print_order_execution_result(order_result: &ExecutionResult) { order_result.space_used.gas, format_ether(order_result.coinbase_profit), ); - if let Order::Bundle(_) = order_result.order { + if let Order::Bundle(_) = order_result.order.as_ref() { for tx in order_result.tx_infos.iter().map(|info| &info.tx) { println!(" ↳ {:?}", tx.hash()); } diff --git a/crates/rbuilder/src/backtest/build_block/synthetic_orders.rs b/crates/rbuilder/src/backtest/build_block/synthetic_orders.rs index 22cbe4578..214a02bcf 100644 --- a/crates/rbuilder/src/backtest/build_block/synthetic_orders.rs +++ b/crates/rbuilder/src/backtest/build_block/synthetic_orders.rs @@ -5,6 +5,7 @@ use rbuilder_primitives::{ Bundle, MempoolTx, Metadata, Order, TransactionSignedEcRecoveredWithBlobs, LAST_BUNDLE_VERSION, }; use reth_provider::test_utils::MockNodeTypesWithDB; +use std::sync::Arc; use uuid::Uuid; use super::backtest_build_block::{run_backtest_build_block, BuildBlockCfg, OrdersSource}; @@ -75,7 +76,7 @@ impl SyntheticOrdersSource { ))); orders.push(OrdersWithTimestamp { timestamp_ms: 0, - order, + order: Arc::new(order), }); } @@ -106,7 +107,7 @@ impl SyntheticOrdersSource { bundle.hash_slow(); orders.push(OrdersWithTimestamp { timestamp_ms: 0, - order: Order::Bundle(bundle), + order: Arc::new(Order::Bundle(bundle)), }); } diff --git a/crates/rbuilder/src/backtest/execute.rs b/crates/rbuilder/src/backtest/execute.rs index e4a63da08..caf475ca7 100644 --- a/crates/rbuilder/src/backtest/execute.rs +++ b/crates/rbuilder/src/backtest/execute.rs @@ -10,7 +10,7 @@ use crate::{ }; use alloy_eips::BlockNumHash; use alloy_primitives::U256; -use rbuilder_primitives::{OrderId, SimulatedOrder}; +use rbuilder_primitives::{Order, OrderId, SimulatedOrder}; use reth_chainspec::ChainSpec; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -95,10 +95,10 @@ pub fn backtest_prepare_orders_from_building_context

( where P: StateProviderFactory + Clone + 'static, { - let orders = available_orders + let orders: Vec> = available_orders .iter() - .map(|order| order.order.clone()) - .collect::>(); + .map(|order| Arc::clone(&order.order)) + .collect(); for order in &orders { ctx.mempool_tx_detector.add_tx(order); } diff --git a/crates/rbuilder/src/backtest/fetch/data_source.rs b/crates/rbuilder/src/backtest/fetch/data_source.rs index ae00d54af..d4f79505a 100644 --- a/crates/rbuilder/src/backtest/fetch/data_source.rs +++ b/crates/rbuilder/src/backtest/fetch/data_source.rs @@ -7,6 +7,7 @@ use crate::{ }; use alloy_primitives::B256; use async_trait::async_trait; +use std::sync::Arc; #[derive(Debug, Clone)] pub struct DatasourceData { @@ -45,7 +46,7 @@ pub async fn get_full_slot_data_from_data( .into_iter() .map(|o| ReplaceableOrderPoolCommandWithTimestamp { timestamp_ms: o.timestamp_ms, - command: ReplaceableOrderPoolCommand::Order(o.order), + command: ReplaceableOrderPoolCommand::Order(Arc::clone(&o.order)), }) .collect(), built_block_data: data.built_block_data, diff --git a/crates/rbuilder/src/backtest/fetch/mempool.rs b/crates/rbuilder/src/backtest/fetch/mempool.rs index 2c9fd9754..fd7ab556f 100644 --- a/crates/rbuilder/src/backtest/fetch/mempool.rs +++ b/crates/rbuilder/src/backtest/fetch/mempool.rs @@ -14,6 +14,7 @@ use rbuilder_primitives::{ Order, }; use sqlx::types::chrono::DateTime; +use std::sync::Arc; use std::{ fs::create_dir_all, path::{Path, PathBuf}, @@ -52,7 +53,7 @@ pub fn get_mempool_transactions( Some(OrdersWithTimestamp { timestamp_ms, - order, + order: Arc::new(order), }) }) .collect()) diff --git a/crates/rbuilder/src/backtest/full_slot_block_data.rs b/crates/rbuilder/src/backtest/full_slot_block_data.rs index a5af10cd9..3fd49bd1d 100644 --- a/crates/rbuilder/src/backtest/full_slot_block_data.rs +++ b/crates/rbuilder/src/backtest/full_slot_block_data.rs @@ -29,7 +29,7 @@ impl From for ReplaceableOrderPoolCommandWithTimestamp { fn from(order: OrdersWithTimestamp) -> Self { ReplaceableOrderPoolCommandWithTimestamp { timestamp_ms: order.timestamp_ms, - command: ReplaceableOrderPoolCommand::Order(order.order), + command: ReplaceableOrderPoolCommand::Order(Arc::clone(&order.order)), } } } @@ -116,7 +116,7 @@ impl FullSlotBlockData { if included_orders_ids.remove(&order.id()) { Some(OrdersWithTimestamp { timestamp_ms: command_ts.timestamp_ms, - order: order.clone(), + order: Arc::clone(order), }) } else { None @@ -167,7 +167,7 @@ impl FullSlotBlockData { match command_ts.command { ReplaceableOrderPoolCommand::Order(order) => { order_id_to_timestamp.insert(order.id(), command_ts.timestamp_ms); - order_manager.insert_order(order); + order_manager.insert_order(Arc::clone(&order)); } ReplaceableOrderPoolCommand::CancelBundle(replacement_data) => { order_manager.remove_bundle(replacement_data); @@ -181,7 +181,7 @@ impl FullSlotBlockData { .iter() .map(|o| OrdersWithTimestamp { timestamp_ms: *order_id_to_timestamp.get(&o.id()).unwrap(), - order: o.clone(), + order: Arc::clone(o), }) .collect(); diff --git a/crates/rbuilder/src/backtest/mod.rs b/crates/rbuilder/src/backtest/mod.rs index a36125e2c..59e23c213 100644 --- a/crates/rbuilder/src/backtest/mod.rs +++ b/crates/rbuilder/src/backtest/mod.rs @@ -12,6 +12,7 @@ mod store; use ahash::HashMap; pub use backtest_build_range::run_backtest_build_range; use std::collections::HashSet; +use std::sync::Arc; use crate::{mev_boost::BuilderBlockReceived, utils::offset_datetime_to_timestamp_ms}; use alloy_consensus::Transaction as TransactionTrait; @@ -38,7 +39,7 @@ impl From for RawOrdersWithTimestamp { fn from(orders: OrdersWithTimestamp) -> Self { Self { timestamp_ms: orders.timestamp_ms, - order: orders.order.into(), + order: (*orders.order).clone().into(), } } } @@ -46,7 +47,7 @@ impl From for RawOrdersWithTimestamp { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OrdersWithTimestamp { pub timestamp_ms: u64, - pub order: Order, + pub order: Arc, } /// Historic data for a block. @@ -171,7 +172,7 @@ impl BlockData { let mempool_txs = self .available_orders .iter() - .filter_map(|o| match &o.order { + .filter_map(|o| match o.order.as_ref() { Order::Tx(tx) => Some(tx.tx_with_blobs.hash()), _ => None, }) diff --git a/crates/rbuilder/src/backtest/redistribute/mod.rs b/crates/rbuilder/src/backtest/redistribute/mod.rs index 948ae23a3..d7b892ce2 100644 --- a/crates/rbuilder/src/backtest/redistribute/mod.rs +++ b/crates/rbuilder/src/backtest/redistribute/mod.rs @@ -312,7 +312,7 @@ where let mut txs = 0; let mut bundles = 0; for ts_order in &block_data.available_orders { - match &ts_order.order { + match ts_order.order.as_ref() { Order::Bundle(_) => bundles += 1, Order::Tx(_) => txs += 1, } @@ -449,7 +449,7 @@ struct AvailableOrders { included_orders_by_address: Vec<(Address, Vec)>, all_orders_by_address: HashMap>, orders_id_to_address: HashMap, - all_orders_by_id: HashMap, + all_orders_by_id: HashMap>, bundle_hash_by_id: HashMap, order_sender_by_id: HashMap, } @@ -550,7 +550,7 @@ fn split_orders_by_identities( for order in &block_data.available_orders { let id = order.order.id(); - if let Order::Bundle(bundle) = &order.order { + if let Order::Bundle(bundle) = order.order.as_ref() { bundle_hash_by_id.insert(id, bundle.external_hash.unwrap_or(bundle.hash)); }; order_sender_by_id.insert(id, order_sender(&order.order)); @@ -603,7 +603,7 @@ fn split_orders_by_identities( all_orders_by_id: block_data .available_orders .iter() - .map(|order| (order.order.id(), order.order.clone())) + .map(|order| (order.order.id(), Arc::clone(&order.order))) .collect(), bundle_hash_by_id, order_sender_by_id, diff --git a/crates/rbuilder/src/backtest/store.rs b/crates/rbuilder/src/backtest/store.rs index eaec978f9..6e86417eb 100644 --- a/crates/rbuilder/src/backtest/store.rs +++ b/crates/rbuilder/src/backtest/store.rs @@ -29,6 +29,7 @@ use std::{ ffi::OsString, path::{Path, PathBuf}, str::FromStr, + sync::Arc, }; /// Version of the data/format on the DB. @@ -638,7 +639,7 @@ impl From for RawReplaceableOrderPoolCommand { fn from(command: ReplaceableOrderPoolCommand) -> Self { match command { ReplaceableOrderPoolCommand::Order(order) => { - RawReplaceableOrderPoolCommand::Order(order.into()) + RawReplaceableOrderPoolCommand::Order((*order).clone().into()) } ReplaceableOrderPoolCommand::CancelBundle(replacement_data) => { RawReplaceableOrderPoolCommand::CancelBundle(replacement_data) @@ -673,7 +674,7 @@ impl RawReplaceableOrderPoolCommandWithTimestamp { timestamp_ms: self.timestamp_ms, command: match self.command { RawReplaceableOrderPoolCommand::Order(raw_order) => { - ReplaceableOrderPoolCommand::Order(raw_order.decode(encoding)?) + ReplaceableOrderPoolCommand::Order(Arc::new(raw_order.decode(encoding)?)) } RawReplaceableOrderPoolCommand::CancelBundle(replacement_data) => { ReplaceableOrderPoolCommand::CancelBundle(replacement_data) diff --git a/crates/rbuilder/src/bin/run-bundle-on-prefix.rs b/crates/rbuilder/src/bin/run-bundle-on-prefix.rs index 57d783b24..6aec4baa2 100644 --- a/crates/rbuilder/src/bin/run-bundle-on-prefix.rs +++ b/crates/rbuilder/src/bin/run-bundle-on-prefix.rs @@ -215,11 +215,8 @@ async fn main() -> eyre::Result<()> { break; } let order = Order::Tx(MempoolTx::new(tx.clone())); - let sim_order = SimulatedOrder { - order, - sim_value: Default::default(), - used_state_trace: Default::default(), - }; + let sim_order = + SimulatedOrder::new(Arc::new(order), Default::default(), Default::default()); let res = builder.commit_order(&mut block_info.local_ctx, &sim_order, &|_| Ok(()))?; println!("{:?} {:?}", tx.hash(), res.is_ok()); } @@ -301,11 +298,11 @@ fn execute_orders_on_tob( ) -> eyre::Result<()> { for order_ts in target_orders { let mut builder = block_info.create_building_helper(false)?; - let sim_order = SimulatedOrder { - order: order_ts.order.clone(), - sim_value: Default::default(), - used_state_trace: Default::default(), - }; + let sim_order = SimulatedOrder::new( + Arc::clone(&order_ts.order), + Default::default(), + Default::default(), + ); let res = builder.commit_order(&mut block_info.local_ctx, &sim_order, &|_| Ok(()))?; let profit = res .as_ref() diff --git a/crates/rbuilder/src/building/block_orders/order_priority.rs b/crates/rbuilder/src/building/block_orders/order_priority.rs index 73c882c95..5d1dcf356 100644 --- a/crates/rbuilder/src/building/block_orders/order_priority.rs +++ b/crates/rbuilder/src/building/block_orders/order_priority.rs @@ -325,15 +325,11 @@ mod test { gas: u64, order_type: OrderType, ) -> Arc { - Arc::new(SimulatedOrder { - order: self.create_order(order_type), - sim_value: SimValue::new_test( - U256::from(full_profit), - U256::from(non_mempool_profit), - gas, - ), - used_state_trace: None, - }) + Arc::new(SimulatedOrder::new( + Arc::new(self.create_order(order_type)), + SimValue::new_test(U256::from(full_profit), U256::from(non_mempool_profit), gas), + None, + )) } } diff --git a/crates/rbuilder/src/building/block_orders/test_data_generator.rs b/crates/rbuilder/src/building/block_orders/test_data_generator.rs index c792a439d..ad29a4c18 100644 --- a/crates/rbuilder/src/building/block_orders/test_data_generator.rs +++ b/crates/rbuilder/src/building/block_orders/test_data_generator.rs @@ -27,10 +27,6 @@ impl TestDataGenerator { let sim_value = SimValue::new_test_no_gas(U256::from(coinbase_profit), U256::from(mev_gas_price)); - Arc::new(SimulatedOrder { - order, - sim_value, - used_state_trace: None, - }) + Arc::new(SimulatedOrder::new(Arc::new(order), sim_value, None)) } } diff --git a/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolvers.rs b/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolvers.rs index 8e06e596b..2d8caac83 100644 --- a/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolvers.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolvers.rs @@ -529,11 +529,11 @@ mod tests { external_hash: None, }; - Arc::new(SimulatedOrder { - order: Order::Bundle(bundle), - used_state_trace: None, + Arc::new(SimulatedOrder::new( + Arc::new(Order::Bundle(bundle)), sim_value, - }) + None, + )) } } diff --git a/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs b/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs index dfc6f62ed..a00eea7c8 100644 --- a/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs @@ -487,16 +487,16 @@ mod tests { let sim_value = SimValue::new_test_no_gas(coinbase_profit, U256::ZERO); - Arc::new(SimulatedOrder { - order: Order::Tx(MempoolTx { + Arc::new(SimulatedOrder::new( + Arc::new(Order::Tx(MempoolTx { tx_with_blobs: TransactionSignedEcRecoveredWithBlobs::new_no_blobs( self.create_tx(), ) .unwrap(), - }), + })), sim_value, - used_state_trace: Some(trace), - }) + Some(trace), + )) } } diff --git a/crates/rbuilder/src/building/builders/parallel_builder/groups.rs b/crates/rbuilder/src/building/builders/parallel_builder/groups.rs index a0b6d8800..f0ebede3c 100644 --- a/crates/rbuilder/src/building/builders/parallel_builder/groups.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/groups.rs @@ -470,16 +470,16 @@ mod tests { trace.destructed_contracts.push(*contract_address); } - Arc::new(SimulatedOrder { - order: Order::Tx(MempoolTx { + Arc::new(SimulatedOrder::new( + Arc::new(Order::Tx(MempoolTx { tx_with_blobs: TransactionSignedEcRecoveredWithBlobs::new_no_blobs( self.create_tx(), ) .unwrap(), - }), - used_state_trace: Some(trace), - sim_value: SimValue::default(), - }) + })), + SimValue::default(), + Some(trace), + )) } } diff --git a/crates/rbuilder/src/building/built_block_trace.rs b/crates/rbuilder/src/building/built_block_trace.rs index b03ca06e7..86adeb7b0 100644 --- a/crates/rbuilder/src/building/built_block_trace.rs +++ b/crates/rbuilder/src/building/built_block_trace.rs @@ -143,7 +143,7 @@ impl BuiltBlockTrace { pub fn used_order_count(&self) -> (usize, usize, usize) { self.included_orders .iter() - .fold((0, 0, 0), |acc, order| match order.order { + .fold((0, 0, 0), |acc, order| match order.order.as_ref() { Order::Tx(_) => (acc.0 + 1, acc.1, acc.2), Order::Bundle(_) => (acc.0, acc.1 + 1, acc.2), }) diff --git a/crates/rbuilder/src/building/mod.rs b/crates/rbuilder/src/building/mod.rs index acc3b44cb..ec7eedfe3 100644 --- a/crates/rbuilder/src/building/mod.rs +++ b/crates/rbuilder/src/building/mod.rs @@ -540,7 +540,7 @@ pub struct ExecutionResult { pub coinbase_profit: U256, pub inplace_sim: SimValue, pub space_used: BlockSpace, - pub order: Order, + pub order: Arc, /// Landed txs execution info. pub tx_infos: Vec, pub nonces_updated: Vec<(Address, u64)>, @@ -777,7 +777,7 @@ impl, unsatisfied_nonces: usize, } @@ -58,15 +58,15 @@ pub type SimulationId = u64; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct SimulationRequest { pub id: SimulationId, - pub order: Order, - pub parents: Vec, + pub order: Arc, + pub parents: Vec>, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct SimulatedResult { pub id: SimulationId, pub simulated_order: Arc, - pub previous_orders: Vec, + pub previous_orders: Vec>, pub nonces_after: Vec, pub simulation_time: Duration, } @@ -90,7 +90,7 @@ pub struct SimTree { enum OrderNonceState { Invalid, PendingNonces(Vec), - Ready(Vec), + Ready(Vec>), } impl SimTree { @@ -105,7 +105,7 @@ impl SimTree { } } - fn push_order(&mut self, order: Order) -> Result<(), ProviderError> { + fn push_order(&mut self, order: Arc) -> Result<(), ProviderError> { if self.pending_orders.contains_key(&order.id()) { return Ok(()); } @@ -208,7 +208,7 @@ impl SimTree { } } - pub fn push_orders(&mut self, orders: Vec) -> Result<(), ProviderError> { + pub fn push_orders(&mut self, orders: Vec>) -> Result<(), ProviderError> { for order in orders { self.push_order(order)?; } @@ -316,7 +316,7 @@ impl SimTree { pub fn simulate_all_orders_with_sim_tree

( provider: P, ctx: &BlockBuildingContext, - orders: &[Order], + orders: &[Arc], randomize_insertion: bool, ) -> Result<(Vec>, Vec), CriticalCommitOrderError> where @@ -413,8 +413,8 @@ where /// Prepares context (fork + tracer) and calls simulate_order_using_fork pub fn simulate_order( - parent_orders: Vec, - order: Order, + parent_orders: Vec>, + order: Arc, ctx: &BlockBuildingContext, local_ctx: &mut ThreadBlockBuildingContext, state: &mut BlockState, @@ -434,8 +434,8 @@ pub fn simulate_order( /// Simulates order (including parent (those needed to reach proper nonces) orders) using a precreated fork pub fn simulate_order_using_fork( - parent_orders: Vec, - order: Order, + parent_orders: Vec>, + order: Arc, fork: &mut PartialBlockFork<'_, '_, '_, '_, Tracer, NullPartialBlockForkExecutionTracer>, mempool_tx_detector: &MempoolTxsDetector, ) -> Result { @@ -445,8 +445,8 @@ pub fn simulate_order_using_fork( // We use empty combined refunds because the value of the bundle will // not change from batching. let combined_refunds = std::collections::HashMap::default(); - for parent in parent_orders { - let result = fork.commit_order(&parent, space_state, true, &combined_refunds)?; + for parent in &parent_orders { + let result = fork.commit_order(parent, space_state, true, &combined_refunds)?; match result { Ok(res) => { space_state.use_space(res.space_used); @@ -471,11 +471,7 @@ pub fn simulate_order_using_fork( } let new_nonces = res.nonces_updated.into_iter().collect::>(); Ok(OrderSimResult::Success( - Arc::new(SimulatedOrder { - order, - sim_value, - used_state_trace: res.used_state_trace, - }), + Arc::new(SimulatedOrder::new(order, sim_value, res.used_state_trace)), new_nonces, )) } diff --git a/crates/rbuilder/src/building/testing/bundle_tests/mod.rs b/crates/rbuilder/src/building/testing/bundle_tests/mod.rs index 33816e546..0735e1892 100644 --- a/crates/rbuilder/src/building/testing/bundle_tests/mod.rs +++ b/crates/rbuilder/src/building/testing/bundle_tests/mod.rs @@ -435,6 +435,7 @@ fn test_bundle_ok_inner_tx_profits() -> eyre::Result<()> { #[test] fn test_bundle_consistency_check() -> eyre::Result<()> { + use std::sync::Arc; let block_number = BlockArgs::MIN_BLOCK_NUMBER; let mut test_setup = TestSetup::gen_test_setup(BlockArgs::default().with_number(block_number))?; @@ -454,11 +455,12 @@ fn test_bundle_consistency_check() -> eyre::Result<()> { )?; let mut res = test_setup.commit_order_ok(); + // break bundle by removing revertible tx hashes if let Order::Bundle(Bundle { reverting_tx_hashes, .. - }) = &mut res.order + }) = Arc::make_mut(&mut res.order) { reverting_tx_hashes.clear(); } else { diff --git a/crates/rbuilder/src/building/testing/bundle_tests/setup.rs b/crates/rbuilder/src/building/testing/bundle_tests/setup.rs index 69046ac08..9dfe9fa91 100644 --- a/crates/rbuilder/src/building/testing/bundle_tests/setup.rs +++ b/crates/rbuilder/src/building/testing/bundle_tests/setup.rs @@ -199,11 +199,11 @@ impl TestSetup { Arc::from(self.test_chain.provider_factory().latest()?); let mut local_ctx = ThreadBlockBuildingContext::default(); - let sim_order = SimulatedOrder { - order: self.order_builder.build_order(), - sim_value: Default::default(), - used_state_trace: Default::default(), - }; + let sim_order = SimulatedOrder::new( + Arc::new(self.order_builder.build_order()), + Default::default(), + Default::default(), + ); // we commit order twice to test evm caching let initial_partial_block = self.partial_block.clone(); diff --git a/crates/rbuilder/src/live_builder/block_output/best_block_from_algorithms.rs b/crates/rbuilder/src/live_builder/block_output/best_block_from_algorithms.rs index cebf6888a..946416ec0 100644 --- a/crates/rbuilder/src/live_builder/block_output/best_block_from_algorithms.rs +++ b/crates/rbuilder/src/live_builder/block_output/best_block_from_algorithms.rs @@ -42,6 +42,8 @@ impl BestBlockFromAlgorithms { #[cfg(test)] mod tests { + use std::sync::Arc; + use alloy_primitives::{I256, U256}; use crate::building::{ @@ -81,7 +83,7 @@ mod tests { coinbase_profit: Default::default(), inplace_sim: Default::default(), space_used: Default::default(), - order: Order::Tx(order), + order: Arc::new(Order::Tx(order)), tx_infos: vec![TransactionExecutionInfo { tx, receipt: Default::default(), diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index ebbd7ba1c..f215a88c0 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -524,7 +524,7 @@ async fn try_send_to_orderpool( match TransactionSignedEcRecoveredWithBlobs::try_from_tx_without_blobs_and_pool(tx, pool) { Ok(tx) => { let order = Order::Tx(MempoolTx::new(tx)); - let command = ReplaceableOrderPoolCommand::Order(order); + let command = ReplaceableOrderPoolCommand::Order(Arc::new(order)); if let Err(e) = orderpool_sender.send(command).await { error!("Error sending order to orderpool: {:#}", e); } diff --git a/crates/rbuilder/src/live_builder/order_flow_tracing/order_flow_tracer.rs b/crates/rbuilder/src/live_builder/order_flow_tracing/order_flow_tracer.rs index 0b315e386..3c6c39128 100644 --- a/crates/rbuilder/src/live_builder/order_flow_tracing/order_flow_tracer.rs +++ b/crates/rbuilder/src/live_builder/order_flow_tracing/order_flow_tracer.rs @@ -128,7 +128,7 @@ impl ReplaceableOrderSniffer { } impl ReplaceableOrderSink for ReplaceableOrderSniffer { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { self.tracer.insert_order(&order); self.sink.insert_order(order) } diff --git a/crates/rbuilder/src/live_builder/order_input/blob_type_order_filter.rs b/crates/rbuilder/src/live_builder/order_input/blob_type_order_filter.rs index 106f91264..fb11d8d39 100644 --- a/crates/rbuilder/src/live_builder/order_input/blob_type_order_filter.rs +++ b/crates/rbuilder/src/live_builder/order_input/blob_type_order_filter.rs @@ -1,5 +1,6 @@ use crate::live_builder::order_input::replaceable_order_sink::ReplaceableOrderSink; use rbuilder_primitives::{BundleReplacementData, Order, TransactionSignedEcRecoveredWithBlobs}; +use std::sync::Arc; use tracing::trace; /// Filters out [`Order`]s based on their blob type. [`Order`]s that do not @@ -77,7 +78,7 @@ impl BlobTypeOrderFilter { } impl ReplaceableOrderSink for BlobTypeOrderFilter { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { if order .list_txs() .iter() diff --git a/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs b/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs index 5f1350912..c47f2c4a2 100644 --- a/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs +++ b/crates/rbuilder/src/live_builder/order_input/mempool_txs_detector.rs @@ -26,7 +26,7 @@ impl ReplaceableOrderStreamSniffer { } impl ReplaceableOrderSink for ReplaceableOrderStreamSniffer { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { self.detector.add_tx(&order); self.sink.insert_order(order) } diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index 015cd8b2b..2651f02cd 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -214,7 +214,7 @@ impl OrderInputConfig { #[allow(clippy::large_enum_variant)] pub enum ReplaceableOrderPoolCommand { /// New or update order - Order(Order), + Order(Arc), CancelBundle(BundleReplacementData), } diff --git a/crates/rbuilder/src/live_builder/order_input/order_replacement_manager.rs b/crates/rbuilder/src/live_builder/order_input/order_replacement_manager.rs index 78f123861..010594e48 100644 --- a/crates/rbuilder/src/live_builder/order_input/order_replacement_manager.rs +++ b/crates/rbuilder/src/live_builder/order_input/order_replacement_manager.rs @@ -1,4 +1,5 @@ use ahash::HashMap; +use std::sync::Arc; use rbuilder_primitives::{BundleReplacementData, BundleReplacementKey, Order, OrderId}; @@ -34,7 +35,7 @@ impl OrderReplacementManager { } impl ReplaceableOrderSink for OrderReplacementManager { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { if let Some((rep_key, sequence_number)) = order.replacement_key_and_sequence_number() { match self.replacement_states.entry(rep_key) { std::collections::hash_map::Entry::Occupied(mut e) => { @@ -47,11 +48,11 @@ impl ReplaceableOrderSink for OrderReplacementManager { sequence_number, order_id: order.id(), })); - self.sink.insert_order(order) + self.sink.insert_order(Arc::clone(&order)) } } } else { - self.sink.insert_order(order) + self.sink.insert_order(Arc::clone(&order)) } } @@ -107,7 +108,7 @@ impl BundleReplacementState { /// returns false if some operation on the sink returned false fn insert_order( &mut self, - order: Order, + order: Arc, sequence_number: u64, sink: &mut Box, ) -> bool { @@ -116,7 +117,7 @@ impl BundleReplacementState { } let mut res = self.send_remove_order_if_needed(sink); let order_id = order.id(); - if !sink.insert_order(order) { + if !sink.insert_order(Arc::clone(&order)) { res = false; } *self = BundleReplacementState::Valid(ValidBundleState { @@ -150,6 +151,7 @@ mod test { //use super::*; use mockall::predicate::eq; + use std::sync::Arc; use uuid::Uuid; use crate::live_builder::order_input::{ @@ -210,7 +212,7 @@ mod test { .withf(move |o| o.id() == bundle_id) .return_const(true); let mut manager = OrderReplacementManager::new(Box::new(order_sink)); - manager.insert_order(bundle); + manager.insert_order(Arc::new(bundle)); } /// simple insert followed by a cancellation of the order @@ -238,7 +240,7 @@ mod test { .return_const(true); let mut manager = OrderReplacementManager::new(Box::new(order_sink)); - manager.insert_order(bundle); + manager.insert_order(Arc::new(bundle)); let cancel_bundle_replacement_data = BundleReplacementData { key: replacement_data.key, sequence_number: replacement_data.sequence_number + 1, @@ -263,7 +265,7 @@ mod test { .return_const(true); let mut manager = OrderReplacementManager::new(Box::new(order_sink)); - manager.insert_order(bundle); + manager.insert_order(Arc::new(bundle)); manager.remove_bundle(replacement_data); } @@ -287,7 +289,7 @@ mod test { let mut manager = OrderReplacementManager::new(Box::new(order_sink)); manager.remove_bundle(replacement_data); - manager.insert_order(bundle); + manager.insert_order(Arc::new(bundle)); } /// replacement with sequence increase should show both versions. @@ -326,8 +328,8 @@ mod test { .return_const(true); let mut manager = OrderReplacementManager::new(Box::new(order_sink)); - manager.insert_order(old_bundle); - manager.insert_order(new_bundle); + manager.insert_order(Arc::new(old_bundle)); + manager.insert_order(Arc::new(new_bundle)); } /// replacement with sequence decrease should ignore the older version. @@ -350,7 +352,7 @@ mod test { .return_const(true); let mut manager = OrderReplacementManager::new(Box::new(order_sink)); - manager.insert_order(new_bundle); - manager.insert_order(old_bundle); + manager.insert_order(Arc::new(new_bundle)); + manager.insert_order(Arc::new(old_bundle)); } } diff --git a/crates/rbuilder/src/live_builder/order_input/order_sink.rs b/crates/rbuilder/src/live_builder/order_input/order_sink.rs index e19333921..b61d79527 100644 --- a/crates/rbuilder/src/live_builder/order_input/order_sink.rs +++ b/crates/rbuilder/src/live_builder/order_input/order_sink.rs @@ -15,7 +15,7 @@ use std::sync::Arc; /// This bool allows the source to cancel notifications on errors if needed. #[automock] pub trait OrderSink: Debug + Send { - fn insert_order(&mut self, order: Order) -> bool; + fn insert_order(&mut self, order: Arc) -> bool; fn remove_order(&mut self, id: OrderId) -> bool; /// @Pending remove this ugly hack to check if we can stop sending data. /// It should be replaced for a better control over object destruction @@ -27,7 +27,7 @@ pub trait OrderSink: Debug + Send { pub struct OrderPrinter {} impl OrderSink for OrderPrinter { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { info!(order_id = ?order.id() ,"New order"); true } @@ -54,7 +54,7 @@ impl Drop for OrderPrinter { #[allow(clippy::large_enum_variant)] pub enum OrderPoolCommand { //OrderSink::insert_order - Insert(Order), + Insert(Arc), //OrderSink::remove_order Remove(OrderId), } @@ -74,7 +74,7 @@ impl OrderSender2OrderSink { } impl OrderSink for OrderSender2OrderSink { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { self.sender.send(OrderPoolCommand::Insert(order)).is_ok() } @@ -91,7 +91,7 @@ impl OrderSink for OrderSender2OrderSink { /// Usage: create, call OrderSink funcs and call into_orders/orders to get the current orders. #[derive(Debug)] pub struct OrderStore { - orders: HashMap, + orders: HashMap>, } impl OrderStore { @@ -101,11 +101,11 @@ impl OrderStore { } } - pub fn into_orders(self) -> Vec { + pub fn into_orders(self) -> Vec> { self.orders.into_values().collect() } - pub fn orders(&self) -> Vec { + pub fn orders(&self) -> Vec> { self.orders.values().cloned().collect() } } @@ -117,7 +117,7 @@ impl Default for OrderStore { } impl OrderSink for OrderStore { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { if let Some(old_order) = self.orders.insert(order.id(), order) { warn!(id =?old_order.id(), "Replacing an already inserted order"); } @@ -149,7 +149,7 @@ impl ShareableOrderSink { } impl OrderSink for ShareableOrderSink { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { self.sink.lock().insert_order(order) } diff --git a/crates/rbuilder/src/live_builder/order_input/orderpool.rs b/crates/rbuilder/src/live_builder/order_input/orderpool.rs index 487ac9d47..a3360d383 100644 --- a/crates/rbuilder/src/live_builder/order_input/orderpool.rs +++ b/crates/rbuilder/src/live_builder/order_input/orderpool.rs @@ -11,6 +11,7 @@ use reth_primitives_traits::InMemorySize; use std::{ collections::VecDeque, num::NonZeroUsize, + sync::Arc, time::{Duration, Instant}, }; use tokio::sync::mpsc::{self}; @@ -51,7 +52,7 @@ impl OrdersForBlock { #[repr(transparent)] struct BundleBlockStore { /// The stored orders - bundles: Vec, + bundles: Vec>, } /// A sink for [`ReplaceableOrderPoolCommand`] for a specific block. This sink @@ -80,14 +81,14 @@ pub struct OrderPoolSubscriptionId(u64); /// progress sinks. #[derive(Debug)] pub struct OrderPool { - mempool_txs: Vec<(Order, Instant)>, + mempool_txs: Vec<(Arc, Instant)>, /// Sum of measure_tx(order) for all mempool_txs mempool_txs_size: usize, /// cancelled bundle, cancellation arrival time bundle_cancellations: VecDeque<(BundleReplacementData, Instant)>, bundles_by_target_block: HashMap, /// Bundles with block == None. Always returned and cleaned on head_updated. - bundles_for_current_block: Vec, + bundles_for_current_block: Vec>, known_orders: LruCache<(OrderId, u64), ()>, sinks: HashMap, next_sink_id: u64, @@ -119,7 +120,7 @@ impl OrderPool { commands.into_iter().for_each(|oc| self.process_command(oc)); } - fn process_order(&mut self, order: &Order) { + fn process_order(&mut self, order: &Arc) { let target_block = order.target_block(); let order_id = order.id(); if self @@ -131,11 +132,11 @@ impl OrderPool { } trace!(?order_id, "Adding order"); - let (order, target_block) = match &order { + let target_block = match order.as_ref() { Order::Tx(..) => { - self.mempool_txs.push((order.clone(), Instant::now())); + self.mempool_txs.push((Arc::clone(order), Instant::now())); self.mempool_txs_size += Self::measure_tx(order); - (order, None) + None } Order::Bundle(bundle) => { let target_block = bundle.block; @@ -145,13 +146,13 @@ impl OrderPool { .bundles_by_target_block .entry(target_block) .or_default(); - bundles_store.bundles.push(order.clone()); + bundles_store.bundles.push(Arc::clone(order)); } None => { - self.bundles_for_current_block.push(order.clone()); + self.bundles_for_current_block.push(Arc::clone(order)); } }; - (order, target_block) + target_block } }; self.known_orders @@ -174,7 +175,7 @@ impl OrderPool { } if target_block.is_none() || target_block == Some(sub.block_number) { let send_ok = match command.clone() { - ReplaceableOrderPoolCommand::Order(o) => sub.sink.insert_order(o), + ReplaceableOrderPoolCommand::Order(o) => sub.sink.insert_order(Arc::clone(&o)), ReplaceableOrderPoolCommand::CancelBundle(replacement_data) => { sub.sink.remove_bundle(replacement_data) } @@ -193,7 +194,7 @@ impl OrderPool { block_number: u64, mut sink: Box, ) -> OrderPoolSubscriptionId { - for order in self.mempool_txs.iter().map(|(order, _)| order.clone()) { + for order in self.mempool_txs.iter().map(|(order, _)| Arc::clone(order)) { sink.insert_order(order); } for replacement_data in self.bundle_cancellations.iter().map(|(key, _)| key) { @@ -201,13 +202,13 @@ impl OrderPool { } if let Some(bundle_store) = self.bundles_by_target_block.get(&block_number) { - for order in bundle_store.bundles.iter().cloned() { - sink.insert_order(order); + for order in bundle_store.bundles.iter() { + sink.insert_order(Arc::clone(order)); } } - for bundle in self.bundles_for_current_block.iter().cloned() { - sink.insert_order(bundle); + for bundle in &self.bundles_for_current_block { + sink.insert_order(Arc::clone(bundle)); } let res = OrderPoolSubscriptionId(self.next_sink_id); diff --git a/crates/rbuilder/src/live_builder/order_input/replaceable_order_sink.rs b/crates/rbuilder/src/live_builder/order_input/replaceable_order_sink.rs index 88aa86699..61cb354f6 100644 --- a/crates/rbuilder/src/live_builder/order_input/replaceable_order_sink.rs +++ b/crates/rbuilder/src/live_builder/order_input/replaceable_order_sink.rs @@ -2,6 +2,7 @@ use tracing::info; use core::fmt::Debug; use rbuilder_primitives::{BundleReplacementData, Order}; +use std::sync::Arc; /// Receiver of order commands in a low level order stream (mempool + RPC calls). /// Orders are assumed to be immutable so there is no update. @@ -11,7 +12,7 @@ use rbuilder_primitives::{BundleReplacementData, Order}; /// Due to source problems insert_order/remove_bundle can arrive out of order so Orders also have a sequence number /// so we can identify the newest. pub trait ReplaceableOrderSink: Debug + Send { - fn insert_order(&mut self, order: Order) -> bool; + fn insert_order(&mut self, order: Arc) -> bool; fn remove_bundle(&mut self, replacement_data: BundleReplacementData) -> bool; /// @Pending remove this ugly hack to check if we can stop sending data. /// It should be replaced for a better control over object destruction @@ -23,7 +24,7 @@ pub trait ReplaceableOrderSink: Debug + Send { pub struct ReplaceableOrderPrinter {} impl ReplaceableOrderSink for ReplaceableOrderPrinter { - fn insert_order(&mut self, order: Order) -> bool { + fn insert_order(&mut self, order: Arc) -> bool { info!( order_id = ?order.id(), order_rep_info = ?order.replacement_key_and_sequence_number(), @@ -52,7 +53,7 @@ impl Drop for ReplaceableOrderPrinter { pub struct NullReplaceableOrderSink {} impl ReplaceableOrderSink for NullReplaceableOrderSink { - fn insert_order(&mut self, _order: Order) -> bool { + fn insert_order(&mut self, _order: Arc) -> bool { true } diff --git a/crates/rbuilder/src/live_builder/order_input/rpc_server.rs b/crates/rbuilder/src/live_builder/order_input/rpc_server.rs index 899177b8b..e04e2fd53 100644 --- a/crates/rbuilder/src/live_builder/order_input/rpc_server.rs +++ b/crates/rbuilder/src/live_builder/order_input/rpc_server.rs @@ -223,7 +223,7 @@ async fn send_order( received_at: OffsetDateTime, ) { send_command( - ReplaceableOrderPoolCommand::Order(order), + ReplaceableOrderPoolCommand::Order(Arc::new(order)), channel, timeout, received_at, diff --git a/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs b/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs index e62032df4..d3b794fed 100644 --- a/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs +++ b/crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs @@ -7,7 +7,7 @@ use rbuilder_primitives::{ serialize::TxEncoding, MempoolTx, Order, RawTransactionDecodable, TransactionSignedEcRecoveredWithBlobs, }; -use std::{pin::pin, time::Instant}; +use std::{pin::pin, sync::Arc, time::Instant}; use time::OffsetDateTime; use tokio::{ sync::{mpsc, mpsc::error::SendTimeoutError}, @@ -76,7 +76,7 @@ pub async fn subscribe_to_txpool_with_blobs( trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), "Mempool transaction received with blobs"); add_txfetcher_time_to_query(parse_duration); - let orderpool_command = ReplaceableOrderPoolCommand::Order(order); + let orderpool_command = ReplaceableOrderPoolCommand::Order(Arc::new(order)); mark_command_received(&orderpool_command, received_at); match results .send_timeout(orderpool_command, config.results_channel_timeout) @@ -175,9 +175,10 @@ mod test { let recv_tx = receiver.recv().await.unwrap(); let tx_with_blobs = match recv_tx { - ReplaceableOrderPoolCommand::Order(Order::Tx(MempoolTx { tx_with_blobs })) => { - Some(tx_with_blobs) - } + ReplaceableOrderPoolCommand::Order(order) => match order.as_ref() { + Order::Tx(MempoolTx { tx_with_blobs }) => Some(tx_with_blobs.clone()), + _ => None, + }, _ => None, } .unwrap(); @@ -197,9 +198,10 @@ mod test { let recv_tx = receiver.recv().await.unwrap(); let tx_without_blobs = match recv_tx { - ReplaceableOrderPoolCommand::Order(Order::Tx(MempoolTx { tx_with_blobs })) => { - Some(tx_with_blobs) - } + ReplaceableOrderPoolCommand::Order(order) => match order.as_ref() { + Order::Tx(MempoolTx { tx_with_blobs }) => Some(tx_with_blobs.clone()), + _ => None, + }, _ => None, } .unwrap(); diff --git a/crates/rbuilder/src/live_builder/simulation/mod.rs b/crates/rbuilder/src/live_builder/simulation/mod.rs index 1dfa36514..4c2047da6 100644 --- a/crates/rbuilder/src/live_builder/simulation/mod.rs +++ b/crates/rbuilder/src/live_builder/simulation/mod.rs @@ -244,7 +244,9 @@ mod tests { let tx = test_context.sign_tx(tx_args).unwrap(); let tx = TransactionSignedEcRecoveredWithBlobs::new_no_blobs(tx).unwrap(); order_sender - .send(OrderPoolCommand::Insert(Order::Tx(MempoolTx::new(tx)))) + .send(OrderPoolCommand::Insert(Arc::new(Order::Tx( + MempoolTx::new(tx), + )))) .unwrap(); // We expect to receive the simulation giving a profit of coinbase_profit since that's what we sent directly to coinbase. diff --git a/crates/rbuilder/src/live_builder/simulation/simulation_job.rs b/crates/rbuilder/src/live_builder/simulation/simulation_job.rs index 2fc950272..22b2eca05 100644 --- a/crates/rbuilder/src/live_builder/simulation/simulation_job.rs +++ b/crates/rbuilder/src/live_builder/simulation/simulation_job.rs @@ -268,7 +268,7 @@ impl SimulationJob { } /// feeding the sim tree. - fn process_new_order(&mut self, order: Order) -> bool { + fn process_new_order(&mut self, order: Arc) -> bool { self.orders_received.accumulate(&order); if let Some(repl_key) = order.replacement_key() { self.unique_replacement_key_bundles.insert(repl_key); @@ -289,7 +289,7 @@ impl SimulationJob { match new_commnad { OrderPoolCommand::Insert(order) => { // This is not unrecoverable error, so if it fails, we ignore it and try processing next order - let _success = self.process_new_order(order.clone()); + let _success = self.process_new_order(Arc::clone(order)); } OrderPoolCommand::Remove(order_id) => { // Returns false if channel is closed, diff --git a/crates/rbuilder/src/telemetry/metrics/tracing_metrics.rs b/crates/rbuilder/src/telemetry/metrics/tracing_metrics.rs index ba1bdacbc..de6db4316 100644 --- a/crates/rbuilder/src/telemetry/metrics/tracing_metrics.rs +++ b/crates/rbuilder/src/telemetry/metrics/tracing_metrics.rs @@ -96,7 +96,7 @@ pub fn mark_command_received(command: &ReplaceableOrderPoolCommand, received_at: let kind = match command { ReplaceableOrderPoolCommand::Order(order) => { mark_order_received(order.id(), received_at); - match order { + match order.as_ref() { Order::Bundle(_) => "bundle", Order::Tx(_) => "tx", }