Commit d879220a authored by Michał Woźniak's avatar Michał Woźniak

WIP: #7 (threading is hard, especially when monkey-patching)

parent 1d5a2be3
......@@ -13,10 +13,23 @@ import time
import fnmatch
import shutil
import logging
import threading
"""
def excepthook(t, val, traceback):
logging.debug("Exception type: %s" % t)
sys.__excepthook__(t, val, traceback)
sys.excepthook = excepthook
"""
# we have an issue with ingestion
class IngestionException(Exception):
pass
# only used to cleanly bail from things
class BailException(Exception):
pass
class Configuration(import_logs.Configuration):
def _sanitize_prefixed_output_path(self, p):
......@@ -244,6 +257,18 @@ class Statistics(import_logs.Statistics):
for sline in summary.split('\n'):
logging.info(sline)
class EmptyQueue(object):
def put(self, arg):
raise BailException()
def task_done(self):
raise BailException()
def empty(self):
return True
def join(self):
raise BailException()
# monkey-patching fatal_error() so that we can handle ingestion errors gracefully
#
......@@ -269,7 +294,23 @@ def non_fatal_error(error, filename=None, lineno=None):
# if not fatal, throw an exception
# (since we do need to get out of the particular block we were called in)
else:
raise IngestionException(errmsg, filename, lineno)
logging.debug('fatal_error(): not fatal')
for r in import_logs.Recorder.recorders:
logging.debug('fatal_error(): in for')
r.queue.task_done()
while not r.queue.empty():
logging.debug('fatal_error(): in while')
try:
r.queue.get(False)
except Empty:
continue
logging.debug('fatal_error(): task_done()')
r.queue.task_done()
logging.debug('fatal_error(): after while')
r.queue = EmptyQueue()
import_logs.Recorder.exc = IngestionException(errmsg, filename, lineno)
# this is to bail from Thread.run()
raise BailException()
# now we can actually monkey patch the thing
import_logs.fatal_error = non_fatal_error
......@@ -369,18 +410,32 @@ while True:
stats.set_time_start()
if config.options.show_progress:
stats.start_monitor()
# launch recorders
recorders = import_logs.Recorder.launch(config.options.recorders)
if len(import_logs.Recorder.recorders) < config.options.recorders:
import_logs.Recorder.launch(config.options.recorders - len(import_logs.Recorder.recorders))
logging.debug('Active threads:')
for t in threading.enumerate():
logging.debug("+-- thread: %s, %s" % (t.name, t.ident, ))
# ingest files one by one
for f in logfiles_free:
logging.info("- %s..." % f)
try:
# processing the logfile
parser.parse(f)
import_logs.Recorder.wait_empty()
try:
parser.parse(f)
import_logs.Recorder.wait_empty()
except BailException:
pass
logging.debug("after import_logs.Recorder.wait_empty()")
if hasattr(import_logs.Recorder, "exc"):
exc = import_logs.Recorder.exc
del import_logs.Recorder.exc
raise exc
except KeyboardInterrupt:
pass
......@@ -408,6 +463,18 @@ while True:
logging.warning("ingesting the file failed, moving it to:")
logging.warning("- %s" % new_f)
shutil.move(f, new_f)
# remove dead recorders
i = 0
while i < len(import_logs.Recorder.recorders):
if isinstance(import_logs.Recorder.recorders[i].queue, EmptyQueue):
del(import_logs.Recorder.recorders[i])
else:
i+=1
"""
# launch recorders
if len(import_logs.Recorder.recorders) < config.options.recorders:
import_logs.Recorder.launch(config.options.recorders)
"""
else:
# do we want to delete or move/rename the file?
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment