Skip to content

Latest commit

Β 

History

History
1457 lines (1278 loc) Β· 62.2 KB

File metadata and controls

1457 lines (1278 loc) Β· 62.2 KB

// ============================================================================= // APEX HFT V8 β€” MEAN REVERSION LATERAL REGIME SNIPER // Engine : Rust + Tokio | Exchange: BingX Perpetual Futures V2 // EstratΓ©gia: Z-Score(20,z=2.0) + OBI(L2 top-5,obi=0.40) + ADX(14,<13) + BBW Squeeze // GestΓ£o : ATR-SL(1.5x) | TP Maker 0.3% | Time Stop 300s | DD 15% // Conta : $50 | Alavancagem 5x | Hedge Mode (Dual-Side) // Calibrado: MarΓ§o 2026 // ============================================================================= // // Cargo.toml: // [dependencies] // tokio = { version = "1", features = ["full"] } // tokio-tungstenite = { version = "0.21", features = ["rustls-tls-native-roots"] } // futures-util = "0.3" // reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false } // serde_json = "1" // hmac = "0.12" // sha2 = "0.10" // hex = "0.4" // chrono = "0.4" // log = "0.4" // env_logger = "0.11" // dotenv = "0.15" // flate2 = "1.0" ← OBRIGATΓ“RIO para descomprimir GZIP do WS BingX // =============================================================================

use chrono::Utc; use flate2::read::GzDecoder; use futures_util::{SinkExt, StreamExt}; use hmac::{Hmac, Mac}; use log::{error, info, warn}; use reqwest::Client; use serde_json::{json, Value}; use sha2::Sha256; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::env; use std::io::Read; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; use tokio::sync::{mpsc, Mutex, RwLock}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

type HmacSha256 = Hmac;

// ============================================================================= // CONSTANTES DE CONFIGURAÇÃO // =============================================================================

const LEVERAGE: u32 = 5; const ZSCORE_WINDOW: usize = 20; const ZSCORE_THRESHOLD_DEFAULT: f64 = 2.0; const OBI_THRESHOLD_DEFAULT: f64 = 0.40; const TIME_STOP_SEC_DEFAULT: i64 = 300; const ADX_THRESHOLD: f64 = 13.0; const ADX_PERIOD: usize = 14; const BB_PERIOD: usize = 20; const BB_K: f64 = 2.0; const BBW_MA_PERIOD: usize = 20; const ATR_SL_MULT: f64 = 1.5; const ATR_PERIOD: usize = 14; const TAKE_PROFIT_PCT: f64 = 0.0015; const MAX_DAILY_DRAWDOWN_PCT: f64 = 0.15; const RISK_PER_TRADE_PCT: f64 = 0.02; const COOLDOWN_AFTER_GUILLOTINE_SEC: i64 = 120; const WS_PING_INTERVAL_SEC: u64 = 20; const IPC_CHANNEL_BUFFER: usize = 256; const OBI_DEPTH: usize = 5;

// BingX REST base const BINGX_REST: &str = "https://open-api.bingx.com"; // BingX WS Perpetual Swap const BINGX_WS: &str = "wss://open-api-swap.bingx.com/swap-market";

// ============================================================================= // AUTENTICAÇÃO BINGX β€” HMAC-SHA256 // Regra 1: parΓ’metros ordenados alfabeticamente β†’ assinados β†’ query string da URL // CabeΓ§alho: apenas X-BX-APIKEY // =============================================================================

fn build_bingx_signature(params: &mut Vec<(&str, String)>) -> (String, String) { let api_key = env::var("BINGX_API_KEY").expect("BINGX_API_KEY ausente no .env"); let secret = env::var("BINGX_SECRET_KEY").expect("BINGX_SECRET_KEY ausente no .env");

// Timestamp em milissegundos
let ts = Utc::now().timestamp_millis().to_string();
params.push(("timestamp", ts));

// OrdenaΓ§Γ£o alfabΓ©tica obrigatΓ³ria
params.sort_by(|a, b| a.0.cmp(b.0));

// Monta query string sem encoding especial
let query: String = params
    .iter()
    .map(|(k, v)| format!("{}={}", k, v))
    .collect::<Vec<_>>()
    .join("&");

// HMAC-SHA256 sobre a query string
let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
    .expect("HMAC aceita qualquer tamanho");
mac.update(query.as_bytes());
let sig = hex::encode(mac.finalize().into_bytes());

(api_key, format!("{}&signature={}", query, sig))

}

// ============================================================================= // DESCOMPRESSÃO GZIP — obrigatória para todas as msgs WS da BingX // Regra 2: TODAS as mensagens chegam comprimidas em GZIP via Message::Binary // =============================================================================

fn decompress_gzip(data: &[u8]) -> Option { let mut decoder = GzDecoder::new(data); let mut s = String::new(); decoder.read_to_string(&mut s).ok()?; Some(s) }

// ============================================================================= // METADADOS DO INSTRUMENTO // =============================================================================

#[derive(Clone, Debug)] struct InstrumentMeta { price_decimals: usize, qty_decimals: usize, tick_size: f64, }

/// Busca informaΓ§Γ΅es do contrato na BingX async fn fetch_instrument_meta( client: &Client, symbol: &str, ) -> Result<InstrumentMeta, Box<dyn std::error::Error + Send + Sync>> { let mut params: Vec<(&str, String)> = vec![ ("symbol", symbol.to_string()), ]; let (api_key, signed_query) = build_bingx_signature(&mut params);

let url = format!("{}/openApi/swap/v2/quote/contracts?{}", BINGX_REST, signed_query);
let res: Value = client
    .get(&url)
    .header("X-BX-APIKEY", &api_key)
    .send()
    .await?
    .json()
    .await?;

// Endpoint pΓΊblico tambΓ©m funciona sem assinatura para meta
let url_pub = format!(
    "{}/openApi/swap/v2/quote/contracts?symbol={}",
    BINGX_REST, symbol
);
let res2: Value = client.get(&url_pub).send().await?.json().await?;

let data = res2["data"]
    .as_array()
    .and_then(|a| a.iter().find(|c| c["symbol"] == symbol))
    .cloned()
    .unwrap_or(res["data"][0].clone());

let price_precision = data["pricePrecision"].as_u64().unwrap_or(2) as usize;
let qty_precision   = data["quantityPrecision"].as_u64().unwrap_or(0) as usize;
let tick_size       = 10f64.powi(-(price_precision as i32));

info!("πŸ“ [{}] price_decimals={} qty_decimals={} tick={}", symbol, price_precision, qty_precision, tick_size);

Ok(InstrumentMeta {
    price_decimals: price_precision,
    qty_decimals:   qty_precision,
    tick_size,
})

}

fn round_to_tick(price: f64, tick: f64, decimals: usize) -> String { let rounded = (price / tick).round() * tick; format!("{:.prec$}", rounded, prec = decimals) }

// ============================================================================= // BALANÇO // =============================================================================

