Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions agent/crates/enterprise-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,24 @@ pub mod l7 {
}
}
}

pub mod dameng {
use public::l7_protocol::{L7LogBase, LogMessageType};

pub struct DamengParseConfig;

pub struct DamengParser;

impl DamengParser {
pub fn check_payload(_: &[u8], _: &DamengParseConfig) -> Option<LogMessageType> {
unimplemented!()
}

pub fn parse_payload(_: &[u8], _: &DamengParseConfig) -> Vec<L7LogBase> {
unimplemented!()
}
}
}
}

pub mod mq {
Expand Down
2 changes: 2 additions & 0 deletions agent/crates/public/src/l7_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub enum L7Protocol {
MySQL = 60,
PostgreSQL = 61,
Oracle = 62,
Dameng = 63,

// NoSQL
Redis = 80,
Expand Down Expand Up @@ -145,6 +146,7 @@ impl From<String> for L7Protocol {
"webspheremq" => Self::WebSphereMq,
"dns" => Self::DNS,
"oracle" => Self::Oracle,
"dameng" => Self::Dameng,
"iso8583" | "iso-8583" => Self::Iso8583,
"triple" => Self::Triple,
"tls" => Self::TLS,
Expand Down
1 change: 1 addition & 0 deletions agent/src/common/l7_protocol_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ cfg_if::cfg_if! {
PostgreInfo(PostgreInfo),
OpenWireInfo(OpenWireInfo),
OracleInfo(crate::flow_generator::protocol_logs::OracleInfo),
DamengInfo(crate::flow_generator::protocol_logs::DamengInfo),
SofaRpcInfo(SofaRpcInfo),
TlsInfo(crate::flow_generator::protocol_logs::TlsInfo),
SomeIpInfo(crate::flow_generator::protocol_logs::SomeIpInfo),
Expand Down
1 change: 1 addition & 0 deletions agent/src/common/l7_protocol_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ cfg_if::cfg_if! {
Brpc(BrpcLog),
Tars(TarsLog),
Oracle(crate::flow_generator::protocol_logs::OracleLog),
Dameng(crate::flow_generator::protocol_logs::DamengLog),
Iso8583(crate::flow_generator::protocol_logs::Iso8583Log),
NetSign(crate::flow_generator::protocol_logs::NetSignLog),
MQTT(MqttLog),
Expand Down
20 changes: 20 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2064,6 +2064,7 @@ impl Default for Filters {
("MySQL".to_string(), "1-65535".to_string()),
("PostgreSQL".to_string(), "1-65535".to_string()),
("Oracle".to_string(), "1521".to_string()),
("Dameng".to_string(), "5236".to_string()),
("Redis".to_string(), "1-65535".to_string()),
("MongoDB".to_string(), "1-65535".to_string()),
("Memcached".to_string(), "11211".to_string()),
Expand Down Expand Up @@ -2097,6 +2098,7 @@ impl Default for Filters {
("MySQL".to_string(), vec![]),
("PostgreSQL".to_string(), vec![]),
("Oracle".to_string(), vec![]),
("Dameng".to_string(), vec![]),
("Redis".to_string(), vec![]),
("MongoDB".to_string(), vec![]),
("Memcached".to_string(), vec![]),
Expand Down Expand Up @@ -3145,6 +3147,7 @@ impl UserConfig {
const DEFAULT_DNS_PORTS: &'static str = "53,5353";
const DEFAULT_TLS_PORTS: &'static str = "443,6443";
const DEFAULT_ORACLE_PORTS: &'static str = "1521";
const DEFAULT_DAMENG_PORTS: &'static str = "5236";
const DEFAULT_MEMCACHED_PORTS: &'static str = "11211";
const PACKET_FANOUT_MODE_MAX: u32 = 7;

Expand Down Expand Up @@ -3225,6 +3228,23 @@ impl UserConfig {
Self::DEFAULT_ORACLE_PORTS.to_string(),
);
}
let dameng_str = L7ProtocolParser::Dameng(
crate::flow_generator::protocol_logs::DamengLog::default(),
)
.as_str();
// dameng default only parse 5236 port. when l7_protocol_ports config without DAMENG, need to reserve the dameng default config.
if !self
.processors
.request_log
.filters
.port_number_prefilters
.contains_key(dameng_str)
{
new.insert(
dameng_str.to_string(),
Self::DEFAULT_DAMENG_PORTS.to_string(),
);
}
}
let memcached_str = L7ProtocolParser::Memcached(MemcachedLog::default()).as_str();
// memcached default only parse 11211 port. when l7_protocol_ports config without MEMCACHED, need to reserve the memcached default config.
Expand Down
1 change: 1 addition & 0 deletions agent/src/ebpf/kernel/include/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ enum traffic_protocol {
PROTO_MYSQL = 60,
PROTO_POSTGRESQL = 61,
PROTO_ORACLE = 62,
PROTO_DAMENG = 63,
PROTO_REDIS = 80,
PROTO_MONGO = 81,
PROTO_MEMCACHED = 82,
Expand Down
103 changes: 103 additions & 0 deletions agent/src/ebpf/kernel/include/protocol_inference.h
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,92 @@ static __inline enum message_type infer_oracle_tns_message(const char *buf,
}
}

#define DAMENG_HEADER_LEN 64
#define DAMENG_HEADER_MSG_TYPE_OFFSET 4
#define DAMENG_HEADER_BODY_LEN_OFFSET 6
#define DAMENG_HEADER_RET_CODE_OFFSET 10
#define DAMENG_INFER_HEADER_LEN 14

static __inline __u16 read_u16_le(const char *p)
{
return (__u16)(__u8)p[0] | ((__u16)(__u8)p[1] << 8);
}

static __inline __u32 read_u32_le(const char *p)
{
return (__u32)(__u8)p[0] | ((__u32)(__u8)p[1] << 8)
| ((__u32)(__u8)p[2] << 16) | ((__u32)(__u8)p[3] << 24);
}

static __inline __s32 read_s32_le(const char *p)
{
return (__s32)read_u32_le(p);
}

static __inline bool dameng_is_valid_response_ret_code(__s32 ret_code)
{
/*
* Source: DM Programmer Appendix "DM服务器错误码汇编"
* warning: (520, 0) => 1..=519
* errors: (-1, -14999)
*/
return ret_code == 0 || (ret_code >= 1 && ret_code <= 519)
|| (ret_code >= -14999 && ret_code <= -1);
}

static __inline enum message_type infer_dameng_msg_type(__u16 msg_type,
__s32 ret_code)
{
switch (msg_type) {
case 0x00:
case 0xbb:
if (dameng_is_valid_response_ret_code(ret_code))
return MSG_RESPONSE;
return MSG_UNKNOWN;
case 0x03:
case 0x04:
case 0x05:
case 0x07:
case 0x08:
case 0x0d:
case 0x1d:
case 0x20:
case 0x2c:
case 0xc8:
if (ret_code == 0)
return MSG_REQUEST;
return MSG_UNKNOWN;
default:
return MSG_UNKNOWN;
}
}

static __inline enum message_type infer_dameng_message(const char *buf,
size_t count,
struct conn_info_s *conn_info)
{
(void)buf;
(void)count;

if (!protocol_port_check_2(PROTO_DAMENG, conn_info))
return MSG_UNKNOWN;
if (conn_info->tuple.l4_protocol != IPPROTO_TCP)
return MSG_UNKNOWN;

if (is_infer_socket_valid(conn_info->socket_info_ptr)) {
if (conn_info->socket_info_ptr->l7_proto != PROTO_DAMENG)
return MSG_UNKNOWN;
}

if (count < DAMENG_INFER_HEADER_LEN)
return MSG_UNKNOWN;

__u16 msg_type = read_u16_le(buf + DAMENG_HEADER_MSG_TYPE_OFFSET);
__s32 ret_code = read_s32_le(buf + DAMENG_HEADER_RET_CODE_OFFSET);

return infer_dameng_msg_type(msg_type, ret_code);
}

// https://en.wikipedia.org/wiki/ISO_8583
static __inline enum message_type infer_iso8583_message(const char *buf,
size_t count,
Expand Down Expand Up @@ -4249,6 +4335,15 @@ infer_protocol_2(const char *infer_buf, size_t count,
count,
conn_info)) != MSG_UNKNOWN) {
inferred_message.protocol = PROTO_ORACLE;
#if defined(LINUX_VER_KFUNC) || defined(LINUX_VER_5_2_PLUS)
} else if (skip_proto != PROTO_DAMENG && (inferred_message.type =
#else
} else if ((inferred_message.type =
#endif
infer_dameng_message(infer_buf,
count,
conn_info)) != MSG_UNKNOWN) {
inferred_message.protocol = PROTO_DAMENG;
#if defined(LINUX_VER_KFUNC) || defined(LINUX_VER_5_2_PLUS)
} else if (skip_proto != PROTO_ISO8583 && (inferred_message.type =
#else
Expand Down Expand Up @@ -4605,6 +4700,14 @@ infer_protocol_1(struct ctx_info_s *ctx,
return inferred_message;
}
break;
case PROTO_DAMENG:
if ((inferred_message.type =
infer_dameng_message(infer_buf, count,
conn_info)) != MSG_UNKNOWN) {
inferred_message.protocol = PROTO_DAMENG;
return inferred_message;
}
break;
case PROTO_ISO8583:
if ((inferred_message.type =
infer_iso8583_message(infer_buf, count,
Expand Down
1 change: 1 addition & 0 deletions agent/src/ebpf/user/ctrl_tracer.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ static void datadump_help(void)
fprintf(stderr, " 60: PROTO_MYSQL\n");
fprintf(stderr, " 61: PROTO_POSTGRESQL\n");
fprintf(stderr, " 62: PROTO_ORACLE\n");
fprintf(stderr, " 63: PROTO_DAMENG\n");
fprintf(stderr, " 80: PROTO_REDIS\n");
fprintf(stderr, " 81: PROTO_MONGO\n");
fprintf(stderr, " 82: PROTO_MEMCACHED\n");
Expand Down
2 changes: 2 additions & 0 deletions agent/src/ebpf/user/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ static inline char *get_proto_name(uint16_t proto_id)
return "PgSQL";
case PROTO_ORACLE:
return "Oracle";
case PROTO_DAMENG:
return "Dameng";
case PROTO_FASTCGI:
return "FastCGI";
case PROTO_BRPC:
Expand Down
2 changes: 1 addition & 1 deletion agent/src/flow_generator/protocol_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ cfg_if::cfg_if! {

pub use mq::{WebSphereMqInfo, WebSphereMqLog};
pub use rpc::{Iso8583Info, Iso8583Log, NetSignInfo, NetSignLog, SomeIpInfo, SomeIpLog};
pub use sql::{OracleInfo, OracleLog};
pub use sql::{DamengInfo, DamengLog, OracleInfo, OracleLog};
pub use tls::{TlsInfo, TlsLog};
}
}
Expand Down
Loading
Loading