Skip to content

Commit f24e928

Browse files
committed
feat: 优化gRPC命名空间处理逻辑,增强默认命名空间兼容性
1 parent 2730040 commit f24e928

3 files changed

Lines changed: 81 additions & 4 deletions

File tree

src/grpc/bistream_conn.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,50 @@ use super::{api_model::ClientDetectionRequest, nacos_proto::Payload};
1313
type SenderType = tokio::sync::mpsc::Sender<Result<Payload, tonic::Status>>;
1414
type ReceiverStreamType = tonic::Streaming<Payload>;
1515

16+
#[derive(Debug)]
17+
pub enum NamespaceType {
18+
Empty,
19+
Public,
20+
Other(Arc<String>),
21+
Unknown,
22+
}
23+
24+
impl NamespaceType {
25+
pub fn from_option(namespace: Option<String>) -> Self {
26+
if let Some(namespace) = namespace {
27+
NamespaceType::get_namespace_type(&namespace)
28+
} else {
29+
NamespaceType::Unknown
30+
}
31+
}
32+
pub fn get_namespace_type(namespace: &str) -> Self {
33+
if namespace.is_empty() {
34+
NamespaceType::Empty
35+
} else if namespace == "public" {
36+
NamespaceType::Public
37+
} else {
38+
NamespaceType::Other(Arc::new(namespace.to_owned()))
39+
}
40+
}
41+
42+
pub fn is_default(&self) -> bool {
43+
match self {
44+
NamespaceType::Empty => true,
45+
NamespaceType::Public => true,
46+
_ => false,
47+
}
48+
}
49+
50+
pub fn to_str(&self) -> &str {
51+
match self {
52+
NamespaceType::Empty => "",
53+
NamespaceType::Public => "public",
54+
NamespaceType::Other(namespace) => namespace.as_str(),
55+
NamespaceType::Unknown => "",
56+
}
57+
}
58+
}
59+
1660
pub struct BiStreamConn {
1761
sender: SenderType,
1862
client_id: Arc<String>,

src/grpc/bistream_manage.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ use super::{
2222
};
2323
use crate::common::constant::EMPTY_CLIENT_VERSION;
2424
use crate::common::model::ClientVersion;
25+
use crate::config::ConfigUtils;
2526
use crate::grpc::api_model::ConnectionSetupRequest;
27+
use crate::grpc::bistream_conn::NamespaceType;
28+
use crate::namespace::DEFAULT_NAMESPACE;
2629
use actix::prelude::*;
2730
use bean_factory::{bean, Inject};
2831
use inner_mem_cache::TimeoutSet;
@@ -31,6 +34,7 @@ pub(crate) struct ConnCacheItem {
3134
last_active_time: u64,
3235
conn: Addr<BiStreamConn>,
3336
pub(crate) client_version: Arc<ClientVersion>,
37+
pub(crate) namespace: NamespaceType,
3438
}
3539

3640
impl ConnCacheItem {
@@ -39,6 +43,7 @@ impl ConnCacheItem {
3943
last_active_time,
4044
conn,
4145
client_version: EMPTY_CLIENT_VERSION.clone(),
46+
namespace: NamespaceType::Unknown,
4247
}
4348
}
4449
}
@@ -236,6 +241,7 @@ impl Handler<BiStreamManageCmd> for BiStreamManage {
236241
item.client_version =
237242
Arc::new(ClientVersion::from_string(&client_version));
238243
}
244+
item.namespace = NamespaceType::from_option(request.tenant);
239245
}
240246
}
241247
self.active_client(client_id).ok();
@@ -260,7 +266,8 @@ impl Handler<BiStreamManageCmd> for BiStreamManage {
260266
return Ok(BiStreamManageResult::ClientInfo(client_version));
261267
}
262268
BiStreamManageCmd::NotifyConfig(config_key, client_id_set) => {
263-
let request = ConfigChangeNotifyRequest {
269+
let tenant = config_key.tenant.clone();
270+
let mut request = ConfigChangeNotifyRequest {
264271
group: config_key.group,
265272
data_id: config_key.data_id,
266273
tenant: config_key.tenant,
@@ -270,10 +277,27 @@ impl Handler<BiStreamManageCmd> for BiStreamManage {
270277
};
271278
let payload = Arc::new(PayloadUtils::build_payload(
272279
"ConfigChangeNotifyRequest",
273-
serde_json::to_string(&request).unwrap(),
280+
serde_json::to_string(&request)?,
274281
));
282+
let mut other_default_payload = None;
275283
for item in &client_id_set {
276284
if let Some(item) = self.conn_cache.get(item) {
285+
// 默认命名空间有两个值,需要把通知的命令空间值调整为监听的值,以兼容不同版本的客户端
286+
if item.namespace.is_default() && item.namespace.to_str() != tenant.as_str()
287+
{
288+
if other_default_payload.is_none() {
289+
request.tenant = Arc::new(item.namespace.to_str().to_string());
290+
other_default_payload =
291+
Some(Arc::new(PayloadUtils::build_payload(
292+
"ConfigChangeNotifyRequest",
293+
serde_json::to_string(&request)?,
294+
)));
295+
}
296+
if let Some(payload) = other_default_payload.as_ref() {
297+
item.conn.do_send(BiStreamSenderCmd::Send(payload.clone()));
298+
continue;
299+
}
300+
}
277301
item.conn.do_send(BiStreamSenderCmd::Send(payload.clone()));
278302
}
279303
}

src/grpc/handler/config_change_batch_listen.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
use std::sync::Arc;
44

5-
use crate::config::ConfigUtils;
5+
use crate::config::{ConfigUtils, DEFAULT_TENANT};
66
use crate::grpc::HandlerResult;
77
use crate::{
88
common::appdata::AppShareData,
@@ -40,7 +40,11 @@ impl PayloadHandler for ConfigChangeBatchListenRequestHandler {
4040
let body_vec = request_payload.body.unwrap_or_default().value;
4141
let request: ConfigBatchListenRequest = serde_json::from_slice(&body_vec)?;
4242
let mut listener_items = vec![];
43+
let mut use_public = false;
4344
for item in request.config_listen_contexts {
45+
if item.tenant == DEFAULT_TENANT {
46+
use_public = true;
47+
}
4448
let key = ConfigKey::new(
4549
&item.data_id,
4650
&item.group,
@@ -65,10 +69,15 @@ impl PayloadHandler for ConfigChangeBatchListenRequestHandler {
6569
ConfigResult::ChangeKey(keys) => {
6670
response.result_code = SUCCESS_CODE;
6771
for key in keys {
72+
let tenant = if key.tenant.is_empty() && use_public {
73+
Arc::new(DEFAULT_TENANT.to_owned())
74+
} else {
75+
key.tenant.clone()
76+
};
6877
let obj = ConfigContext {
6978
data_id: key.data_id,
7079
group: key.group,
71-
tenant: key.tenant,
80+
tenant,
7281
};
7382
response.changed_configs.push(obj);
7483
}

0 commit comments

Comments
 (0)