Skip to content

Commit 3fb0afc

Browse files
authored
feat(stats): Add linked secondary stats service support (#1652)
* Implement linking 2nd stats service * Added max_hops param to prevent request cycles. Readme updated, tests improved. * Improve tests coverage * Fixed format and envs issues * Fix unit test * Fixed path normalization issue * Added format&check just actions * Fixed code review issues * Added strict path builder
1 parent a289ea3 commit 3fb0afc

10 files changed

Lines changed: 1596 additions & 35 deletions

File tree

stats/README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,15 @@ Some variables are hidden in a disclosure widget below the table.
6868
| Variable | Req​uir​ed | Description | Default value |
6969
| -------------------------------------------------------------------- | ------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------- |
7070
| `STATS__DB_URL` | | Postgres URL to stats db | `` |
71-
| `STATS__MODE` | | Service mode: `blockscout`, `multichain_aggregator`, `zetachain`, or `interchain`. Modes are mutually exclusive. `blockscout` implies no zetachain CCTX and no multichain behavior. | `blockscout` |
72-
| `STATS__MULTICHAIN_FILTER` | | An array of chain IDs used to filter input data in `multichain_aggregator` mode. Specified as a comma-separated list of identifiers without spaces (e.g. `10,130`) | `null` |
73-
| `STATS__INTERCHAIN_PRIMARY_ID` | | If the primary chain is set, send/receive counters and charts will be built around it (interchain mode only) | `null` |
71+
| `STATS__MODE` | | Service mode: `blockscout`, `multichain_aggregator`, `zetachain`, or `interchain`. Modes are mutually exclusive. `blockscout` implies no zetachain CCTX and no multichain behavior. | `blockscout` |
72+
| `STATS__MULTICHAIN_FILTER` | | An array of chain IDs used to filter input data in `multichain_aggregator` mode. Specified as a comma-separated list of identifiers without spaces (e.g. `10,130`) | `null` |
73+
| `STATS__INTERCHAIN_PRIMARY_ID` | | If the primary chain is set, send/receive counters and charts will be built around it (interchain mode only) | `null` |
7474
| `STATS__INDEXER_DB_URL` | | Postgres URL to indexer db; renamed from `*_BLOCKSCOUT_DB_URL` | `null` |
7575
| `STATS__BLOCKSCOUT_DB_URL` | | Postgres URL to blockscout db. Renamed to `*_INDEXER_DB_URL` but left for backwards-compatibility | `null` |
7676
| `STATS__SECOND_INDEXER_DB_URL` | | Postgres URL to second indexer db; i.e. zetachain cctx indexer db. Required if `STATS__MODE=zetachain` | `null` |
77+
| `STATS__LINKED_STATS__BASE_URL` | | Base URL of an optional linked secondary stats service used to fill gaps in read responses. | `null` |
78+
| `STATS__LINKED_STATS__TIMEOUT` | | Timeout for linked secondary requests, in milliseconds | `3000` |
79+
| `STATS__LINKED_STATS__MAX_HOPS` | | Remaining hop budget propagated to linked secondary stats services. Missing incoming hop header starts with this value. Values above the hard cap of `4` are truncated. | `1` |
7780
| `STATS__CREATE_DATABASE` | | Create database on start | `false` |
7881
| `STATS__RUN_MIGRATIONS` | | Run migrations on start | `false` |
7982
| `STATS__CHARTS_CONFIG` | | Path to config file for charts | `config/blockscout_instance/charts.json` |
@@ -100,6 +103,10 @@ Some variables are hidden in a disclosure widget below the table.
100103

101104
[anchor]: <> (anchors.envs.end.service)
102105

106+
##### Chaining services
107+
108+
When `linked_stats` is configured, the intended topology is one primary stats service with at most one linked secondary service. Chaining is technically possible because a secondary service can also be configured with its own `linked_stats`, but this should be avoided unless there is a clear operational reason: it increases latency and complexity, and careless configuration can still create cycles.
109+
103110
##### Conditional start
104111
In order to prevent incorrect statistics from being collected, there is an option to automatically delay chart update. This is controlled by `STATS_CONDITIONAL_START_*` environmental variables.
105112

stats/justfile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,21 @@ start-postgres-and-build-tests:
5252
wait
5353
echo "finished postgres wait"
5454

55+
# apply the basic formatting
56+
format:
57+
if cargo sort --version >/dev/null 2>&1; then cargo sort --workspace; else echo "cargo-sort is not installed, skipping cargo sort"; fi
58+
cargo fmt --all -- --config imports_granularity=Crate
59+
60+
# check and lint the code
61+
check:
62+
cargo check
63+
cargo clippy --all --all-targets --all-features -- -D warnings
64+
65+
# run formatting and checks together
66+
format-check:
67+
just format
68+
just check
69+
5570
check-envs:
5671
cargo run --bin env-docs-generation -- --validate-only
5772

stats/stats-server/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ pub mod auth;
22
pub mod blockscout_waiter;
33
mod config;
44
mod health;
5+
mod linked_stats;
6+
mod linked_stats_merge;
57
mod read_service;
68
mod runtime_setup;
79
mod server;
@@ -13,6 +15,6 @@ pub use config::env as config_env;
1315
pub use read_service::ReadService;
1416
pub use runtime_setup::RuntimeSetup;
1517
pub use server::stats;
16-
pub use settings::{Mode, Settings};
18+
pub use settings::{LinkedStatsSettings, Mode, Settings};
1719
pub use update_service::UpdateService;
1820
pub use update_tracker::InitialUpdateTracker;
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
use reqwest::StatusCode;
2+
use serde::de::DeserializeOwned;
3+
use stats_proto::blockscout::stats::v1 as proto_v1;
4+
use thiserror::Error;
5+
use url::{ParseError, Url};
6+
7+
use crate::settings::LinkedStatsSettings;
8+
9+
pub const LINK_HOP_HEADER: &str = "x-stats-link-hop";
10+
11+
#[derive(Debug, Clone)]
12+
pub struct LinkedStatsClient {
13+
client: reqwest::Client,
14+
base_url: Url,
15+
}
16+
17+
#[derive(Debug, Error)]
18+
pub enum LinkedStatsError {
19+
#[error("linked stats service returned not found")]
20+
NotFound,
21+
#[error("linked stats request failed: {0}")]
22+
Request(#[from] reqwest::Error),
23+
#[error("linked stats returned unexpected status {0}")]
24+
UnexpectedStatus(StatusCode),
25+
#[error("failed to construct linked stats URL: {0}")]
26+
InvalidUrl(#[from] ParseError),
27+
}
28+
29+
impl LinkedStatsClient {
30+
/// Returns `None` when [`LinkedStatsSettings::base_url`] is not set.
31+
pub fn try_new(settings: &LinkedStatsSettings) -> Result<Option<Self>, reqwest::Error> {
32+
let Some(base_url) = settings.base_url.clone() else {
33+
return Ok(None);
34+
};
35+
let base_url = normalize_base_url(base_url);
36+
let client = reqwest::Client::builder()
37+
.timeout(settings.timeout())
38+
.build()?;
39+
Ok(Some(Self { client, base_url }))
40+
}
41+
42+
pub async fn get_counters(&self, hop: u32) -> Result<proto_v1::Counters, LinkedStatsError> {
43+
self.get_json("api/v1/counters", hop).await
44+
}
45+
46+
pub async fn get_line_charts(
47+
&self,
48+
hop: u32,
49+
) -> Result<proto_v1::LineCharts, LinkedStatsError> {
50+
self.get_json("api/v1/lines", hop).await
51+
}
52+
53+
pub async fn get_line_chart(
54+
&self,
55+
request: &proto_v1::GetLineChartRequest,
56+
hop: u32,
57+
) -> Result<proto_v1::LineChart, LinkedStatsError> {
58+
let mut url = self.endpoint_with_path_segments(["api", "v1", "lines", &request.name])?;
59+
{
60+
let mut query = url.query_pairs_mut();
61+
if let Some(from) = request.from.as_deref() {
62+
query.append_pair("from", from);
63+
}
64+
if let Some(to) = request.to.as_deref() {
65+
query.append_pair("to", to);
66+
}
67+
let resolution = request.resolution();
68+
if resolution != proto_v1::Resolution::Unspecified {
69+
query.append_pair("resolution", resolution.as_str_name());
70+
}
71+
}
72+
self.get_json_by_url(url, hop, true).await
73+
}
74+
75+
pub async fn get_main_page_stats(
76+
&self,
77+
hop: u32,
78+
) -> Result<proto_v1::MainPageStats, LinkedStatsError> {
79+
self.get_json("api/v1/pages/main", hop).await
80+
}
81+
82+
pub async fn get_transactions_page_stats(
83+
&self,
84+
hop: u32,
85+
) -> Result<proto_v1::TransactionsPageStats, LinkedStatsError> {
86+
self.get_json("api/v1/pages/transactions", hop).await
87+
}
88+
89+
pub async fn get_contracts_page_stats(
90+
&self,
91+
hop: u32,
92+
) -> Result<proto_v1::ContractsPageStats, LinkedStatsError> {
93+
self.get_json("api/v1/pages/contracts", hop).await
94+
}
95+
96+
pub async fn get_main_page_multichain_stats(
97+
&self,
98+
hop: u32,
99+
) -> Result<proto_v1::MainPageMultichainStats, LinkedStatsError> {
100+
self.get_json("api/v1/pages/multichain/main", hop).await
101+
}
102+
103+
pub async fn get_main_page_interchain_stats(
104+
&self,
105+
hop: u32,
106+
) -> Result<proto_v1::MainPageInterchainStats, LinkedStatsError> {
107+
self.get_json("api/v1/pages/interchain/main", hop).await
108+
}
109+
110+
pub async fn get_update_status(
111+
&self,
112+
hop: u32,
113+
) -> Result<proto_v1::UpdateStatus, LinkedStatsError> {
114+
self.get_json("api/v1/update-status", hop).await
115+
}
116+
117+
fn endpoint(&self, path: &str) -> Result<Url, LinkedStatsError> {
118+
Ok(self.base_url.join(path)?)
119+
}
120+
121+
fn endpoint_with_path_segments<'a>(
122+
&self,
123+
segments: impl IntoIterator<Item = &'a str>,
124+
) -> Result<Url, LinkedStatsError> {
125+
build_url_with_path_segments(self.base_url.clone(), segments)
126+
}
127+
128+
async fn get_json<T: DeserializeOwned>(
129+
&self,
130+
path: &str,
131+
hop: u32,
132+
) -> Result<T, LinkedStatsError> {
133+
let url = self.endpoint(path)?;
134+
self.get_json_by_url(url, hop, false).await
135+
}
136+
137+
async fn get_json_by_url<T: DeserializeOwned>(
138+
&self,
139+
url: Url,
140+
hop: u32,
141+
allow_not_found: bool,
142+
) -> Result<T, LinkedStatsError> {
143+
let response = self
144+
.client
145+
.get(url)
146+
.header(LINK_HOP_HEADER, hop.to_string())
147+
.send()
148+
.await?;
149+
let status = response.status();
150+
if status.is_success() {
151+
return Ok(response.json().await?);
152+
}
153+
if allow_not_found && status == StatusCode::NOT_FOUND {
154+
return Err(LinkedStatsError::NotFound);
155+
}
156+
Err(LinkedStatsError::UnexpectedStatus(status))
157+
}
158+
}
159+
160+
fn normalize_base_url(mut base_url: Url) -> Url {
161+
let trimmed_path = base_url.path().trim_end_matches('/');
162+
let normalized_path = if trimmed_path.is_empty() {
163+
"/".to_string()
164+
} else {
165+
format!("{trimmed_path}/")
166+
};
167+
base_url.set_path(&normalized_path);
168+
base_url
169+
}
170+
171+
fn build_url_with_path_segments<'a>(
172+
mut base_url: Url,
173+
segments: impl IntoIterator<Item = &'a str>,
174+
) -> Result<Url, LinkedStatsError> {
175+
let mut path_segments = base_url
176+
.path_segments_mut()
177+
.map_err(|_| ParseError::RelativeUrlWithoutBase)?;
178+
path_segments.pop_if_empty();
179+
path_segments.extend(segments);
180+
drop(path_segments);
181+
Ok(base_url)
182+
}

0 commit comments

Comments
 (0)