Commit 579cae8f authored by Michał Woźniak's avatar Michał Woźniak

WIP: initial implementation of the main loop (with some stubs here and there)

parent 00590590
......@@ -8,11 +8,8 @@ import sys
import os.path
from glob import glob
from enum import Enum
class states(Enum):
NEW = 1
OPEN = 2
READY = 3
import time
import fnmatch
class Configuration(import_logs.Configuration):
......@@ -23,7 +20,7 @@ class Configuration(import_logs.Configuration):
'--ingestion-start-delay',
dest='ingestion_start_delay',
type='int',
default=200,
default=1000,
help="Delay (in ms) between noticing a logfile to be processed and starting ingesting it."
"This is part of the built-in heuristic for determining that a file is not being modified "
"or moved anymore and can be safely ingested."
......@@ -39,7 +36,7 @@ class Configuration(import_logs.Configuration):
if not os.path.isdir(f):
raise IOError("Path is not a directory: " % f)
sanitized_filenames.append(f)
self.filenames = sanitized_filenames
self.filenames = list(set(sanitized_filenames))
if len(self.filenames) == 0:
raise Exception("No existing directories to watch!")
......@@ -50,28 +47,74 @@ watch_flags = flags.CREATE | flags.MODIFY | flags.DELETE_SELF | flags.CLOSE_WRIT
# files to process
logfile_glob = "*.log"
logfiles = [] # array of dicts, each with "path", "state"
logfiles_busy = [] # this is where newly discovered logfiles are added
logfiles_free = [] # logfiles which we can assume are not busy
# set up watches
print("+-- watching:")
# active watches
watches = {}
for path in config.filenames:
print(" - %s" % path)
# set up watches
wd = inotify.add_watch(path, watch_flags)
watches[wd] = path
# glob any files already existing in the directory
for p in glob(os.path.join(path, logfile_glob)):
logfiles.append({
"path": p,
"state": states.NEW
})
print(logfiles)
# set up watches
def setup_watches():
global watches
global logfiles_busy
print(" +-- watching:")
for path in config.filenames:
print(" - %s" % path)
# set up watches
wd = inotify.add_watch(path, watch_flags)
watches[wd] = path
# glob any files already existing in the directory
for p in glob(os.path.join(path, logfile_glob)):
if p not in logfiles_busy:
logfiles_busy.append(p)
print("+-- setting up watches...")
setup_watches()
if len(logfiles_busy) > 0:
print("+-- initial logfiles:")
for f in logfiles_busy:
print(" - %s" % f)
# And see the corresponding events:
timestamp = time.time()
time.sleep(config.options.ingestion_start_delay / 1000)
while True:
print("+-- loop...")
for event in inotify.read(config.options.ingestion_start_delay):
print(' +-- event! %s: %s' % (os.path.join(watches[event.wd], event.name), ', '.join([str(f) for f in flags.from_mask(event.mask)])))
new_timestamp = time.time()
print("+-- loop... %0.5fs" % (new_timestamp - timestamp))
timestamp = new_timestamp
logfiles_free = list(set(logfiles_free + logfiles_busy))
logfiles_busy = []
for event in inotify.read(config.options.ingestion_start_delay, config.options.ingestion_start_delay):
if event.wd not in watches:
continue
f = os.path.join(watches[event.wd], event.name)
print(' +-- event! %s: %s' % (f, ', '.join([str(flag) for flag in flags.from_mask(event.mask)])))
if f in logfiles_free:
logfiles_free.remove(f)
logfiles_busy.append(f)
print(" - busy...")
elif f not in logfiles_busy:
if fnmatch.fnmatch(f, logfile_glob):
print(" - new logfile spotted!")
logfiles_busy.append(f)
if len(logfiles_free) > 0:
print(" +-- free logfiles:")
for f in logfiles_free:
print(" - %s" % f)
print(" +-- removing watches")
for wd in watches.items():
print(" - %s" % wd[1])
inotify.rm_watch(wd[0])
watches = {}
print(" +-- simulating log processing of free logfiles...")
for f in logfiles_free:
print(" - %s" % f)
time.sleep(config.options.ingestion_start_delay / 200)
logfiles_free = []
setup_watches()
time.sleep(config.options.ingestion_start_delay / 1000)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment