1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import subprocess import pyinotify import os import shutil mask = pyinotify.ALL_EVENTS rootdir = '/tmp/test-pyinotify'
def _process_watcher_events(event): print("event = %s" % event)
def _(path): return os.path.join(rootdir, path) def commands(): commands = [ ['ls', rootdir], ['touch', _('origin.txt')], ['cp', _('origin.txt'), _('cp.txt')], ['mv', _('cp.txt'), _('mv.txt')], ['mv', _('mv.txt'), '/tmp/mv.txt'], ['rsync', '-a', '/tmp/mv.txt', _('rsync.txt')], ['scp', '/tmp/mv.txt', _('scp.txt')], ['ln', '-s', _('origin.txt'), _('ln.txt')], ['rm', _('ln.txt')], ['mkdir', _('testdir')], ['ln', _('origin.txt'), _('lnhard.txt')], ['python', '-c', "import os; f = open(os.path.join('%s', 'python.txt'), 'w'); f.write('test_python'); f.close()" % rootdir] ] for cmd in commands: print cmd output = subprocess.check_output(cmd) yield if __name__ == "__main__": if os.path.exists(rootdir): shutil.rmtree(rootdir) os.makedirs(rootdir) watcher = pyinotify.WatchManager() watcher.add_watch(rootdir, mask=mask, rec=True, auto_add=True, quiet=False) notifier = pyinotify.Notifier(watcher, _process_watcher_events) commands = commands() while True: notifier.process_events() try: next(commands) while notifier.check_events(timeout=3): notifier.read_events() notifier.process_events() print('===========================================\n') except StopIteration: while notifier.check_events(timeout=3): notifier.read_events() notifier.process_events() print('finished') break shutil.rmtree(rootdir)
|