Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork252
Expand file tree
/
Copy pathfilewatch.py
More file actions
87 lines (74 loc) · 2.96 KB
/
filewatch.py
File metadata and controls
87 lines (74 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
importos
fromcollectionsimportdefaultdict
fromcollections.abcimportCallable,Iterable,Sequence
from ..importimportcompletion
try:
fromwatchdog.observersimportObserver
fromwatchdog.eventsimportFileSystemEventHandler,FileSystemEvent
exceptImportError:
defModuleChangedEventHandler(*args):
returnNone
else:
classModuleChangedEventHandler(FileSystemEventHandler):# type: ignore [no-redef]
def__init__(
self,
paths:Iterable[str],
on_change:Callable[[Sequence[str]],None],
)->None:
self.dirs:dict[str,set[str]]=defaultdict(set)
self.on_change=on_change
self.modules_to_add_later:list[str]= []
self.observer=Observer()
self.started=False
self.activated=False
forpathinpaths:
self._add_module(path)
super().__init__()
defreset(self)->None:
self.dirs.clear()
self.modules_to_add_later.clear()
self.observer.unschedule_all()
def_add_module(self,path:str)->None:
"""Add a python module to track changes"""
path=os.path.abspath(path)
forsuffinimportcompletion.SUFFIXES:
ifpath.endswith(suff):
path=path[:-len(suff)]
break
dirname=os.path.dirname(path)
ifdirnamenotinself.dirs:
self.observer.schedule(self,dirname,recursive=False)
self.dirs[dirname].add(path)
def_add_module_later(self,path:str)->None:
self.modules_to_add_later.append(path)
deftrack_module(self,path:str)->None:
"""
Begins tracking this if activated, or remembers to track later.
"""
ifself.activated:
self._add_module(path)
else:
self._add_module_later(path)
defactivate(self)->None:
ifself.activated:
raiseValueError(f"{self!r} is already activated.")
ifnotself.started:
self.started=True
self.observer.start()
fordirnameinself.dirs:
self.observer.schedule(self,dirname,recursive=False)
formoduleinself.modules_to_add_later:
self._add_module(module)
self.modules_to_add_later.clear()
self.activated=True
defdeactivate(self)->None:
ifnotself.activated:
raiseValueError(f"{self!r} is not activated.")
self.observer.unschedule_all()
self.activated=False
defon_any_event(self,event:FileSystemEvent)->None:
dirpath=os.path.dirname(event.src_path)
ifany(
event.src_path==f"{path}.py"forpathinself.dirs[dirpath]
):
self.on_change((event.src_path,))