@@ -30,12 +30,15 @@ use std::time::{Duration, Instant, SystemTime};
3030use arc_swap:: access:: Access ;
3131use lazy_static:: lazy_static;
3232use log:: { debug, error, info, warn} ;
33- use public:: sender:: { SendMessageType , Sendable } ;
33+ use public:: {
34+ leaky_bucket:: LeakyBucket ,
35+ sender:: { SendMessageType , Sendable } ,
36+ } ;
3437use rand:: { thread_rng, RngCore } ;
3538
3639use super :: { get_sender_id, QUEUE_BATCH_SIZE } ;
3740
38- use crate :: config:: handler:: SenderAccess ;
41+ use crate :: config:: { handler:: SenderAccess , TrafficOverflowAction } ;
3942use crate :: exception:: ExceptionHandler ;
4043use crate :: trident:: SenderEncoder ;
4144use crate :: utils:: stats:: {
@@ -53,6 +56,7 @@ pub struct SenderCounter {
5356 pub tx : AtomicU64 ,
5457 pub tx_bytes : AtomicU64 ,
5558 pub dropped : AtomicU64 ,
59+ pub waited : AtomicU64 ,
5660}
5761
5862impl RefCountable for SenderCounter {
@@ -89,6 +93,11 @@ impl RefCountable for SenderCounter {
8993 CounterType :: Counted ,
9094 CounterValue :: Unsigned ( self . dropped. swap( 0 , Ordering :: Relaxed ) ) ,
9195 ) ,
96+ (
97+ "waited" ,
98+ CounterType :: Counted ,
99+ CounterValue :: Unsigned ( self . waited. swap( 0 , Ordering :: Relaxed ) ) ,
100+ ) ,
92101 ]
93102 }
94103}
@@ -153,10 +162,10 @@ impl<T: Sendable> Encoder<T> {
153162 version : HEADER_VESION ,
154163 team_id : 0 ,
155164 organization_id : 0 ,
156- agent_id : agent_id ,
165+ agent_id,
157166 reserved_1 : 0 ,
158167 reserved_2 : 0 ,
159- encoder : encoder ,
168+ encoder,
160169 } ,
161170 _marker : PhantomData ,
162171 }
@@ -340,6 +349,7 @@ lazy_static! {
340349 static ref TOTAL_SENT_BYTES : Arc <AtomicU64 > = Arc :: new( AtomicU64 :: new( 0 ) ) ;
341350 static ref SENT_START_DURATION : Arc <AtomicU64 > = Arc :: new( AtomicU64 :: new( 0 ) ) ;
342351 static ref LAST_LOGGING_DURATION : Arc <AtomicU64 > = Arc :: new( AtomicU64 :: new( 0 ) ) ;
352+ static ref LEAKY_BUCKET : LeakyBucket = LeakyBucket :: new( Some ( 0 ) ) ;
343353}
344354
345355#[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
@@ -380,6 +390,7 @@ pub struct UniformSender<T> {
380390
381391 input : Arc < Receiver < T > > ,
382392 counter : Arc < SenderCounter > ,
393+ overwritten_count : u64 ,
383394
384395 encoder : Encoder < T > ,
385396 private_conn : Mutex < Connection > ,
@@ -389,6 +400,8 @@ pub struct UniformSender<T> {
389400 multiple_sockets_to_ingester : bool ,
390401 dest_ip : String ,
391402 dest_port : u16 ,
403+ max_throughput_mbps : u64 ,
404+ ingester_traffic_overflow_action : TrafficOverflowAction ,
392405
393406 config : SenderAccess ,
394407
@@ -426,6 +439,7 @@ impl<T: Sendable> UniformSender<T> {
426439 name,
427440 input,
428441 counter : Arc :: new ( SenderCounter :: default ( ) ) ,
442+ overwritten_count : 0 ,
429443 encoder : Encoder :: new (
430444 0 ,
431445 SendMessageType :: TaggedFlow ,
@@ -440,6 +454,8 @@ impl<T: Sendable> UniformSender<T> {
440454 multiple_sockets_to_ingester : false ,
441455 dest_ip : "127.0.0.1" . to_string ( ) ,
442456 dest_port : cfg. dest_port ,
457+ max_throughput_mbps : 0 ,
458+ ingester_traffic_overflow_action : TrafficOverflowAction :: Waiting ,
443459
444460 running,
445461 stats,
@@ -522,6 +538,11 @@ impl<T: Sendable> UniformSender<T> {
522538 }
523539
524540 fn send_buffer ( & mut self ) {
541+ if self . is_traffic_overflow ( )
542+ && self . ingester_traffic_overflow_action == TrafficOverflowAction :: Dropping
543+ {
544+ return ;
545+ }
525546 let mut conn = match self . connection_type {
526547 ConnectionType :: Global => self . global_shared_conn . lock ( ) . unwrap ( ) ,
527548 ConnectionType :: PrivateShared => {
@@ -626,37 +647,51 @@ impl<T: Sendable> UniformSender<T> {
626647 }
627648 }
628649
629- fn is_exceed_max_throughput ( & mut self , max_throughput_mbps : u64 ) -> bool {
630- if max_throughput_mbps == 0 {
631- return false ;
650+ fn log_when_traffic_overflow ( & mut self ) {
651+ let now = SystemTime :: now ( )
652+ . duration_since ( SystemTime :: UNIX_EPOCH )
653+ . unwrap ( ) ;
654+ // to prevent frequent log printing, print at least once every 10 seconds
655+ if now - Duration :: from_nanos ( LAST_LOGGING_DURATION . load ( Ordering :: Relaxed ) )
656+ > Duration :: from_secs ( 10 )
657+ {
658+ warn ! (
659+ "{} sender dropping message, throughput exceed setting value 'max_throughput_to_ingester' {}Mbps, action {:?}, total overwrittern count {}" ,
660+ self . name, self . max_throughput_mbps, self . ingester_traffic_overflow_action, self . overwritten_count
661+ ) ;
662+ LAST_LOGGING_DURATION . store ( now. as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
632663 }
633- let max_throughput_bytes = max_throughput_mbps << 20 >> 3 ;
634- if TOTAL_SENT_BYTES . load ( Ordering :: Relaxed ) > max_throughput_bytes {
635- let now = SystemTime :: now ( )
636- . duration_since ( SystemTime :: UNIX_EPOCH )
637- . unwrap ( ) ;
664+ }
638665
639- let used = now - Duration :: from_nanos ( SENT_START_DURATION . load ( Ordering :: Relaxed ) ) ;
640- if used > Duration :: from_secs ( 1 ) {
641- SENT_START_DURATION . store ( now. as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
642- TOTAL_SENT_BYTES . store ( 0 , Ordering :: Relaxed ) ;
643- } else {
644- // to prevent frequent log printing, print at least once every 5 seconds
645- if now - Duration :: from_nanos ( LAST_LOGGING_DURATION . load ( Ordering :: Relaxed ) )
646- > Duration :: from_secs ( 5 )
647- {
648- warn ! (
649- "{} sender dropping message, throughput execeed setting value 'max_throughput_to_ingester' {}Mbps" ,
650- self . name, max_throughput_mbps
651- ) ;
652- LAST_LOGGING_DURATION . store ( now. as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
666+ fn is_traffic_overflow ( & mut self ) -> bool {
667+ if self . max_throughput_mbps == 0 {
668+ return false ;
669+ }
670+ let mut overflow = false ;
671+ if self . ingester_traffic_overflow_action == TrafficOverflowAction :: Waiting {
672+ while !LEAKY_BUCKET . acquire ( self . encoder . buffer_len ( ) as u64 ) {
673+ // LEAKY_BUCKET token is updated every 100ms by default,
674+ // wait 10ms each time until the token is acquired
675+ thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
676+ if self . input . total_overwritten_count ( ) > self . overwritten_count {
677+ overflow = true ;
678+ self . overwritten_count = self . input . total_overwritten_count ( ) ;
653679 }
654- self . exception_handler
655- . set ( Exception :: DataBpsThresholdExceeded ) ;
656- return true ;
680+ self . counter . waited . fetch_add ( 1 , Ordering :: Relaxed ) ;
657681 }
682+ } else {
683+ if !LEAKY_BUCKET . acquire ( self . encoder . buffer_len ( ) as u64 ) {
684+ overflow = true ;
685+ self . counter . dropped . fetch_add ( 1 , Ordering :: Relaxed ) ;
686+ }
687+ }
688+
689+ if overflow {
690+ self . exception_handler
691+ . set ( Exception :: DataBpsThresholdExceeded ) ;
692+ self . log_when_traffic_overflow ( ) ;
658693 }
659- return false ;
694+ overflow
660695 }
661696
662697 fn check_or_register_counterable ( & mut self , message_type : SendMessageType ) {
@@ -677,7 +712,12 @@ impl<T: Sendable> UniformSender<T> {
677712 while self . running . load ( Ordering :: Relaxed ) {
678713 let config = self . config . load ( ) ;
679714 let socket_type = config. collector_socket_type ;
680- let max_throughput_mpbs = config. max_throughput_to_ingester ;
715+ let max_throughput_mbps = config. max_throughput_to_ingester ;
716+ if self . max_throughput_mbps != max_throughput_mbps {
717+ LEAKY_BUCKET . set_rate ( Some ( max_throughput_mbps << 17 ) ) ; // Mbit -> byte
718+ self . max_throughput_mbps = max_throughput_mbps;
719+ }
720+ self . ingester_traffic_overflow_action = config. ingester_traffic_overflow_action ;
681721 match self . input . recv_all (
682722 & mut batch,
683723 Some ( Duration :: from_secs ( Self :: QUEUE_READ_TIMEOUT ) ) ,
@@ -688,13 +728,6 @@ impl<T: Sendable> UniformSender<T> {
688728 start_cached = Instant :: now ( ) ;
689729 self . cached = false ;
690730 }
691- if self . is_exceed_max_throughput ( max_throughput_mpbs) {
692- self . counter
693- . dropped
694- . fetch_add ( batch. len ( ) as u64 , Ordering :: Relaxed ) ;
695- batch. clear ( ) ;
696- continue ;
697- }
698731 for send_item in batch. drain ( ..) {
699732 if !self . running . load ( Ordering :: Relaxed ) {
700733 break ;
0 commit comments