-
Notifications
You must be signed in to change notification settings - Fork 72
Expand file tree
/
Copy pathadapters.py
More file actions
486 lines (381 loc) · 18.4 KB
/
adapters.py
File metadata and controls
486 lines (381 loc) · 18.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
import logging
import select
import os
import struct
import collections
import time
from errno import EINTR, ENOENT
import inotify.constants
import inotify.calls
# Constants.
_DEFAULT_EPOLL_BLOCK_DURATION_S = 1
_HEADER_STRUCT_FORMAT = 'iIII'
_DEFAULT_TERMINAL_EVENTS = (
'IN_Q_OVERFLOW',
'IN_UNMOUNT',
)
# Globals.
_LOGGER = logging.getLogger(__name__)
_INOTIFY_EVENT = collections.namedtuple(
'_INOTIFY_EVENT',
[
'wd',
'mask',
'cookie',
'len',
])
_STRUCT_HEADER_LENGTH = struct.calcsize(_HEADER_STRUCT_FORMAT)
_IS_DEBUG = bool(int(os.environ.get('DEBUG', '0')))
class EventTimeoutException(Exception):
pass
class TerminalEventException(Exception):
def __init__(self, type_name, event):
super(TerminalEventException, self).__init__(type_name)
self.event = event
class Inotify(object):
def __init__(self, paths=[], block_duration_s=_DEFAULT_EPOLL_BLOCK_DURATION_S):
self.__block_duration = block_duration_s
self.__watches = {}
self.__watches_r = {}
self.__buffer = b''
self.__inotify_fd = inotify.calls.inotify_init()
_LOGGER.debug("Inotify handle is (%d).", self.__inotify_fd)
self.__epoll = select.epoll()
self.__epoll.register(self.__inotify_fd, select.POLLIN)
self.__last_success_return = None
for path in paths:
self.add_watch(path)
def __get_block_duration(self):
"""Allow the block-duration to be an integer or a function-call."""
try:
return self.__block_duration()
except TypeError:
# A scalar value describing seconds.
return self.__block_duration
def __del__(self):
_LOGGER.debug("Cleaning-up inotify.")
os.close(self.__inotify_fd)
def add_watch(self, path_unicode, mask=inotify.constants.IN_ALL_EVENTS):
_LOGGER.debug("Adding watch: [%s]", path_unicode)
# Because there might be race-conditions in the recursive handling (see
# the notes in the documentation), we recommend to add watches using
# data from a secondary channel, if possible, which means that we might
# then be adding it, yet again, if we then receive it in the normal
# fashion afterward.
if path_unicode in self.__watches:
_LOGGER.warning("Path already being watched: [%s]", path_unicode)
return
path_bytes = path_unicode.encode('utf8')
wd = inotify.calls.inotify_add_watch(self.__inotify_fd, path_bytes, mask)
try:
old_path = self.__watches_r.pop(wd)
except KeyError:
_LOGGER.debug("Added watch (%d): [%s]", wd, path_unicode)
else:
# Already watched under a different path
self.__watches.pop(old_path)
_LOGGER.debug("Watch (%d) moved from %s to %s",
wd, old_path, path_unicode)
self.__watches[path_unicode] = wd
self.__watches_r[wd] = path_unicode
return wd
def watches(self):
return self.__watches.keys()
def remove_watch(self, path, superficial=False):
"""Remove our tracking information and call inotify to stop watching
the given path. When a directory is removed, we'll just have to remove
our tracking since inotify already cleans-up the watch.
"""
wd = self.__watches.get(path)
if wd is None:
return
_LOGGER.debug("Removing watch for watch-handle (%d): [%s]",
wd, path)
del self.__watches[path]
self.remove_watch_with_id(wd, superficial)
def remove_watch_with_id(self, wd, superficial=False):
del self.__watches_r[wd]
if superficial is False:
_LOGGER.debug("Removing watch for watch-handle (%d).", wd)
inotify.calls.inotify_rm_watch(self.__inotify_fd, wd)
def _get_event_names(self, event_type):
names = []
for bit, name in inotify.constants.MASK_LOOKUP.items():
if event_type & bit:
names.append(name)
event_type -= bit
if event_type == 0:
break
assert event_type == 0, \
"We could not resolve all event-types: (%d)" % (event_type,)
return names
def _handle_inotify_event(self, wd):
"""Handle a series of events coming-in from inotify."""
b = os.read(wd, 1024)
if not b:
return
self.__buffer += b
while 1:
length = len(self.__buffer)
if length < _STRUCT_HEADER_LENGTH:
_LOGGER.debug("Not enough bytes for a header.")
return
# We have, at least, a whole-header in the buffer.
peek_slice = self.__buffer[:_STRUCT_HEADER_LENGTH]
header_raw = struct.unpack(
_HEADER_STRUCT_FORMAT,
peek_slice)
header = _INOTIFY_EVENT(*header_raw)
type_names = self._get_event_names(header.mask)
_LOGGER.debug("Events received in stream: {}".format(type_names))
event_length = (_STRUCT_HEADER_LENGTH + header.len)
if length < event_length:
return
filename = self.__buffer[_STRUCT_HEADER_LENGTH:event_length]
# Our filename is 16-byte aligned and right-padded with NULs.
filename_bytes = filename.rstrip(b'\0')
self.__buffer = self.__buffer[event_length:]
path = self.__watches_r.get(header.wd)
if path is not None:
filename_unicode = filename_bytes.decode('utf8')
if filename_unicode:
_LOGGER.debug(f"Event filename received for {path}: {filename_unicode}")
yield (header, type_names, path, filename_unicode)
buffer_length = len(self.__buffer)
if buffer_length < _STRUCT_HEADER_LENGTH:
break
def event_gen(
self, timeout_s=None, yield_nones=True, filter_predicate=None,
terminal_events=_DEFAULT_TERMINAL_EVENTS):
"""Yield one event after another. If `timeout_s` is provided, we'll
break when no event is received for that many seconds.
"""
# We will either return due to the optional filter or because of a
# timeout. The former will always set this. The latter will never set
# this.
self.__last_success_return = None
last_hit_s = time.time()
while True:
block_duration_s = self.__get_block_duration()
# Poll, but manage signal-related errors.
try:
events = self.__epoll.poll(block_duration_s)
except IOError as e:
if e.errno != EINTR:
raise
if timeout_s is not None:
time_since_event_s = time.time() - last_hit_s
if time_since_event_s > timeout_s:
break
continue
# Process events.
for fd, event_type in events:
# (fd) looks to always match the inotify FD.
names = self._get_event_names(event_type)
_LOGGER.debug("Events received from epoll: {}".format(names))
for (header, type_names, path, filename) \
in self._handle_inotify_event(fd):
last_hit_s = time.time()
e = (header, type_names, path, filename)
for type_name in type_names:
if filter_predicate is not None and \
filter_predicate(type_name, e) is False:
self.__last_success_return = (type_name, e)
return
elif type_name in terminal_events:
raise TerminalEventException(type_name, e)
yield e
if timeout_s is not None:
time_since_event_s = time.time() - last_hit_s
if time_since_event_s > timeout_s:
break
if yield_nones is True:
yield None
@property
def last_success_return(self):
return self.__last_success_return
class _BaseTree(object):
def __init__(self, mask=inotify.constants.IN_ALL_EVENTS,
block_duration_s=_DEFAULT_EPOLL_BLOCK_DURATION_S,
follow_symlinks=False):
# No matter what we actually received as the mask, make sure we have
# the minimum that we require to curate our list of watches.
self._mask = mask | \
inotify.constants.IN_ISDIR | \
inotify.constants.IN_CREATE | \
inotify.constants.IN_DELETE
self._i = Inotify(block_duration_s=block_duration_s)
self._follow_symlinks = follow_symlinks
def remove_tree(self, path):
path = path.rstrip(os.path.sep)
_LOGGER.debug("Removing all watches beneath %s", path)
prefix = path + os.path.sep
# Accumulate all paths to remove before removing any to avoid messing
# with the data while we're iterating through it.
to_remove = [p for p in self._i.watches()
if p == path or p.startswith(prefix)]
for watch_path in to_remove:
self._i.remove_watch(watch_path)
def event_gen(self, ignore_missing_new_folders=False, **kwargs):
"""This is a secondary generator that wraps the principal one, and
adds/removes watches as directories are added/removed.
If we're doing anything funky and allowing the events to queue while a
rename occurs then the folder may no longer exist. In this case, set
`ignore_missing_new_folders`.
"""
user_yield_nones = kwargs.get('yield_nones', True)
kwargs['yield_nones'] = True
move_from_events = {}
for event in self._i.event_gen(**kwargs):
if event is None:
if move_from_events:
_LOGGER.debug("Handling deferred MOVED_FROM events")
for move_event in move_from_events.values():
(header, type_names, path, filename) = move_event
self.remove_tree(os.path.join(path, filename))
move_from_events = {}
else:
(header, type_names, path, filename) = event
if header.mask & inotify.constants.IN_ISDIR:
full_path = os.path.join(path, filename)
if header.mask & inotify.constants.IN_CREATE and \
(
os.path.exists(full_path) is True or
ignore_missing_new_folders is False
):
_LOGGER.debug("A directory has been created. We're "
"adding a watch on it (because we're "
"being recursive): [%s]", full_path)
self._load_tree(full_path)
elif header.mask & inotify.constants.IN_DELETE:
_LOGGER.debug("A directory has been removed. We're "
"being recursive, but it would have "
"automatically been deregistered: [%s]",
full_path)
# The watch would've already been cleaned-up internally.
self._i.remove_watch(full_path, superficial=True)
# If a subdirectory of a directory we're watching is moved,
# then there are two scenarios we need to handle:
#
# 1) If it has been moved out of the directory tree we're
# watching, then we will get only the IN_MOVED_FROM
# event for it. In this case we need to remove our watch
# on the moved directory and on all of the directories
# underneath it. We won't get separate events for those!
# 2) If it has been moved somewhere else within the
# tree we're watching, then we'll get both IN_MOVED_FROM
# and IN_MOVED_TO events for it. In this case our
# existing watches on the directory and its
# subdirectories will remain open, but they have new
# paths now so we need to update our internal data to
# match the new paths. This is handled in _load_tree.
#
# Challenge: when we get the IN_MOVED_FROM event, how we
# handle it depends on whether there is a subsequent
# IN_MOVED_TO event! We don't want to remove all the
# watches if this is an in-tree move, both because it's
# inefficient to delete and then soon after recreate those
# watches, and because it creates a race condition: if
# something happens in one of the directories between when
# we remove the watches and when we recreate them, we won't
# get notified about it.
#
# We solve this by waiting to handle the IN_MOVED_FROM
# event until we get a None from the primary event_gen
# generator. It is reasonable to assume that linked
# IN_MOVED_FROM and IN_MOVED_TO events will arrive in a
# single batch of events. If there are pending
# IN_MOVED_FROM events at the end of the batch, then we
# assume they were moved out of tree and remove all the
# corresponding watches.
#
# There's also a third scenario we need to handle below. If
# we get an IN_MOVED_TO without a corresponding
# IN_MOVED_FROM, then the directory was moved into our tree
# from outside our tree, so we need to add watches for that
# whole subtree.
elif header.mask & inotify.constants.IN_MOVED_FROM:
_LOGGER.debug(
"A directory has been renamed. Deferring "
"handling until we find out whether the target is "
"in our tree: [%s]", full_path)
move_from_events[header.cookie] = event
elif header.mask & inotify.constants.IN_MOVED_TO:
try:
from_event = move_from_events.pop(header.cookie)
except KeyError:
_LOGGER.debug(
"A directory has been moved into our watch "
"area. Adding watches for it and its "
"subdirectories: [%s]", full_path)
self._load_tree(full_path)
else:
(_, _, from_path, from_filename) = from_event
full_from_path = os.path.join(
from_path, from_filename)
_LOGGER.debug(
"A directory has been moved from %s to %s "
"within our watch area. Updating internal "
"data to reflect move.", full_from_path,
full_path)
self._load_tree(full_path)
# If part of the _load_tree above fails in part or
# in full because the top-level directory or some
# of its subdirectories have been removed, then
# they won't get cleaned up by _load_tree, so let's
# clean them up just in case.
self.remove_tree(full_from_path)
if user_yield_nones or event is not None:
yield event
@property
def inotify(self):
return self._i
def _load_tree(self, path):
paths = []
q = [path]
while q:
current_path = q[0]
del q[0]
try:
direntries = os.scandir(current_path)
except FileNotFoundError:
_LOGGER.warning("Path %s disappeared before we could list it", current_path)
continue
paths.append(current_path)
for direntry in direntries:
if not direntry.is_dir(follow_symlinks=self._follow_symlinks):
continue
q.append(direntry.path)
for path in paths:
try:
self._i.add_watch(path, self._mask)
except inotify.calls.InotifyError as e:
if e.errno == ENOENT:
_LOGGER.warning("Path %s disappeared before we could watch it", path)
continue
raise
class InotifyTree(_BaseTree):
"""Recursively watch a path."""
def __init__(self, path, mask=inotify.constants.IN_ALL_EVENTS,
block_duration_s=_DEFAULT_EPOLL_BLOCK_DURATION_S,
follow_symlinks=False):
super(InotifyTree, self).__init__(
mask=mask, block_duration_s=block_duration_s,
follow_symlinks=follow_symlinks)
self._load_tree(path)
def _load_tree(self, path):
_LOGGER.debug("Adding initial watches on tree: [%s]", path)
return super()._load_tree(path)
class InotifyTrees(_BaseTree):
"""Recursively watch over a list of trees."""
def __init__(self, paths, mask=inotify.constants.IN_ALL_EVENTS,
block_duration_s=_DEFAULT_EPOLL_BLOCK_DURATION_S,
follow_symlinks=False):
super(InotifyTrees, self).__init__(
mask=mask, block_duration_s=block_duration_s,
follow_symlinks=follow_symlinks)
self._load_trees(paths)
def _load_trees(self, paths):
_LOGGER.debug("Adding initial watches on trees: [%s]", ",".join(map(str, paths)))
for path in paths:
self._load_tree(path)