Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Performance Optimization Guide

This comprehensive guide covers performance best practices for high-frequency trading applications, based on patterns observed in the axiomtrade-rs codebase.

Table of Contents

  1. Async/Await Best Practices
  2. Connection Pooling and Management
  3. Batch Operations
  4. Memory Management
  5. Rate Limiting and Throttling
  6. Benchmarking and Monitoring
  7. High-Frequency Trading Optimizations
  8. Network Optimization

Async/Await Best Practices

1. Efficient Task Spawning

Use tokio::spawn for CPU-intensive tasks and concurrent operations:

#![allow(unused)]
fn main() {
// Good: Spawn independent tasks
let handles: Vec<_> = wallet_addresses.iter().map(|address| {
    let client = client.clone();
    let address = address.clone();
    tokio::spawn(async move {
        client.get_balance(&address).await
    })
}).collect();

// Wait for all tasks to complete
let results = futures_util::future::try_join_all(handles).await?;
}

2. Avoid Blocking in Async Context

#![allow(unused)]
fn main() {
// Bad: Blocking I/O in async context
async fn bad_example() {
    std::fs::read_to_string("file.json").unwrap(); // Blocks entire runtime
}

// Good: Use async I/O
async fn good_example() {
    tokio::fs::read_to_string("file.json").await.unwrap();
}
}

3. Strategic Use of Arc and RwLock

#![allow(unused)]
fn main() {
pub struct EnhancedClient {
    auth_client: Arc<RwLock<AuthClient>>,
    rate_limiter: EndpointRateLimiter,
    global_rate_limiter: RateLimiter,
}

// Minimize lock contention
impl EnhancedClient {
    pub async fn make_request(&self, method: Method, url: &str) -> Result<Response> {
        // Check rate limits first (no lock needed)
        self.global_rate_limiter.wait_if_needed().await;
        
        // Only acquire lock when needed
        let auth_client = Arc::clone(&self.auth_client);
        let result = retry_with_config(self.retry_config.clone(), || {
            let auth_client = Arc::clone(&auth_client);
            async move {
                auth_client.write().await
                    .make_authenticated_request(method, url, body)
                    .await
            }
        }).await?;
        
        result
    }
}
}

4. Efficient Error Handling

#![allow(unused)]
fn main() {
impl RetryableError for EnhancedClientError {
    fn is_retryable(&self) -> bool {
        match self {
            EnhancedClientError::NetworkError(e) => e.is_timeout() || e.is_connect(),
            EnhancedClientError::RateLimitExceeded => true,
            _ => false,
        }
    }
}

// Use custom retry logic for better performance
pub async fn retry_with_backoff<F, Fut, T, E>(
    config: RetryConfig,
    mut operation: F,
) -> Result<T, E>
where
    F: FnMut() -> Fut,
    Fut: Future<Output = Result<T, E>>,
    E: RetryableError,
{
    for attempt in 0..=config.max_retries {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(error) => {
                if !error.is_retryable() || attempt == config.max_retries {
                    return Err(error);
                }
                
                let delay = config.calculate_delay(attempt);
                tokio::time::sleep(delay).await;
            }
        }
    }
}
}

Connection Pooling and Management

1. HTTP Client Reuse

#![allow(unused)]
fn main() {
// Good: Reuse client with connection pooling
lazy_static! {
    static ref HTTP_CLIENT: reqwest::Client = {
        reqwest::Client::builder()
            .pool_max_idle_per_host(10)
            .pool_idle_timeout(Duration::from_secs(30))
            .timeout(Duration::from_secs(30))
            .tcp_keepalive(Duration::from_secs(60))
            .tcp_nodelay(true)
            .build()
            .expect("Failed to create HTTP client")
    };
}
}

2. WebSocket Connection Management

#![allow(unused)]
fn main() {
pub struct WebSocketClient {
    region: Region,
    handler: Arc<dyn MessageHandler>,
    is_connected: Arc<RwLock<bool>>,
    reconnect_on_expire: bool,
}

impl WebSocketClient {
    // Use multiple endpoints for redundancy
    fn get_random_url(&self) -> &'static str {
        let urls = match self.region {
            Region::USWest => vec!["socket8.axiom.trade", "cluster-usw2.axiom.trade"],
            Region::USCentral => vec!["cluster3.axiom.trade", "cluster-usc2.axiom.trade"],
            // ...
        };
        urls[fastrand::usize(0..urls.len())]
    }
    
    // Automatic token refresh
    fn spawn_token_refresh_task(&self) {
        let auth_client = Arc::clone(&self.auth_client);
        let is_connected = Arc::clone(&self.is_connected);
        
        tokio::spawn(async move {
            let mut refresh_interval = interval(Duration::from_secs(600));
            
            loop {
                refresh_interval.tick().await;
                
                if !*is_connected.read().await {
                    break;
                }
                
                if let Err(e) = auth_client.write().await.ensure_valid_authentication().await {
                    *is_connected.write().await = false;
                    break;
                }
            }
        });
    }
}
}

3. Session Management

#![allow(unused)]
fn main() {
pub struct SessionManager {
    session: Arc<RwLock<Option<AuthSession>>>,
    storage_path: Option<PathBuf>,
    auto_save: bool,
}

impl SessionManager {
    // Efficient session validation
    pub async fn is_session_valid(&self) -> bool {
        let guard = self.session.read().await;
        guard.as_ref().map_or(false, |session| session.is_valid())
    }
    
    // Batch operations for session updates
    pub async fn update_session_data(
        &self, 
        tokens: Option<AuthTokens>,
        cookies: Option<AuthCookies>
    ) -> Result<(), AuthError> {
        {
            let mut guard = self.session.write().await;
            if let Some(session) = guard.as_mut() {
                if let Some(tokens) = tokens {
                    session.update_tokens(tokens);
                }
                if let Some(cookies) = cookies {
                    session.cookies.merge_with(&cookies);
                }
            }
        }
        
        if self.auto_save {
            self.save_session().await?;
        }
        
        Ok(())
    }
}
}

Batch Operations

1. Efficient Batch Balance Queries

#![allow(unused)]
fn main() {
// Instead of individual queries
pub async fn get_balances_inefficient(
    &self, 
    addresses: &[String]
) -> Result<Vec<Balance>, PortfolioError> {
    let mut balances = Vec::new();
    for address in addresses {
        let balance = self.get_balance(address).await?; // N API calls
        balances.push(balance);
    }
    Ok(balances)
}

// Use batch endpoint
pub async fn get_batch_balance(
    &self,
    wallet_addresses: &[String],
) -> Result<BatchBalanceResponse, PortfolioError> {
    let request_body = json!({
        "wallets": wallet_addresses
    });
    
    self.client
        .make_json_request(Method::POST, "/portfolio/batch-balance", Some(request_body))
        .await
        .map_err(PortfolioError::from)
}
}

2. Concurrent Processing

#![allow(unused)]
fn main() {
pub async fn process_batch_concurrent<T, F, Fut>(
    items: Vec<T>,
    concurrency_limit: usize,
    processor: F,
) -> Vec<Result<F::Output, F::Error>>
where
    F: Fn(T) -> Fut + Clone + Send + 'static,
    Fut: Future<Output = Result<F::Output, F::Error>> + Send,
    T: Send + 'static,
{
    let semaphore = Arc::new(Semaphore::new(concurrency_limit));
    let tasks: Vec<_> = items.into_iter().map(|item| {
        let semaphore = Arc::clone(&semaphore);
        let processor = processor.clone();
        
        tokio::spawn(async move {
            let _permit = semaphore.acquire().await.unwrap();
            processor(item).await
        })
    }).collect();
    
    futures_util::future::join_all(tasks).await
        .into_iter()
        .map(|result| result.unwrap())
        .collect()
}
}

Memory Management

1. Efficient Data Structures

#![allow(unused)]
fn main() {
// Use bounded collections for streaming data
struct MarketDataBuffer {
    ticks: VecDeque<MarketTick>,
    max_size: usize,
}

impl MarketDataBuffer {
    fn new() -> Self {
        Self {
            ticks: VecDeque::with_capacity(10000),
            max_size: 10000,
        }
    }
    
    fn add_tick(&mut self, tick: MarketTick) {
        if self.ticks.len() >= self.max_size {
            self.ticks.pop_front(); // Remove oldest
        }
        self.ticks.push_back(tick);
    }
}
}

2. Zero-Copy Patterns

#![allow(unused)]
fn main() {
// Use references instead of cloning
pub fn process_market_data(data: &[MarketTick]) -> MarketState {
    let mut total_volume = 0.0;
    let mut price_sum = 0.0;
    
    for tick in data.iter() {  // Iterator over references
        total_volume += tick.quantity;
        price_sum += tick.price;
    }
    
    MarketState {
        avg_price: price_sum / data.len() as f64,
        total_volume,
    }
}
}

3. Memory Pool Usage

#![allow(unused)]
fn main() {
// Pre-allocate frequently used objects
pub struct OrderPool {
    orders: Vec<Order>,
    free_indices: Vec<usize>,
}

impl OrderPool {
    pub fn new(capacity: usize) -> Self {
        let orders = (0..capacity)
            .map(|_| Order::default())
            .collect();
        let free_indices = (0..capacity).collect();
        
        Self { orders, free_indices }
    }
    
    pub fn acquire(&mut self) -> Option<&mut Order> {
        self.free_indices.pop()
            .map(|idx| &mut self.orders[idx])
    }
    
    pub fn release(&mut self, order: &Order) {
        if let Some(idx) = self.orders.iter().position(|o| std::ptr::eq(o, order)) {
            self.orders[idx].reset();
            self.free_indices.push(idx);
        }
    }
}
}

Rate Limiting and Throttling

1. Token Bucket Rate Limiter

#![allow(unused)]
fn main() {
pub struct BucketRateLimiter {
    tokens: Arc<RwLock<f64>>,
    max_tokens: f64,
    refill_rate: f64,
    last_refill: Arc<RwLock<Instant>>,
}

impl BucketRateLimiter {
    pub async fn try_consume(&self, tokens_needed: f64) -> Option<Duration> {
        let mut tokens = self.tokens.write().await;
        let mut last_refill = self.last_refill.write().await;
        let now = Instant::now();
        
        // Refill tokens based on elapsed time
        let elapsed = now.duration_since(*last_refill).as_secs_f64();
        let tokens_to_add = (elapsed * self.refill_rate).min(self.max_tokens - *tokens);
        *tokens = (*tokens + tokens_to_add).min(self.max_tokens);
        *last_refill = now;
        
        if *tokens >= tokens_needed {
            *tokens -= tokens_needed;
            None
        } else {
            let tokens_deficit = tokens_needed - *tokens;
            let wait_time = Duration::from_secs_f64(tokens_deficit / self.refill_rate);
            Some(wait_time)
        }
    }
}
}

2. Endpoint-Specific Rate Limiting

#![allow(unused)]
fn main() {
pub struct EndpointRateLimiter {
    limiters: Arc<RwLock<HashMap<String, RateLimiter>>>,
    default_limiter: RateLimiter,
}

impl EndpointRateLimiter {
    pub async fn wait_for_endpoint(&self, endpoint: &str) {
        let limiters = self.limiters.read().await;
        
        if let Some(limiter) = limiters.get(endpoint) {
            limiter.wait_if_needed().await;
        } else {
            drop(limiters); // Release read lock
            self.default_limiter.wait_if_needed().await;
        }
    }
}
}

Benchmarking and Monitoring

1. Performance Metrics Collection

#![allow(unused)]
fn main() {
pub struct LatencyTracker {
    execution_latencies: VecDeque<Duration>,
    max_samples: usize,
}

impl LatencyTracker {
    pub fn record_execution_latency(&mut self, latency: Duration) {
        if self.execution_latencies.len() >= self.max_samples {
            self.execution_latencies.pop_front();
        }
        self.execution_latencies.push_back(latency);
    }
    
    pub fn get_percentiles(&self) -> LatencyPercentiles {
        let mut sorted: Vec<_> = self.execution_latencies.iter().collect();
        sorted.sort();
        
        let len = sorted.len();
        LatencyPercentiles {
            p50: *sorted[len * 50 / 100],
            p95: *sorted[len * 95 / 100],
            p99: *sorted[len * 99 / 100],
        }
    }
}
}

2. Real-time Performance Monitoring