async fn fetch_usdt_balance( client: &Client, ) -> Result<f64, Box<dyn std::error::Error + Send + Sync>> { let mut params: Vec<(&str, String)> = vec![]; let (api_key, signed_query) = build_bingx_signature(&mut params);

let url = format!("{}/openApi/swap/v2/user/balance?{}", BINGX_REST, signed_query);
let res: Value = client
    .get(&url)
    .header("X-BX-APIKEY", &api_key)
    .send()
    .await?
    .json()
    .await?;

let code = res["code"].as_i64().unwrap_or(-1);
if code != 0 {
    error!("πŸ’€ [AUTH] BingX retornou code={}: {}", code, res);
    panic!("Credenciais invΓ‘lidas. Abortando.");
}

let bal = res["data"]["balance"]["availableMargin"]
    .as_str()
    .and_then(|s| s.parse::<f64>().ok())
    .or_else(|| res["data"]["balance"]["availableMargin"].as_f64())
    .unwrap_or(0.0);

info!("🏦 [BingX] Auth OK. Saldo disponível: {:.2} USDT", bal);
Ok(bal)

}

// ============================================================================= // WARM-UP: HEDGE MODE + MARGIN TYPE + ALAVANCAGEM (Regra 4) // =============================================================================

async fn configure_symbol( client: &Client, symbol: &str, ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { // 1. Margin Type β†’ ISOLATED { let mut params: Vec<(&str, String)> = vec![ ("marginType", "ISOLATED".to_string()), ("symbol", symbol.to_string()), ]; let (api_key, signed_query) = build_bingx_signature(&mut params); let url = format!("{}/openApi/swap/v2/trade/marginType?{}", BINGX_REST, signed_query); let res: Value = client.post(&url).header("X-BX-APIKEY", &api_key).send().await?.json().await?; info!("βš™οΈ [{}] Margin ISOLATED: code={}", symbol, res["code"]); }

// 2. Position Mode β†’ Dual-Side (Hedge Mode)
{
    let mut params: Vec<(&str, String)> = vec![
        ("dualSidePosition", "true".to_string()),
    ];
    let (api_key, signed_query) = build_bingx_signature(&mut params);
    let url = format!("{}/openApi/swap/v1/positionSide/dual?{}", BINGX_REST, signed_query);
    let res: Value = client.post(&url).header("X-BX-APIKEY", &api_key).send().await?.json().await?;
    info!("βš™οΈ [{}] Hedge Mode: code={}", symbol, res["code"]);
}

// 3. Alavancagem β€” DOIS POSTs separados: LONG e SHORT (Regra 4)
for side in &["LONG", "SHORT"] {
    let mut params: Vec<(&str, String)> = vec![
        ("leverage", LEVERAGE.to_string()),
        ("side",     side.to_string()),
        ("symbol",   symbol.to_string()),
    ];
    let (api_key, signed_query) = build_bingx_signature(&mut params);
    let url = format!("{}/openApi/swap/v2/trade/leverage?{}", BINGX_REST, signed_query);
    let res: Value = client.post(&url).header("X-BX-APIKEY", &api_key).send().await?.json().await?;
    info!("βš™οΈ [{}] Leverage {}x {}: code={}", symbol, LEVERAGE, side, res["code"]);
}

Ok(())

}

// ============================================================================= // SNAPSHOT INICIAL DO LIVRO L2 (REST BingX) // =============================================================================

async fn fetch_depth_snapshot( client: &Client, symbol: &str, ) -> Result<(BTreeMap<u64, f64>, BTreeMap<u64, f64>), Box<dyn std::error::Error + Send + Sync>> { let url = format!( "{}/openApi/swap/v2/quote/depth?symbol={}&limit=50", BINGX_REST, symbol ); let res: Value = client.get(&url).send().await?.json().await?;

let mut bids: BTreeMap<u64, f64> = BTreeMap::new();
let mut asks: BTreeMap<u64, f64> = BTreeMap::new();

let parse_side = |arr: &Value, map: &mut BTreeMap<u64, f64>| {
    if let Some(levels) = arr.as_array() {
        for lvl in levels {
            // BingX retorna [["price_str", "qty_str"], ...]
            let price = lvl[0].as_f64()
                .or_else(|| lvl[0].as_str().and_then(|s| s.parse().ok()))
                .unwrap_or(0.0);
            let volume = lvl[1].as_f64()
                .or_else(|| lvl[1].as_str().and_then(|s| s.parse().ok()))
                .unwrap_or(0.0);
            if price > 0.0 && volume > 0.0 {
                map.insert(price_to_key(price), volume);
            }
        }
    }
};

parse_side(&res["data"]["bids"], &mut bids);
parse_side(&res["data"]["asks"], &mut asks);

Ok((bids, asks))

}

#[inline] fn price_to_key(price: f64) -> u64 { (price * 1e8) as u64 }

#[inline] fn key_to_price(key: u64) -> f64 { key as f64 / 1e8 }

// ============================================================================= // LIVRO L2 // =============================================================================

#[derive(Default)] struct OrderBook { bids: BTreeMap<u64, f64>, asks: BTreeMap<u64, f64>, }

impl OrderBook { fn obi(&self, depth: usize) -> f64 { let v_bid: f64 = self.bids.values().rev().take(depth).sum(); let v_ask: f64 = self.asks.values().take(depth).sum(); let total = v_bid + v_ask; if total == 0.0 { return 0.0; } (v_bid - v_ask) / total } fn best_bid(&self) -> Option { self.bids.keys().next_back().map(|&k| key_to_price(k)) } fn best_ask(&self) -> Option { self.asks.keys().next().map(|&k| key_to_price(k)) } }

type SharedBook = Arc<RwLock>;

// ============================================================================= // PARΓ‚METROS DINΓ‚MICOS (IPC Python) // =============================================================================

#[derive(Clone)] struct DynParams { z_threshold: f64, obi_threshold: f64, ts_seconds: i64, }

impl Default for DynParams { fn default() -> Self { DynParams { z_threshold: ZSCORE_THRESHOLD_DEFAULT, obi_threshold: OBI_THRESHOLD_DEFAULT, ts_seconds: TIME_STOP_SEC_DEFAULT, } } }

type SharedParams = Arc<RwLock>;

// ============================================================================= // FSM // =============================================================================

#[derive(Debug, Clone, PartialEq)] enum FsmState { Idle, SignalAcquired, PendingMaker, PositionOpen, RiskManagement, GuillotineTriggered, Suspended, }

// ============================================================================= // WELFORD ONLINE // =============================================================================

#[derive(Default)] struct WelfordOnline { count: usize, mean: f64, m2: f64 }

impl WelfordOnline { fn update(&mut self, x: f64) { self.count += 1; let d = x - self.mean; self.mean += d / self.count as f64; self.m2 += d * (x - self.mean); } fn std_dev(&self) -> f64 { if self.count < 2 { return 0.0; } (self.m2 / (self.count - 1) as f64).sqrt() } }

// ============================================================================= // CANDLE // =============================================================================

#[derive(Clone, Default)] #[allow(dead_code)] struct Candle { open: f64, high: f64, low: f64, close: f64 }

// ============================================================================= // ESTADO DO SÍMBOLO // =============================================================================

struct CoinState { fsm: FsmState, candles: VecDeque, last_candle_ts: i64, zscore_closes: VecDeque, adx: f64, plus_di: f64, minus_di: f64, atr_wilder: f64, prev_adx_dx: VecDeque, atr_sl: f64, atr_sl_initialized: bool, bbw_history: VecDeque, position_is_short: bool, entry_price: f64, entry_time: i64, position_vol: f64, // BingX usa qty em float pending_order_id: Option, last_guillotine_ts: i64, session_loss_usd: f64, session_start_ts: i64, daily_halt: bool, }

impl CoinState { fn new() -> Self { CoinState { fsm: FsmState::Idle, candles: VecDeque::with_capacity(ADX_PERIOD * 3), last_candle_ts: 0, zscore_closes: VecDeque::with_capacity(ZSCORE_WINDOW + 1), adx: 0.0, plus_di: 0.0, minus_di: 0.0, atr_wilder: 0.0, prev_adx_dx: VecDeque::with_capacity(ADX_PERIOD + 1), atr_sl: 0.0, atr_sl_initialized: false, bbw_history: VecDeque::with_capacity(BBW_MA_PERIOD + 1), position_is_short: false, entry_price: 0.0, entry_time: 0, position_vol: 0.0, pending_order_id: None, last_guillotine_ts: 0, session_loss_usd: 0.0, session_start_ts: Utc::now().timestamp(), daily_halt: false, } }

fn push_candle(&mut self, c: Candle) {
    self.candles.push_back(c.clone());
    while self.candles.len() > BBW_MA_PERIOD + ADX_PERIOD + 10 {
        self.candles.pop_front();
    }
    self.update_atr_sl(&c);
    self.update_adx(&c);
    self.update_bbw();
    self.update_zscore(c.close);
}

fn update_atr_sl(&mut self, c: &Candle) {
    let prev_close = self.candles.iter().rev().nth(1).map(|p| p.close).unwrap_or(c.close);
    let tr = true_range(c.high, c.low, prev_close);
    if !self.atr_sl_initialized {
        self.atr_sl += tr;
        if self.candles.len() >= ATR_PERIOD {
            self.atr_sl /= ATR_PERIOD as f64;
            self.atr_sl_initialized = true;
        }
    } else {
        self.atr_sl = (self.atr_sl * (ATR_PERIOD as f64 - 1.0) + tr) / ATR_PERIOD as f64;
    }
}

fn update_adx(&mut self, c: &Candle) {
    let n = self.candles.len();
    if n < 2 { return; }
    let prev = &self.candles[n - 2];
    let tr       = true_range(c.high, c.low, prev.close);
    let plus_dm  = if c.high - prev.high > prev.low - c.low && c.high - prev.high > 0.0 { c.high - prev.high } else { 0.0 };
    let minus_dm = if prev.low - c.low > c.high - prev.high && prev.low - c.low > 0.0 { prev.low - c.low } else { 0.0 };

    if n <= ADX_PERIOD {
        self.atr_wilder += tr;
        self.plus_di    += plus_dm;
        self.minus_di   += minus_dm;
        if n == ADX_PERIOD {
            let ai  = self.atr_wilder;
            let pdi = if ai > 0.0 { (self.plus_di  / ai) * 100.0 } else { 0.0 };
            let mdi = if ai > 0.0 { (self.minus_di / ai) * 100.0 } else { 0.0 };
            let dx  = if pdi + mdi > 0.0 { ((pdi - mdi).abs() / (pdi + mdi)) * 100.0 } else { 0.0 };
            self.prev_adx_dx.push_back(dx);
            self.plus_di  = pdi;
            self.minus_di = mdi;
        }
        return;
    }

    let p = ADX_PERIOD as f64;
    self.atr_wilder  = self.atr_wilder - (self.atr_wilder / p) + tr;
    let sp = self.plus_di  * (p - 1.0) / p + plus_dm;
    let sm = self.minus_di * (p - 1.0) / p + minus_dm;
    let pdi = if self.atr_wilder > 0.0 { (sp / self.atr_wilder) * 100.0 } else { 0.0 };
    let mdi = if self.atr_wilder > 0.0 { (sm / self.atr_wilder) * 100.0 } else { 0.0 };
    self.plus_di  = sp;
    self.minus_di = sm;
    let dx = if pdi + mdi > 0.0 { ((pdi - mdi).abs() / (pdi + mdi)) * 100.0 } else { 0.0 };
    self.prev_adx_dx.push_back(dx);

    if self.prev_adx_dx.len() >= ADX_PERIOD {
        self.adx = if self.adx == 0.0 {
            self.prev_adx_dx.iter().sum::<f64>() / ADX_PERIOD as f64
        } else {
            (self.adx * (p - 1.0) + dx) / p
        };
        while self.prev_adx_dx.len() > ADX_PERIOD + 5 { self.prev_adx_dx.pop_front(); }
    }
}

fn update_bbw(&mut self) {
    if self.candles.len() < BB_PERIOD { return; }
    let closes: Vec<f64> = self.candles.iter().rev().take(BB_PERIOD).map(|c| c.close).collect();
    let sma: f64 = closes.iter().sum::<f64>() / BB_PERIOD as f64;
    let var: f64 = closes.iter().map(|x| (x - sma).powi(2)).sum::<f64>() / BB_PERIOD as f64;
    let bbw = if sma > 0.0 { (2.0 * BB_K * var.sqrt()) / sma } else { 0.0 };
    self.bbw_history.push_back(bbw);
    while self.bbw_history.len() > BBW_MA_PERIOD + 5 { self.bbw_history.pop_front(); }
}

fn bbw_squeeze(&self) -> bool {
    if self.bbw_history.len() < BBW_MA_PERIOD { return false; }
    let cur = *self.bbw_history.back().unwrap_or(&0.0);
    let ma  = self.bbw_history.iter().sum::<f64>() / self.bbw_history.len() as f64;
    cur < ma
}

fn update_zscore(&mut self, close: f64) {
    self.zscore_closes.push_back(close);
    while self.zscore_closes.len() > ZSCORE_WINDOW { self.zscore_closes.pop_front(); }
}

fn zscore(&self, tick: f64) -> f64 {
    if self.zscore_closes.len() < ZSCORE_WINDOW { return 0.0; }
    let mut w = WelfordOnline::default();
    for &p in &self.zscore_closes { w.update(p); }
    let std = w.std_dev();
    if std == 0.0 { return 0.0; }
    (tick - w.mean) / std
}

fn regime_is_lateral(&self) -> bool {
    self.adx > 0.0 && self.adx < ADX_THRESHOLD && self.bbw_squeeze()
}

fn sl_price(&self) -> f64 {
    let off = self.atr_sl * ATR_SL_MULT;
    if self.position_is_short { self.entry_price + off } else { self.entry_price - off }
}

fn tp_price(&self) -> f64 {
    if self.position_is_short { self.entry_price * (1.0 - TAKE_PROFIT_PCT) }
    else { self.entry_price * (1.0 + TAKE_PROFIT_PCT) }
}

fn check_exit(&self, price: f64, now: i64, ts: i64) -> Option<ExitReason> {
    if self.fsm != FsmState::PositionOpen && self.fsm != FsmState::RiskManagement { return None; }
    if self.entry_price == 0.0 { return None; }
    let sl = self.sl_price();
    let tp = self.tp_price();
    let hit_sl = if self.position_is_short { price >= sl } else { price <= sl };
    let hit_tp = if self.position_is_short { price <= tp } else { price >= tp };
    if hit_sl { return Some(ExitReason::StopLoss); }
    if hit_tp { return Some(ExitReason::TakeProfit); }
    if now - self.entry_time >= ts {
        return Some(if self.pnl_pct(price) >= 0.0 { ExitReason::TimeStopProfit } else { ExitReason::TimeStopLoss });
    }
    None
}

fn pnl_pct(&self, price: f64) -> f64 {
    if self.position_is_short { (self.entry_price - price) / self.entry_price }
    else { (price - self.entry_price) / self.entry_price }
}

fn register_loss(&mut self, loss_usd: f64) {
    let now = Utc::now().timestamp();
    if now - self.session_start_ts > 86_400 {
        self.session_loss_usd = 0.0;
        self.session_start_ts = now;
        self.daily_halt       = false;
    }
    self.session_loss_usd += loss_usd.abs();
}

fn check_daily_halt(&mut self, balance: f64) -> bool {
    if self.daily_halt { return true; }
    if balance <= 0.0 { return false; }
    let limit = balance * MAX_DAILY_DRAWDOWN_PCT;
    if self.session_loss_usd > 0.0 && self.session_loss_usd >= limit {
        self.daily_halt = true;
        error!("🚨 [CIRCUIT BREAKER] Drawdown ${:.2} >= limite ${:.2}. HALT.", self.session_loss_usd, limit);
    }
    self.daily_halt
}

}

#[inline] fn true_range(h: f64, l: f64, pc: f64) -> f64 { (h - l).max((h - pc).abs()).max((l - pc).abs()) }

// ============================================================================= // RAZΓƒO DE SAÍDA // =============================================================================

#[derive(Debug)] enum ExitReason { StopLoss, TakeProfit, TimeStopProfit, TimeStopLoss }

impl ExitReason { fn label(&self) -> &'static str { match self { ExitReason::StopLoss => "STOP_LOSS", ExitReason::TakeProfit => "TAKE_PROFIT", ExitReason::TimeStopProfit => "TIME_STOP_LUCRO", ExitReason::TimeStopLoss => "TIME_STOP_PREJUIZO", } } fn is_taker(&self) -> bool { matches!(self, ExitReason::StopLoss | ExitReason::TimeStopProfit | ExitReason::TimeStopLoss) } }

// ============================================================================= // EXECUÇÃO β€” ENTRADA (Regra 3: params na query string, POST sem body) // BingX Hedge Mode: side=BUY/SELL + positionSide=LONG/SHORT // =============================================================================

struct EntryResult { symbol: String, success: bool, price: f64, is_short: bool, vol: f64, order_id: Option, }

async fn execute_entry_order( client: Client, symbol: String, is_short: bool, price: f64, balance: f64, meta: InstrumentMeta, result_tx: mpsc::Sender, ) { let notional = balance * RISK_PER_TRADE_PCT; // BingX quantity em contratos (geralmente 1 contrato = 1 unidade do ativo) let qty_raw = notional / price; let factor = 10f64.powi(meta.qty_decimals as i32); let qty = (qty_raw * factor).floor().max(1.0) / factor;

let side          = if is_short { "SELL" } else { "BUY" };
let position_side = if is_short { "SHORT" } else { "LONG" };
let px_str        = round_to_tick(price, meta.tick_size, meta.price_decimals);

let mut params: Vec<(&str, String)> = vec![
    ("positionSide", position_side.to_string()),
    ("price",        px_str.clone()),
    ("quantity",     format!("{:.prec$}", qty, prec = meta.qty_decimals)),
    ("side",         side.to_string()),
    ("symbol",       symbol.clone()),
    ("timeInForce",  "PostOnly".to_string()),  // Maker garantido
    ("type",         "LIMIT".to_string()),
];
let (api_key, signed_query) = build_bingx_signature(&mut params);

// Regra 3: POST com params na URL, sem body JSON
let url = format!("{}/openApi/swap/v2/trade/order?{}", BINGX_REST, signed_query);

match client
    .post(&url)
    .header("X-BX-APIKEY", &api_key)
    .send()
    .await
{
    Ok(res) => {
        let text   = res.text().await.unwrap_or_default();
        let parsed: Value = serde_json::from_str(&text).unwrap_or_default();
        let code    = parsed["code"].as_i64().unwrap_or(-1);
        let success = code == 0;
        let order_id = parsed["data"]["order"]["orderId"]
            .as_str()
            .or_else(|| parsed["data"]["orderId"].as_str())
            .map(String::from);

        if success {
            info!("πŸ“¬ [ENTRADA] {} {} | qty={:.prec$} | px={} | 5x",
                if is_short { "SHORT" } else { "LONG" }, symbol, qty,
                px_str, prec = meta.qty_decimals);
        } else {
            warn!("⚠️ [ENTRADA REJEITADA] {} | code={} | {}", symbol, code, text);
        }
        let _ = result_tx.send(EntryResult { symbol, success, price, is_short, vol: qty, order_id }).await;
    }
    Err(e) => {
        error!("❌ [REDE] Entrada falhou {}: {}", symbol, e);
        let _ = result_tx.send(EntryResult { symbol, success: false, price, is_short, vol: qty, order_id: None }).await;
    }
}

}

// ============================================================================= // EXECUÇÃO β€” CANCEL // =============================================================================

async fn cancel_order(client: &Client, symbol: &str, order_id: &str) { let mut params: Vec<(&str, String)> = vec![ ("orderId", order_id.to_string()), ("symbol", symbol.to_string()), ]; let (api_key, signed_query) = build_bingx_signature(&mut params); let url = format!("{}/openApi/swap/v2/trade/order?{}", BINGX_REST, signed_query); match client.delete(&url).header("X-BX-APIKEY", &api_key).send().await { Ok(res) => { let t = res.text().await.unwrap_or_default(); warn!("🚫 [CANCEL] {} | {}", symbol, t); } Err(e) => error!("❌ [CANCEL] {} falhou: {}", symbol, e), } }

// ============================================================================= // EXECUÇÃO β€” SAÍDA (Regra 3: params na query string) // =============================================================================

async fn execute_exit_order( client: Client, symbol: String, is_short: bool, vol: f64, is_taker: bool, tp_price: Option, meta: InstrumentMeta, ) { // Para fechar: side inverso, positionSide igual Γ  posiΓ§Γ£o let side = if is_short { "BUY" } else { "SELL" }; let position_side = if is_short { "SHORT" } else { "LONG" }; let qty_str = format!("{:.prec$}", vol, prec = meta.qty_decimals);

let mut params: Vec<(&str, String)> = if is_taker {
    vec![
        ("positionSide", position_side.to_string()),
        ("quantity",     qty_str),
        ("reduceOnly",   "true".to_string()),
        ("side",         side.to_string()),
        ("symbol",       symbol.clone()),
        ("type",         "MARKET".to_string()),
    ]
} else {
    let px = tp_price.unwrap_or(0.0);
    let px_str = round_to_tick(px, meta.tick_size, meta.price_decimals);
    vec![
        ("positionSide", position_side.to_string()),
        ("price",        px_str),
        ("quantity",     qty_str),
        ("reduceOnly",   "true".to_string()),
        ("side",         side.to_string()),
        ("symbol",       symbol.clone()),
        ("timeInForce",  "PostOnly".to_string()),
        ("type",         "LIMIT".to_string()),
    ]
};

let (api_key, signed_query) = build_bingx_signature(&mut params);
let url = format!("{}/openApi/swap/v2/trade/order?{}", BINGX_REST, signed_query);

for attempt in 1u8..=3 {
    match client.post(&url).header("X-BX-APIKEY", &api_key).send().await {
        Ok(res) => {
            let text = res.text().await.unwrap_or_default();
            warn!("πŸ›‘οΈ [SAÍDA {}] {} tentativa={} | {}",
                if is_taker { "MARKET" } else { "LIMIT" }, symbol, attempt, text);
            return;
        }
        Err(e) => {
            error!("❌ [SAÍDA] {} tentativa={}: {}", symbol, attempt, e);
            if attempt < 3 { tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; }
        }
    }
}
error!("πŸ’€ [SAÍDA CRÍTICA] {} β€” 3 tentativas falharam!", symbol);

}

// ============================================================================= // IPC β€” ESCRITA PARA PYTHON MAESTRO // =============================================================================

async fn ipc_writer_task(stream: Arc<Mutex>, mut rx: mpsc::Receiver) { while let Some(msg) = rx.recv().await { let mut g = stream.lock().await; if let Err(e) = g.write_all(msg.as_bytes()).await { error!("[IPC-TX] Falha: {}", e); } } }

// ============================================================================= // IPC β€” LEITURA DE PARΓ‚METROS DO PYTHON // =============================================================================

async fn ipc_reader_task( stream: Arc<Mutex>, shared_params: SharedParams, asri_halt_tx: mpsc::Sender, ) { loop { tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; let mut guard = match stream.try_lock() { Ok(g) => g, Err(_) => continue }; let mut reader = BufReader::new(&mut *guard); let mut line = String::new(); match tokio::time::timeout( tokio::time::Duration::from_millis(10), reader.read_line(&mut line), ).await { Ok(Ok(n)) if n > 0 => { let trimmed = line.trim(); if let Ok(cmd) = serde_json::from_str::(trimmed) { if let (Some(z), Some(obi), Some(ts)) = ( cmd.get("z_threshold").and_then(|v| v.as_f64()), cmd.get("obi_threshold").and_then(|v| v.as_f64()), cmd.get("ts_seconds").and_then(|v| v.as_i64()), ) { let mut p = shared_params.write().await; p.z_threshold = z; p.obi_threshold = obi; p.ts_seconds = ts; info!("πŸ”§ [IPC] ParΓ’metros: z={} obi={} ts={}s", z, obi, ts); } if let Some(halt) = cmd.get("asri_halt").and_then(|v| v.as_bool()) { let _ = asri_halt_tx.send(halt).await; if halt { warn!("🚨 [ASRI] HALT ativado."); } else { info!("βœ… [ASRI] Retomada."); } } } } _ => {} } } }

// ============================================================================= // TASK DEPTH β€” livro L2 via WS BingX (Regra 5) // Canal: symbol@depth20 β†’ envia snapshot completo dos top-20 nΓ­veis // =============================================================================

async fn depth_maintenance_task( mut rx: mpsc::Receiver, book: SharedBook, ) { while let Some(msg) = rx.recv().await { // BingX depth: {"dataType":"BTC-USDT@depth20","data":{"bids":[[p,q]...],"asks":[[p,q]...]}} let data = &msg["data"]; let mut bids: BTreeMap<u64, f64> = BTreeMap::new(); let mut asks: BTreeMap<u64, f64> = BTreeMap::new();

    let parse = |arr: &Value, map: &mut BTreeMap<u64, f64>| {
        if let Some(levels) = arr.as_array() {
            for lvl in levels {
                let p = lvl[0].as_f64().or_else(|| lvl[0].as_str().and_then(|s| s.parse().ok())).unwrap_or(0.0);
                let v = lvl[1].as_f64().or_else(|| lvl[1].as_str().and_then(|s| s.parse().ok())).unwrap_or(0.0);
                if p > 0.0 && v > 0.0 { map.insert(price_to_key(p), v); }
            }
        }
    };

    parse(&data["bids"], &mut bids);
    parse(&data["asks"], &mut asks);

    if !bids.is_empty() || !asks.is_empty() {
        let mut b = book.write().await;
        b.bids = bids;
        b.asks = asks;
    }
}

}

// ============================================================================= // MOTOR PRINCIPAL // =============================================================================

#[tokio::main] async fn main() { dotenv::dotenv().ok(); env_logger::Builder::from_env( env_logger::Env::default().default_filter_or("info") ).init();

info!("╔══════════════════════════════════════════════════════╗");
info!("β•‘  APEX HFT V8.3 β€” MEAN REVERSION | BingX Perp V2    β•‘");
info!("β•‘  Z-Score(20,Β±2.0) + OBI(0.40) + ADX(<13) + BBW    β•‘");
info!("β•‘  TP: 0.15% Maker | SL: 1.5Γ—ATR | CD: 120s | 5x   β•‘");
info!("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•");

let http_client = Client::builder()
    .user_agent("Mozilla/5.0")
    .build()
    .expect("HTTP client");

// Saldo real ou paper trade
let real_balance = fetch_usdt_balance(&http_client).await.unwrap_or(0.0);
let paper_trade  = real_balance == 0.0;
let usdt_balance = if paper_trade { 50.0 } else { real_balance };

if paper_trade {
    warn!("πŸ“‹ [PAPER TRADE] Simulando ${:.2}. Ordens NΓƒO enviadas.", usdt_balance);
} else {
    info!("πŸ’° [LIVE] Saldo: ${:.2} USDT", usdt_balance);
}

// IPC β€” Conecta ao Python Maestro
let maestro_raw = {
    let mut attempt = 0u8;
    loop {
        attempt += 1;
        match TcpStream::connect("127.0.0.1:9001").await {
            Ok(s) => { info!("πŸ”— [IPC] Maestro conectado (tentativa {}).", attempt); break s; }
            Err(e) => {
                if attempt >= 5 { panic!("πŸ’€ Maestro inacessΓ­vel: {}", e); }
                warn!("⏳ [IPC] Tentativa {}. Aguardando 3s...", attempt);
                tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
            }
        }
    }
};

let maestro_stream = Arc::new(Mutex::new(maestro_raw));

// LΓͺ lista de sΓ­mbolos do Maestro
let alvos: Vec<String> = {
    let mut guard  = maestro_stream.lock().await;
    let mut reader = BufReader::new(&mut *guard);
    let mut line   = String::new();
    match tokio::time::timeout(tokio::time::Duration::from_secs(10), reader.read_line(&mut line)).await {
        Ok(Ok(n)) if n > 0 => {
            match serde_json::from_str::<Value>(line.trim()) {
                Ok(cmd) => {
                    let s: Vec<String> = cmd["symbols"].as_array().unwrap_or(&vec![])
                        .iter().filter_map(|v| v.as_str().map(String::from)).collect();
                    if s.is_empty() { vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()] } else { s }
                }
                Err(_) => vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()],
            }
        }
        _ => {
            warn!("⚠️ [IPC] Timeout/vazio. Fallback BTC-USDT + ETH-USDT.");
            vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()]
        }
    }
};

