# # Python module for common Perforce activities related to the Journal # daemon. # import Setup, cutil, listener, os import marshal class IJournalFile(listener.IPoller): def __init__(self): listener.IPoller.__init__(self) self._entry = None self._sequence = 0 self._prevSequence = None def nextLine(self): ''' Returns a touple: the first element is the next line in the journal file (according to the file's policy), or None if there isn't another line; and the second element is a boolean indicating whether the journal was rotated underneath us. If it was rotated, then there's a chance that we missed some journal data. A good assumption one can make: - if the script was running before the rotation - if the returned value is (None, True) - then we can assume that we are running actively enough to have caught all the data. - But otherwise, if the rotation is True, then we can only assume that we missed some data, and additional means must be taken. ''' return (None, False) def _readLine(self, f): x = f.readline() if len(x) <= 0: x = None return x def next(self): ''' For use by the ListenerRunner. ''' x = self.parseNext() if x.incomplete: x = None return x def parseNext(self): ''' Parses the next journal entry, and returns the corresponding Journal type. If there is no more data, then an entry with no data associated with it is returned. If the current entry is incomplete (in the case of multi-line entries), then the entry object will have no data and the "incomplete" flag will be set. The rotate flag will be set in the returned object if the journal was rotated at all. If a possible error due to rotation in the journal occurs, then the entry will be marked with the 'badRotate' flag set to True. ''' (line, didRotate) = self.nextLine() entry = self._entry if entry == None: entry = JournalEntry() entry.addLine(line, didRotate) if entry.incomplete: if line != None: cutil.log(cutil.DEBUG, ("Added journal line [", line, "], but entry is incomplete")) self._entry = entry else: cutil.log(cutil.DEBUG, ("Added journal line [", line, "], completing the entry")) self._entry = None entry.finalize() return entry def _readFirstLine(self, f): ''' Special code for reading the first line of the Journal file. It returns the sequence number for the given journal file, or None if there was an error reading the first line. ''' import time #cutil.log(cutil.DEBUG, ("Reading first line of journal file [", # f, "], current sequence = ", self._sequence)) # Read the @vv@ record at the beginning of the journal. Since the # server truncates the journal without locking it first during # journal rotation, several attempts at reading the @vv@ record # are needed. The retry logic is needed since it is possible that # a single read here could occur between the inode updated as a # result of truncating the journal and the inode updated as a # result of writing of the @vv@ record. oldSeq = self._sequence pos = f.tell() f.seek(0) try: for attempts in range(1,10): line = f.readline() if len(line) > 0: break cutil.log(cutil.DEBUG, ("Did not find any data in file [", f,"], pos = ", f.tell())) time.sleep(1) if len(line) < 32: # special case: on the very first startup of Perforce, # if it hasn't created any db files ever, then it will # record some "interesting" data points. if line.startswith('@pv@ 0 @db.counters@ @upgrade@ 1'): self._sequence = 0 else: cutil.log(cutil.ERR, ("First journal line is too short (", line, ")")) self._sequence = None elif line[0:5] != "@vv@ ": # special case: on the very first startup of Perforce, # if it hasn't created any db files ever, then it will # record some "interesting" data points. if line.startswith('@pv@ 0 @db.counters@ @upgrade@ 1'): self._sequence = 0 else: cutil.log(cutil.ERR, ("First journal line does not set counter (", line, ")")) self._sequence = None else: version = int(line[5]) if version != 0: cutil.log(cutil.ERR, ("Unknown journal version (", version, ")")) self._sequence = None elif line[6:6+25] != " @db.counters@ @journal@ ": cutil.log(cutil.ERR, ("First @vv@ line is not setting the journal (", line, ")")) self._sequence = None else: # this will ignore the \r\n or \n at the end of the file self._sequence = int(line[6+25:]) changed = oldSeq != None and self._sequence != oldSeq if changed: self._prevSequence = oldSeq return changed finally: # return the file to the read position it was last at before # calling this method. f.seek(pos) class JournalEntry: def __init__(self): self.lines = [] self.fields = [] self._remainder = None self.didRotate = False self.badRotate = False self.incomplete = True def finalize(self): pass def addLine(self, line, didRotate): # if the didRotate flag is set once, then it will always be set. self.didRotate = self.didRotate or didRotate if didRotate and line != None: self.badRotate = True if line != None: self.lines.append(line) # split up the fields if self._remainder == None: self._remainder = '' self._remainder += line if self._remainder.count('@') > 0 \ and self._remainder.count('@') % 2 == 1: # there's another line expected self.incomplete = True else: lastAt = self._remainder.rindex('@') pos = 0 s = len(self._remainder) while pos < s: pos = self._parseField(self._remainder, pos, s) self.incomplete = False self._remainder = None def _parseField(self, line, pos, size): if line[pos] == '@': sm1 = size - 1 first = pos + 1 while pos < size: pos += 1 if line[pos] == '@': if pos == sm1: self._addField(line[first : pos]) return size elif line[pos+1] != '@': self._addField(line[first : pos]) pos += 1 if line[pos] == ' ': pos += 1 if pos <= sm1 and line[pos] == '\r': pos += 1 if pos < sm1 and line[pos] == '\n': pos += 1 return pos # else we are escaping the @ sign # there was an error in the field - don't add it return size else: # not @ wrapped last = size if line.count(' ',pos) > 0: last = line.index(' ',pos) self._addField(line[pos : last]) return last + 1 def _addField(self, field): if field != None: self.fields.append(field) class FullJournalFile(IJournalFile): ''' Knows how to handle a local Journal file. It only knows how to read the file from start to finish. ''' def __init__(self, filename): ''' filename: the file name of the journal file. ''' IJournalFile.__init__(self) self._jname = filename if filename[-3:] == '.gz': self.compressed = True import gzip self._f = gzip.open(filename, 'r') else: self.compressed = False self._f = file(filename, 'r') cutil.log(cutil.INFO, ("Opening full journal file ", filename, ", compressed = ", self.compressed)) self._firstCall = True self._readFirstLine(self._f) def __del__(self): self._f.close() def nextLine(self): x = self._firstCall self._firstCall = False return (self._readLine(self._f), x) class TailJournalFile(IJournalFile): ''' Knows how to handle a local live Journal file. It only handles the reading from the last known position, rather than reading in the whole file. It should correctly handle journal roll-over. ''' def __init__(self, filename, datafile, keepOpen = True): ''' filename: the file name of the journal file. datafile: file which contains persistent data about where the Joural reader last left off, so that restarts can occur without needless or incorrect syncs to the remote server. ''' IJournalFile.__init__(self) self._jname = filename self._tmpfile = datafile self._keepOpen = keepOpen self._f = None self._prevSequence = None cutil.log(cutil.INFO, ("Tailing journal file ", filename)) self.__deserialize() pass def __del__(self): if self._f != None: self._f.close() self._f = None def nextLine(self): ''' Read the next line from the journal based on where we last were, and assume that the file was rolled back if it doesn't look the same or if the journal size is less than the last read size. On any rotate, we need to return (None, True). ''' # discover if the file has changed if not os.path.exists(self._jname): cutil.log(cutil.WARN, "Journal file (%s) does not exist right now." % self._jname) self.rotate() self._serialize() if self._sequence == None: self._sequence = -1 return (None, True) nextline = None didRotate = False f = self.getJournal() try: # get the size of the file f.seek(0,2) fileLength = f.tell() nextpos = self._pos # check the length of the file against the last discovered # size... if fileLength == 0: # short-hand for all the other code in this method. # Early out if the journal is empty. if self._pos > 0: self.rotate() self.__serialize() else: self.rotate() self._sequence = -1 return (None, True) elif self._lastsize > fileLength: # the file has shrunk, so assume that the file was rotated. cutil.log(cutil.DEBUG, ("File has shrunk since we last ", "saw it. Assume it rotated.")) self.rotate() self._readFirstLine(f) self.__serialize() return (None, True) else: # the journal is bigger or the same as we remember. # This could be the same journal, or maybe a different one. # there's the slim chance that we're not looking at the # same file. #cutil.log(cutil.DEBUG, ("Check for a changed sequence")) changed = self._readFirstLine(f) #cutil.log(cutil.DEBUG, ("File is bigger: sequence = ", # self._sequence, ", prev = ", self._prevSequence)) if changed: # rotate! cutil.log(cutil.DEBUG, ("Rotated but journal is bigger: previous sequence = ", self._prevSequence, ", sequence = ", self._sequence)) self.rotate() self.__serialize() return (None, True) elif self._lastsize == fileLength \ and (self._lastline == None \ or self._pos + len(self._lastline) == fileLength): # there was nothing new added to the file. # Note that we don't need to serialize, since none of our # statistics were updated. #cutil.log(cutil.DEBUG, ("previous sequence = ", # self._prevSequence, ", sequence = ", self._sequence)) return (None, False) self._lastsize = fileLength f.seek(self._pos) if self._lastline != None: # now ensure that the line at "pos" is in fact the # last line we read. If this isn't the case, then the # file has changed underneath us, and we assume it's been # a rotation. If there was a rotation, and the journal is # now bigger than the last position we read, and this # precise location contains a line of text exactly like # before the rotation, then we don't catch the rotate. However, # we'll consider this a highly unlikely situation. line = self._readLine(f) if line != self._lastline: self.rotate() self.__serialize() return (None, True) else: # we are reading at the head of the new file changed = self._readFirstLine(f) cutil.log(cutil.DEBUG, ("New file read: previous sequence = ", self._prevSequence, ", sequence = ", self._sequence)) if changed: # ensure that we're pointing to the begining of the file self.rotate() didRotate = True # read in the current line nextpos = f.tell() nextline = self._readLine(f) if nextline != None: self._pos = nextpos self._lastline = nextline self.__serialize() cutil.log(cutil.DEBUG, ("Read line [", self._lastline, "]")) finally: self.closeJournal() # now, remember where we were for future runs. Don't put this # in the finally clause return (nextline, didRotate) def rotate(self, keepSequence = True): ''' Update internal records to represent the rotation of the journal file. ''' cutil.log(cutil.INFO, "Detected a rotated journal.") self._lastline = None self._lastsize = 0 self._pos = 0 if not keepSequence: self._sequence = None def getJournal(self): if self._f == None: self._f = file(self._jname, 'rb') return self._f def closeJournal(self): if self._f != None and not self._keepOpen: self._f.close() self._f = None def _getSerialStruct(self): return { 'pos': self._pos, 'lastline': self._lastline, 'lastsize': self._lastsize, 'sequence': self._sequence, 'prevSequence': self._prevSequence } def _putSerialStruct(self, struct): self._pos = struct['pos'] self._lastline = struct['lastline'] self._lastsize = struct['lastsize'] self._sequence = struct['sequence'] self._prevSequence = struct['prevSequence'] def __serialize(self): f = file(self._tmpfile, 'w+b') try: marshal.dump(self._getSerialStruct(), f) finally: f.close() def __deserialize(self): o = { 'pos': 0, 'lastline': None, 'lastsize': 0, 'sequence': 0, 'prevSequence': None } if os.path.exists(self._tmpfile): f = file(self._tmpfile, 'rb') try: o = marshal.load(f) except: cutil.log(cutil.ERR, "Problem reading journal position file (%s)" % self._tmpfile) f.close() self._putSerialStruct(o) class RotateJournal(IJournalFile): ''' A combination of a TailJournalFile and a FullJournalFile. It handles the return code of rotation, correctly reading from the archived journal files. ''' def __init__(self, lastCheckpoint, tailJournal, rotateJournal): ''' By providing the lastCheckpoint value, you tell the RotateJournal the farthest back it can check for earlier journal files. The use-case is where a checkpoint was restored on the backup server, then you want to start the hot backup from there. In this case, you would want to remove the journal position file, and set the lastCheckpoint to the number that was just restored. ''' IJournalFile.__init__(self) self.tail = tailJournal self.rotatedTail = False self.rotateSpec = rotateJournal self.rotatedStack = [] if lastCheckpoint == None: lastCheckpoint = 0 self.earliestJournal = lastCheckpoint def nextLine(self): ''' Knows how to load the rotated journals ''' if len(self.rotatedStack) > 0: next = self.rotatedStack[-1] (line, rotated) = next.nextLine() while line == None or len(line) == 0: # at this point, the last item in the rotated stack # is at the end, so go to the next journal in the stack # and read from it. self.rotatedStack.pop() if len(self.rotatedStack) == 0: break next = self.rotatedStack[-1] cutil.log(cutil.INFO, ("Switching to journal file [", next._f, "]")) (line, rotated) = next.nextLine() if line != None and len(line) > 0: # at least one of the rotated journals had an entry we # could read. return (line, False) # else, fall through to use the tail journal state = self.tail._getSerialStruct() (line, rotated) = self.tail.nextLine() if rotated: # get the previous file, and jump to where we last left off cutil.log(cutil.VERBOSE, "Detected a rotation, so finding the previous journal.") if self.tail._sequence == None: # an error in the journal file raise Exception, "Unrecoverable error: not a recognizable journal file" if self.tail._sequence > self.earliestJournal: seq = self.tail._sequence jf = None if state['sequence'] == None or state['sequence'] < self.earliestJournal: state['sequence'] = self.earliestJournal if seq < state['sequence']: # is this the right thing, or is it a hack? cutil.log(cutil.DEBUG, ("resetting the seq hack")) seq = state['sequence'] cutil.log(cutil.INFO, ("Searching for journal with sequence number ", state['sequence'], ", current = ", seq)) isFirstJournal = True while seq >= self.earliestJournal and jf == None: jf = self.getJournalForSequence(seq) if jf == None: if isFirstJournal: # the very first journal: may not have a backup # in the archives, in which case we ignore this # missing file. seq -= 1 isFirstJournal = False continue else: # a missing file in between the live journal and the # one we're looking for. This is definitely wrong. break isFirstJournal = False cutil.log(cutil.DEBUG, ("Found journal with sequence number ", jf._sequence)) if jf._sequence == state['sequence']: # Continuing from where we last left off self.rotatedStack.append(jf) if state['lastline'] != None: jf._f.seek(state['pos']) line = self._readLine(jf._f) if line != state['lastline']: cutil.log(cutil.FATAL, ("Journal file ", jf._f, " has same sequence number as the ", "previous journal we were tailing, but ", "doesn't look the same. Must continue ", "from a checkpoint (read prev line = [", line, "], stored prev line = [", state['lastline'],"]).")) raise Exception, "Unrecoverable error: Must continue from a checkpoint." # keep jf != None #elif jf._sequence == None and seq == self.earliestJournal: # # There's one situation where this isn't applicable: # # if you're restoring the server from a checkpoint, # # and the original journal isn't used. else: # go back further # make sure that when we get to this journal, we read # it from the start. jf._f.seek(0) self.rotatedStack.append(jf) jf = None # do the subtraction at the end, so that we can # search for the sequence matching our number, not just # those before our number seq -= 1 if jf == None: cutil.log(cutil.FATAL, "Could not find a rollback file far enough back.") raise Exception, "Unrecoverable error: Must continue from a checkpoint." else: (line, x) = jf.nextLine() rotated = False return (line, rotated) def getJournalForSequence(self, sequence): ''' Takes the sequence number for the currently loaded journal, and retrieves the previous journal file. If the previous journal file was zipped, then it will be uncompressed. Returns the FullJournalFile version of the file. ''' lastFile = "%s.jnl.%d" % (self.rotateSpec, sequence) if not os.path.exists(lastFile): lastFile = lastFile + ".gz" if not os.path.exists(lastFile): cutil.log(cutil.ERR, ("Could not find journal file with sequence ", sequence, " at [", Setup.SERVER.rotateJournal, "]")) return None cutil.log(cutil.INFO, ("Loading journal file [", lastFile, "]")) #try: return FullJournalFile(lastFile) #except: # import sys # cutil.log(cutil.ERR # return None