#![allow(unused)]
fn main() {
pub async fn monitor_performance(
    &self,
    interval: Duration
) -> tokio::task::JoinHandle<()> {
    let latency_tracker = Arc::clone(&self.latency_tracker);
    
    tokio::spawn(async move {
        let mut monitoring_interval = tokio::time::interval(interval);
        
        loop {
            monitoring_interval.tick().await;
            
            let tracker = latency_tracker.lock().await;
            let avg_latency = tracker.get_average_latency();
            let percentiles = tracker.get_percentiles();
            
            if avg_latency > Duration::from_millis(10) {
                println!("⚠️  High average latency: {:.2}ms", 
                    avg_latency.as_secs_f64() * 1000.0);
            }
            
            if percentiles.p99 > Duration::from_millis(50) {
                println!("🚨 P99 latency spike: {:.2}ms", 
                    percentiles.p99.as_secs_f64() * 1000.0);
            }
        }
    })
}
}

3. Benchmarking Framework

#![allow(unused)]
fn main() {
#[cfg(test)]
mod benchmarks {
    use super::*;
    use std::time::Instant;
    
    #[tokio::test]
    async fn benchmark_batch_vs_individual() {
        let client = setup_test_client().await;
        let addresses = generate_test_addresses(100);
        
        // Benchmark individual requests
        let start = Instant::now();
        for address in &addresses {
            client.get_balance(address).await.unwrap();
        }
        let individual_time = start.elapsed();
        
        // Benchmark batch request
        let start = Instant::now();
        client.get_batch_balance(&addresses).await.unwrap();
        let batch_time = start.elapsed();
        
        println!("Individual: {:.2}ms", individual_time.as_secs_f64() * 1000.0);
        println!("Batch: {:.2}ms", batch_time.as_secs_f64() * 1000.0);
        println!("Speedup: {:.2}x", individual_time.as_secs_f64() / batch_time.as_secs_f64());
        
        assert!(batch_time < individual_time / 5); // At least 5x faster
    }
}
}

High-Frequency Trading Optimizations

1. Ultra-Low Latency Execution

#![allow(unused)]
fn main() {
pub struct ExecutionEngine {
    config: ExecutionConfig,
}

impl ExecutionEngine {
    pub async fn execute_order_ultra_fast(
        &self,
        client: &EnhancedClient,
        signal: &HftSignal,
    ) -> Result<ExecutionResult> {
        let execution_start = Instant::now();
        
        // Pre-validate signal (avoid blocking operations)
        if signal.confidence < self.config.min_confidence {
            return Err(ExecutionError::LowConfidence);
        }
        
        // Use IOC (Immediate or Cancel) orders for speed
        let order_request = self.build_order_request(signal, OrderTimeInForce::IOC);
        
        // Execute with timeout
        let result = tokio::time::timeout(
            self.config.max_latency_tolerance,
            client.submit_order(order_request)
        ).await??;
        
        let execution_latency = execution_start.elapsed();
        
        // Record metrics
        self.metrics.record_execution_latency(execution_latency);
        
        if execution_latency > self.config.max_latency_tolerance {
            println!("⚠️  Execution exceeded latency budget: {:.3}ms", 
                execution_latency.as_secs_f64() * 1000.0);
        }
        
        Ok(ExecutionResult {
            execution_latency,
            ..result
        })
    }
}
}

2. Market Microstructure Analysis

#![allow(unused)]
fn main() {
pub struct MicrostructureAnalyzer {
    order_flow_buffer: VecDeque<OrderFlowEvent>,
    tick_buffer: VecDeque<MarketTick>,
}

impl MicrostructureAnalyzer {
    pub fn analyze_market_impact(&self, order_size: f64) -> f64 {
        let recent_ticks: Vec<_> = self.tick_buffer
            .iter()
            .rev()
            .take(100)
            .collect();
        
        if recent_ticks.is_empty() {
            return 0.0;
        }
        
        // Calculate volume-weighted average price
        let total_volume: f64 = recent_ticks.iter().map(|t| t.quantity).sum();
        let vwap: f64 = recent_ticks.iter()
            .map(|t| t.price * t.quantity)
            .sum::<f64>() / total_volume;
        
        // Estimate impact based on order size vs recent volume
        let avg_volume = total_volume / recent_ticks.len() as f64;
        let impact_factor = (order_size / avg_volume).min(1.0);
        
        impact_factor * 0.001 // Convert to basis points
    }
}
}

3. Smart Order Routing