info!("🎯 {} alvos: {:?}", alvos.len(), alvos);
{ let mut g = maestro_stream.lock().await; let _ = g.write_all(b"HFT V8 BingX armado.\n").await; }

let shared_params: SharedParams = Arc::new(RwLock::new(DynParams::default()));
let (ipc_tx, ipc_rx) = mpsc::channel::<String>(IPC_CHANNEL_BUFFER);
tokio::spawn(ipc_writer_task(Arc::clone(&maestro_stream), ipc_rx));

let (asri_halt_tx, mut asri_halt_rx) = mpsc::channel::<bool>(8);
tokio::spawn(ipc_reader_task(Arc::clone(&maestro_stream), Arc::clone(&shared_params), asri_halt_tx));

let (order_tx, mut order_rx) = mpsc::channel::<EntryResult>(32);

// Warm-up: metadados + hedge mode + alavancagem (Regra 4)
info!("πŸ”₯ Warm-up: metadados, Hedge Mode, alavancagem...");
let mut instrument_map: HashMap<String, InstrumentMeta> = HashMap::new();

for symbol in &alvos {
    match fetch_instrument_meta(&http_client, symbol).await {
        Ok(meta) => { instrument_map.insert(symbol.clone(), meta); }
        Err(e)   => panic!("πŸ’€ Meta de {} falhou: {}", symbol, e),
    }
    if !paper_trade {
        if let Err(e) = configure_symbol(&http_client, symbol).await {
            warn!("⚠️ configure_symbol {}: {}", symbol, e);
        }
    }
}

