Skip to content

Commit 8b6f9cd

Browse files
committed
feat:控制台注册中心服务订阅页面集群场景支持查询指定节点下的订阅者信息; #231
1 parent 3b7504a commit 8b6f9cd

14 files changed

Lines changed: 194 additions & 17 deletions

File tree

src/common/model/privilege.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ where
178178
}
179179
}
180180

181-
#[derive(Debug, Clone, Default)]
181+
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
182182
pub struct NamespacePrivilegeGroup(pub PrivilegeGroup<Arc<String>>);
183183

184184
impl NamespacePrivilegeGroup {

src/console/api.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,10 @@ pub fn console_api_config_v2(config: &mut web::ServiceConfig) {
284284
web::resource("/service/list")
285285
.route(web::get().to(v2::naming_api::query_service_list)),
286286
)
287+
.service(
288+
web::resource("/service/subscriber/list")
289+
.route(web::get().to(v2::naming_api::query_subscribers_list)),
290+
)
287291
.service(
288292
web::resource("/service/add").route(web::post().to(v2::naming_api::add_service)),
289293
)

src/console/model/naming_model.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,16 @@ pub struct ServiceKeyParam{
2323
}
2424
*/
2525

26-
#[derive(Debug, Serialize, Deserialize, Default)]
26+
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
2727
#[serde(rename_all = "camelCase")]
2828
pub struct ServiceQueryListRequest {
2929
pub page_no: Option<usize>,
3030
pub page_size: Option<usize>,
3131
pub namespace_id: Option<String>,
3232
pub group_name_param: Option<String>,
3333
pub service_name_param: Option<String>,
34+
/// 节点id,在查询指定节点信息时用到
35+
pub node_id: Option<u64>,
3436
}
3537

3638
impl ServiceQueryListRequest {

src/console/v2/naming_api.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ use crate::console::model::naming_model::{
44
InstanceParams, ServiceDto, ServiceParam, ServiceQueryListRequest,
55
};
66
use crate::console::v2::ERROR_CODE_SYSTEM_ERROR;
7+
use crate::grpc::handler::NAMING_ROUTE_REQUEST;
8+
use crate::grpc::PayloadUtils;
79
use crate::naming::api_model::InstanceVO;
10+
use crate::naming::cluster::model::{NamingRouteRequest, NamingRouterResponse};
811
use crate::naming::core::{NamingActor, NamingCmd, NamingResult};
912
use crate::naming::model::{InstanceUpdateTag, ServiceDetailDto};
13+
use crate::naming::service::SubscriberInfoDto;
14+
use crate::naming::service_index::ServiceQueryParam;
1015
use crate::naming::NamingUtils;
1116
use crate::{user_namespace_privilege, user_no_namespace_permission};
1217
use actix::Addr;
@@ -295,3 +300,59 @@ pub async fn remove_instance(
295300
)),
296301
}
297302
}
303+
304+
pub async fn query_subscribers_list(
305+
app: Data<Arc<AppShareData>>,
306+
req: HttpRequest,
307+
param: web::Query<ServiceQueryListRequest>,
308+
) -> impl Responder {
309+
let node_id = param.node_id.unwrap_or(0);
310+
let service_param = param.0.to_param(&req).unwrap();
311+
if !service_param
312+
.namespace_privilege
313+
.check_option_value_permission(&service_param.namespace_id, true)
314+
{
315+
user_no_namespace_permission!(&service_param.namespace_id);
316+
}
317+
match do_query_subscribers_list(app, service_param, node_id).await {
318+
Ok(res) => HttpResponse::Ok().json(ApiResult::success(Some(res))),
319+
Err(err) => HttpResponse::Ok().json(ApiResult::<()>::error(
320+
ERROR_CODE_SYSTEM_ERROR.to_string(),
321+
Some(err.to_string()),
322+
)),
323+
}
324+
}
325+
326+
async fn do_query_subscribers_list(
327+
app: Data<Arc<AppShareData>>,
328+
param: ServiceQueryParam,
329+
node_id: u64,
330+
) -> anyhow::Result<PageResult<SubscriberInfoDto>> {
331+
if node_id == 0 || node_id == app.sys_config.raft_node_id {
332+
if let NamingResult::ServiceSubscribersPage((total_count, list)) = app
333+
.naming_addr
334+
.send(NamingCmd::QueryServiceSubscribersPageV2(param))
335+
.await??
336+
{
337+
Ok(PageResult { total_count, list })
338+
} else {
339+
Err(anyhow::anyhow!("query subscribers error"))
340+
}
341+
} else {
342+
let addr = app.naming_node_manage.get_node_addr(node_id).await?;
343+
let req = NamingRouteRequest::QueryServiceSubscriberPage(param);
344+
let request = serde_json::to_string(&req).unwrap_or_default();
345+
let payload = PayloadUtils::build_payload(NAMING_ROUTE_REQUEST, request);
346+
let resp_payload = app
347+
.cluster_sender
348+
.send_request(addr.clone(), payload)
349+
.await?;
350+
let body_vec = resp_payload.body.unwrap_or_default().value;
351+
let resp: NamingRouterResponse = serde_json::from_slice(&body_vec)?;
352+
if let NamingRouterResponse::ServiceSubscribersPage((total_count, list)) = resp {
353+
Ok(PageResult { total_count, list })
354+
} else {
355+
Err(anyhow::anyhow!("query subscribers error"))
356+
}
357+
}
358+
}

src/grpc/handler/naming_route.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ impl NamingRouteRequestHandler {
186186
}
187187
args.merge_args(tmp_args);
188188
}
189+
NamingRouteRequest::QueryServiceSubscriberPage(param) => {}
189190
}
190191
Ok(args.to_string())
191192
}

src/naming/cluster/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,15 @@ pub async fn handle_naming_route(
224224
}
225225
}
226226
}
227+
NamingRouteRequest::QueryServiceSubscriberPage(param) => {
228+
let resp = app
229+
.naming_addr
230+
.send(NamingCmd::QueryServiceSubscribersPageV2(param))
231+
.await??;
232+
if let NamingResult::ServiceSubscribersPage((total, list)) = resp {
233+
return Ok(NamingRouterResponse::ServiceSubscribersPage((total, list)));
234+
}
235+
}
227236
};
228237
Ok(NamingRouterResponse::None)
229238
}

src/naming/cluster/model.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::metrics::timeline::model::{TimelineQueryParam, TimelineQueryResponse};
22
use crate::naming::model::{Instance, InstanceKey, InstanceUpdateTag, ServiceDetailDto};
3+
use crate::naming::service::SubscriberInfoDto;
4+
use crate::naming::service_index::ServiceQueryParam;
35
use actix::prelude::*;
46
use serde::{Deserialize, Serialize};
57
use std::collections::{HashMap, HashSet};
@@ -47,6 +49,7 @@ pub enum NamingRouteRequest {
4749
MetricsTimelineQuery(TimelineQueryParam),
4850
SyncDistroClientInstances(HashMap<Arc<String>, HashSet<InstanceKey>>),
4951
QueryDistroInstanceSnapshot(Vec<InstanceKey>),
52+
QueryServiceSubscriberPage(ServiceQueryParam),
5053
}
5154

