// ============================================================================= // APEX PREDATOR V7 — LIQUIDATION CASCADE SNIPER // Engine: Rust + Tokio | Exchange: OKX Futures (SWAP) // Estratégia: OI Exhaustion + Wick Rejection + Funding Extremo // Auditado: Março 2026 // ============================================================================= // // Cargo.toml necessário: // [dependencies] // tokio = { version = "1", features = ["full"] } // tokio-tungstenite = { version = "0.21", features = ["native-tls"] } // futures-util = "0.3" // reqwest = { version = "0.11", features = ["json"] } // serde_json = "1" // hmac = "0.12" // sha2 = "0.10" // base64 = "0.21" // chrono = "0.4" // log = "0.4" // env_logger = "0.11" // dotenv = "0.15" // =============================================================================
use base64::prelude::*; use chrono::Utc; use futures_util::{SinkExt, StreamExt}; use hmac::{Hmac, Mac}; use log::{error, info, warn}; use reqwest::Client; use serde_json::{json, Value}; use sha2::Sha256; use std::collections::HashMap; use std::env; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; use tokio::sync::{oneshot, Mutex}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
type HmacSha256 = Hmac; type SharedStream = Arc<Mutextokio::net::TcpStream>;
// ============================================================================= // CONSTANTES DE CONFIGURAÇÃO — AJUSTE AQUI, NÃO NO MEIO DO CÓDIGO // =============================================================================
/// % mínimo de queda do OI na janela de 5min para acionar análise de liquidação const OI_DROP_ALARM_PCT: f64 = 1.5;
/// Janela de referência do OI em segundos (5 minutos) const OI_WINDOW_SEC: i64 = 300;
/// % mínimo do pavio (upper para short, lower para long) em relação ao candle total const MIN_WICK_RATIO: f64 = 35.0;
/// Funding rate mínimo (em %) para considerar euforia/pânico no mercado /// Short setup: funding > +FUNDING_THRESHOLD (mercado overleveraged long) /// Long setup: funding < -FUNDING_THRESHOLD (mercado overleveraged short) const FUNDING_THRESHOLD: f64 = 0.01;
/// % de perda máxima tolerada antes do stop loss hard /// Curto porque se passou do pavio, a tese de reversão falhou const STOP_LOSS_PCT: f64 = 1.5;
/// % de lucro alvo (take profit) const TAKE_PROFIT_PCT: f64 = 2.5;
/// Segundos máximos aguardando o snap-back. Se não veio, sai. const TIME_STOP_SEC: i64 = 90;
/// % do saldo disponível arriscado por trade const RISK_PER_TRADE_PCT: f64 = 0.01; // 1%
/// Cooldown mínimo entre trades no mesmo símbolo (segundos) const COOLDOWN_SEC: i64 = 120;
/// Intervalo do heartbeat para o WS da OKX (segundos) const WS_PING_INTERVAL_SEC: u64 = 20;
/// Intervalo do log de telemetria (segundos) const TELEMETRY_INTERVAL_SEC: u64 = 300;
// ============================================================================= // MÓDULO DE CRIPTOGRAFIA — ASSINATURA HMAC-SHA256 PARA A OKX // =============================================================================
fn generate_okx_headers( method: &str, path: &str, body: &str, ) -> (String, String, String, String) { let api_key = env::var("OKX_API_KEY").expect("OKX_API_KEY não encontrada no .env"); let secret_key = env::var("OKX_SECRET_KEY").expect("OKX_SECRET_KEY não encontrada no .env"); let passphrase = env::var("OKX_PASSPHRASE").expect("OKX_PASSPHRASE não encontrada no .env");
let timestamp = Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.to_string();
let payload = format!("{}{}{}{}", timestamp, method, path, body);
let mut mac = HmacSha256::new_from_slice(secret_key.as_bytes())
.expect("HMAC aceita qualquer tamanho de chave");
mac.update(payload.as_bytes());
let signature = BASE64_STANDARD.encode(mac.finalize().into_bytes());
(api_key, signature, timestamp, passphrase)
}
// ============================================================================= // MÓDULO DE INSTRUMENTOS — BUSCA TICK SIZE E CONTRACT VALUE DA OKX // =============================================================================
/// Metadados do instrumento necessários para formatar ordens corretamente #[derive(Clone, Debug)] struct InstrumentMeta { /// Tamanho mínimo de variação de preço (ex: 0.1 para BTC, 0.0001 para DOGE) tick_sz: f64, /// Valor em unidades base por contrato (ex: 0.01 BTC por contrato) ct_val: f64, /// Número de casas decimais do tick_sz (calculado uma vez) price_decimals: usize, }
async fn fetch_instrument_meta(symbol: &str) -> Result<InstrumentMeta, Box> { let client = Client::new(); let url = format!( "https://www.okx.com/api/v5/public/instruments?instType=SWAP&instId={}", symbol ); let res = client.get(&url).send().await?; let parsed: Value = res.json().await?;
let inst = &parsed["data"][0];
let tick_sz: f64 = inst["tickSz"].as_str().unwrap_or("0.01").parse()?;
let ct_val: f64 = inst["ctVal"].as_str().unwrap_or("1").parse()?;
// Calcula casas decimais a partir do tickSz
let price_decimals = if tick_sz < 1.0 {
(-tick_sz.log10().floor()) as usize
} else {
0
};
Ok(InstrumentMeta {
tick_sz,
ct_val,
price_decimals,
})
}
/// Arredonda o preço para o tick size correto do instrumento fn round_to_tick(price: f64, tick_sz: f64, decimals: usize) -> String { let factor = 10f64.powi(decimals as i32); let rounded = (price / tick_sz).round() * tick_sz; format!("{:.prec$}", (rounded * factor).round() / factor, prec = decimals) }
// ============================================================================= // MÓDULO DE BALANÇO — RETORNA SALDO USDT DISPONÍVEL // =============================================================================
async fn fetch_usdt_balance() -> Result<f64, Box> { dotenv::dotenv().ok(); let path = "/api/v5/account/balance"; let (api_key, sign, ts, pass) = generate_okx_headers("GET", path, "");
let client = Client::new();
let res = client
.get(format!("https://www.okx.com{}", path))
.header("OK-ACCESS-KEY", &api_key)
.header("OK-ACCESS-SIGN", &sign)
.header("OK-ACCESS-TIMESTAMP", &ts)
.header("OK-ACCESS-PASSPHRASE", &pass)
.send()
.await?;
let parsed: Value = res.json().await?;
if parsed["code"] != "0" {
error!("💀 [AUTH] Falha: {}", parsed);
panic!("Credenciais inválidas. Abortando.");
}
if let Some(details) = parsed["data"][0]["details"].as_array() {
for asset in details {
if asset["ccy"] == "USDT" {
let balance = asset["availEq"]
.as_str()
.unwrap_or("0")
.parse::<f64>()
.unwrap_or(0.0);
info!("🏦 [OKX] Auth OK. Saldo disponível: {:.2} USDT", balance);
return Ok(balance);
}
}
}
warn!("⚠️ [OKX] USDT não encontrado na conta. Verifique se é conta de futuros.");
Ok(0.0)
}
// ============================================================================= // MÓDULO DE EXECUÇÃO — ABERTURA DE POSIÇÃO (POST-ONLY MAKER) // =============================================================================
/// Retorna true se a ordem foi aceita pela OKX (code == "0") async fn execute_entry_order( symbol: &str, side: &str, // "sell" para short, "buy" para long pos_side: &str, // "short" ou "long" price: f64, balance: f64, meta: &InstrumentMeta, ) -> bool { let path = "/api/v5/trade/order";
// Calcula sz em número de contratos baseado no risco definido
let notional = balance * RISK_PER_TRADE_PCT;
// sz = valor em USDT / (preço do ativo * valor do contrato em unidades base)
let sz = (notional / (price * meta.ct_val)).floor().max(1.0) as u64;
let px_str = round_to_tick(price, meta.tick_sz, meta.price_decimals);
let order_payload = json!({
"instId": symbol,
"tdMode": "isolated",
"side": side,
"posSide": pos_side,
"ordType": "post_only",
"sz": sz.to_string(),
"px": px_str
})
.to_string();
let (api_key, sign, ts, pass) = generate_okx_headers("POST", path, &order_payload);
let client = Client::new();
match client
.post(format!("https://www.okx.com{}", path))
.header("OK-ACCESS-KEY", api_key)
.header("OK-ACCESS-SIGN", sign)
.header("OK-ACCESS-TIMESTAMP", ts)
.header("OK-ACCESS-PASSPHRASE", pass)
.header("Content-Type", "application/json")
.body(order_payload)
.send()
.await
{
Ok(res) => {
let text = res.text().await.unwrap_or_default();
let parsed: Value = serde_json::from_str(&text).unwrap_or_default();
let success = parsed["code"] == "0";
if success {
warn!(
"💥 [ENTRADA] {} {} | sz={} | px={} | Aceito pela OKX",
pos_side.to_uppercase(), symbol, sz, price
);
} else {
error!("❌ [ENTRADA REJEITADA] {} | Resposta: {}", symbol, text);
}
success
}
Err(e) => {
error!("❌ [REDE] Falha ao enviar entrada para {}: {}", symbol, e);
false
}
}
}
// ============================================================================= // MÓDULO DE SAÍDA — FECHAMENTO DE POSIÇÃO (MARKET PARA GARANTIR EXECUÇÃO) // =============================================================================
async fn execute_exit_order(symbol: &str, pos_side: &str, sz: u64) { let path = "/api/v5/trade/order";
// Para fechar: lado oposto da posição, mesmo posSide
let close_side = if pos_side == "short" { "buy" } else { "sell" };
let order_payload = json!({
"instId": symbol,
"tdMode": "isolated",
"side": close_side,
"posSide": pos_side,
"ordType": "market",
"sz": sz.to_string()
})
.to_string();
let (api_key, sign, ts, pass) = generate_okx_headers("POST", path, &order_payload);
let client = Client::new();
match client
.post(format!("https://www.okx.com{}", path))
.header("OK-ACCESS-KEY", api_key)
.header("OK-ACCESS-SIGN", sign)
.header("OK-ACCESS-TIMESTAMP", ts)
.header("OK-ACCESS-PASSPHRASE", pass)
.header("Content-Type", "application/json")
.body(order_payload)
.send()
.await
{
Ok(res) => {
let text = res.text().await.unwrap_or_default();
warn!("🛡️ [SAÍDA] {} {} | Resposta: {}", pos_side.to_uppercase(), symbol, text);
}
Err(e) => {
error!("❌ [SAÍDA CRÍTICA] Falha ao fechar {} {}: {}", pos_side, symbol, e);
// Em produção: implementar retry com backoff aqui
}
}
}
// ============================================================================= // ESTADO DE CADA MOEDA MONITORADA // =============================================================================
struct CoinState { // --- Dados de mercado --- highest_oi: f64, highest_oi_ts: i64, // Timestamp do último reset da janela de OI current_oi: f64, // Último OI recebido current_funding: f64, // Em %, ex: 0.015 = 0.015% last_close: f64, // Último preço de fechamento (candle 1m) upper_wick_ratio: f64, // % do pavio superior em relação ao candle total lower_wick_ratio: f64, // % do pavio inferior em relação ao candle total last_candle_ts: i64, // Último ts de candle recebido
// --- Controle de intensidade / “dor” ---
last_oi: f64, // OI do tick anterior
last_oi_ts: i64, // ts do OI anterior
oi_panic_score: f64, // score de “choque” recente de OI
last_signal_ts: i64, // ts do último gatilho calculado
// --- Controle de trades ---
last_shot_ts: i64, // Timestamp do último trade (cooldown)
// --- Estado da posição aberta ---
position_active: bool,
position_side: String, // "short" ou "long"
entry_price: f64, // Preço de entrada CONFIRMADO via API (ou aproximação)
entry_time: i64,
position_sz: u64, // Contratos abertos (necessário para fechar corretamente)
// --- Trailing / scalping ---
max_favorable_pnl: f64, // melhor PnL alcançado desde a entrada (em %)
}
impl CoinState { fn new() -> Self { CoinState { highest_oi: 0.0, highest_oi_ts: 0, current_oi: 0.0, current_funding: 0.0, last_close: 0.0, upper_wick_ratio: 0.0, lower_wick_ratio: 0.0, last_candle_ts: 0, last_oi: 0.0, last_oi_ts: 0, oi_panic_score: 0.0, last_signal_ts: 0, last_shot_ts: 0, position_active: false, position_side: String::new(), entry_price: 0.0, entry_time: 0, position_sz: 0, max_favorable_pnl: 0.0, } } }
enum TradeSignal { Short, Long, None, }
fn evaluate_signal(state: &CoinState, current_time: i64) -> TradeSignal { // 1) Pré-condições mínimas if state.last_close == 0.0 || state.highest_oi == 0.0 || state.last_candle_ts == 0 { return TradeSignal::None; }
// 2) Cooldown por símbolo
if current_time - state.last_shot_ts < COOLDOWN_SEC {
return TradeSignal::None;
}
// 3) “Freshness” do evento: só opera se o drop de OI for MUITO recente
// Isso transforma a lógica em um scalper de choque, não em um mean reversion de 5min.
let age_from_oi_peak = current_time - state.highest_oi_ts;
if age_from_oi_peak > 15 {
// Se o pico de OI é velho (>15s), não tem flush fresco pra scalpar
return TradeSignal::None;
}
// 4) Cálculo de drop de OI bruto
let drop_pct = ((state.highest_oi - state.current_oi) / state.highest_oi) * 100.0;
if drop_pct < OI_DROP_ALARM_PCT {
return TradeSignal::None;
}
// 5) “Panic score” — intensidade do movimento de OI por tempo
// Se o último tick de OI foi recente e a variação foi forte, score aumenta.
let mut panic_score = state.oi_panic_score;
if state.last_oi_ts > 0 && current_time > state.last_oi_ts {
let dt = (current_time - state.last_oi_ts) as f64;
let d_oi = (state.last_oi - state.current_oi).abs();
let speed = if dt > 0.0 { d_oi / dt } else { 0.0 };
// Decaimento exponencial simples + reforço com velocidade atual
panic_score = panic_score * 0.8 + speed * 0.2;
}
// Threshold dinâmico de panic — quanto maior o drop, menor a exigência de speed
let panic_threshold = (OI_DROP_ALARM_PCT / 100.0) * state.highest_oi * 0.01;
if panic_score < panic_threshold {
return TradeSignal::None;
}
// 6) Filtro de contexto do candle: wick forte + candle ainda vivo
// Só opera se o candle atual estiver “no meio da guerra”, não candle morto.
let candle_age = current_time - state.last_candle_ts;
if candle_age > 10 {
// Candle velho demais pra scalper milissegundo/segundos
return TradeSignal::None;
}
// 7) Funding como vento a favor (não gate absoluto)
// Elite não joga fora flush bom porque funding está neutro.
let funding = state.current_funding;
let funding_bias_long = if funding < -FUNDING_THRESHOLD {
1.0
} else if funding < 0.0 {
0.5
} else {
0.0
};
let funding_bias_short = if funding > FUNDING_THRESHOLD {
1.0
} else if funding > 0.0 {
0.5
} else {
0.0
};
// 8) Score final para SHORT / LONG
let short_score = if state.upper_wick_ratio > MIN_WICK_RATIO {
// Drop de OI + wick de topo + funding pró-short
drop_pct * 0.5 + state.upper_wick_ratio * 0.3 + funding_bias_short * 20.0
} else {
0.0
};
let long_score = if state.lower_wick_ratio > MIN_WICK_RATIO {
// Drop de OI + wick de fundo + funding pró-long
drop_pct * 0.5 + state.lower_wick_ratio * 0.3 + funding_bias_long * 20.0
} else {
0.0
};
// 9) Thresholds de scalper: exige score razoável, não sinal qualquer
let min_score = 10.0;
if short_score > long_score && short_score >= min_score {
TradeSignal::Short
} else if long_score > short_score && long_score >= min_score {
TradeSignal::Long
} else {
TradeSignal::None
}
}
// ============================================================================= // LÓGICA DE SAÍDA — DECIDE SE FECHA A POSIÇÃO ABERTA // ============================================================================= enum ExitReason { StopLoss, TakeProfit, TimeStopProfit, TimeStopLoss, ScalperTrail, // novo motivo: trailing acionado }
fn evaluate_exit(state: &mut CoinState, current_time: i64) -> Option { if !state.position_active || state.entry_price == 0.0 { return None; }
// PnL atual em %
let pnl_pct = if state.position_side == "short" {
((state.entry_price - state.last_close) / state.entry_price) * 100.0
} else {
((state.last_close - state.entry_price) / state.entry_price) * 100.0
};
let duration = current_time - state.entry_time;
// Atualiza melhor PnL desde a entrada para trailing
if pnl_pct > state.max_favorable_pnl {
state.max_favorable_pnl = pnl_pct;
}
// 1) Stop loss hard — não negocia com o mercado
if pnl_pct <= -STOP_LOSS_PCT {
return Some(ExitReason::StopLoss);
}
// 2) Scalper: se o trade andou bem e voltou forte, zera sem pensar
// Ex: bateu +2% e voltou para +0.8% rápido => sai.
let trail_trigger = 1.5; // só ativa trailing se já passou disso
let trail_gap = 0.8; // quanto pode devolver antes de sair
if state.max_favorable_pnl >= trail_trigger && pnl_pct <= state.max_favorable_pnl - trail_gap {
return Some(ExitReason::ScalperTrail);
}
// 3) Take Profit nominal (se não bateu trailing mas atingiu alvo cheio)
if pnl_pct >= TAKE_PROFIT_PCT {
return Some(ExitReason::TakeProfit);
}
// 4) Time stop agressivo de scalper
if duration > TIME_STOP_SEC {
if pnl_pct > 0.0 {
return Some(ExitReason::TimeStopProfit);
} else {
return Some(ExitReason::TimeStopLoss);
}
}
None
}
async fn fetch_initial_prices(symbols: &[String]) -> HashMap<String, f64> { let mut initial_prices = HashMap::new(); let client = Client::new();
// Na OKX, o endpoint /tickers retorna os preços de todas as moedas na hora
let url = "https://www.okx.com/api/v5/market/tickers?instType=SWAP";
if let Ok(res) = client.get(url).send().await {
if let Ok(text) = res.text().await {
if let Ok(parsed) = serde_json::from_str::<Value>(&text) {
if let Some(data) = parsed["data"].as_array() {
for inst in data {
let symbol = inst["instId"].as_str().unwrap_or("").to_string();
if symbols.contains(&symbol) {
let last_price = inst["last"].as_str().unwrap_or("0").parse::<f64>().unwrap_or(0.0);
initial_prices.insert(symbol, last_price);
}
}
}
}
}
}
initial_prices
}
// ============================================================================= // MOTOR PRINCIPAL // =============================================================================
#[tokio::main] async fn main() { dotenv::dotenv().ok(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
info!("╔══════════════════════════════════════════════╗");
info!("║ APEX PREDATOR V7 — LIQUIDATION SNIPER ║");
info!("║ OKX Futures | Rust + Tokio ║");
info!("╚══════════════════════════════════════════════╝");
// --- 1. Autenticação e saldo ---
// PAPER TRADE: saldo 0 é permitido. Para dinheiro real, reative a checagem abaixo.
let usdt_balance = fetch_usdt_balance().await.unwrap_or(0.0);
if usdt_balance == 0.0 {
warn!("⚠️ [PAPER TRADE] Saldo 0 USDT — bot roda normalmente, ordens logadas mas NÃO enviadas.");
} else {
info!("💰 Saldo disponível: {:.2} USDT", usdt_balance);
}
// --- 2. Recebe alvos do Python Maestro via IPC ---
let maestro_raw = TcpStream::connect("127.0.0.1:9001")
.await
.expect("Python Maestro offline. Inicie o Terminal 1 primeiro.");
// Compartilha o stream com Arc<Mutex> para escrita segura do loop de trades
let maestro_stream: SharedStream = Arc::new(Mutex::new(maestro_raw));
let alvos: Vec<String> = {
let mut guard = maestro_stream.lock().await;
let mut reader = BufReader::new(&mut *guard);
let mut line = String::new();
reader.read_line(&mut line).await.expect("Falha ao ler alvos do Maestro");
let cmd: Value = serde_json::from_str(line.trim()).expect("JSON inválido do Maestro");
let symbols: Vec<String> = cmd["symbols"]
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
info!("🎯 {} alvos recebidos do Maestro.", symbols.len());
symbols
};
{
let mut guard = maestro_stream.lock().await;
let _ = guard.write_all(b"Sniper armado. Aguardando liquidacoes.\n").await;
}
// --- 3. Pré-carrega metadados de instrumentos (tickSz, ctVal) ---
let mut instrument_map: HashMap<String, InstrumentMeta> = HashMap::new();
for symbol in &alvos {
match fetch_instrument_meta(symbol).await {
Ok(meta) => {
info!(
"📐 [{}] tickSz={} | ctVal={}",
symbol, meta.tick_sz, meta.ct_val
);
instrument_map.insert(symbol.clone(), meta);
}
Err(e) => {
warn!("⚠️ Falha ao buscar meta de {}. Usando padrão. Erro: {}", symbol, e);
instrument_map.insert(symbol.clone(), InstrumentMeta {
tick_sz: 0.01,
ct_val: 1.0,
price_decimals: 2,
});
}
}
}
// --- 4. Loop principal com reconexão automática ---
let okx_url = "wss://ws.okx.com:8443/ws/v5/public";
let okx_business_url = "wss://ws.okx.com:8443/ws/v5/business";
'reconnect: loop {
info!("🔌 Conectando ao WebSocket público da OKX...");
// Conexão 1: WS público — OI e Funding Rate
let (mut ws_public, _) = match connect_async(okx_url).await {
Ok(s) => s,
Err(e) => {
error!("❌ Falha na conexão WS public: {}. Tentando em 5s...", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
continue 'reconnect;
}
};
// Conexão 2: WS business — Candles (OKX moveu candles para /business em 2023)
let (mut ws_business, _) = match connect_async(okx_business_url).await {
Ok(s) => s,
Err(e) => {
error!("❌ Falha na conexão WS business: {}. Tentando em 5s...", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
continue 'reconnect;
}
};
// Subscreve OI + Funding no WS público
let mut public_args = Vec::new();
for symbol in &alvos {
public_args.push(json!({"channel": "open-interest", "instId": symbol}));
public_args.push(json!({"channel": "funding-rate", "instId": symbol}));
}
if let Err(e) = ws_public.send(Message::Text(
json!({"op": "subscribe", "args": public_args}).to_string()
)).await {
error!("❌ Falha ao subscrever WS público: {}", e);
continue 'reconnect;
}
// Subscreve Candles no WS business
let mut business_args = Vec::new();
for symbol in &alvos {
business_args.push(json!({"channel": "candle1m", "instId": symbol}));
}
if let Err(e) = ws_business.send(Message::Text(
json!({"op": "subscribe", "args": business_args}).to_string()
)).await {
error!("❌ Falha ao subscrever WS business: {}", e);
continue 'reconnect;
}
info!("📡 Subscrições ativas (public + business). Sniper em espera...");
// Inicializa estado de cada moeda
info!("🔥 Aquecendo motores: Buscando preços de largada via REST...");
let initial_prices = fetch_initial_prices(&alvos).await;
// Inicializa estado de cada moeda já com o preço de largada injetado
let mut market_state: HashMap<String, CoinState> = alvos
.iter()
.map(|s| {
let start_price = *initial_prices.get(s).unwrap_or(&0.0);
let mut state = CoinState::new();
state.last_close = start_price; // Fim do Cold Start!
(s.clone(), state)
})
.collect();
info!("✅ Motores aquecidos. Tabela de preços pré-carregada.");
let mut ping_interval =
tokio::time::interval(tokio::time::Duration::from_secs(WS_PING_INTERVAL_SEC));
let mut telemetry_interval =
tokio::time::interval(tokio::time::Duration::from_secs(TELEMETRY_INTERVAL_SEC));
// --- 5. Event loop ---
// --- 5. Event loop ---
loop {
tokio::select! {
// --- Heartbeat para os dois WS ---
_ = ping_interval.tick() => {
let pub_ok = ws_public.send(Message::Ping(vec![])).await.is_ok();
let biz_ok = ws_business.send(Message::Ping(vec![])).await.is_ok();
if !pub_ok || !biz_ok {
error!("💔 Heartbeat falhou (public={} business={}). Reconectando...", pub_ok, biz_ok);
break;
}
}
// --- Telemetria periódica ---
_ = telemetry_interval.tick() => {
let active: Vec<&String> = market_state
.iter()
.filter(|(_, s)| s.position_active)
.map(|(k, _)| k)
.collect();
if active.is_empty() {
info!("🟢 [TELEMETRIA] Nenhuma posição aberta. Aguardando setup.");
} else {
warn!("🔶 [TELEMETRIA] Posições ativas: {:?}", active);
}
}
// --- Mensagens do WS público (OI + Funding) ---
msg_opt = ws_public.next() => {
let message = match msg_opt {
Some(Ok(m)) => m,
Some(Err(e)) => {
error!("❌ Erro no stream WS public: {}. Reconectando...", e);
break;
}
None => {
warn!("📴 Stream WS public encerrado. Reconectando...");
break;
}
};
match message {
Message::Ping(payload) => { let _ = ws_public.send(Message::Pong(payload)).await; }
Message::Pong(_) => {}
Message::Text(text) => {
// Ignora mensagens de controle da OKX (subscribe confirmado, etc.)
if text.contains("\"event\"") { continue; }
let parsed = match serde_json::from_str::<Value>(&text) {
Ok(v) => v,
Err(_) => continue,
};
let data_array = match parsed.get("data").and_then(|d| d.as_array()) {
Some(d) => d,
None => continue,
};
let arg = match parsed.get("arg") {
Some(a) => a,
None => continue,
};
let channel = arg.get("channel").and_then(|c| c.as_str()).unwrap_or("");
let inst_id = arg.get("instId").and_then(|i| i.as_str()).unwrap_or("");
let meta = match instrument_map.get(inst_id) {
Some(m) => m,
None => continue,
};
let state = match market_state.get_mut(inst_id) {
Some(s) => s,
None => continue,
};
let now = Utc::now().timestamp();
// -----------------------------------------------
// PROCESSAMENTO DE DADOS POR CANAL
// -----------------------------------------------
for item in data_array {
// Canal 1: Funding Rate
if channel == "funding-rate" {
if let Some(fr) = item.get("fundingRate")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
{
// OKX retorna como decimal, ex: 0.0001 = 0.01%
state.current_funding = fr * 100.0;
}
continue;
}
// Canal 2: Open Interest
if channel == "open-interest" {
let oi_val = match item.get("oi")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
{
Some(v) => v,
None => continue,
};
// Guarda histórico anterior
let prev_oi = state.current_oi;
let prev_ts = state.last_oi_ts;
// Atualiza OI atual e ts de OI
state.current_oi = oi_val;
state.last_oi_ts = now;
// Calcula velocidade de mudança de OI (panic_score)
if prev_ts > 0 && now > prev_ts {
let dt = (now - prev_ts) as f64;
let d_oi = (prev_oi - state.current_oi).abs();
let speed = if dt > 0.0 { d_oi / dt } else { 0.0 };
state.oi_panic_score = state.oi_panic_score * 0.8 + speed * 0.2;
}
// Atualiza janela de OI (pico dos últimos 5 minutos)
if state.highest_oi == 0.0 {
state.highest_oi = oi_val;
state.highest_oi_ts = now;
} else if now - state.highest_oi_ts > OI_WINDOW_SEC {
state.highest_oi = oi_val;
state.highest_oi_ts = now;
} else if oi_val > state.highest_oi {
state.highest_oi = oi_val;
state.highest_oi_ts = now;
}
let drop_pct = if state.highest_oi > 0.0 {
((state.highest_oi - oi_val) / state.highest_oi) * 100.0
} else {
0.0
};
if drop_pct > 0.1 || state.last_close > 0.0 {
info!(
"[STATUS] {} | close={:.8} | drop={:.3}% | funding={:.4}% | wick={:.1}% | panic={:.6}",
inst_id,
state.last_close,
drop_pct,
state.current_funding,
state.upper_wick_ratio,
state.oi_panic_score
);
}
// -----------------------------------------------
// GESTÃO DO TRADE ABERTO
// -----------------------------------------------
if state.position_active {
if let Some(reason) = evaluate_exit(state, now) {
let pnl_pct = if state.position_side == "short" {
((state.entry_price - state.last_close) / state.entry_price) * 100.0
} else {
((state.last_close - state.entry_price) / state.entry_price) * 100.0
};
let reason_str = match reason {
ExitReason::StopLoss => "STOP_LOSS",
ExitReason::TakeProfit => "TAKE_PROFIT",
ExitReason::TimeStopProfit => "TIME_STOP_LUCRO",
ExitReason::TimeStopLoss => "TIME_STOP_PREJUIZO",
ExitReason::ScalperTrail => "SCALPER_TRAIL",
};
error!(
"🛡️ [SAÍDA] {} | {} | PnL: {:.2}% | Duração: {}s",
inst_id,
reason_str,
pnl_pct,
now - state.entry_time
);
let sym = inst_id.to_string();
let side = state.position_side.clone();
let sz = state.position_sz;
tokio::spawn(async move {
execute_exit_order(&sym, &side, sz).await;
});
let report = json!({
"action": "TRADE_CLOSED",
"symbol": inst_id,
"side": state.position_side,
"entry_price": state.entry_price,
"exit_price": state.last_close,
"pnl_pct": format!("{:.4}", pnl_pct).parse::<f64>().unwrap_or(0.0),
"duration_sec": now - state.entry_time,
"exit_reason": reason_str,
"timestamp": now
});
let msg = format!("{}\n", report);
let maestro = Arc::clone(&maestro_stream);
tokio::spawn(async move {
let mut guard = maestro.lock().await;
let _ = guard.write_all(msg.as_bytes()).await;
});
state.position_active = false;
state.position_side = String::new();
state.entry_price = 0.0;
state.entry_time = 0;
state.position_sz = 0;
state.last_shot_ts = now;
state.max_favorable_pnl = 0.0;
}
continue;
}
// -----------------------------------------------
// ANÁLISE DE ENTRADA
// -----------------------------------------------
match evaluate_signal(state, now) {
TradeSignal::None => {}
signal @ (TradeSignal::Short | TradeSignal::Long) => {
let is_short = matches!(signal, TradeSignal::Short);
let side = if is_short { "sell" } else { "buy" };
let pos_side = if is_short { "short" } else { "long" };
let drop_pct = ((state.highest_oi - state.current_oi) / state.highest_oi) * 100.0;
warn!(
"🎯 [GATILHO] {} {} | OI drop: {:.2}% | Funding: {:.4}% | Wick: {:.1}% | panic={:.6}",
pos_side.to_uppercase(),
inst_id,
drop_pct,
state.current_funding,
if is_short { state.upper_wick_ratio } else { state.lower_wick_ratio },
state.oi_panic_score
);
let notional = usdt_balance * RISK_PER_TRADE_PCT;
let sz_estimate = (notional / (state.last_close * meta.ct_val))
.floor()
.max(1.0) as u64;
let sym = inst_id.to_string();
let price = state.last_close;
let bal = usdt_balance;
let tick_sz = meta.tick_sz;
let ct_val = meta.ct_val;
let decimals = meta.price_decimals;
let side_s = side.to_string();
let pos_side_s = pos_side.to_string();
let (tx, rx) = oneshot::channel::<bool>();
tokio::spawn(async move {
let meta_local = InstrumentMeta { tick_sz, ct_val, price_decimals: decimals };
let ok = execute_entry_order(
&sym, &side_s, &pos_side_s, price, bal, &meta_local
).await;
let _ = tx.send(ok);
});
match rx.await {
Ok(true) => {
state.position_active = true;
state.position_side = pos_side.to_string();
state.entry_price = state.last_close;
state.entry_time = now;
state.position_sz = sz_estimate;
state.last_shot_ts = now;
state.max_favorable_pnl = 0.0;
state.highest_oi = oi_val;
state.highest_oi_ts = now;
// TRADE_OPENED para o Python
let entry_report = json!({
"action": "TRADE_OPENED",
"symbol": inst_id,
"side": state.position_side,
"entry_price": state.entry_price,
"position_sz": state.position_sz,
"oi_drop_pct": drop_pct,
"oi_panic_score": state.oi_panic_score,
"upper_wick_ratio": state.upper_wick_ratio,
"lower_wick_ratio": state.lower_wick_ratio,
"funding_pct": state.current_funding,
"timestamp": now
});
let msg = format!("{}\n", entry_report);
let maestro = Arc::clone(&maestro_stream);
tokio::spawn(async move {
let mut guard = maestro.lock().await;
let _ = guard.write_all(msg.as_bytes()).await;
});
}
Ok(false) => {
error!("⚠️ [{}] Ordem rejeitada pela OKX. Estado não alterado.", inst_id);
}
Err(_) => {
error!("⚠️ [{}] Channel de confirmação falhou.", inst_id);
}
}
}
}
}
}
}
_ => {}
}
}
// --- Mensagens do WS business (Candles) ---
biz_opt = ws_business.next() => {
let message = match biz_opt {
Some(Ok(m)) => m,
Some(Err(e)) => {
error!("❌ Erro no stream WS business: {}. Reconectando...", e);
break;
}
None => {
warn!("📴 Stream WS business encerrado. Reconectando...");
break;
}
};
match message {
Message::Ping(payload) => { let _ = ws_business.send(Message::Pong(payload)).await; }
Message::Pong(_) => {}
Message::Text(text) => {
if text.contains("\"event\"") { continue; }
let parsed = match serde_json::from_str::<Value>(&text) {
Ok(v) => v,
Err(_) => continue,
};
let data_array = match parsed.get("data").and_then(|d| d.as_array()) {
Some(d) => d,
None => continue,
};
let arg = match parsed.get("arg") {
Some(a) => a,
None => continue,
};
let channel = arg.get("channel").and_then(|c| c.as_str()).unwrap_or("");
let inst_id = arg.get("instId").and_then(|i| i.as_str()).unwrap_or("");
if channel != "candle1m" {
continue;
}
let state = match market_state.get_mut(inst_id) {
Some(s) => s,
None => continue,
};
for item in data_array {
let kd = match item.as_array() {
Some(k) => k,
None => continue,
};
// ts do candle (ms -> s)
let ts = kd[0]
.as_str()
.unwrap_or("0")
.parse::<i64>()
.unwrap_or(0) / 1000;
state.last_candle_ts = ts;
// close
let close = kd[4]
.as_str()
.unwrap_or("0")
.parse::<f64>()
.unwrap_or(0.0);
state.last_close = close;
// wick ratios
let high = kd[2].as_str().unwrap_or("0").parse::<f64>().unwrap_or(0.0);
let low = kd[3].as_str().unwrap_or("0").parse::<f64>().unwrap_or(0.0);
let candle_range = (high - low).max(0.00000001);
let upper_wick = (high - close).max(0.0);
let lower_wick = (close - low).max(0.0);
state.upper_wick_ratio = (upper_wick / candle_range) * 100.0;
state.lower_wick_ratio = (lower_wick / candle_range) * 100.0;
}
}
_ => {}
}
}
} // fim tokio::select!
} // fim loop interno
warn!("🔄 Reconectando em 3 segundos...");
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
} // fim loop 'reconnect
}