// Livros L2 + snapshots REST
let mut book_map: HashMap<String, SharedBook> = HashMap::new();
for symbol in &alvos {
    let book: SharedBook = Arc::new(RwLock::new(OrderBook::default()));
    match fetch_depth_snapshot(&http_client, symbol).await {
        Ok((bids, asks)) => {
            let mut b = book.write().await;
            b.bids = bids; b.asks = asks;
            let nb = b.bids.len();
            if nb > 0 { info!("πŸ“š [{}] Snapshot REST: {} bids | {} asks", symbol, nb, b.asks.len()); }
            else       { info!("πŸ“š [{}] Snapshot REST vazio β€” WS popularΓ‘.", symbol); }
        }
        Err(e) => warn!("⚠️ Snapshot {} falhou: {}", symbol, e),
    }
    book_map.insert(symbol.clone(), Arc::clone(&book));
}

// Tasks de profundidade
let mut depth_tx_map: HashMap<String, mpsc::Sender<Value>> = HashMap::new();
for symbol in &alvos {
    let (dtx, drx) = mpsc::channel::<Value>(512);
    let book = Arc::clone(book_map.get(symbol).unwrap());
    tokio::spawn(depth_maintenance_task(drx, book));
    depth_tx_map.insert(symbol.clone(), dtx);
}

let mut market_state: HashMap<String, CoinState> = alvos.iter()
    .map(|s| (s.clone(), CoinState::new())).collect();

let mut asri_suspended = false;

info!("βœ… Warm-up completo. Motor BingX ativado.");

'reconnect: loop {
    info!("πŸ”Œ Conectando ao WebSocket BingX...");

    let (mut ws, _) = match connect_async(BINGX_WS).await {
        Ok(s)  => s,
        Err(e) => {
            error!("❌ WS falhou: {}. Tentando em 5s...", e);
            tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
            continue 'reconnect;
        }
    };

    // SubscriΓ§Γ΅es BingX:
    // Kline   (Regra 6): dataType "BTC-USDT@kline_1m"
    // Depth   (Regra 5): dataType "BTC-USDT@depth20"
    for symbol in &alvos {
        let sub_kline = json!({
            "id":   format!("kline_{}", symbol),
            "reqType": "sub",
            "dataType": format!("{}@kline_1m", symbol)
        });
        let sub_depth = json!({
            "id":   format!("depth_{}", symbol),
            "reqType": "sub",
            "dataType": format!("{}@depth20", symbol)
        });
        for sub in [sub_kline, sub_depth] {
            if ws.send(Message::Text(sub.to_string())).await.is_err() {
                error!("❌ Sub falhou para {}. Reconectando...", symbol);
                continue 'reconnect;
            }
        }
    }

    info!("πŸ“‘ SubscriΓ§Γ΅es BingX ativas. Aguardando dados...");

    let mut ping_iv  = tokio::time::interval(tokio::time::Duration::from_secs(WS_PING_INTERVAL_SEC));
    let mut tele_iv  = tokio::time::interval(tokio::time::Duration::from_secs(60));
    let mut last_pong = Utc::now().timestamp_millis();

    loop {
        tokio::select! {

            // Heartbeat
            _ = ping_iv.tick() => {
                let now_ms = Utc::now().timestamp_millis();
                if now_ms - last_pong > (WS_PING_INTERVAL_SEC * 1000 + 2000) as i64 {
                    error!("πŸ’” Pong timeout BingX. Reconectando...");
                    break;
                }
                if ws.send(Message::Ping(vec![])).await.is_err() {
                    error!("πŸ’” Ping falhou. Reconectando...");
                    break;
                }
            }

            // Telemetria a cada 60s β€” mostra estado real de cada sΓ­mbolo
            _ = tele_iv.tick() => {
                info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
                info!("🟒 [VIVO] BingX WS ativo | paper={} | alvos={}", paper_trade, alvos.len());
                for sym in &alvos {
                    if let Some(s) = market_state.get(sym) {
                        let fsm_str = match &s.fsm {
                            FsmState::Idle                => "IDLE",
                            FsmState::SignalAcquired      => "SIGNAL_ACQ",
                            FsmState::PendingMaker        => "PENDING_MAKER",
                            FsmState::PositionOpen        => "POSITION_OPEN",
                            FsmState::RiskManagement      => "RISK_MGMT",
                            FsmState::GuillotineTriggered => "GUILLOTINE",
                            FsmState::Suspended           => "SUSPENDED",
                        };
                        info!(
                            "   {} | {:13} | candles={:>3} | adx={:>5.1} | bbw={} | lateral={} | z_buf={:>2}",
                            sym, fsm_str,
                            s.candles.len(),
                            s.adx,
                            s.bbw_squeeze(),
                            s.regime_is_lateral(),
                            s.zscore_closes.len()
                        );
                        if s.fsm == FsmState::PositionOpen || s.fsm == FsmState::RiskManagement {
                            info!(
                                "   {} | πŸ“ˆ {} | entry={:.4} | SL={:.4} | TP={:.4} | dur={}s",
                                sym,
                                if s.position_is_short { "SHORT" } else { "LONG" },
                                s.entry_price, s.sl_price(), s.tp_price(),
                                Utc::now().timestamp() - s.entry_time
                            );
                        }
                    }
                }
                let max_c = market_state.values().map(|s| s.candles.len()).max().unwrap_or(0);
                if max_c < BBW_MA_PERIOD {
                    info!("   ⏳ Warm-up: {}/{} candles (faltam ~{} min)", max_c, BBW_MA_PERIOD, BBW_MA_PERIOD - max_c);
                } else {
                    info!("   βœ… Warm-up OK β€” sinais sendo avaliados a cada tick");
                }
                info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
            }

            // ASRI halt
            Some(halt) = asri_halt_rx.recv() => {
                asri_suspended = halt;
                if halt {
                    for (sym, state) in &mut market_state {
                        if state.fsm == FsmState::PendingMaker {
                            if let Some(ref oid) = state.pending_order_id.clone() {
                                cancel_order(&http_client, sym, oid).await;
                            }
                            state.fsm = FsmState::Suspended;
                            state.pending_order_id = None;
                            warn!("πŸ”΄ [ASRI] {} β†’ SUSPENDED", sym);
                        }
                    }
                } else {
                    for state in market_state.values_mut() {
                        if state.fsm == FsmState::Suspended { state.fsm = FsmState::Idle; }
                    }
                }
            }

            // Resultado de ordem de entrada
            Some(result) = order_rx.recv() => {
                if let Some(state) = market_state.get_mut(&result.symbol) {
                    if result.success {
                        state.entry_price       = result.price;
                        state.entry_time        = Utc::now().timestamp();
                        state.position_is_short = result.is_short;
                        state.position_vol      = result.vol;
                        state.pending_order_id  = result.order_id;
                        if paper_trade {
                            state.fsm = FsmState::PositionOpen;
                            info!("πŸ“‹ [PAPER] {} β†’ POSITION_OPEN | px={:.4} qty={:.4} | SL={:.4} TP={:.4}",
                                result.symbol, result.price, result.vol,
                                state.sl_price(), state.tp_price());
                        } else {
                            state.fsm = FsmState::PendingMaker;
                            info!("⏳ [FSM] {} β†’ PENDING_MAKER | px={:.4}", result.symbol, result.price);
                        }
                    } else {
                        state.fsm = FsmState::Idle;
                    }
                }
            }

            // Mensagens WebSocket
            msg_opt = ws.next() => {
                let message = match msg_opt {
                    Some(Ok(m))  => m,
                    Some(Err(e)) => { error!("❌ WS erro: {}. Reconectando...", e); break; }
                    None         => { warn!("πŸ“΄ WS encerrado. Reconectando..."); break; }
                };

                // Regra 2: BingX envia TUDO comprimido em GZIP via Message::Binary
                // Texto puro "Ping" tambΓ©m vem comprimido β€” descomprime ANTES de verificar
                let text = match message {
                    Message::Binary(data) => {
                        match decompress_gzip(&data) {
                            Some(s) => s,
                            None    => continue,
                        }
                    }
                    Message::Ping(p) => { let _ = ws.send(Message::Pong(p)).await; continue; }
                    Message::Pong(_) => { last_pong = Utc::now().timestamp_millis(); continue; }
                    Message::Text(t) => t,
                    _ => continue,
                };

                // Regra 2: intercepta "Ping" ANTES de tentar parse JSON
                if text.trim() == "Ping" {
                    last_pong = Utc::now().timestamp_millis();
                    if ws.send(Message::Text("Pong".to_string())).await.is_err() {
                        error!("πŸ’” Pong falhou. Reconectando...");
                        break;
                    }
                    continue;
                }

                let parsed = match serde_json::from_str::<Value>(&text) {
                    Ok(v)  => v,
                    Err(e) => {
                        warn!("πŸ” [RAW] Parse falhou: {} | raw='{}'", e, &text[..text.len().min(200)]);
                        continue;
                    }
                };

                // BingX pode usar "dataType" ou "e" β€” detecta ambos
                let data_type = parsed["dataType"].as_str()
                    .or_else(|| parsed["e"].as_str())
                    .unwrap_or("")
                    .to_string();

                if data_type.is_empty() {
                    // ACK de subscriΓ§Γ£o puro β€” ignora silenciosamente
                    continue;
                }

                // Extrai sΓ­mbolo do dataType: "BTC-USDT@kline_1m" β†’ "BTC-USDT"
                let symbol = data_type.split('@').next().unwrap_or("").to_string();
                if symbol.is_empty() || !alvos.contains(&symbol) { continue; }

                let now = Utc::now().timestamp();

                // ---- DEPTH ----
                if data_type.contains("@depth") {
                    if let Some(dtx) = depth_tx_map.get(&symbol) {
                        let _ = dtx.try_send(parsed);
                    }
                    continue;
                }

                // ---- KLINE (Regra 6) ----
                // Formato real BingX: {"dataType":"BTC-USDT@kline_1m","s":"BTC-USDT",
                //   "data":[{"c":"74315.5","o":"74270.7","h":"74320.0","l":"74257.8","T":1773801540000}]}
                // data Γ© um ARRAY β€” pegamos o primeiro elemento [0]
                if data_type.contains("@kline_") {
                    let kd = &parsed["data"][0]; // ← array, Γ­ndice 0

                    let parse_f = |key: &str| -> f64 {
                        let v = &kd[key];
                        v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse().ok())).unwrap_or(0.0)
                    };

                    let o = parse_f("o");
                    let h = parse_f("h");
                    let l = parse_f("l");
                    let c = parse_f("c");

                    // Timestamp de abertura do candle em ms β€” campo "T"
                    let candle_ts_ms: i64 = kd["T"].as_i64()
                        .or_else(|| kd["T"].as_str().and_then(|s| s.parse().ok()))
                        .unwrap_or(0);

                    let candle_min = (candle_ts_ms / 1000) / 60;

                    if o == 0.0 || h == 0.0 || l == 0.0 || c == 0.0 { continue; }

                    if let Some(state) = market_state.get_mut(&symbol) {
                        // SΓ³ processa candle novo (1 por minuto)
                        if candle_min > state.last_candle_ts {
                            state.last_candle_ts = candle_min;
                            state.push_candle(Candle { open: o, high: h, low: l, close: c });

                            let n = state.candles.len();
                            if n % 5 == 0 || n <= 5 {
                                info!("πŸ“Š [{}] candles_1m={} adx={:.1} bbw_sq={} z_buf={} c={:.4}",
                                    symbol, n, state.adx, state.bbw_squeeze(),
                                    state.zscore_closes.len(), c);
                            }
                        }
                    }
                }

                // SΓ³ processa lΓ³gica de entrada/saΓ­da em eventos kline
                if !data_type.contains("@kline_") { continue; }

                // Tick price atual β€” tambΓ©m em data[0]["c"]
                let tick_price = {
                    let v = &parsed["data"][0]["c"];
                    v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse::<f64>().ok())).unwrap_or(0.0)
                };
                if tick_price == 0.0 { continue; }

                let params = shared_params.read().await.clone();

                // LΓͺ OBI e best bid/ask com await direto β€” sem block_on
                let (obi, best_bid, best_ask): (f64, Option<f64>, Option<f64>) =
                    if let Some(book) = book_map.get(&symbol) {
                        let b = book.read().await;
                        (b.obi(OBI_DEPTH), b.best_bid(), b.best_ask())
                    } else {
                        (0.0, None, None)
                    };

                let state = match market_state.get_mut(&symbol) { Some(s) => s, None => continue };

                if state.check_daily_halt(usdt_balance) {
                    if state.fsm != FsmState::Suspended {
                        state.fsm = FsmState::Suspended;
                        warn!("πŸ”΄ [DD HALT] {} bloqueado.", symbol);
                    }
                    continue;
                }

                if asri_suspended && state.fsm == FsmState::Idle {
                    state.fsm = FsmState::Suspended;
                    continue;
                }

                match state.fsm.clone() {

                    FsmState::Idle => {
                        if now - state.last_guillotine_ts < COOLDOWN_AFTER_GUILLOTINE_SEC { continue; }
                        if !state.regime_is_lateral() { continue; }

                        let z = state.zscore(tick_price);
                        let signal: Option<bool> = if z <= -params.z_threshold && obi > params.obi_threshold {
                            Some(false) // LONG
                        } else if z >= params.z_threshold && obi < -params.obi_threshold {
                            Some(true)  // SHORT
                        } else {
                            None
                        };

                        if let Some(is_short) = signal {
                            let order_price = if is_short {
                                best_ask.unwrap_or(tick_price)
                            } else {
                                best_bid.unwrap_or(tick_price)
                            };

                            info!("🎯 [SIGNAL] {} {} | Z={:.3} OBI={:.3} ADX={:.1} BBW={}",
                                if is_short { "SHORT" } else { "LONG" },
                                symbol, z, obi, state.adx, state.bbw_squeeze());

                            state.fsm = FsmState::SignalAcquired;

                            let meta = instrument_map.get(&symbol).unwrap().clone();
                            let sym  = symbol.clone();
                            let tx   = order_tx.clone();

                            if paper_trade {
                                let vol = {
                                    let notional = usdt_balance * RISK_PER_TRADE_PCT;
                                    let raw = notional / order_price;
                                    let f   = 10f64.powi(meta.qty_decimals as i32);
                                    (raw * f).floor().max(1.0) / f
                                };
                                info!("πŸ“‹ [PAPER] {} {} | px={:.prec$} | qty={:.qprec$}",
                                    if is_short {"SHORT"} else {"LONG"}, sym,
                                    order_price, vol,
                                    prec  = meta.price_decimals,
                                    qprec = meta.qty_decimals);
                                let _ = tx.send(EntryResult {
                                    symbol: sym, success: true, price: order_price,
                                    is_short, vol, order_id: Some("PAPER".to_string()),
                                }).await;
                            } else {
                                let cli = http_client.clone();
                                tokio::spawn(async move {
                                    execute_entry_order(cli, sym, is_short, order_price, usdt_balance, meta, tx).await;
                                });
                            }
                        }
                    }

                    FsmState::PendingMaker => {
                        let z = state.zscore(tick_price);
                        let lost = if state.position_is_short { z < params.z_threshold }
                                   else { z > -params.z_threshold };
                        if lost {
                            if let Some(ref oid) = state.pending_order_id.clone() {
                                let cli = http_client.clone();
                                let s   = symbol.clone();
                                let o   = oid.clone();
                                tokio::spawn(async move { cancel_order(&cli, &s, &o).await; });
                            }
                            state.fsm = FsmState::Idle;
                            state.pending_order_id = None;
                            info!("↩️ [FSM] {} PENDING β†’ IDLE (sinal expirou)", symbol);
                        }
                    }

                    FsmState::PositionOpen | FsmState::RiskManagement => {
                        state.fsm = FsmState::RiskManagement;
                        if let Some(reason) = state.check_exit(tick_price, now, params.ts_seconds) {
                            let pnl      = state.pnl_pct(tick_price);
                            let is_taker = reason.is_taker();
                            let label    = reason.label();
                            let tp_px    = state.tp_price();

                            warn!("πŸ›‘οΈ [SAÍDA] {} | {} | PnL: {:.3}% | {}s | SL={:.4} TP={:.4}",
                                symbol, label, pnl * 100.0,
                                now - state.entry_time,
                                state.sl_price(), state.tp_price());

                            if pnl < 0.0 {
                                state.register_loss(pnl.abs() * state.entry_price * state.position_vol);
                            }

                            let meta  = instrument_map.get(&symbol).unwrap().clone();
                            let sym   = symbol.clone();
                            let short = state.position_is_short;
                            let vol   = state.position_vol;

                            if paper_trade {
                                info!("πŸ“‹ [PAPER] SaΓ­da {} {} | {} | PnL: {:.3}%",
                                    if short {"SHORT"} else {"LONG"}, sym, label, pnl * 100.0);
                            } else {
                                let cli = http_client.clone();
                                tokio::spawn(async move {
                                    execute_exit_order(cli, sym, short, vol, is_taker, Some(tp_px), meta).await;
                                });
                            }

                            let report = json!({
                                "action":       "TRADE_CLOSED",
                                "symbol":       symbol,
                                "side":         if state.position_is_short {"short"} else {"long"},
                                "entry_price":  state.entry_price,
                                "exit_price":   tick_price,
                                "pnl_pct":      format!("{:.5}", pnl).parse::<f64>().unwrap_or(0.0),
                                "duration_sec": now - state.entry_time,
                                "exit_reason":  label,
                                "atr_sl":       state.atr_sl,
                                "adx":          state.adx,
                                "timestamp":    now
                            });
                            let _ = ipc_tx.send(format!("{}\n", report)).await;

                            state.fsm               = FsmState::GuillotineTriggered;
                            state.last_guillotine_ts = now;
                            state.entry_price       = 0.0;
                            state.entry_time        = 0;
                            state.position_vol      = 0.0;
                            state.pending_order_id  = None;
                        }
                    }

                    FsmState::GuillotineTriggered => {
                        if now - state.last_guillotine_ts >= COOLDOWN_AFTER_GUILLOTINE_SEC {
                            state.fsm = FsmState::Idle;
                            info!("πŸ”„ [FSM] {} COOLDOWN β†’ IDLE", symbol);
                        }
                    }

                    FsmState::SignalAcquired | FsmState::Suspended => {}
                }
            }
        }
    }

    warn!("πŸ”„ Reconectando em 3s...");
    tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
}

}