5255
impl NamingRouteRequest {
@@ -65,6 +68,7 @@ impl NamingRouteRequest {
6568
NamingRouteRequest::MetricsTimelineQuery(_) => "MetricsTimelineQuery",
6669
NamingRouteRequest::SyncDistroClientInstances(_) => "SyncDistroClientInstances",
6770
NamingRouteRequest::QueryDistroInstanceSnapshot(_) => "QueryDistroInstanceSnapshot",
71+
NamingRouteRequest::QueryServiceSubscriberPage(_) => "QueryServiceSubscriberPage",
6872
}
6973
}
7074
}
@@ -73,6 +77,7 @@ impl NamingRouteRequest {
7377
pub enum NamingRouterResponse {
7478
None,
7579
MetricsTimeLineResponse(TimelineQueryResponse),
80+
ServiceSubscribersPage((usize, Vec<SubscriberInfoDto>)),
7681
}
7782

7883
#[derive(Message, Debug, Clone)]

src/naming/core.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,13 @@ use std::collections::BTreeSet;
5050
use std::collections::HashMap;
5151
use std::collections::HashSet;
5252
use std::collections::LinkedList;
53+
use std::default::Default;
5354
use std::net::SocketAddr;
5455
use std::sync::Arc;
5556
use std::time::Duration;
5657

5758
use crate::common::constant::EMPTY_ARC_STRING;
59+
use crate::common::model::privilege::NamespacePrivilegeGroup;
5860
use crate::metrics::metrics_key::MetricsKey;
5961
use crate::metrics::model::{MetricsItem, MetricsQuery, MetricsRecord};
6062
use crate::namespace::NamespaceActor;
@@ -689,6 +691,8 @@ impl NamingActor {
689691
(size, service_names)
690692
}
691693

694+
///过程生成的不必要的临时对象过多,使用v2版本
695+
#[deprecated]
692696
pub fn get_subscribers_list(
693697
&self,
694698
page_size: usize,
@@ -740,6 +744,35 @@ impl NamingActor {
740744
(total, paginated_result)
741745
}
742746

747+
pub fn get_subscribers_list_v2(
748+
&self,
749+
page_size: usize,
750+
page_index: usize,
751+
key: &ServiceKey,
752+
) -> (usize, Vec<SubscriberInfoDto>) {
753+
let offset = if page_index == 0 {
754+
0
755+
} else {
756+
page_size * (page_index - 1)
757+
};
758+
let param = ServiceQueryParam {
759+
offset,
760+
limit: page_size,
761+
namespace_id: Some(key.namespace_id.clone()),
762+
like_group: Some(key.group_name.as_ref().clone()),
763+
like_service: Some(key.service_name.as_ref().clone()),
764+
..Default::default()
765+
};
766+
self.subscriber.query_service_listener_page(&param)
767+
}
768+
769+
pub fn get_subscribers_list_by_param(
770+
&self,
771+
param: ServiceQueryParam,
772+
) -> (usize, Vec<SubscriberInfoDto>) {
773+
self.subscriber.query_service_listener_page(&param)
774+
}
775+
743776
pub fn get_service_info_page(&self, param: ServiceQueryParam) -> (usize, Vec<ServiceInfoDto>) {
744777
let (size, list) = self.namespace_index.query_service_page(&param);
745778

@@ -1026,6 +1059,7 @@ pub enum NamingCmd {
10261059
QueryServiceInfo(ServiceKey, String, bool),
10271060
QueryServicePage(ServiceKey, usize, usize),
10281061
QueryServiceSubscribersPage(ServiceKey, usize, usize),
1062+
QueryServiceSubscribersPageV2(ServiceQueryParam),
10291063
//查询服务实际信息列表
10301064
QueryServiceInfoPage(ServiceQueryParam),
10311065
//CreateService(ServiceDetailDto),
@@ -1056,7 +1090,7 @@ pub enum NamingResult {
10561090
InstanceListString(String),
10571091
ServiceInfo(ServiceInfo),
10581092
ServicePage((usize, Vec<Arc<String>>)),
1059-
ServiceSubscribersPage((usize, Vec<Arc<SubscriberInfoDto>>)),
1093+
ServiceSubscribersPage((usize, Vec<SubscriberInfoDto>)),
10601094
ServiceInfoPage((usize, Vec<ServiceInfoDto>)),
10611095
ClientInstanceCount(Vec<(Arc<String>, usize)>),
10621096
RewriteToCluster(u64, Instance),
@@ -1158,12 +1192,15 @@ impl Handler<NamingCmd> for NamingActor {
11581192
}
11591193
NamingCmd::QueryServiceSubscribersPage(service_key, page_size, page_index) => {
11601194
Ok(NamingResult::ServiceSubscribersPage(
1161-
self.get_subscribers_list(page_size, page_index, &service_key),
1195+
self.get_subscribers_list_v2(page_size, page_index, &service_key),
11621196
))
11631197
}
11641198
NamingCmd::QueryServiceInfoPage(param) => Ok(NamingResult::ServiceInfoPage(
11651199
self.get_service_info_page(param),
11661200
)),
1201+
NamingCmd::QueryServiceSubscribersPageV2(param) => Ok(
1202+
NamingResult::ServiceSubscribersPage(self.get_subscribers_list_by_param(param)),
1203+
),
11671204
NamingCmd::PeekListenerTimeout => {
11681205
self.time_check();
11691206
//self.notify_check();

src/naming/model.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl Default for InstanceUpdateTag {
202202
}
203203
}
204204

205-
#[derive(Debug, Clone, Default, Hash, PartialEq, Eq)]
205+
#[derive(Debug, Clone, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
206206
pub struct ServiceKey {
207207
pub namespace_id: Arc<String>,
208208
pub group_name: Arc<String>,

src/naming/naming_subscriber.rs

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
#![allow(unused_imports)]
22

3-
use std::{
4-
collections::{HashMap, HashSet},
5-
sync::Arc,
6-
};
7-
8-
use actix::prelude::*;
9-
103
use super::{
114
model::ServiceKey,
125
naming_delay_nofity::{DelayNotifyActor, DelayNotifyCmd},
136
};
7+
use crate::common::constant::EMPTY_ARC_STRING;
8+
use crate::naming::service::SubscriberInfoDto;
9+
use crate::naming::service_index::ServiceQueryParam;
10+
use actix::prelude::*;
11+
use std::collections::BTreeMap;
12+
use std::{
13+
collections::{HashMap, HashSet},
14+
sync::Arc,
15+
};
1416

1517
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
1618
pub enum ListenerClusterType {
@@ -40,7 +42,7 @@ pub struct NamingListenerItem {
4042

4143
#[derive(Default)]
4244
pub struct Subscriber {
43-
listener: HashMap<ServiceKey, HashMap<Arc<String>, Option<HashSet<String>>>>,
45+
listener: BTreeMap<ServiceKey, BTreeMap<Arc<String>, Option<HashSet<String>>>>,
4446
client_keys: HashMap<Arc<String>, HashSet<ServiceKey>>,
4547
notify_addr: Option<Addr<DelayNotifyActor>>,
4648
}
@@ -79,7 +81,7 @@ impl Subscriber {
7981
set.insert(client_id.clone(), item.clusters);
8082
}
8183
None => {
82-
let mut set = HashMap::new();
84+
let mut set = BTreeMap::new();
8385
set.insert(client_id.clone(), item.clusters);
8486
self.listener.insert(item.service_key, set);
8587
}
@@ -191,7 +193,7 @@ impl Subscriber {
191193
group_name: &str,
192194
service_name: &str,
193195
namespace_id: &str,
194-
) -> HashMap<ServiceKey, HashMap<Arc<String>, Option<HashSet<String>>>> {
196+
) -> BTreeMap<ServiceKey, BTreeMap<Arc<String>, Option<HashSet<String>>>> {
195197
self.listener
196198
.iter()
197199
.filter(|(key, _)| {
@@ -202,4 +204,47 @@ impl Subscriber {
202204
.map(|(key, value)| (key.clone(), value.clone()))
203205
.collect()
204206
}
207+
208+
pub fn query_service_listener_page(
209+
&self,
210+
param: &ServiceQueryParam,
211+
) -> (usize, Vec<SubscriberInfoDto>) {
212+
let mut rlist = vec![];
213+
let end_index = param.offset + param.limit;
214+
let mut index = 0;
215+
for (key, value) in self.listener.iter() {
216+
if param.match_namespace_id(&key.namespace_id)
217+
&& param.match_group(&key.group_name)
218+
&& param.match_service(&key.service_name)
219+
{
220+
for (ip_port, _) in value.iter() {
221+
if index >= param.offset && index < end_index {
222+
let (ip, port) = Self::parse_ip_port(ip_port.as_ref());
223+
rlist.push(SubscriberInfoDto {
224+
service_name: key.service_name.clone(),
225+
group_name: key.group_name.clone(),
226+
namespace_id: key.namespace_id.clone(),
227+
ip,
228+
port,
229+
});
230+
}
231+
index += 1;
232+
}
233+
}
234+
}
235+
(index, rlist)
236+
}
237+
238+
fn parse_ip_port(ip_port: &str) -> (Arc<String>, u16) {
239+
let parts: Vec<&str> = ip_port.split(':').collect();
240+
let mut ip = EMPTY_ARC_STRING.clone();
241+
let mut port = 0;
242+
if parts.len() == 2 {
243+
ip = Arc::new(parts[0].to_string());
244+
if let Ok(p) = parts[1].parse::<u16>() {
245+
port = p;
246+
}
247+
}
248+
(ip, port)
249+
}
205250
}

0 commit comments

Comments
 (0)