1- # Copyright 2020 The StackStorm Authors.
1+ # Copyright 2020-2026 The StackStorm Authors.
22# Copyright 2019 Extreme Networks, Inc.
33#
44# Licensed under the Apache License, Version 2.0 (the "License");
1414# limitations under the License.
1515
1616import os
17- import eventlet
18-
19- from logshipper .tail import Tail
17+ import time
18+ import threading
2019
2120from st2reactor .sensor .base import Sensor
2221
@@ -27,25 +26,19 @@ def __init__(self, sensor_service, config=None):
2726 sensor_service = sensor_service , config = config
2827 )
2928 self .log = self ._sensor_service .get_logger (__name__ )
30- self .tail = None
29+ self ._watchers = {} # file_path -> (thread, stop_event)
3130 self .file_ref = {}
3231
3332 def setup (self ):
34- self .tail = Tail (filenames = [])
35- self .tail .handler = self ._handle_line
36- self .tail .should_run = True
33+ pass
3734
3835 def run (self ):
39- self .tail .run ()
36+ while True :
37+ time .sleep (1 )
4038
4139 def cleanup (self ):
42- if self .tail :
43- self .tail .should_run = False
44-
45- try :
46- self .tail .notifier .stop ()
47- except Exception :
48- self .log .exception ("Unable to stop the tail notifier" )
40+ for file_path in list (self ._watchers ):
41+ self ._stop_watcher (file_path )
4942
5043 def add_trigger (self , trigger ):
5144 file_path = trigger ["parameters" ].get ("file_path" , None )
@@ -54,18 +47,19 @@ def add_trigger(self, trigger):
5447 self .log .error ('Received trigger type without "file_path" field.' )
5548 return
5649
57- trigger = trigger .get ("ref" , None )
50+ ref = trigger .get ("ref" , None )
5851
59- if not trigger :
52+ if not ref :
6053 raise Exception (f"Trigger { trigger } did not contain a ref." )
6154
62- # Wait a bit to avoid initialization race in logshipper library
63- eventlet .sleep (1.0 )
55+ self .file_ref [file_path ] = ref
6456
65- self .tail .add_file (filename = file_path )
66- self .file_ref [file_path ] = trigger
57+ stop_event = threading .Event ()
58+ t = threading .Thread (target = self ._tail , args = (file_path , stop_event ), daemon = True )
59+ self ._watchers [file_path ] = (t , stop_event )
60+ t .start ()
6761
68- self .log .info (f"Added file '{ file_path } ' ({ trigger } ) to watch list." )
62+ self .log .info (f"Added file '{ file_path } ' ({ ref } ) to watch list." )
6963
7064 def update_trigger (self , trigger ):
7165 pass
@@ -77,10 +71,25 @@ def remove_trigger(self, trigger):
7771 self .log .error ("Received trigger type without 'file_path' field." )
7872 return
7973
80- self .tail .remove_file (filename = file_path )
81- self .file_ref .pop (file_path )
82-
83- self .log .info (f"Removed file '{ file_path } ' ({ trigger } ) from watch list." )
74+ self ._stop_watcher (file_path )
75+ self .file_ref .pop (file_path , None )
76+
77+ self .log .info (f"Removed file '{ file_path } ' from watch list." )
78+
79+ def _stop_watcher (self , file_path ):
80+ if file_path in self ._watchers :
81+ _ , stop_event = self ._watchers .pop (file_path )
82+ stop_event .set ()
83+
84+ def _tail (self , file_path , stop_event ):
85+ with open (file_path , "r" ) as f :
86+ f .seek (0 , 2 ) # seek to EOF
87+ while not stop_event .is_set ():
88+ line = f .readline ()
89+ if line :
90+ self ._handle_line (file_path , line .strip ())
91+ else :
92+ time .sleep (0.1 )
8493
8594 def _handle_line (self , file_path , line ):
8695 if file_path not in self .file_ref :
0 commit comments