""" # logutils.py - Utilities for logging etc # ##################################################### # OVERVIEW: # # Logging utilities # ###################################################### """ import sys import re import os import time import stat def python3(): return sys.version_info[0] >= 3 if python3(): import urllib.parse import urllib.request else: import urllib import traceback NOTIFY_URL = "" # If blank no emails are sent def notify_users(subject, msg): "Uses simple form to send an email to fixed set of users" if not NOTIFY_URL: return request = None try: if python3(): request = urllib.request.Request(NOTIFY_URL, urllib.parse.urlencode({'subject': subject, 'message': msg})) else: request = urllib.urlopen(NOTIFY_URL, urllib.urlencode({'subject': subject, 'message':msg})) except Exception as e: print("Failed to notify:", str(e)) sys.stdout.flush() return request def save_existing_file(file_name): "Ensures we don't overwrite existing files by renaming them" if os.path.exists(file_name): new_name = os.path.basename(file_name) (new_name, ext) = new_name.rsplit(".", 1) new_name = "%s-%s.%s" % (new_name, time.strftime("%Y%m%d%H%M%S", time.localtime()), ext) os.rename(file_name, new_name) return new_name return None class Logger(object): "Do logging" def __init__(self, options=None): self.options = options prefix = sys.argv[0].split(".py")[0] self.logfile_name = os.path.join(os.getcwd(), "log-%s-%s.log" % (os.path.basename(prefix), time.strftime('%Y%m%d%H%M%S', time.localtime()))) self.logfile = open(self.logfile_name, "w") self.warning_count = 0 self.error_count = 0 self.p4warning_count = 0 self.p4error_count = 0 self.time_last_notified = time.time() self.saved_output = [] self.saved_log = [] self.output("Logging to: %s" % self.logfile_name) def save_log(self, line): "Save to circular buffer" self.saved_log.append(line) if len(self.saved_log) > 100: del self.saved_log[0] def save_output(self, line): "Save to circular buffer" self.saved_output.append(line) if len(self.saved_output) > 50: del self.saved_output[0] def log(self, *params): "Write to log file" line = ", ".join([str(x) for x in params]) self.logfile.write(line) self.logfile.write("\n") self.logfile.flush() self.save_log(line) def output(self, *params): "Print to stdout and log" timestamp = time.strftime('%Y/%m/%d %H:%M:%S', time.localtime()) self.log(timestamp, params) msg = "%s: %s" % (timestamp, ", ".join([str(x) for x in params])) print(msg) sys.stdout.flush() self.save_output(msg) if time.time() - self.time_last_notified > 30 * 60: # 30 mins self.time_last_notified = time.time() notify_users("Regular update for %s" % sys.argv[0], "\n".join(self.saved_output)) def report_exception(self, msg=None): "Notify users that we have had a problem" msg = "Exception caught:\n" + traceback.format_exc() for line in msg.split("\n"): self.output(line) notify_users("Exception in %s" % sys.argv[0], "\n".join(self.saved_log)) def log_warning(self, *params): "Logs warning messages" self.log("+++Warning:", params) self.warning_count += 1 def log_error(self, *params): "Logs error messages" self.log("@@@Error:", params) self.error_count += 1 def p4log(self, p4, *params): "Logs p4 warnings or errors" if len(p4.warnings) > 0: self.log_warning("p4:", p4.warnings, params) self.p4warning_count += 1 if len(p4.errors) > 0: self.log_error("p4:", p4.errors, params) self.p4error_count += 1 def notify(self, subject, body): "Notify users of a message" notify_users("%s %s" % (subject, sys.argv[0]), body + "\n".join(self.saved_output)) def report(self): "Print summary" self.output("Summary:") self.output("Warnings: ", self.warning_count) self.output("Errors:", self.error_count) self.output("P4Warnings:", self.p4warning_count) self.output("P4Errors:", self.p4error_count) class TimeRecorder(object): "Does basic timing of steps/phases" def __init__(self, logger): self.steps = [] self.logger = logger def add(self, msg): "Record a timer step" self.steps.append((msg, time.time())) num = len(self.steps) if num > 1: self.elapsed(self.steps[-1][0], num - 2, num - 1) def elapsed(self, msg, i, j): "Compute elapsed time between steps" text = "%s %.2f seconds %.2f minutes" % (msg, self.steps[j][1] - self.steps[i][1], (self.steps[j][1] - self.steps[i][1]) / 60) self.logger.output(text) def report(self): "Report timing summary" self.logger.output("Time summary") for i in range(1, len(self.steps)): self.elapsed(self.steps[i][0], i-1, i) self.elapsed("Total elapsed time:", 0, -1) def test(): "Test the above" options = None log = Logger(options) print(log.logfile_name) log.log("Some", "text", "params") log.log([1, 2, "three", 1.2], "more") if __name__ == "__main__": test()