Commit 5fc86955 authored by Michał Woźniak's avatar Michał Woźniak

WIP: we're handling the errors in a sane-ish way now

parent 6d49d511
......@@ -18,13 +18,13 @@ class IngestionException(Exception):
class Configuration(import_logs.Configuration):
def _sanitize_ingested_output_path(self, p):
def _sanitize_prefixed_output_path(self, p):
# we want the real absolute path of the dirname of path we're looking at
p = os.path.realpath(os.path.dirname(p))
# if it already exists, we need to make sure it's a directory
if os.path.exists(p):
if not os.path.isdir(p):
raise IOError("Ingested logfiles output path is not a directory: %s" % p)
raise IOError("Output location is not a directory: %s" % p)
else:
os.makedirs(p)
......@@ -99,15 +99,6 @@ class Configuration(import_logs.Configuration):
"contain any '/' characters."
)
self.parser.add_option(
'--suffix-failed-error',
dest='suffix_failed_error',
default=".error",
help="Files containing information on the error encountered when ingesting a "
"logfile that failed to be ingested will be named using this suffix (and the "
"--prefix-failed prefix); it cannot contain any '/' characters."
)
# parse the stuff
self._parse_args(self.parser, argv)
......@@ -124,23 +115,26 @@ class Configuration(import_logs.Configuration):
if len(self.filenames) == 0:
raise Exception("No existing directories to watch!")
# sanitize output directories for ingested logfiles, based on prefix_ingested
# we just need to make sure they exist
# remember, prefix does not have to be a directory and does not have to end in a directory
# sanitize output directories for ingested and failed logfiles, based on
# prefix_ingested and prefix_failed; we just need to make sure they exist
#
# either prefix does not have to be a directory and does not have to end in a directory
#
# is the prefix an absolute path?
if os.path.isabs(self.options.prefix_ingested):
self._sanitize_ingested_output_path(self.options.prefix_ingested)
# not absolute, we need to handle this for every watched directory
else:
for p in self.filenames:
# we need to sanitize the path made of the watched directory
# with the prefix added at the end
self._sanitize_ingested_output_path(os.path.join(p, self.options.prefix_ingested))
for prefix_path in [self.options.prefix_ingested, self.options.prefix_failed]:
if os.path.isabs(prefix_path):
self._sanitize_prefixed_output_path(prefix_path)
# not absolute, we need to handle this for every watched directory
else:
for p in self.filenames:
# we need to sanitize the path made of the watched directory
# with the prefix added at the end
self._sanitize_prefixed_output_path(os.path.join(p, prefix_path))
# monkey-patching fatal_error() so that we can handle ingestion errors gracefully
#
# first, define a replacement
def non_fatal_error(error, filename=None, lineno=None):
# if we have running Recorders, the error might not be fatal
......@@ -162,8 +156,8 @@ def non_fatal_error(error, filename=None, lineno=None):
# (since we do need to get out of the particular block we were called in)
else:
raise IngestionException(errmsg, filename, lineno)
orig_fatal_error = import_logs.fatal_error
# now we can actually monkey patch the thing
import_logs.fatal_error = non_fatal_error
# set up configuration, parse command line args, etc
......@@ -263,33 +257,50 @@ while True:
recorders = import_logs.Recorder.launch(config.options.recorders)
print(" +-- ingesting logfiles...")
try:
for f in logfiles_free:
print(" - %s" % f)
for f in logfiles_free:
print(" - %s" % f)
try:
# processing the logfile
parser.parse(f)
import_logs.Recorder.wait_empty()
except KeyboardInterrupt:
pass
except IngestionException as e:
errmsg, filename, lineno = e.args
logging.error(errmsg)
# move the failed file to the location specified by
# applying the prefix_failed and suffix_failed, as configured
f_dir, f_base = os.path.split(f)
f_base += config.options.suffix_failed
if os.path.isabs(config.options.prefix_failed):
# not using os.path.join() here since prefix does not have to end in a directory
new_f = config.options.prefix_failed + f_base
else:
new_f = os.path.join(f_dir, config.options.prefix_failed + f_base)
print(" error, moving to: %s" % new_f)
shutil.move(f, new_f)
else:
# once the file is processed, we need to move it to the location specified by
# applying the prefix_ingested and suffix_ingested, as configured
f_dir, f_base = os.path.split(f)
f_base += config.options.suffix_ingested
if os.path.isabs(config.options.prefix_ingested):
# not using os.path.join() here since prefix does not have to end in a directory
new_f = config.options.prefix_ingested + f_base
else:
new_f = os.path.join(f_dir, config.options.prefix_ingested + f_base)
print(" done, moving to: %s" % new_f)
shutil.move(f, new_f)
logfiles_free = []
except KeyboardInterrupt:
pass
except IngestionException as e:
errmsg, filename, lineno = e.args
logging.error(errmsg)
# clear files to be processed
# all have either been processed, or errored out
logfiles_free = []
# get the watches going again
setup_watches()
# done with stats
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment