Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions crates/adapters/databento/src/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use databento::{
use indexmap::IndexMap;
use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
use nautilus_model::{
data::{Bar, Data, InstrumentStatus, QuoteTick, TradeTick},
data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
enums::BarAggregation,
identifiers::{InstrumentId, Symbol, Venue},
instruments::InstrumentAny,
Expand All @@ -42,8 +42,8 @@ use tokio::sync::Mutex;
use crate::{
common::get_date_time_range,
decode::{
decode_imbalance_msg, decode_instrument_def_msg, decode_record, decode_statistics_msg,
decode_status_msg,
decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
decode_statistics_msg, decode_status_msg,
},
symbology::{
MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
Expand Down Expand Up @@ -313,6 +313,82 @@ impl DatabentoHistoricalClient {
Ok(result)
}

/// Fetches order book depth10 snapshots for the given parameters.
///
/// # Errors
///
/// Returns an error if the API request or data processing fails.
pub async fn get_range_order_book_depth10(
&self,
params: RangeQueryParams,
depth: Option<usize>,
) -> anyhow::Result<Vec<OrderBookDepth10>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;

let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;

// For now, only support MBP_10 schema for depth 10
let _depth = depth.unwrap_or(10);
if _depth != 10 {
anyhow::bail!("Only depth=10 is currently supported for order book depths");
}

let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Mbp10)
.limit(params.limit.and_then(NonZeroU64::new))
.build();

let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);

let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;

let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<OrderBookDepth10> = Vec::new();

let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
let sym_map = self
.symbol_venue_map
.read()
.map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;

if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
result.push(depth);
}

Ok(())
};

while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
process_record(dbn::RecordRef::from(msg))?;
}

Ok(result)
}

/// Fetches trade ticks for the given parameters.
///
/// # Errors
Expand Down
35 changes: 35 additions & 0 deletions crates/adapters/databento/src/python/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,41 @@ impl DatabentoHistoricalClient {
})
}

#[pyo3(name = "get_order_book_depth10")]
#[pyo3(signature = (dataset, instrument_ids, start, end=None, depth=None))]
#[allow(clippy::too_many_arguments)]
fn py_get_order_book_depth10<'py>(
&self,
py: Python<'py>,
dataset: String,
instrument_ids: Vec<InstrumentId>,
start: u64,
end: Option<u64>,
depth: Option<usize>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let symbols = inner
.prepare_symbols_from_instrument_ids(&instrument_ids)
.map_err(to_pyvalue_err)?;

let params = RangeQueryParams {
dataset,
symbols,
start: start.into(),
end: end.map(Into::into),
limit: None,
price_precision: None,
};

pyo3_async_runtimes::tokio::future_into_py(py, async move {
let depths = inner
.get_range_order_book_depth10(params, depth)
.await
.map_err(to_pyvalue_err)?;
Python::attach(|py| depths.into_py_any(py))
})
}

#[pyo3(name = "get_range_imbalance")]
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
#[allow(clippy::too_many_arguments)]
Expand Down
7 changes: 6 additions & 1 deletion crates/common/src/messages/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod unsubscribe;
// Re-exports
pub use request::{
RequestBars, RequestBookSnapshot, RequestCustomData, RequestInstrument, RequestInstruments,
RequestQuotes, RequestTrades,
RequestOrderBookDepth, RequestQuotes, RequestTrades,
};
pub use response::{
BarsResponse, BookResponse, CustomDataResponse, InstrumentResponse, InstrumentsResponse,
Expand Down Expand Up @@ -299,6 +299,7 @@ pub enum RequestCommand {
Instrument(RequestInstrument),
Instruments(RequestInstruments),
BookSnapshot(RequestBookSnapshot),
OrderBookDepth(RequestOrderBookDepth),
Quotes(RequestQuotes),
Trades(RequestTrades),
Bars(RequestBars),
Expand All @@ -322,6 +323,7 @@ impl RequestCommand {
Self::Instrument(cmd) => &cmd.request_id,
Self::Instruments(cmd) => &cmd.request_id,
Self::BookSnapshot(cmd) => &cmd.request_id,
Self::OrderBookDepth(cmd) => &cmd.request_id,
Self::Quotes(cmd) => &cmd.request_id,
Self::Trades(cmd) => &cmd.request_id,
Self::Bars(cmd) => &cmd.request_id,
Expand All @@ -334,6 +336,7 @@ impl RequestCommand {
Self::Instrument(cmd) => cmd.client_id.as_ref(),
Self::Instruments(cmd) => cmd.client_id.as_ref(),
Self::BookSnapshot(cmd) => cmd.client_id.as_ref(),
Self::OrderBookDepth(cmd) => cmd.client_id.as_ref(),
Self::Quotes(cmd) => cmd.client_id.as_ref(),
Self::Trades(cmd) => cmd.client_id.as_ref(),
Self::Bars(cmd) => cmd.client_id.as_ref(),
Expand All @@ -346,6 +349,7 @@ impl RequestCommand {
Self::Instrument(cmd) => Some(&cmd.instrument_id.venue),
Self::Instruments(cmd) => cmd.venue.as_ref(),
Self::BookSnapshot(cmd) => Some(&cmd.instrument_id.venue),
Self::OrderBookDepth(cmd) => Some(&cmd.instrument_id.venue),
Self::Quotes(cmd) => Some(&cmd.instrument_id.venue),
Self::Trades(cmd) => Some(&cmd.instrument_id.venue),
// TODO: Extract the below somewhere
Expand All @@ -362,6 +366,7 @@ impl RequestCommand {
Self::Instrument(cmd) => cmd.ts_init,
Self::Instruments(cmd) => cmd.ts_init,
Self::BookSnapshot(cmd) => cmd.ts_init,
Self::OrderBookDepth(cmd) => cmd.ts_init,
Self::Quotes(cmd) => cmd.ts_init,
Self::Trades(cmd) => cmd.ts_init,
Self::Bars(cmd) => cmd.ts_init,
Expand Down
41 changes: 41 additions & 0 deletions crates/common/src/messages/data/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,47 @@ impl RequestTrades {
}
}

#[derive(Clone, Debug)]
pub struct RequestOrderBookDepth {
pub instrument_id: InstrumentId,
pub start: Option<DateTime<Utc>>,
pub end: Option<DateTime<Utc>>,
pub limit: Option<NonZeroUsize>,
pub depth: Option<NonZeroUsize>,
pub client_id: Option<ClientId>,
pub request_id: UUID4,
pub ts_init: UnixNanos,
pub params: Option<IndexMap<String, String>>,
}

impl RequestOrderBookDepth {
/// Creates a new [`RequestOrderBookDepth`] instance.
#[allow(clippy::too_many_arguments)]
pub fn new(
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
depth: Option<NonZeroUsize>,
client_id: Option<ClientId>,
request_id: UUID4,
ts_init: UnixNanos,
params: Option<IndexMap<String, String>>,
) -> Self {
Self {
instrument_id,
start,
end,
limit,
depth,
client_id,
request_id,
ts_init,
params,
}
}
}

#[derive(Clone, Debug)]
pub struct RequestBars {
pub bar_type: BarType,
Expand Down
36 changes: 28 additions & 8 deletions crates/data/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ use std::{
use ahash::AHashSet;
use nautilus_common::messages::data::{
RequestBars, RequestBookSnapshot, RequestCustomData, RequestInstrument, RequestInstruments,
RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars,
UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand,
UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
RequestOrderBookDepth, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
UnsubscribeTrades,
};
#[cfg(feature = "defi")]
use nautilus_common::messages::defi::{
Expand Down Expand Up @@ -558,6 +559,16 @@ pub trait DataClient: Any + Sync + Send {
log_not_implemented(&request);
Ok(())
}

/// Requests historical order book depth data for a specified instrument.
///
/// # Errors
///
/// Returns an error if the order book depths request fails.
fn request_order_book_depth(&self, request: &RequestOrderBookDepth) -> anyhow::Result<()> {
log_not_implemented(&request);
Ok(())
}
}

/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
Expand Down Expand Up @@ -1345,6 +1356,15 @@ impl DataClientAdapter {
pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
self.client.request_bars(req)
}

/// Sends an order book depths request for a given instrument.
///
/// # Errors
///
/// Returns an error if the client fails to process the order book depths request.
pub fn request_order_book_depth(&self, req: &RequestOrderBookDepth) -> anyhow::Result<()> {
self.client.request_order_book_depth(req)
}
}

#[inline(always)]
Expand Down
1 change: 1 addition & 0 deletions crates/data/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ impl DataEngine {
RequestCommand::Instrument(req) => client.request_instrument(req),
RequestCommand::Instruments(req) => client.request_instruments(req),
RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
RequestCommand::OrderBookDepth(req) => client.request_order_book_depth(req),
RequestCommand::Quotes(req) => client.request_quotes(req),
RequestCommand::Trades(req) => client.request_trades(req),
RequestCommand::Bars(req) => client.request_bars(req),
Expand Down
40 changes: 40 additions & 0 deletions crates/data/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use nautilus_common::{
RequestCustomData,
RequestInstrument,
RequestInstruments,
RequestOrderBookDepth,
RequestQuotes,
RequestTrades,
// Subscription commands
Expand Down Expand Up @@ -1728,6 +1729,45 @@ fn test_request_bars(
assert_eq!(rec[0], DataCommand::Request(RequestCommand::Bars(req)));
}

#[rstest]
fn test_request_order_book_depth(
clock: Rc<RefCell<TestClock>>,
cache: Rc<RefCell<Cache>>,
client_id: ClientId,
venue: Venue,
) {
let recorder = Rc::new(RefCell::new(Vec::<DataCommand>::new()));
let client = Box::new(MockDataClient::new_with_recorder(
clock,
cache,
client_id,
Some(venue),
Some(recorder.clone()),
));
let adapter = DataClientAdapter::new(client_id, Some(venue), false, false, client);

let inst_id = audusd_sim().id;
let req = RequestOrderBookDepth::new(
inst_id,
None,
None,
None,
Some(NonZeroUsize::new(10).unwrap()),
Some(client_id),
UUID4::new(),
UnixNanos::default(),
None,
);
adapter.request_order_book_depth(&req).unwrap();

let rec = recorder.borrow();
assert_eq!(rec.len(), 1);
assert_eq!(
rec[0],
DataCommand::Request(RequestCommand::OrderBookDepth(req))
);
}

// ------------------------------------------------------------------------------------------------
// DeFi subscription tests
// ------------------------------------------------------------------------------------------------
Expand Down
22 changes: 16 additions & 6 deletions crates/data/tests/common/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ use nautilus_common::{
clock::Clock,
messages::data::{
DataCommand, RequestBars, RequestBookSnapshot, RequestCommand, RequestCustomData,
RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand,
SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars,
UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
RequestInstrument, RequestInstruments, RequestOrderBookDepth, RequestQuotes, RequestTrades,
SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
SubscribeCommand, SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices,
SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
Expand Down Expand Up @@ -577,6 +577,16 @@ impl DataClient for MockDataClient {
}
Ok(())
}

fn request_order_book_depth(&self, request: &RequestOrderBookDepth) -> anyhow::Result<()> {
if let Some(rec) = &self.recorder {
rec.borrow_mut()
.push(DataCommand::Request(RequestCommand::OrderBookDepth(
request.clone(),
)));
}
Ok(())
}
}

// SAFETY: Cannot be sent across thread boundaries
Expand Down
Loading
Loading