Performance Optimization Guide
This comprehensive guide covers performance best practices for high-frequency trading applications, based on patterns observed in the axiomtrade-rs codebase.
Table of Contents
- Async/Await Best Practices
- Connection Pooling and Management
- Batch Operations
- Memory Management
- Rate Limiting and Throttling
- Benchmarking and Monitoring
- High-Frequency Trading Optimizations
- Network Optimization
Async/Await Best Practices
1. Efficient Task Spawning
Use tokio::spawn
for CPU-intensive tasks and concurrent operations:
#![allow(unused)] fn main() { // Good: Spawn independent tasks let handles: Vec<_> = wallet_addresses.iter().map(|address| { let client = client.clone(); let address = address.clone(); tokio::spawn(async move { client.get_balance(&address).await }) }).collect(); // Wait for all tasks to complete let results = futures_util::future::try_join_all(handles).await?; }
2. Avoid Blocking in Async Context
#![allow(unused)] fn main() { // Bad: Blocking I/O in async context async fn bad_example() { std::fs::read_to_string("file.json").unwrap(); // Blocks entire runtime } // Good: Use async I/O async fn good_example() { tokio::fs::read_to_string("file.json").await.unwrap(); } }
3. Strategic Use of Arc and RwLock
#![allow(unused)] fn main() { pub struct EnhancedClient { auth_client: Arc<RwLock<AuthClient>>, rate_limiter: EndpointRateLimiter, global_rate_limiter: RateLimiter, } // Minimize lock contention impl EnhancedClient { pub async fn make_request(&self, method: Method, url: &str) -> Result<Response> { // Check rate limits first (no lock needed) self.global_rate_limiter.wait_if_needed().await; // Only acquire lock when needed let auth_client = Arc::clone(&self.auth_client); let result = retry_with_config(self.retry_config.clone(), || { let auth_client = Arc::clone(&auth_client); async move { auth_client.write().await .make_authenticated_request(method, url, body) .await } }).await?; result } } }
4. Efficient Error Handling
#![allow(unused)] fn main() { impl RetryableError for EnhancedClientError { fn is_retryable(&self) -> bool { match self { EnhancedClientError::NetworkError(e) => e.is_timeout() || e.is_connect(), EnhancedClientError::RateLimitExceeded => true, _ => false, } } } // Use custom retry logic for better performance pub async fn retry_with_backoff<F, Fut, T, E>( config: RetryConfig, mut operation: F, ) -> Result<T, E> where F: FnMut() -> Fut, Fut: Future<Output = Result<T, E>>, E: RetryableError, { for attempt in 0..=config.max_retries { match operation().await { Ok(result) => return Ok(result), Err(error) => { if !error.is_retryable() || attempt == config.max_retries { return Err(error); } let delay = config.calculate_delay(attempt); tokio::time::sleep(delay).await; } } } } }
Connection Pooling and Management
1. HTTP Client Reuse
#![allow(unused)] fn main() { // Good: Reuse client with connection pooling lazy_static! { static ref HTTP_CLIENT: reqwest::Client = { reqwest::Client::builder() .pool_max_idle_per_host(10) .pool_idle_timeout(Duration::from_secs(30)) .timeout(Duration::from_secs(30)) .tcp_keepalive(Duration::from_secs(60)) .tcp_nodelay(true) .build() .expect("Failed to create HTTP client") }; } }
2. WebSocket Connection Management
#![allow(unused)] fn main() { pub struct WebSocketClient { region: Region, handler: Arc<dyn MessageHandler>, is_connected: Arc<RwLock<bool>>, reconnect_on_expire: bool, } impl WebSocketClient { // Use multiple endpoints for redundancy fn get_random_url(&self) -> &'static str { let urls = match self.region { Region::USWest => vec!["socket8.axiom.trade", "cluster-usw2.axiom.trade"], Region::USCentral => vec!["cluster3.axiom.trade", "cluster-usc2.axiom.trade"], // ... }; urls[fastrand::usize(0..urls.len())] } // Automatic token refresh fn spawn_token_refresh_task(&self) { let auth_client = Arc::clone(&self.auth_client); let is_connected = Arc::clone(&self.is_connected); tokio::spawn(async move { let mut refresh_interval = interval(Duration::from_secs(600)); loop { refresh_interval.tick().await; if !*is_connected.read().await { break; } if let Err(e) = auth_client.write().await.ensure_valid_authentication().await { *is_connected.write().await = false; break; } } }); } } }
3. Session Management
#![allow(unused)] fn main() { pub struct SessionManager { session: Arc<RwLock<Option<AuthSession>>>, storage_path: Option<PathBuf>, auto_save: bool, } impl SessionManager { // Efficient session validation pub async fn is_session_valid(&self) -> bool { let guard = self.session.read().await; guard.as_ref().map_or(false, |session| session.is_valid()) } // Batch operations for session updates pub async fn update_session_data( &self, tokens: Option<AuthTokens>, cookies: Option<AuthCookies> ) -> Result<(), AuthError> { { let mut guard = self.session.write().await; if let Some(session) = guard.as_mut() { if let Some(tokens) = tokens { session.update_tokens(tokens); } if let Some(cookies) = cookies { session.cookies.merge_with(&cookies); } } } if self.auto_save { self.save_session().await?; } Ok(()) } } }
Batch Operations
1. Efficient Batch Balance Queries
#![allow(unused)] fn main() { // Instead of individual queries pub async fn get_balances_inefficient( &self, addresses: &[String] ) -> Result<Vec<Balance>, PortfolioError> { let mut balances = Vec::new(); for address in addresses { let balance = self.get_balance(address).await?; // N API calls balances.push(balance); } Ok(balances) } // Use batch endpoint pub async fn get_batch_balance( &self, wallet_addresses: &[String], ) -> Result<BatchBalanceResponse, PortfolioError> { let request_body = json!({ "wallets": wallet_addresses }); self.client .make_json_request(Method::POST, "/portfolio/batch-balance", Some(request_body)) .await .map_err(PortfolioError::from) } }
2. Concurrent Processing
#![allow(unused)] fn main() { pub async fn process_batch_concurrent<T, F, Fut>( items: Vec<T>, concurrency_limit: usize, processor: F, ) -> Vec<Result<F::Output, F::Error>> where F: Fn(T) -> Fut + Clone + Send + 'static, Fut: Future<Output = Result<F::Output, F::Error>> + Send, T: Send + 'static, { let semaphore = Arc::new(Semaphore::new(concurrency_limit)); let tasks: Vec<_> = items.into_iter().map(|item| { let semaphore = Arc::clone(&semaphore); let processor = processor.clone(); tokio::spawn(async move { let _permit = semaphore.acquire().await.unwrap(); processor(item).await }) }).collect(); futures_util::future::join_all(tasks).await .into_iter() .map(|result| result.unwrap()) .collect() } }
Memory Management
1. Efficient Data Structures
#![allow(unused)] fn main() { // Use bounded collections for streaming data struct MarketDataBuffer { ticks: VecDeque<MarketTick>, max_size: usize, } impl MarketDataBuffer { fn new() -> Self { Self { ticks: VecDeque::with_capacity(10000), max_size: 10000, } } fn add_tick(&mut self, tick: MarketTick) { if self.ticks.len() >= self.max_size { self.ticks.pop_front(); // Remove oldest } self.ticks.push_back(tick); } } }
2. Zero-Copy Patterns
#![allow(unused)] fn main() { // Use references instead of cloning pub fn process_market_data(data: &[MarketTick]) -> MarketState { let mut total_volume = 0.0; let mut price_sum = 0.0; for tick in data.iter() { // Iterator over references total_volume += tick.quantity; price_sum += tick.price; } MarketState { avg_price: price_sum / data.len() as f64, total_volume, } } }
3. Memory Pool Usage
#![allow(unused)] fn main() { // Pre-allocate frequently used objects pub struct OrderPool { orders: Vec<Order>, free_indices: Vec<usize>, } impl OrderPool { pub fn new(capacity: usize) -> Self { let orders = (0..capacity) .map(|_| Order::default()) .collect(); let free_indices = (0..capacity).collect(); Self { orders, free_indices } } pub fn acquire(&mut self) -> Option<&mut Order> { self.free_indices.pop() .map(|idx| &mut self.orders[idx]) } pub fn release(&mut self, order: &Order) { if let Some(idx) = self.orders.iter().position(|o| std::ptr::eq(o, order)) { self.orders[idx].reset(); self.free_indices.push(idx); } } } }
Rate Limiting and Throttling
1. Token Bucket Rate Limiter
#![allow(unused)] fn main() { pub struct BucketRateLimiter { tokens: Arc<RwLock<f64>>, max_tokens: f64, refill_rate: f64, last_refill: Arc<RwLock<Instant>>, } impl BucketRateLimiter { pub async fn try_consume(&self, tokens_needed: f64) -> Option<Duration> { let mut tokens = self.tokens.write().await; let mut last_refill = self.last_refill.write().await; let now = Instant::now(); // Refill tokens based on elapsed time let elapsed = now.duration_since(*last_refill).as_secs_f64(); let tokens_to_add = (elapsed * self.refill_rate).min(self.max_tokens - *tokens); *tokens = (*tokens + tokens_to_add).min(self.max_tokens); *last_refill = now; if *tokens >= tokens_needed { *tokens -= tokens_needed; None } else { let tokens_deficit = tokens_needed - *tokens; let wait_time = Duration::from_secs_f64(tokens_deficit / self.refill_rate); Some(wait_time) } } } }
2. Endpoint-Specific Rate Limiting
#![allow(unused)] fn main() { pub struct EndpointRateLimiter { limiters: Arc<RwLock<HashMap<String, RateLimiter>>>, default_limiter: RateLimiter, } impl EndpointRateLimiter { pub async fn wait_for_endpoint(&self, endpoint: &str) { let limiters = self.limiters.read().await; if let Some(limiter) = limiters.get(endpoint) { limiter.wait_if_needed().await; } else { drop(limiters); // Release read lock self.default_limiter.wait_if_needed().await; } } } }
Benchmarking and Monitoring
1. Performance Metrics Collection
#![allow(unused)] fn main() { pub struct LatencyTracker { execution_latencies: VecDeque<Duration>, max_samples: usize, } impl LatencyTracker { pub fn record_execution_latency(&mut self, latency: Duration) { if self.execution_latencies.len() >= self.max_samples { self.execution_latencies.pop_front(); } self.execution_latencies.push_back(latency); } pub fn get_percentiles(&self) -> LatencyPercentiles { let mut sorted: Vec<_> = self.execution_latencies.iter().collect(); sorted.sort(); let len = sorted.len(); LatencyPercentiles { p50: *sorted[len * 50 / 100], p95: *sorted[len * 95 / 100], p99: *sorted[len * 99 / 100], } } } }
2. Real-time Performance Monitoring
#![allow(unused)] fn main() { pub async fn monitor_performance( &self, interval: Duration ) -> tokio::task::JoinHandle<()> { let latency_tracker = Arc::clone(&self.latency_tracker); tokio::spawn(async move { let mut monitoring_interval = tokio::time::interval(interval); loop { monitoring_interval.tick().await; let tracker = latency_tracker.lock().await; let avg_latency = tracker.get_average_latency(); let percentiles = tracker.get_percentiles(); if avg_latency > Duration::from_millis(10) { println!("⚠️ High average latency: {:.2}ms", avg_latency.as_secs_f64() * 1000.0); } if percentiles.p99 > Duration::from_millis(50) { println!("🚨 P99 latency spike: {:.2}ms", percentiles.p99.as_secs_f64() * 1000.0); } } }) } }
3. Benchmarking Framework
#![allow(unused)] fn main() { #[cfg(test)] mod benchmarks { use super::*; use std::time::Instant; #[tokio::test] async fn benchmark_batch_vs_individual() { let client = setup_test_client().await; let addresses = generate_test_addresses(100); // Benchmark individual requests let start = Instant::now(); for address in &addresses { client.get_balance(address).await.unwrap(); } let individual_time = start.elapsed(); // Benchmark batch request let start = Instant::now(); client.get_batch_balance(&addresses).await.unwrap(); let batch_time = start.elapsed(); println!("Individual: {:.2}ms", individual_time.as_secs_f64() * 1000.0); println!("Batch: {:.2}ms", batch_time.as_secs_f64() * 1000.0); println!("Speedup: {:.2}x", individual_time.as_secs_f64() / batch_time.as_secs_f64()); assert!(batch_time < individual_time / 5); // At least 5x faster } } }
High-Frequency Trading Optimizations
1. Ultra-Low Latency Execution
#![allow(unused)] fn main() { pub struct ExecutionEngine { config: ExecutionConfig, } impl ExecutionEngine { pub async fn execute_order_ultra_fast( &self, client: &EnhancedClient, signal: &HftSignal, ) -> Result<ExecutionResult> { let execution_start = Instant::now(); // Pre-validate signal (avoid blocking operations) if signal.confidence < self.config.min_confidence { return Err(ExecutionError::LowConfidence); } // Use IOC (Immediate or Cancel) orders for speed let order_request = self.build_order_request(signal, OrderTimeInForce::IOC); // Execute with timeout let result = tokio::time::timeout( self.config.max_latency_tolerance, client.submit_order(order_request) ).await??; let execution_latency = execution_start.elapsed(); // Record metrics self.metrics.record_execution_latency(execution_latency); if execution_latency > self.config.max_latency_tolerance { println!("⚠️ Execution exceeded latency budget: {:.3}ms", execution_latency.as_secs_f64() * 1000.0); } Ok(ExecutionResult { execution_latency, ..result }) } } }
2. Market Microstructure Analysis
#![allow(unused)] fn main() { pub struct MicrostructureAnalyzer { order_flow_buffer: VecDeque<OrderFlowEvent>, tick_buffer: VecDeque<MarketTick>, } impl MicrostructureAnalyzer { pub fn analyze_market_impact(&self, order_size: f64) -> f64 { let recent_ticks: Vec<_> = self.tick_buffer .iter() .rev() .take(100) .collect(); if recent_ticks.is_empty() { return 0.0; } // Calculate volume-weighted average price let total_volume: f64 = recent_ticks.iter().map(|t| t.quantity).sum(); let vwap: f64 = recent_ticks.iter() .map(|t| t.price * t.quantity) .sum::<f64>() / total_volume; // Estimate impact based on order size vs recent volume let avg_volume = total_volume / recent_ticks.len() as f64; let impact_factor = (order_size / avg_volume).min(1.0); impact_factor * 0.001 // Convert to basis points } } }
3. Smart Order Routing
#![allow(unused)] fn main() { pub struct SmartOrderRouter { venues: Vec<TradingVenue>, latency_tracker: HashMap<VenueId, LatencyTracker>, } impl SmartOrderRouter { pub async fn route_order(&self, order: &Order) -> Result<VenueId> { let mut best_venue = None; let mut best_score = f64::NEG_INFINITY; for venue in &self.venues { let score = self.calculate_venue_score(venue, order).await; if score > best_score { best_score = score; best_venue = Some(venue.id); } } best_venue.ok_or(RoutingError::NoSuitableVenue) } async fn calculate_venue_score(&self, venue: &TradingVenue, order: &Order) -> f64 { let liquidity_score = venue.get_liquidity_score(order.symbol()).await; let latency_score = self.get_latency_score(venue.id).await; let fee_score = 1.0 - venue.get_fee_rate(order.symbol()).await; // Weighted combination liquidity_score * 0.4 + latency_score * 0.4 + fee_score * 0.2 } } }
Network Optimization
1. TCP Optimization
#![allow(unused)] fn main() { // Configure HTTP client for optimal performance fn create_optimized_client() -> reqwest::Client { reqwest::Client::builder() .tcp_nodelay(true) // Disable Nagle's algorithm .tcp_keepalive(Duration::from_secs(60)) // Keep connections alive .pool_max_idle_per_host(20) // Connection pooling .pool_idle_timeout(Duration::from_secs(30)) .timeout(Duration::from_secs(10)) // Request timeout .connect_timeout(Duration::from_secs(5)) // Connection timeout .user_agent("axiomtrade-rs/1.0") .build() .expect("Failed to create HTTP client") } }
2. WebSocket Optimization
#![allow(unused)] fn main() { // Configure WebSocket for minimal latency pub async fn connect_optimized_websocket(url: &str) -> Result<WebSocketStream> { let request = http::Request::builder() .method("GET") .uri(url) .header("Connection", "Upgrade") .header("Upgrade", "websocket") .header("Sec-WebSocket-Version", "13") .header("Sec-WebSocket-Key", generate_key()) .header("Cache-Control", "no-cache") .header("Pragma", "no-cache") .body(())?; let (ws_stream, _) = connect_async(request).await?; // Configure stream for minimal buffering // (Platform-specific socket options would go here) Ok(ws_stream) } }
3. Regional Optimization
#![allow(unused)] fn main() { pub enum Region { USWest, USCentral, USEast, EUWest, // ... } impl Region { pub fn get_optimal_endpoints(&self) -> Vec<&'static str> { match self { Region::USWest => vec![ "socket8.axiom.trade", // Primary "cluster-usw2.axiom.trade", // Backup ], // Select closest endpoints for minimal latency } } pub async fn measure_latency(&self, endpoint: &str) -> Result<Duration> { let start = Instant::now(); let _response = reqwest::get(format!("https://{}/health", endpoint)).await?; Ok(start.elapsed()) } } }
Performance Monitoring Dashboard
Real-time Metrics
Monitor these key performance indicators:
- Execution Latency: P50, P95, P99 execution times
- Network Latency: Round-trip times to API endpoints
- Rate Limit Status: Remaining capacity for each endpoint
- Memory Usage: Heap usage and garbage collection frequency
- Connection Health: Active connections and error rates
- Order Flow: Orders per second and fill rates
Alerting Thresholds
Set up alerts for:
- P99 latency > 50ms
- Error rate > 1%
- Memory usage > 80%
- Rate limit utilization > 90%
- Connection failures > 5 per minute
Conclusion
Effective performance optimization requires:
- Proactive Monitoring: Instrument code to measure what matters
- Efficient Resource Usage: Pool connections, batch operations, minimize allocations
- Network Optimization: Choose optimal endpoints, configure TCP properly
- Async Best Practices: Avoid blocking, use appropriate concurrency patterns
- Continuous Benchmarking: Measure improvements and catch regressions
The patterns demonstrated in this codebase provide a solid foundation for building high-performance trading applications that can handle the demands of modern financial markets.