Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agent/plugins/packet_sequence_block/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl PacketSequenceBlock {
unimplemented!();
}

pub fn with_capacity(_: usize, _: u32) -> Self {
unimplemented!();
}

pub fn is_available(&self, _: usize, _: u32) -> bool {
unimplemented!();
}
Expand Down
220 changes: 99 additions & 121 deletions agent/src/flow_generator/flow_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,43 @@ impl stats::Module for AllocatorStats {
}
}

struct BufferedSender<T: std::fmt::Debug> {
queue: DebugSender<T>,

limit: usize,
buffer: Vec<T>,
}

impl<T: std::fmt::Debug> BufferedSender<T> {
fn new(queue: DebugSender<T>, limit: usize) -> Self {
Self {
queue,
limit,
buffer: Vec::with_capacity(limit),
}
}

fn send(&mut self, item: T) {
if self.buffer.len() >= self.limit {
self.flush();
}
self.buffer.push(item);
}

fn flush(&mut self) {
if self.buffer.is_empty() {
return;
}
if let Err(e) = self.queue.send_all(&mut self.buffer) {
warn!(
"FlowMap push to {} queue failed: {:?}",
std::any::type_name::<T>(),
e
);
self.buffer.clear();
}
}
}
// not thread-safe
pub struct FlowMap {
// The original std HashMap uses SipHash-1-3 and is slow.
Expand Down Expand Up @@ -177,18 +214,17 @@ pub struct FlowMap {

tagged_flow_allocator: Allocator<TaggedFlow>,
l7_stats_allocator: Allocator<L7Stats>,
output_queue: Option<DebugSender<Arc<BatchedBox<TaggedFlow>>>>,
l7_stats_output_queue: DebugSender<BatchedBox<L7Stats>>,
out_log_queue: DebugSender<AppProto>,
output_buffer: Vec<Arc<BatchedBox<TaggedFlow>>>,
l7_stats_buffer: Vec<BatchedBox<L7Stats>>,
protolog_buffer: Vec<AppProto>,

tflow_output: Option<BufferedSender<Arc<BatchedBox<TaggedFlow>>>>,
l7_stats_output: BufferedSender<BatchedBox<L7Stats>>,
l7_log_output: BufferedSender<AppProto>,
pseq_output: Option<BufferedSender<Box<PacketSequenceBlock>>>, // Enterprise Edition Feature: packet-sequence

last_queue_flush: Duration,

perf_cache: Rc<RefCell<L7PerfCache>>,
flow_perf_counter: Arc<FlowPerfCounter>,
ntp_diff: Arc<AtomicI64>,
packet_sequence_queue: Option<DebugSender<Box<PacketSequenceBlock>>>, // Enterprise Edition Feature: packet-sequence
packet_sequence_enabled: bool,
stats_counter: Arc<FlowMapCounter>,
system_time: Duration,

Expand Down Expand Up @@ -294,17 +330,19 @@ impl FlowMap {
);
allocator
},
output_queue,
out_log_queue: app_proto_log_queue,
output_buffer: Vec::with_capacity(QUEUE_BATCH_SIZE),
l7_stats_buffer: Vec::with_capacity(QUEUE_BATCH_SIZE),
protolog_buffer: Vec::with_capacity(QUEUE_BATCH_SIZE),
tflow_output: output_queue.map(|q| BufferedSender::new(q, QUEUE_BATCH_SIZE)),
l7_stats_output: BufferedSender::new(l7_stats_output_queue, QUEUE_BATCH_SIZE),
l7_log_output: BufferedSender::new(app_proto_log_queue, QUEUE_BATCH_SIZE),
pseq_output: match packet_sequence_queue {
Some(q) if packet_sequence_enabled => {
Some(BufferedSender::new(q, QUEUE_BATCH_SIZE))
}
_ => None,
},
last_queue_flush: Duration::ZERO,
perf_cache: Rc::new(RefCell::new(L7PerfCache::new(config.capacity as usize))),
flow_perf_counter,
ntp_diff,
packet_sequence_queue, // Enterprise Edition Feature: packet-sequence
packet_sequence_enabled,
stats_counter,
system_time,
l7_protocol_checker: L7ProtocolChecker::new(
Expand All @@ -326,7 +364,6 @@ impl FlowMap {
so_plugin: Default::default(),
tcp_perf_pool: MemoryPool::new(config.memory_pool_size),
flow_node_pool: MemoryPool::new(config.memory_pool_size),
l7_stats_output_queue,
obfuscate_cache: if config.obfuscate_enabled_protocols != L7ProtocolBitmap::default() {
Some(Rc::new(RefCell::new(LruCache::new(
NonZeroUsize::new(OBFUSCATE_CACHE_SIZE).unwrap(),
Expand Down Expand Up @@ -618,13 +655,10 @@ impl FlowMap {
// 未超时Flow的统计信息发送到队列下游
self.node_updated_aftercare(&flow_config, node, timestamp, None);
// Enterprise Edition Feature: packet-sequence
if self.packet_sequence_enabled {
if let Some(q) = self.pseq_output.as_mut() {
if let Some(block) = node.packet_sequence_block.take() {
// flush the packet_sequence_block at the regular time
if let Err(e) = self.packet_sequence_queue.as_ref().unwrap().send(block)
{
warn!("packet sequence block to queue failed, because {:?}", e);
}
q.send(block);
}
}

Expand Down Expand Up @@ -654,8 +688,6 @@ impl FlowMap {
self.start_time_in_unit = next_start_time_in_unit;
self.flush_queue(&flow_config, timestamp);

self.flush_app_protolog();

true
}

Expand Down Expand Up @@ -812,32 +844,31 @@ impl FlowMap {
// rust 版本用了std的hashmap自动处理扩容,所以无需执行policy_gettelr
}

fn append_to_block(&self, config: &FlowConfig, node: &mut FlowNode, meta_packet: &MetaPacket) {
const MINUTE: u64 = 60;
fn append_to_block(
config: &FlowConfig,
output: &mut BufferedSender<Box<PacketSequenceBlock>>,
node: &mut FlowNode,
meta_packet: &MetaPacket,
) {
let packet_sequence_start_time = node.tagged_flow.flow.start_time_in_minute();
if node.packet_sequence_block.is_some() {
if !node.packet_sequence_block.as_ref().unwrap().is_available(
config.packet_sequence_block_size,
packet_sequence_start_time as u32,
) {
match node.packet_sequence_block.as_ref() {
Some(block)
if !block.is_available(
config.packet_sequence_block_size,
packet_sequence_start_time as u32,
) =>
{
// if the packet_sequence_block is no enough to push one more packet, then send it to the queue
if let Err(e) = self
.packet_sequence_queue
.as_ref()
.unwrap()
.send(node.packet_sequence_block.take().unwrap())
{
warn!("packet sequence block to queue failed, because {:?}", e);
}

node.packet_sequence_block = Some(Box::new(PacketSequenceBlock::new(
output.send(node.packet_sequence_block.take().unwrap());
}
_ => (),
}
if node.packet_sequence_block.is_none() {
node.packet_sequence_block
.replace(Box::new(PacketSequenceBlock::with_capacity(
config.packet_sequence_block_size,
packet_sequence_start_time as u32,
)));
}
} else {
node.packet_sequence_block = Some(Box::new(PacketSequenceBlock::new(
packet_sequence_start_time as u32,
)));
}

if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data {
Expand Down Expand Up @@ -914,8 +945,8 @@ impl FlowMap {
}

// Enterprise Edition Feature: packet-sequence
if self.packet_sequence_enabled {
self.append_to_block(flow_config, node, meta_packet);
if let Some(output) = self.pseq_output.as_mut() {
Self::append_to_block(flow_config, output, node, meta_packet);
}

flow_closed
Expand Down Expand Up @@ -1320,7 +1351,7 @@ impl FlowMap {
// tag
(self.policy_getter).lookup(meta_packet, self.id as usize, local_epc_id);
self.init_endpoint_and_policy_data(&mut node, meta_packet);
node.tagged_flow.flow.need_to_store = (self.packet_sequence_enabled
node.tagged_flow.flow.need_to_store = (self.pseq_output.is_some()
&& meta_packet.lookup_key.proto == IpProtocol::TCP)
|| node.contain_pcap_policy();
// Currently, only virtual traffic's tap_side is counted
Expand Down Expand Up @@ -1514,7 +1545,7 @@ impl FlowMap {
node.tagged_flow
.flow
.set_tap_side(config.flow.agent_type, config.flow.cloud_gateway_traffic);
node.tagged_flow.flow.need_to_store = (self.packet_sequence_enabled
node.tagged_flow.flow.need_to_store = (self.pseq_output.is_some()
&& meta_packet.lookup_key.proto == IpProtocol::TCP)
|| node.contain_pcap_policy();
} else {
Expand Down Expand Up @@ -1826,8 +1857,8 @@ impl FlowMap {
}

// Enterprise Edition Feature: packet-sequence
if self.packet_sequence_enabled {
self.append_to_block(flow_config, &mut node, meta_packet);
if let Some(output) = self.pseq_output.as_mut() {
Self::append_to_block(flow_config, output, &mut node, meta_packet);
}
node
}
Expand Down Expand Up @@ -1913,60 +1944,26 @@ impl FlowMap {

fn flush_queue(&mut self, config: &FlowConfig, now: Duration) {
if now > config.flush_interval + self.last_queue_flush {
if self.l7_stats_buffer.len() > 0 {
if let Err(e) = self
.l7_stats_output_queue
.send_all(&mut self.l7_stats_buffer)
{
warn!("flow-map push l7 stats to queue failed, because {:?}", e);
self.l7_stats_buffer.clear();
}
self.l7_stats_output.flush();
if let Some(o) = self.tflow_output.as_mut() {
o.flush();
}
if self.output_queue.is_some() && self.output_buffer.len() > 0 {
if let Err(e) = self
.output_queue
.as_ref()
.unwrap()
.send_all(&mut self.output_buffer)
{
warn!(
"flow-map push tagged flows to queue failed, because {:?}",
e
);
self.output_buffer.clear();
}
self.l7_log_output.flush();
if let Some(o) = self.pseq_output.as_mut() {
o.flush();
}
self.last_queue_flush = now;
}
}

fn push_to_flow_stats_queue(&mut self, tagged_flow: Arc<BatchedBox<TaggedFlow>>) {
if self.l7_stats_buffer.len() >= QUEUE_BATCH_SIZE {
if let Err(e) = self
.l7_stats_output_queue
.send_all(&mut self.l7_stats_buffer)
{
warn!("flow-map push l7 stats to queue failed, because {:?}", e);
self.l7_stats_buffer.clear();
}
}

self.stats_counter.flow_delay.fetch_max(
self.system_time.as_nanos() as i64 - tagged_flow.flow.flow_stat_time.as_nanos() as i64,
Ordering::Relaxed,
);

if let Some(output_queue) = &mut self.output_queue {
self.output_buffer.push(tagged_flow);
if self.output_buffer.len() >= QUEUE_BATCH_SIZE {
if let Err(e) = output_queue.send_all(&mut self.output_buffer) {
warn!(
"flow-map push tagged flows to queue failed, because {:?}",
e
);
self.output_buffer.clear();
}
}
if let Some(output) = self.tflow_output.as_mut() {
output.send(tagged_flow);
}
}

Expand Down Expand Up @@ -1994,8 +1991,8 @@ impl FlowMap {
} else {
l7_stats.flow = None;
}
self.l7_stats_buffer
.push(self.l7_stats_allocator.allocate_one_with(l7_stats));
self.l7_stats_output
.send(self.l7_stats_allocator.allocate_one_with(l7_stats));
}
}
}
Expand Down Expand Up @@ -2042,10 +2039,10 @@ impl FlowMap {
);

// Enterprise Edition Feature: packet-sequence
if self.packet_sequence_enabled && flow.flow_key.proto == IpProtocol::TCP {
if let Some(block) = node.packet_sequence_block.take() {
if let Err(e) = self.packet_sequence_queue.as_ref().unwrap().send(block) {
warn!("packet sequence block to queue failed, because {:?}", e);
if flow.flow_key.proto == IpProtocol::TCP {
if let Some(output) = self.pseq_output.as_mut() {
if let Some(block) = node.packet_sequence_block.take() {
output.send(block);
}
}
}
Expand Down Expand Up @@ -2143,11 +2140,7 @@ impl FlowMap {
None,
node.last_cap_seq,
);
self.protolog_buffer
.push(AppProto::SocketClosed(session_key));
if self.protolog_buffer.len() >= QUEUE_BATCH_SIZE {
self.flush_app_protolog();
}
self.l7_log_output.send(AppProto::SocketClosed(session_key));
}
}
_ => {}
Expand All @@ -2174,23 +2167,8 @@ impl FlowMap {
if let Some(app_proto) =
MetaAppProto::new(&node.tagged_flow, meta_packet, l7_info, head)
{
self.protolog_buffer
.push(AppProto::MetaAppProto(Box::new(app_proto)));
if self.protolog_buffer.len() >= QUEUE_BATCH_SIZE {
self.flush_app_protolog();
}
}
}
}

fn flush_app_protolog(&mut self) {
if self.protolog_buffer.len() > 0 {
if let Err(e) = self.out_log_queue.send_all(&mut self.protolog_buffer) {
warn!(
"flow-map push MetaAppProto to queue failed, because {:?}",
e
);
self.protolog_buffer.clear();
self.l7_log_output
.send(AppProto::MetaAppProto(Box::new(app_proto)));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions agent/src/rpc/remote_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl Interior {
let mut stream = match client.remote_execute(responser).await {
Ok(stream) => stream,
Err(e) => {
warn!("remote_execute failed: {:?}", e);
warn!("calling server remote_execute rpc failed: {:?}", e);
self.exc.set(pb::Exception::ControllerSocketError);
tokio::time::sleep(RPC_RETRY_INTERVAL).await;
continue;
Expand All @@ -273,7 +273,7 @@ impl Interior {
break;
}
Err(e) => {
warn!("remote_execute failed: {:?}", e);
warn!("receiving server remote_execute rpc has error: {:?}", e);
self.exc.set(pb::Exception::ControllerSocketError);
break;
}
Expand Down