#!/usr/bin/env python # -*- coding: utf-8 -*- """ # logutils.py - Utilities for logging etc # ##################################################### # OVERVIEW: # # Logging utilities # For enhancement ideas, see http://astropy.readthedocs.org/en/latest/logging.html ###################################################### """ import sys import re import os import time import stat import logging def python3(): return sys.version_info[0] >= 3 if python3(): import urllib.parse import urllib.request else: import urllib import traceback def notify_users(mail_form_url, subject, msg): "Uses simple form to send an email to fixed set of users" if not mail_form_url: return response = None try: if python3(): data = urllib.parse.urlencode({'subject': subject, 'message': msg}) response = urllib.request.urlopen(mail_form_url, data.encode('ascii')) else: data = urllib.urlencode({'subject': subject, 'message':msg}) response = urllib.urlopen(mail_form_url, data) except Exception as e: print("Failed to notify:", str(e)) sys.stdout.flush() return response def save_existing_file(file_name): "Ensures we don't overwrite existing files by renaming them" if os.path.exists(file_name): new_name = os.path.basename(file_name) (new_name, ext) = new_name.rsplit(".", 1) new_name = "%s-%s.%s" % (new_name, time.strftime("%Y%m%d%H%M%S", time.localtime()), ext) os.rename(file_name, new_name) return new_name return None def get_unique_file_name(file_path): "Ensures we have a unique file name" i = 1 ldir = os.path.dirname(file_path) (base_file_name, ext) = os.path.basename(file_path).rsplit(".", 1) while os.path.exists(file_path): file_path = os.path.join(ldir, "%s-%d.%s" % (base_file_name, i, ext)) i += 1 return file_path def get_log_file_name(): prefix = sys.argv[0].split(".py")[0] return get_unique_file_name(os.path.join(os.getcwd(), "log-%s-%s.log" % (os.path.basename(prefix), time.strftime('%Y%m%d%H%M%S', time.localtime())))) class ArgLogRecord(logging.LogRecord): """Custom formatting - just prints out any arguments passed""" def __init__(self, name, level, pathname, lineno, msg, args, exc_info, func=None, extra=None, sinfo=None): if sys.version_info[0] < 3 or \ (sys.version_info[0] == 3 and sys.version_info[1] < 2): logging.LogRecord.__init__(self, name, level, pathname, lineno, msg, args, exc_info, func) else: logging.LogRecord.__init__(self, name, level, pathname, lineno, msg, args, exc_info, func=func, extra=extra, sinfo=sinfo) def getMessage(self): """Return the message for this LogRecord. Just prints any arguments left over""" msg = str(self.msg) if self.args: try: msg = msg % self.args except TypeError as ex: msg += ", ".join([str(x) for x in self.args]) return msg class ArgLogger(logging.getLoggerClass()): "Specific logging class to use our logrecord" def __init__(self, name, **kwargs): logging.Logger.__init__(self, name, **kwargs) self.saved_output = [] self.saved_log = [] self.mail_form_url = None self.time_last_notified = time.time() def _saveRecord(self, record): "Save to circular buffers" line = record.getMessage() if record.levelno == logging.DEBUG: self.saved_log.append(line) if len(self.saved_log) > 100: del self.saved_log[0] else: self.saved_output.append(line) if len(self.saved_output) > 50: del self.saved_output[0] if time.time() - self.time_last_notified > 30 * 60: # 30 mins self.time_last_notified = time.time() notify_users(self.mail_form_url, "Regular update for %s" % sys.argv[0], "\n".join(self.saved_output)) def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None): if sys.version_info[0] < 3 or \ (sys.version_info[0] == 3 and sys.version_info[1] < 2): record = ArgLogRecord(name, level, fn, lno, msg, args, exc_info, func, extra) else: record = ArgLogRecord(name, level, fn, lno, msg, args, exc_info, func=func, extra=extra, sinfo=sinfo) self._saveRecord(record) return record def report_exception(self): "Notify users that we have had a problem" notify_users(self.mail_form_url, "Exception in %s" % sys.argv[0], "\n".join(self.saved_log)) def notify(self, subject, body): "Notify users of a message" return notify_users(self.mail_form_url, "%s %s" % (subject, sys.argv[0]), body + "\n".join([str(x) for x in self.saved_output])) def getLogger(logger_name, stream=sys.stdout): "Register our logger and initialise everything" logging.setLoggerClass(ArgLogger) logger = logging.getLogger(logger_name) if len(logger.handlers) > 0: # Only set them up once! return logger logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s:%(message)s') fh = logging.FileHandler(filename=get_log_file_name()) fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) logger.addHandler(fh) if stream: ch = logging.StreamHandler(stream) ch.setLevel(logging.INFO) ch.setFormatter(formatter) logger.addHandler(ch) return logger def test(): "Test the above" logger = getLogger('testlogger') logger.debug('debug message') logger.info('info message') logger.warn('warn message') logger.error('error message') logger.critical('critical message') logger.info("Some", "text", "params") logger.info([1, 2, "three", 1.2], "more") logger.info("Unicode text", u"file1uåäö") logger.info("Unicode text", '\ufffd') logging.shutdown() if __name__ == "__main__": test()