Commit c690000c authored by Michał Woźniak's avatar Michał Woźniak

WIP: handling ingested files implemented

parent 61e9704e
......@@ -14,11 +14,30 @@ import shutil
class Configuration(import_logs.Configuration):
def _sanitize_ingested_output_path(self, p):
# we want the real absolute path of the dirname of path we're looking at
p = os.path.realpath(os.path.dirname(p))
# if it already exists, we need to make sure it's a directory
if os.path.exists(p):
if not os.path.isdir(p):
raise IOError("Ingested logfiles output path is not a directory: %s" % p)
else:
os.makedirs(p)
def __init__(self, argv = None):
# in case we want to add options
self.parser = self._create_parser()
self.parser.add_option(
'--logfiles-glob',
dest='logfiles_glob',
default="*.log",
help="Only files matching this shell glob expression will be ingested. It's "
"important to make sure that the glob does not match ingested files after "
"prefix and suffix is applied!"
)
self.parser.add_option(
'--ingestion-grace-period',
dest='ingestion_grace_period',
......@@ -49,29 +68,46 @@ class Configuration(import_logs.Configuration):
# parse the stuff
self._parse_args(self.parser, argv)
# sanitize the paths (all paths should be existing directories)
# sanitize the watched paths (all paths should be existing directories)
sanitized_filenames = []
for f in self.filenames:
f = os.path.realpath(f)
if not os.path.exists(f):
raise IOError("Path does not exist: " % f)
raise IOError("Watch path does not exist: " % f)
if not os.path.isdir(f):
raise IOError("Path is not a directory: " % f)
raise IOError("Watch path is not a directory: " % f)
sanitized_filenames.append(f)
self.filenames = list(set(sanitized_filenames))
if len(self.filenames) == 0:
raise Exception("No existing directories to watch!")
# sanitize output directories for ingested logfiles, based on prefix_ingested
# we just need to make sure they exist
# remember, prefix does not have to be a directory and does not have to end in a directory
#
# is the prefix an absolute path?
if os.path.isabs(self.options.prefix_ingested):
self._sanitize_ingested_output_path(self.options.prefix_ingested)
# not absolute, we need to handle this for every watched directory
else:
for p in self.filenames:
# we need to sanitize the path made of the watched directory
# with the prefix added at the end
self._sanitize_ingested_output_path(os.path.join(p, self.options.prefix_ingested))
config = Configuration()
inotify = INotify()
watch_flags = flags.CREATE | flags.MODIFY | flags.DELETE_SELF | flags.CLOSE_WRITE | flags.OPEN | flags.MOVED_TO | flags.MOVED_FROM
config = Configuration()
# files to process
logfile_glob = "*.log"
logfiles_busy = [] # this is where newly discovered logfiles are added
logfiles_free = [] # logfiles which we can assume are not busy
# inotify init
inotify = INotify()
watch_flags = flags.CREATE | flags.MODIFY | flags.DELETE_SELF | flags.CLOSE_WRITE | flags.OPEN | flags.MOVED_TO | flags.MOVED_FROM
# active watches
watches = {}
......@@ -86,7 +122,7 @@ def setup_watches():
wd = inotify.add_watch(path, watch_flags)
watches[wd] = path
# glob any files already existing in the directory
for p in glob(os.path.join(path, logfile_glob)):
for p in glob(os.path.join(path, config.options.logfiles_glob)):
if p not in logfiles_busy:
logfiles_busy.append(p)
......@@ -118,7 +154,7 @@ while True:
logfiles_busy.append(f)
print(" - busy...")
elif f not in logfiles_busy:
if fnmatch.fnmatch(f, logfile_glob):
if fnmatch.fnmatch(f, config.options.logfiles_glob):
print(" - new logfile spotted!")
logfiles_busy.append(f)
......@@ -140,6 +176,17 @@ while True:
# processing the logfile (simulation right now)
time.sleep(config.options.ingestion_grace_period / 200)
# once the file is processed, we need to move it to the location specified by
# applying the prefix_ingested and suffix_ingested, as configured
f_dir, f_base = os.path.split(f)
f_base += config.options.suffix_ingested
if os.path.isabs(config.options.prefix_ingested):
new_f = config.options.prefix_ingested + f_base
else:
new_f = os.path.join(f_dir, config.options.prefix_ingested + f_base)
print(" -> %s" % new_f)
shutil.move(f, new_f)
logfiles_free = []
setup_watches()
time.sleep(config.options.ingestion_grace_period / 1000)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment