#!/usr/local/bin/perl
#
# This script replicates a local (secondary) server from a source
# (like a primary server).
# It is used in the Perforce replicate command to replay the source journal
# on the local server.
# The Perforce replicate command should be spelled like this:
# p4 -p MASTER:PORT replicate
# -s STATE # local file to track the most recent journal position
# -J PREFIX # journal prefix used on the master
# -i 0 # optionally to disable polling
# PATH/p4admin_replicate # this script
# -v # verbose mode
# -port NNNN # port of the local replica
# -srchost MASTER # master host name where ,v files live
# -srctop DIR # directory where ,v files live on the master
# -log # log everything (that's the default anyway)
#
# Example:
# p4 -p perforce-00-blr:1666 replicate
# -s /export/journal/perforce/1666/replica.state
# -J /export/journal/perforce/1666/journal
# -i 0
# /export/db/perforce/1666/admin.support/p4admin_replicate
# -v
# -port 1666
# -srchost perforce-00-blr
# -srctop /export/data/perforce/1666
# -log
#
# Note: the current code expects the pipe from "replicate" to be closed
# eventually. Only then rsync will copy archive files. This way, it is
# easier to synchronize (and possibly minimize) rsync activity.
# This means that the -k replicate option is not used.
#
# Michael Mirman
# MathWorks, Inc. 2010-2011
use strict;
use warnings;
use Cwd qw(abs_path);
use File::Basename qw(basename dirname fileparse);
use Getopt::Long qw(GetOptions);
use Parallel::ForkManager;
use Pod::Usage qw(pod2usage);
our ($Mydir, $Myname);
BEGIN {
($Myname, $Mydir) = fileparse($0);
$Mydir = abs_path($Mydir);
unshift @INC, $Mydir, dirname($Mydir) . '/lib';
}
use MW::Util::Mail qw(sendmsg);
use p4admin_backup; # settings, P4LOGDIR, msg
# Global variables shared with or defined in p4admin_backup.pm
our $HOST = hostname;
our ($logfile, $preview_only);
our $notify = $HOST =~ /^ perforce /x
? 'p4-help@mathworks.com'
: $HOST =~ /^ scmtest /x
? 'perforce-admin@mathworks.com'
: '';
our $verbose = 0; # this is the same $verbose as in p4admin_backup.pm
my @orig_args = @ARGV;
my $port; # host:port specifies the target Perforce server
my ($repeat_interval, $srchost, $srctop);
$logfile = 1; # default: create log file
my $nproc = 10; # default max number of parallel rsync processes
my @target_dirs; # srchost:srctop will be mirrored to these directories
#
# Parse arguments
GetOptions(
'data2=s' => \@target_dirs,
'help' => sub { pod2usage( -verbose => 2, -exit => 0 ) },
'log!' => \$logfile, # log file to redirect the output
'mail=s' => \$notify, # email result there
'n' => \$preview_only,
'nproc=i' => \$nproc,
'port=s' => \$port, # port of the local replica
'repeat=i' # run continuously, repeating itself no more
# frequently than this number of minutes
=> sub { $repeat_interval = $_[1] * 60 },
'srchost=s' => \$srchost,
'srctop=s' => \$srctop,
'v+' => \$verbose,
)
or die "$Myname: Error parsing arguments\n";
if ( ! $port ) {
print "No local port specified.\n";
pod2usage( -verbose => 2,
-exitval => 2 );
}
if ( ! $srchost ) {
print "No source host specified.\n";
pod2usage( -verbose => 2,
-exitval => 2 );
}
if ( ! $srctop ) {
print "No source top directory specified.\n";
pod2usage( -verbose => 2,
-exitval => 2 );
}
(my $prefix = $Myname) =~ s/p4admin_//;
p4admin_begin($port, "$prefix.log");
$SIG{TERM} = $SIG{HUP} = $SIG{QUIT} = $SIG{INT} = sub {
my ($sig) = @_;
msg(0, "Signal $sig received.\n", Carp::longmess(), "\n");
if ( $sig eq 'INT' ) {
my $msg = "Replication was interrupted by a $sig signal.\n"
. "You must verify the integrity of the archive\n"
. "\n--$Myname on behalf of the Perforce Administrator\n";
if ( $notify ) {
sendmsg({Subject => "Replication interrupted on $HOST",
Message => $msg,
To => $notify,
verbose => $verbose,
});
}
msg(0, "You must verify the integrity of the archive\n");
exit 9;
}
msg(0, "We need to finish this batch of changes\n");
};
#
# Replicate
# p4 [ -j token ][ -s statefile ][ -i interval ][ -k -x ]
# [ -J prefix ][ -o output ][ command ]
#
# -j token
# Specify a journal number or position token of the form journalnum/byteoffset from which to start replicating metadata. If this flag is specified, it overrides any state file specification.
# -s statefile
# Specify a state file which tracks the most recent journal position.
# -i interval
# Specify a polling interval, in seconds. The default is two seconds. To disable polling (that is, to check once for updated journal entries and then exit), specify an interval of 0.
# -J prefix
# Specifies a filename prefix for the journal, such as that used with p4d -jc prefix
# -k
# Keep the pipe to the command subprocess open between polling intervals.
# -x
# Exit the p4 replicate command when journal rotation is detected.
# -o savefile
# Specify a file for output. If a command subprocess is specified, both the subprocess and the specified savefile are provided with the output.
# "command" in p4 replicate above is *this* script, so we are reading
# journal records from STDIN and should pass them to p4d -r ROOT -b 1 -jrc -
my $p4 = P4($port);
my $p4d = P4D($port);
my $logdir = P4LOGDIR($port);
my $statefile = "$logdir/replica.state";
my $RSYNC = '/usr/bin/rsync';
# From p4d reference on
# http://www.perforce.com/perforce/doc.092/manuals/p4sag/aa_p4d.html#1043673:
#
# -b bunch -jr file
# Read bunch lines of journal records, sorting and removing duplicates before updating the database. The default is 5000, but can be set to 1 to force serial processing. This combination of flags is intended for use with by replica servers started with the p4 replicate command.
#
# -jrc file
# Journal-restore with integrity-checking. Because this option locks the database, this option is intended only for use by replica servers started with the p4 replicate command.
#
# See
# http://www.perforce.com/perforce/doc.current/manuals/p4sag/10_replication.html
# for the detailed explanation of this command.
# We'll pipe all the records there, and p4d will then terminate.
my $replicate = "$p4d -r " . P4ROOT($port) . " -f -jrc -";
unshift @target_dirs, P4DATA($port);
open my $OUT, "| $replicate"
or die "Cannot open pipe to $replicate: $!\n";
my %need2copy;
while ( <STDIN> ) {
msg(2, "READ: $_");
print $OUT $_;
#
# Ref: http://www.perforce.com/perforce/doc.current/schema/index.html
# We are interested in db.rev records because they determine
# what archive files we need to copy
#
# Example:
# @pv@ 8 @db.rev@ @//sandbox/mmirman/doc/new/foo.2@ 1 0 3 3 1277910491 1277910483 F78464B431A46634558D8B20219B729A 17 0 1 @//sandbox/mmirman/doc/foo@ @1.2@ 0
my ($record_type, $depotFile, $depotRev, $type, $action, $change, $date,
$modTime, $digest, $size, $traitLot, $lbrIsLazy)
= m{ ^ \@.v\@ \s \d+ \s \@(db\.rev\w*)\@
\s \@ //([^@]+) \@ # The file name
\s (\S+) # The revision number
\s (\S+) # The file type of the revision
\s (\S+) # The action that created the revision
\s (\S+) # The changelist that created the revision
\s (\S+) # The date/time the changelist that created
# the revision was submitted
\s (\S+) # The timestamp on the file in the user's
# workspace when the revision was submitted
\s (\S+) # The MD5 digest of the revision
\s (\S+) # The size of the file in bytes
\s (\S+) # Group of traits (attributes) associated
# with the revision.
\s (\S+) # Flag specifying whether or not the revision
# gets its content from another file (i.e.
# whether or not depotFile and lbrFile differ)
\s
}smx
or next;
msg(1, "READ: $_")
if $verbose < 2; # dont repeat the same record twice
# From http://www.perforce.com/perforce/r10.1/schema:
# db.revcx Secondary index of db.rev
# db.revdx Revision records for revisions deleted at the head revision.
# db.revhx Revision records for revisions NOT deleted at the head revision
# db.revpx Pending revision records.
# exclude db.revcx, db.revdx, db.revhx, db.revpx records
if ( $record_type =~ m{^db.rev[cdhp]x$} ) { # ,v files didn't change
msg(0, "Record $record_type can be skipped\n");
next;
}
# exclude lazy copies in integrations (when lbrIsLazy is true in db.rev)
# exclude those that don't exist in the source -
# they were lazy copies and then got deleted
if ( $lbrIsLazy || ( ($digest =~ /^0+$/) && ($size < 0) ) ) {
msg(0, "Lazy copy does not require an rsync\n");
next;
}
my $depot_dir = dirname($depotFile);
$need2copy{$depot_dir} ||= $record_type;
} # read STDIN to the end
#
# Hypothetically speaking, we could figure out whether we deal with
# binary files, in which case we need to copy *,d subdirectories.
# It's too much trouble for the wrong decision here.
# So, we will copy *recursively* in every case.
# Therefore, we need to exclude those directories that are subdirectories
# of those we will copy anyway.
#
my $copy_limit = 15; # max number of trying to mirror
my $JUST_A_FEW = 1; # bat/branch/foo will be mirrored, but bat/branch won't
my @failed2copy;
while ( %need2copy && ( $copy_limit-- >= 0 ) ) {
# copy_all does the actual copy and removes from %need2copy
# all directories we successfully copied (or didn't have to copy)
copy_all(\%need2copy);
# If we fail to mirror something, try to mirror its parent
for my $dir ( sort keys %need2copy ) {
my $parent = dirname($dir);
if ( ($parent =~ tr{/}{}) > $JUST_A_FEW ) {
msg(0, "We will try to copy $parent",
" since we could not copy one of its subdirectories\n");
$need2copy{$parent} = 'try-again';
}
else {
msg(0, "Parent directory of $dir",
" is too short to try to mirror it\n");
push @failed2copy, $dir;
delete $need2copy{$dir};
}
}
}
@failed2copy = sort { $a cmp $b } (@failed2copy, keys %need2copy);
if ( @failed2copy ) {
my $msg = join '', "Failed to copy the following directories:\n",
(map { " $_\n" } @failed2copy),
"This requires an investigation and fixing the data.\n",
"If\n p4 -p $HOST:$port verify\n",
"starts failing, the data on this replica will become unreliable.\n",
"\n--$Myname on behalf of the Perforce Administrator\n";
if ( $notify ) {
sendmsg({Subject => "Failure in replication on $HOST",
Message => $msg,
To => $notify,
verbose => $verbose,
});
}
else {
print "NOT sending the following message to perforce-admin:\n", $msg;
}
}
my $rc = @failed2copy;
msg(0, "Finished replaying journal for $port. Errors: $rc\n");
p4admin_end($repeat_interval, $port);
exit $rc;
#
# Mirror given directories.
# Returns the number of directories we failed to copy.
# If it returns 0, we are happy.
#
sub copy_all {
my ($need2copy) = @_;
my @need2copy = sort keys %{ $need2copy };
SKIP_SUBDIR:
for ( my $i=$#need2copy; $i >= 1; $i-- ) {
for ( my $j=0; $j < $i; $j++ ) {
if ( $need2copy[$i]
=~ m{ ^ \Q$need2copy[$j]\E # $j is a parent of $i
/. }smx ) {
msg(0, "$need2copy[$i] does not have to be copied separately:",
" it is a subdirectory of $need2copy[$j]\n");
delete $need2copy->{$need2copy[$i]};
next SKIP_SUBDIR;
}
}
}
my $rsync_err = 0;
my $pm = Parallel::ForkManager->new($nproc);
$pm->run_on_finish(sub {
my ($pid, $code, $dir, $sig) = @_;
msg(0,
"Process $pid finished copying $dir with code $code\n");
if ( $code ) {
$rsync_err++;
}
else {
msg(0, "(pid=$$)",
" We won't try to copy $dir any more\n");
delete $need2copy->{$dir};
}
});
for my $depot_dir ( sort keys %{ $need2copy } ) {
my $msg_prefix = "$need2copy->{$depot_dir}: Mirroring $depot_dir";
for my $target_dir ( @target_dirs ) {
my $msg_target = (@target_dirs > 1 ? " to $target_dir" : '');
if ( my $pid = $pm->start($depot_dir) ) { # this does the fork.
msg(0, "$msg_prefix$msg_target in process $pid\n");
next;
}
# This is a child process
my $rc = copy_one($depot_dir, $target_dir);
msg(0, $rc ? "All attempts to copy $depot_dir failed (code=$rc)\n"
: "Copying $depot_dir$msg_target succeeded in pid=$$\n");
$pm->finish($rc); # Terminates the child process
} # for each target directory
} # for each directory we need to copy
# Unclear how critical is to notify $notify about this kind of error.
# We probably need to collect some statistics about any fall-out's from
# these errors. - 8/3/2010, MM
close $OUT
or msg(0, $! ? "Syserr closing pipe to $replicate:\n $!\n"
: "ERROR running $replicate (exit code: "
. ( $? > 255 ? $?>>8 : $? ) . ")\n");
msg(0, "Waiting for completion...\n");
$pm->wait_all_children;
msg(0, "Total number of errors from this copying: $rsync_err\n")
if $rsync_err;
return $rsync_err;
} # copy_all
# Copy one directory (this happens in a child process)
sub copy_one {
my ($depot_dir, $target_dir) = @_;
# Replace the SIG defined in p4admin_backup.pm, so we would not send
# email from every thread.
$SIG{__DIE__} = sub {
msg(0, "$Myname: Child Process $$ was terminated:\n", @_);
exit 13;
};
# In order to use relative paths, we need to be in the right
# directories on both machines
chdir $target_dir
or die "Unexpected failure to chdir to $target_dir",
" in the child process $$";
-d $depot_dir
or mkpath $depot_dir; # or croak
(my $dir_nospace = $depot_dir) =~ s/ /\\ /g;
my $cmd = $RSYNC
# - copy files recursing the directory (dont try to be too
# smart to select specific files);
# - preserve modification times;
# - verbose mode;
# - quote arguments in case of there are spaces;
# - use --delete because subdirectories and files related
# to deleted shelved changes go away and we need to mirror
# that (g659635).
. " -av --delete '$srchost:$srctop/$dir_nospace/' $dir_nospace/";
# rsync sometimes fails. Make three attempts not counting those
# where we can identify problems as those we can ignore.
my $limit = 3;
msg(0, "Copying: $cmd\n");
my $i = 0;
my $rc;
while ( $i <= $limit ) {
# Sometimes, we mirror gecks very quickly (and frequently).
# This may cause temp files to come and vanish while we are running
# rsync.
# If we see messages like
# file has vanished: "/export/data/perforce/1666/meta/job/tmp.16119.207"
# or
# ssh_exchange_identification: Connection closed by remote host
# we should re-try rsyncing.
my $known_error = 0;
open my $PIPE, '-|', "$cmd 2>&1"
or die "Cannot start pipe to '$cmd': $!";
while ( <$PIPE> ) {
print;
$known_error = 1
if m{^file has vanished: \S+/tmp[.\d]+.?$}
|| m{ssh_exchange_identification: Connection closed};
}
close $PIPE;
$rc = $? > 255 ? $?>>8 : $?;
if ( $rc == 0 ) {
last;
}
$i++;
msg(0, "Attempt $i. Copying of $depot_dir failed in pid=",
"$$ with code $rc\n");
if ( $known_error # "known" errors don't $limit us, but we'll be
&& # reasonable and won't make more than 100 attempts anyway
$limit < 100 ) {
$limit++;
}
# a small delay is helpful if there is a temporary network problem
sleep 1;
}
return $rc;
} # copy_one
=head1 NAME
p4admin_replicate
=head1 SYNOPSIS
p4admin_replicate -help
p4admin_replicate -port NNNN -backup host:port
=head1 DESCRIPTION
Replicate primary server on a local server.
Options:
-data2 dir additional target directory to mirror to
(the source is always srchost:srctop)
(one target directory is always the data directory of
the local replica determined by -port)
-log redirect the output to automatically created log (default)
-nolog send all output to stdout
-mail user send email with the result to the specified user
-n preview: shows what would be done
-nproc N syncing of the data files can be in N parallel processes
-port NNNN this local server is a replica of the source
-srchost host mirror data from this host
-srctop dir data on srchost to mirror are in this directory
-v verbose mode
-v -v even more verbose
=head1 EXAMPLES
Continuously replicate primary server perforce:1666 on the local host 1777:
p4 -p perforce:1666 replicate -s DIR1/replicate.state DIR2/p4admin_replicate -v -port 1777
=cut