#![allow(unused)]
fn main() {
pub struct SmartOrderRouter {
    venues: Vec<TradingVenue>,
    latency_tracker: HashMap<VenueId, LatencyTracker>,
}

impl SmartOrderRouter {
    pub async fn route_order(&self, order: &Order) -> Result<VenueId> {
        let mut best_venue = None;
        let mut best_score = f64::NEG_INFINITY;
        
        for venue in &self.venues {
            let score = self.calculate_venue_score(venue, order).await;
            if score > best_score {
                best_score = score;
                best_venue = Some(venue.id);
            }
        }
        
        best_venue.ok_or(RoutingError::NoSuitableVenue)
    }
    
    async fn calculate_venue_score(&self, venue: &TradingVenue, order: &Order) -> f64 {
        let liquidity_score = venue.get_liquidity_score(order.symbol()).await;
        let latency_score = self.get_latency_score(venue.id).await;
        let fee_score = 1.0 - venue.get_fee_rate(order.symbol()).await;
        
        // Weighted combination
        liquidity_score * 0.4 + latency_score * 0.4 + fee_score * 0.2
    }
}
}

Network Optimization

1. TCP Optimization

#![allow(unused)]
fn main() {
// Configure HTTP client for optimal performance
fn create_optimized_client() -> reqwest::Client {
    reqwest::Client::builder()
        .tcp_nodelay(true)                    // Disable Nagle's algorithm
        .tcp_keepalive(Duration::from_secs(60)) // Keep connections alive
        .pool_max_idle_per_host(20)           // Connection pooling
        .pool_idle_timeout(Duration::from_secs(30))
        .timeout(Duration::from_secs(10))     // Request timeout
        .connect_timeout(Duration::from_secs(5)) // Connection timeout
        .user_agent("axiomtrade-rs/1.0")
        .build()
        .expect("Failed to create HTTP client")
}
}

2. WebSocket Optimization

#![allow(unused)]
fn main() {
// Configure WebSocket for minimal latency
pub async fn connect_optimized_websocket(url: &str) -> Result<WebSocketStream> {
    let request = http::Request::builder()
        .method("GET")
        .uri(url)
        .header("Connection", "Upgrade")
        .header("Upgrade", "websocket")
        .header("Sec-WebSocket-Version", "13")
        .header("Sec-WebSocket-Key", generate_key())
        .header("Cache-Control", "no-cache")
        .header("Pragma", "no-cache")
        .body(())?;
    
    let (ws_stream, _) = connect_async(request).await?;
    
    // Configure stream for minimal buffering
    // (Platform-specific socket options would go here)
    
    Ok(ws_stream)
}
}

3. Regional Optimization

#![allow(unused)]
fn main() {
pub enum Region {
    USWest,
    USCentral, 
    USEast,
    EUWest,
    // ...
}

impl Region {
    pub fn get_optimal_endpoints(&self) -> Vec<&'static str> {
        match self {
            Region::USWest => vec![
                "socket8.axiom.trade",      // Primary
                "cluster-usw2.axiom.trade", // Backup
            ],
            // Select closest endpoints for minimal latency
        }
    }
    
    pub async fn measure_latency(&self, endpoint: &str) -> Result<Duration> {
        let start = Instant::now();
        let _response = reqwest::get(format!("https://{}/health", endpoint)).await?;
        Ok(start.elapsed())
    }
}
}

Performance Monitoring Dashboard

Real-time Metrics

Monitor these key performance indicators:

  • Execution Latency: P50, P95, P99 execution times
  • Network Latency: Round-trip times to API endpoints
  • Rate Limit Status: Remaining capacity for each endpoint
  • Memory Usage: Heap usage and garbage collection frequency
  • Connection Health: Active connections and error rates
  • Order Flow: Orders per second and fill rates

Alerting Thresholds

Set up alerts for:

  • P99 latency > 50ms
  • Error rate > 1%
  • Memory usage > 80%
  • Rate limit utilization > 90%
  • Connection failures > 5 per minute

Conclusion

Effective performance optimization requires:

  1. Proactive Monitoring: Instrument code to measure what matters
  2. Efficient Resource Usage: Pool connections, batch operations, minimize allocations
  3. Network Optimization: Choose optimal endpoints, configure TCP properly
  4. Async Best Practices: Avoid blocking, use appropriate concurrency patterns
  5. Continuous Benchmarking: Measure improvements and catch regressions

The patterns demonstrated in this codebase provide a solid foundation for building high-performance trading applications that can handle the demands of modern financial markets.