Skip to content

Commit 5c61803

Browse files
feat: add protobuf integration (#198)
* Added protobuf integration * feat: protobuf impl refactor * fix: pr suggestions --------- Co-authored-by: Rok Černič <rok.cernic@gmail.com>
1 parent 10c9160 commit 5c61803

27 files changed

Lines changed: 204 additions & 90 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,4 @@ rust_decimal_macros = { version = "1.29.1" }
7171
bytes = { version = "1.5.0" }
7272
spin_sleep = { version = "1.3.0 "}
7373
criterion = { version = "0.5.1" }
74+
prost = { version = "0.12.4" }

barter-data/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,4 @@ derive_more = { workspace = true }
5353
itertools = { workspace = true }
5454
vecmap-rs = { workspace = true }
5555
fnv = { workspace = true }
56+
prost = { workspace = true }

barter-data/src/exchange/binance/futures/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use self::liquidation::BinanceLiquidation;
22
use super::{Binance, ExchangeServer};
33
use crate::{
4-
ExchangeWsStream, NoInitialSnapshots,
4+
NoInitialSnapshots,
55
exchange::{
66
StreamSelector,
7-
binance::futures::l2::{
8-
BinanceFuturesUsdOrderBooksL2SnapshotFetcher, BinanceFuturesUsdOrderBooksL2Transformer,
7+
binance::{
8+
BinanceWsStream,
9+
futures::l2::{
10+
BinanceFuturesUsdOrderBooksL2SnapshotFetcher,
11+
BinanceFuturesUsdOrderBooksL2Transformer,
12+
},
913
},
1014
},
1115
instrument::InstrumentData,
@@ -46,15 +50,15 @@ where
4650
Instrument: InstrumentData,
4751
{
4852
type SnapFetcher = BinanceFuturesUsdOrderBooksL2SnapshotFetcher;
49-
type Stream = ExchangeWsStream<BinanceFuturesUsdOrderBooksL2Transformer<Instrument::Key>>;
53+
type Stream = BinanceWsStream<BinanceFuturesUsdOrderBooksL2Transformer<Instrument::Key>>;
5054
}
5155

5256
impl<Instrument> StreamSelector<Instrument, Liquidations> for BinanceFuturesUsd
5357
where
5458
Instrument: InstrumentData,
5559
{
5660
type SnapFetcher = NoInitialSnapshots;
57-
type Stream = ExchangeWsStream<
61+
type Stream = BinanceWsStream<
5862
StatelessTransformer<Self, Instrument::Key, Liquidations, BinanceLiquidation>,
5963
>;
6064
}

barter-data/src/exchange/binance/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use crate::{
1111
transformer::stateless::StatelessTransformer,
1212
};
1313
use barter_instrument::exchange::ExchangeId;
14-
use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
14+
use barter_integration::{
15+
error::SocketError,
16+
protocol::websocket::{WebSocketSerdeParser, WsMessage},
17+
};
1518
use std::{fmt::Debug, marker::PhantomData};
1619
use url::Url;
1720

@@ -44,6 +47,9 @@ pub mod subscription;
4447
/// [`BinanceFuturesUsd`](futures::BinanceFuturesUsd).
4548
pub mod trade;
4649

50+
/// Convenient type alias for a Binance [`ExchangeWsStream`] using [`WebSocketSerdeParser`].
51+
pub type BinanceWsStream<Transformer> = ExchangeWsStream<WebSocketSerdeParser, Transformer>;
52+
4753
/// Generic [`Binance<Server>`](Binance) exchange.
4854
///
4955
/// ### Notes
@@ -106,7 +112,7 @@ where
106112
{
107113
type SnapFetcher = NoInitialSnapshots;
108114
type Stream =
109-
ExchangeWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BinanceTrade>>;
115+
BinanceWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BinanceTrade>>;
110116
}
111117

112118
impl<Instrument, Server> StreamSelector<Instrument, OrderBooksL1> for Binance<Server>
@@ -115,7 +121,7 @@ where
115121
Server: ExchangeServer + Debug + Send + Sync,
116122
{
117123
type SnapFetcher = NoInitialSnapshots;
118-
type Stream = ExchangeWsStream<
124+
type Stream = BinanceWsStream<
119125
StatelessTransformer<Self, Instrument::Key, OrderBooksL1, BinanceOrderBookL1>,
120126
>;
121127
}

barter-data/src/exchange/binance/spot/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use super::{Binance, ExchangeServer};
22
use crate::{
3-
ExchangeWsStream,
43
exchange::{
54
StreamSelector,
6-
binance::spot::l2::{
7-
BinanceSpotOrderBooksL2SnapshotFetcher, BinanceSpotOrderBooksL2Transformer,
5+
binance::{
6+
BinanceWsStream,
7+
spot::l2::{
8+
BinanceSpotOrderBooksL2SnapshotFetcher, BinanceSpotOrderBooksL2Transformer,
9+
},
810
},
911
},
1012
instrument::InstrumentData,
@@ -41,7 +43,7 @@ where
4143
Instrument: InstrumentData,
4244
{
4345
type SnapFetcher = BinanceSpotOrderBooksL2SnapshotFetcher;
44-
type Stream = ExchangeWsStream<BinanceSpotOrderBooksL2Transformer<Instrument::Key>>;
46+
type Stream = BinanceWsStream<BinanceSpotOrderBooksL2Transformer<Instrument::Key>>;
4547
}
4648

4749
impl Display for BinanceSpot {

barter-data/src/exchange/bitfinex/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ use crate::{
3232
transformer::stateless::StatelessTransformer,
3333
};
3434
use barter_instrument::exchange::ExchangeId;
35-
use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
35+
use barter_integration::{
36+
error::SocketError,
37+
protocol::websocket::{WebSocketSerdeParser, WsMessage},
38+
};
3639
use barter_macro::{DeExchange, SerExchange};
3740
use derive_more::Display;
3841
use serde_json::json;
@@ -64,6 +67,9 @@ pub mod validator;
6467
/// See docs: <https://docs.bitfinex.com/docs/ws-general>
6568
pub const BASE_URL_BITFINEX: &str = "wss://api-pub.bitfinex.com/ws/2";
6669

70+
/// Convenient type alias for a Bitfinex [`ExchangeWsStream`] using [`WebSocketSerdeParser`](barter_integration::protocol::websocket::WebSocketSerdeParser).
71+
pub type BitfinexWsStream<Transformer> = ExchangeWsStream<WebSocketSerdeParser, Transformer>;
72+
6773
/// [`Bitfinex`] exchange.
6874
///
6975
/// See docs: <https://docs.bitfinex.com/docs/ws-general>
@@ -117,7 +123,7 @@ where
117123
Instrument: InstrumentData,
118124
{
119125
type SnapFetcher = NoInitialSnapshots;
120-
type Stream = ExchangeWsStream<
126+
type Stream = BitfinexWsStream<
121127
StatelessTransformer<Self, Instrument::Key, PublicTrades, BitfinexMessage>,
122128
>;
123129
}

barter-data/src/exchange/bitfinex/validator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use barter_integration::{
1111
error::SocketError,
1212
protocol::{
1313
StreamParser,
14-
websocket::{WebSocket, WebSocketParser, WsMessage},
14+
websocket::{WebSocket, WebSocketSerdeParser, WsMessage},
1515
},
1616
subscription::SubscriptionId,
1717
};
@@ -36,7 +36,7 @@ pub struct BitfinexWebSocketSubValidator;
3636

3737
#[async_trait]
3838
impl SubscriptionValidator for BitfinexWebSocketSubValidator {
39-
type Parser = WebSocketParser;
39+
type Parser = WebSocketSerdeParser;
4040

4141
async fn validate<Exchange, Instrument, Kind>(
4242
mut instrument_map: Map<Instrument>,
@@ -82,7 +82,7 @@ impl SubscriptionValidator for BitfinexWebSocketSubValidator {
8282
None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
8383
};
8484

85-
match Self::Parser::parse::<BitfinexPlatformEvent>(response) {
85+
match <WebSocketSerdeParser as StreamParser<BitfinexPlatformEvent>>::parse(response) {
8686
Some(Ok(response)) => match response.validate() {
8787
// Bitfinex server is online
8888
Ok(BitfinexPlatformEvent::PlatformStatus(status)) => {

barter-data/src/exchange/bitmex/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use crate::{
1414
transformer::stateless::StatelessTransformer,
1515
};
1616
use barter_instrument::exchange::ExchangeId;
17-
use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
17+
use barter_integration::{
18+
error::SocketError,
19+
protocol::websocket::{WebSocketSerdeParser, WsMessage},
20+
};
1821
use derive_more::Display;
1922
use serde::de::{Error, Unexpected};
2023
use std::fmt::Debug;
@@ -38,6 +41,9 @@ pub mod subscription;
3841
/// Public trade types for [`Bitmex`].
3942
pub mod trade;
4043

44+
/// Convenient type alias for a Bitmex [`ExchangeWsStream`] using [`WebSocketSerdeParser`](barter_integration::protocol::websocket::WebSocketSerdeParser).
45+
pub type BitmexWsStream<Transformer> = ExchangeWsStream<WebSocketSerdeParser, Transformer>;
46+
4147
/// [`Bitmex`] server base url.
4248
///
4349
/// See docs: <https://www.bitmex.com/app/wsAPI>
@@ -84,7 +90,7 @@ where
8490
{
8591
type SnapFetcher = NoInitialSnapshots;
8692
type Stream =
87-
ExchangeWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BitmexTrade>>;
93+
BitmexWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BitmexTrade>>;
8894
}
8995

9096
impl<'de> serde::Deserialize<'de> for Bitmex {

barter-data/src/exchange/bybit/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ use crate::{
1515
transformer::stateless::StatelessTransformer,
1616
};
1717
use barter_instrument::exchange::ExchangeId;
18-
use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
18+
use barter_integration::{
19+
error::SocketError,
20+
protocol::websocket::{WebSocketSerdeParser, WsMessage},
21+
};
1922
use book::{BybitOrderBookMessage, l2::BybitOrderBooksL2Transformer};
2023
use serde::de::{Error, Unexpected};
2124
use std::{fmt::Debug, marker::PhantomData, time::Duration};
@@ -56,6 +59,9 @@ pub mod trade;
5659
/// [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
5760
pub mod book;
5861

62+
/// Convenient type alias for a Bybit [`ExchangeWsStream`] using [`WebSocketSerdeParser`](barter_integration::protocol::websocket::WebSocketSerdeParser).
63+
pub type BybitWsStream<Transformer> = ExchangeWsStream<WebSocketSerdeParser, Transformer>;
64+
5965
/// Generic [`Bybit<Server>`](Bybit) exchange.
6066
///
6167
/// ### Notes
@@ -122,7 +128,7 @@ where
122128
{
123129
type SnapFetcher = NoInitialSnapshots;
124130
type Stream =
125-
ExchangeWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BybitTrade>>;
131+
BybitWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BybitTrade>>;
126132
}
127133

128134
impl<Instrument, Server> StreamSelector<Instrument, OrderBooksL1> for Bybit<Server>
@@ -131,7 +137,7 @@ where
131137
Server: ExchangeServer + Debug + Send + Sync,
132138
{
133139
type SnapFetcher = NoInitialSnapshots;
134-
type Stream = ExchangeWsStream<
140+
type Stream = BybitWsStream<
135141
StatelessTransformer<Self, Instrument::Key, OrderBooksL1, BybitOrderBookMessage>,
136142
>;
137143
}
@@ -142,7 +148,7 @@ where
142148
Server: ExchangeServer + Debug + Send + Sync,
143149
{
144150
type SnapFetcher = NoInitialSnapshots;
145-
type Stream = ExchangeWsStream<BybitOrderBooksL2Transformer<Instrument::Key>>;
151+
type Stream = BybitWsStream<BybitOrderBooksL2Transformer<Instrument::Key>>;
146152
}
147153

148154
impl<'de, Server> serde::Deserialize<'de> for Bybit<Server>

barter-data/src/exchange/coinbase/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use crate::{
1111
transformer::stateless::StatelessTransformer,
1212
};
1313
use barter_instrument::exchange::ExchangeId;
14-
use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
14+
use barter_integration::{
15+
error::SocketError,
16+
protocol::websocket::{WebSocketSerdeParser, WsMessage},
17+
};
1518
use barter_macro::{DeExchange, SerExchange};
1619
use derive_more::Display;
1720
use serde_json::json;
@@ -37,6 +40,9 @@ pub mod trade;
3740
/// See docs: <https://docs.cloud.coinbase.com/exchange/docs/websocket-overview>
3841
pub const BASE_URL_COINBASE: &str = "wss://ws-feed.exchange.coinbase.com";
3942

43+
/// Convenient type alias for a Coinbase [`ExchangeWsStream`] using [`WebSocketSerdeParser`](barter_integration::protocol::websocket::WebSocketSerdeParser).
44+
pub type CoinbaseWsStream<Transformer> = ExchangeWsStream<WebSocketSerdeParser, Transformer>;
45+
4046
/// [`Coinbase`] exchange.
4147
///
4248
/// See docs: <https://docs.cloud.coinbase.com/exchange/docs/websocket-overview>
@@ -91,5 +97,5 @@ where
9197
{
9298
type SnapFetcher = NoInitialSnapshots;
9399
type Stream =
94-
ExchangeWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, CoinbaseTrade>>;
100+
CoinbaseWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, CoinbaseTrade>>;
95101
}

0 commit comments

Comments
 (0)