package VCP::Filter::changesets; =head1 NAME VCP::Filter::changesets - Group revs in to changesets =head1 SYNOPSIS ## From the command line: vcp <source> changesets: ...options... -- <dest> ## In a .vcp file: ChangeSets: time <=60 ## seconds user_id equal ## case-sensitive equality comment equal ## case-sensitive equality branched_rev_branch_id equal ## change only one branch at a time =head1 DESCRIPTION This filter is automatically loaded when there is no sort filter loaded (both this and L<VCP::Filter::sort|VCP::Filter::sort> count as sort filters). =head2 Sorting by change_id, etc. When all revs from the source have change numbers, this filter sorts by change_id, branch_id, and name, regardless of the rules set. The name sort is case sensitive, though it should not be for Win32. This sort by change_id is necessary for sources that supply change_id because the order of scanning the revisions is not usually (ever, so far :) in change set order. =head2 Aggregating changes If one or more revisions arrives from the source with an empty change_id, the rules for this filter establish the conditions that determine what revisions may be grouped in to each change. In this case, this filter rewrites all change_id fields so that the (eventual) destination can use the change_id field to break the revisions in to changes. This is sometimes used by non-changeset oriented destinations to aggregate "changes" as though a user were performing them and to reduce the number of individual operations the destination driver must perform (for instance: VCP::Dest::cvs prefers to not call cvs commit all the time; cvs commit is slow). If you don't specify any conditions, a set of default conditions are used. If you specify any conditions at all, none of the default conditions are used. The conditions are specified as a set of rules. Each rule is a pair of words: a the name of the criterion (usually a data field of the revision) and the compatability specification. There are two compatability specifications: "equal" and "<=N" (where N is a number; note that no spaces are allowed before the number unless the spec is quoted somehow). "equal" states that all revisions in the same change must have identical values for the indicated field. So: user_id equal states that all revisions in a change must be submitted by the same user. The "equal" specification is available for all criterial. The "<=N" specification is only available for the "time" field. It specifices that no gaps larger than N seconds may exist in a change. The default conditions are: time <=60 ## seconds user_id equal ## case-sensitive equality comment equal ## case-sensitive equality branched_rev_branch_id equal ## change only one branch at a time The C<time <=60> condition sets a maximum allowable difference between two revisions; revisions that are more than this number of seconds apart are considered to be in different changes. The C<user_id equal> and C<comment equal> conditions assert that two revisions must be by the same user and have the same comment in order to be in the same change. The C<branched_rev_branch_id equal> condition is a special case to handle repositories like CVS which don't record branch creation times. This condition kicks in when a user creates several branches before changing any files on any of them then all of the branches get created at the same time. This condition also kicks in when multiple CVS branches exist with no changes on them. In this case, VCP::Source::cvs groups all of the branch creations after the last "real" edit. The C<branched_rev_branch_id> condition only applies to revisions branching from one branch in to another. Only C<time> and C<mod_time> can be used with a "<=" condition. All fields may be used with the the C<equal> condition, which is case sensitive equality. =head1 ALGORITHM =head2 handle_rev() As revs are received by handle_rev(), they are store on disk. Several RAM-efficient (well, for Perl) data structures are built, however, that describe each revision's children and its membership in a changeset. Some or all of these structures may be moved to disk when we need to handly truly large data sets. =head3 The ALL_HAVE_CHANGE_IDS statistic One statistic that handle_rev() gathers is whether or not all revisions arrived with a non-empty change_id field. =head3 The REV_COUNT statistic How many revisions have been recieved. This is used only for UI feedback; primarily it is to forewarn the downstream filter(s) and destination of how many revisions will constitute a 100% complete transfer. =head3 The CHANGES list As each rev arrives, it is placed in a "protochange" determined solely by the revision's fields in the rules list with an "equal" condition. Protochanges are likely to have too many revisions in them, including revisions that descend from one another and revisions that are too far apart in time. =head3 The CHANGES_BY_KEY index The categorization of each revision in to changes is done by forming a key string from all the fields in the rules list with the "equal" condition. This index maps unique keys to changes. =head3 The CHILDREN index This is an index of all revisions that are direct offspring of a revision. =head3 The REVS_BY_CHANGE_ID index If all revs do indeed arrive with change_ids, they need to be sorted and sent out in order. This index is gathered until the first rev with an empty change_id arrives. =head3 The ROOT_IDS list This is a list of the IDs of all revisions that have no parent revisions in this transfer. This is used as the starting point for send_changes(), below. =head2 handle_footer() All the real work occurs when handle_footer() is called. handle_footer() glances at the change_id statistic gathered by handle_rev() and determines whether it can sort by change_id or whether it has to perform change aggregation. If all revisions arrive with a change_id, sort_by_change_id_and_send() If at least one revision didn't handle_footer() decides to perform change aggregation by calling split_protochanges() and then send_changes(). Any source or upstream filter may perform change aggregation by assigning change_ids to all revisions. VCP::Source::p4 does this. At the time of this writing no otherd do. Likewise, a filter like VCP::Filter::StringEdit may be used to clear out all the change_ids and force change aggregation. =head2 sort_by_change_id_and_send() If all revisions arrived with a change_id, then they will be sorted by the change_id and sent out in change_id, branch_id, and name and sent on. There is no provision in this filter for ignoring change_id other than if any revisions arrive with an empty change_id, this sort is not done. =head2 split_protochanges() Once all revisions have been placed in to protochanges, each protochange's revisions are sorted in time order and then the protochange is split in to smaller protochanges a gap of more than the number of seconds given in a "<=" rule. =head3 The CHANGES_BY_REV index As the large protochanges are split in to smaller ones, the resulting CHANGES list is indexed by, among other things, which revs are in the change. This is so the algorithms can quickly find what change a revision is in when it's time to consider sending that revision. =cut $VERSION = 1 ; use strict ; use VCP::Logger qw( lg pr BUG ); use VCP::Debug qw( :debug ); use VCP::Utils qw( empty ); use VCP::Filter; use VCP::Rev; use VCP::DB_File; ## TODO: move pack_values and unpack_values in to Utils use VCP::DB_File::big_records; use base qw( VCP::Filter ); ## A change handle is a number from 0..N. ## A change key is the catenation of all fields that are configured to ## be "equal". This is useful until all revs have been received, then ## is discarded as the changes to that point are re-split based on the "<=N" ## rules. use fields ( 'CHILDREN', ## A HASH keyed on a rev's id of ARRAYs of the ## rev's children's ids. 'REV_COUNT', ## How many revs we received 'ALL_HAVE_CHANGE_IDS', ## Set if all incoming revs have change_ids 'REVS_BY_CHANGE_ID', ## HASH of change_id => \@rev_ids 'CHANGES_BY_KEY', ## A HASH of change keys to change handles 'CHANGE_KEY_SUB', ## Returns the change key for a rev 'CHANGES', ## An ARRAY of changes: each change is a list of ## packed strings, one per rev. The first field in ## the pack is the rev_id, the second is it's time. 'CHANGES_BY_REV', ## Which change each revision is a member of. This ## is used when several changes have the same timestamp ## and we want to avoid sending a change for which we ## don't have all the revisions ready to go. It ## is not valid until the initial changes are split ## by time. 'ROOT_IDS', ## Ids of parentless revs. This is built by handle_rev() ## and send_changes() uses it to seed ## the wavefront clustering algorithm. 'REVS_DB', ## A temporary data store of revisions 'INDEX_COUNT', ## How many indexes have been assigned to revs 'INDEXES_BY_ID', ## What index each rev has ); sub _compile_change_key_sub { my VCP::Filter::changesets $self = shift; my ( $rules ) = @_; my @code; for ( @$rules ) { my ( $field, $cond ) = map lc, @$_; if ( $cond eq "equal" && $field ne "branched_rev_branch_id" ) { push @code, <<CODE; \$r->$field, CODE } } @code = ( <<'PREAMBLE', @code, <<'POSTAMBLE' ); #line 1 VCP::Filter::changesets::add_rev() sub { my ( $r ) = @_; VCP::DB_File->pack_values( map defined $_ ? $_ : "", PREAMBLE ); } POSTAMBLE debug @code if debugging; unless ( $self->{CHANGE_KEY_SUB} = eval join "", @code ) { my $x =$@; lg "$x:\n", @code; die $x; } } sub new { my $class = ref $_[0] ? ref shift : shift; my $self = $class->SUPER::new( @_ ) ; ## Parse the options my ( $spec, $options ) = @_ ; $options ||= []; $self->_compile_change_key_sub( $self->parse_rules_list( $options, "Field", "Condition", [ ## default rules [qw( time <=60 )], [qw( user_id equal )], [qw( comment equal )], [qw( branched_rev_branch_id equal )], ] ) ); return $self ; } sub filter_name { return "ChangeSets" } sub sort_keys { my $self = shift; return qw( change_id ); } sub revs_db { my VCP::Filter::changesets $self = shift; $self->{REVS_DB}; } sub r_index { my VCP::Filter::changesets $self = shift; my ( $id ) = @_; return exists $self->{INDEXES_BY_ID}->{$id} ? $self->{INDEXES_BY_ID}->{$id} : ( $self->{INDEXES_BY_ID}->{$id} = $self->{INDEX_COUNT}++ ); } sub store_rev { my VCP::Filter::changesets $self = shift; my ( $r ) = @_; my $id = $r->id; my $r_index = $self->r_index( $id ); $self->revs_db->set( [ $r_index ], $r->serialize ); return $r_index; } sub destore_rev { my VCP::Filter::changesets $self = shift; my ( $r_index ) = @_; my $r = VCP::Rev->deserialize( $self->revs_db->get( [ $r_index ] ) ); BUG "vcp: $_ not found" unless $r; return $r; } sub handle_header { my VCP::Filter::changesets $self = shift; $self->{REV_COUNT} = 0; $self->{ALL_HAVE_CHANGE_IDS} = 1; $self->{REVS_BY_CHANGE_ID} = {}; $self->{CHANGES_BY_KEY} = {}; $self->{CHANGES} = []; $self->{INDEX_COUNT} = 0; $self->{INDEXES_BY_ID} = {}; my $store_loc = $self->tmp_dir; $self->{REVS_DB} = VCP::DB_File::big_records->new( StoreLoc => $store_loc, TableName => "revs", ); $self->revs_db->delete_db; $self->revs_db->open_db; $self->SUPER::handle_header( @_ ); } sub handle_rev { my VCP::Filter::changesets $self = shift; my ( $r ) = @_; my $r_index = $self->store_rev( $r ); if ( $self->{ALL_HAVE_CHANGE_IDS} ) { if ( empty $r->change_id ) { if ( $self->{REV_COUNT} ) { pr "only first ", $self->{REV_COUNT}, " revisions had change_ids"; } $self->{ALL_HAVE_CHANGE_IDS} = 0; %{$self->{REVS_BY_CHANGE_ID}} = (); } else { $self->{REVS_BY_CHANGE_ID}->{$r->change_id} .= "$r_index;"; } } my $change_key = $self->{CHANGE_KEY_SUB}->( $r ); my $change_index = exists $self->{CHANGES_BY_KEY}->{$change_key} ? $self->{CHANGES_BY_KEY}->{$change_key} : do { push @{$self->{CHANGES}}, ""; $self->{CHANGES_BY_KEY}->{$change_key} = $#{$self->{CHANGES}}; }; $self->{CHANGES}->[$change_index] .= "$r_index," . ( $r->time || 0 ) . ";"; if ( empty $r->previous_id ) { push @{$self->{ROOT_IDS}}, $r_index; } else { ## It's a descendant node, note its parentage and stow it for later $self->{CHILDREN}->[ $self->r_index( $r->previous_id ) ] .= $r_index . ";"; } ++$self->{REV_COUNT}; } sub split_protochanges { ## Scan through $self->{CHANGES} and split them apart based on time gaps. ## CHANGES coming in are arrays of packed strings; going out, they're ## arrays of rev ids. Also sets CHANGES_BY_REV so that we can get each ## rev's change in order to figure out if we have any complete changes. my VCP::Filter::changesets $self = shift; ## On entry each change is a list of packed strings with the rev's id ## in the first field and the time of the rev in the second. That's ## changed by each loop. $self->{CHANGES_BY_REV} = []; my @new_changes; for ( @{$self->{CHANGES}} ) { ## Split each change in to multiple changes, if need be and make ## each change be a list of [ min_time, @ids ], ready for sorting my @rev_recs = sort { $a->[1] <=> $b->[1] } map [ split /,/ ], split /;/, $_; ## The in-time order is needed here, but is also taken advantage ## of in send_changes(). my ( $r_index, $t ) = @{shift @rev_recs}; my $new_change = "$r_index;"; $self->{CHANGES_BY_REV}->[$r_index] = $#new_changes + 1; my $min_time = $t; for ( @rev_recs ) { my ( $r_index, $t ) = @$_; if ( $t > $min_time + 60 ) { ## TODO: make 60 a param push @new_changes, $new_change; $new_change = "$r_index;"; $min_time = $t; } else { $new_change .= "$r_index;"; } $self->{CHANGES_BY_REV}->[$r_index] = $#new_changes + 1; } push @new_changes, $new_change; } $self->{CHANGES} = \@new_changes; } sub send_changes { my VCP::Filter::changesets $self = shift; ## Some shortcuts my $changes = $self->{CHANGES}; my $changes_by_rev = $self->{CHANGES_BY_REV}; my $children = $self->{CHILDREN}; my @cur_indexes = @{$self->{ROOT_IDS}}; ## The set of all revisions that have no unset ancestors but which ## have not been sorted in to changes to send. my %cur_changes; ## The set of changes that we are currently growing. As revs are ## consumed from @cur_indexes, they are added to changes. This is ## a HASH rather than an array because it's a very sparse space, ## we hope. my $change_number = 1; ## Humans start counting at 1. while ( @cur_indexes || keys %cur_changes ) { debug "ids to place in changes:\n", map " $_\n", @cur_indexes if debugging; debug "Cur changes:\n", map { ( " $_:\n", map( " " . $_->id . "\n", @{$cur_changes{$_}->{Revs}} ), map( " f:$_\n", @{$cur_changes{$_}->{FutureIndexes}} ) ) } sort keys %cur_changes if debugging; for my $r_index ( splice @cur_indexes ) { my $r = $self->destore_rev( $r_index ); my $change_index = $changes_by_rev->[$r_index]; ## The 0 based offset of the current change in the changes ## array. my $change = $cur_changes{$change_index}; if ( !$change ) { my @future_indexes = grep $_ != $r_index, split /;/, $changes->[$change_index]; $changes->[$change_index] = undef; ## As a result of split_protochanges(), the ids are ## in time order. ## We leave CHANGES unmolested, though maybe we should ## shrink the array or undef it. $cur_changes{$change_index} = $change = { Index => $change_index, MinTime => $r->time || 0, Revs => [ $r ], FutureIndexes => \@future_indexes, }; } else { push @{$change->{Revs}}, $self->destore_rev( $r_index ); @{$change->{FutureIndexes}} = grep $_ != $r_index, @{$change->{FutureIndexes}}; } } my $cur_change; { my $min_time; ## Find the oldest complete change ## The sort is for repeatability for ( sort { $a->{Index} <=> $b->{Index} } values %cur_changes ) { next if @{$_->{FutureIndexes}}; if ( ! defined $min_time || $_->{MinTime} < $min_time ) { $min_time = $_->{MinTime}; $cur_change = $_; } } } if ( !$cur_change ) { ## No complete changes. Break one up. my $min_time; my @oldest_changes; ## The sort is for repeatability for ( sort { $a->{Index} <=> $b->{Index} } values %cur_changes ) { if ( ! $min_time || $_->{MinTime} < $min_time ) { $min_time = $_->{MinTime}; @oldest_changes = ( $_ ); } elsif ( $_->{MinTime} == $min_time ) { push @oldest_changes, $_; } } BUG "\@oldest_changes empty" unless @oldest_changes; ## For now, just grab the first one. ## TODO: look through the changes and find the one with the ## largest gap after one of the @{$_->{Revs}}. This will require ## loading the first rev in @{$_->{FutureIndexes}} and getting its time, ## but that's ok. We could also choose a change that will free ## up some other complete change, but that's more subtle. lg "splitting change $change_number"; $cur_change = shift @oldest_changes; $changes->[$cur_change->{Index}] = join ";", @{delete $cur_change->{FutureIndexes}}; ## Leave all unprocess changes from this change behind. } ## Set the change_id for each rev to be sent. ## Move children of the change we're sending in to @cur_indexes. ## We're sending all their parents, so we'll never have a ## chicken-and-egg problem in %cur_changes. for my $r ( @{$cur_change->{Revs}} ) { $r->change_id( $change_number ); my $r_index = $self->{INDEXES_BY_ID}->{$r->id}; push @cur_indexes, split /;/, $children->[$r_index] if $children->[$r_index]; } lg "change $change_number: " . @{$cur_change->{Revs}} . " revs"; ## Do this last just to not send a partial change. If an error or ## segfault arises in an earlier loop, doing this should make the ## resulting state cleaner. $self->dest->handle_rev( $_ ) for @{$cur_change->{Revs}}; delete $cur_changes{$cur_change->{Index}}; ++$change_number; } } sub _d($) { defined $_[0] ? $_[0] : "" } sub sort_by_change_id_and_send { my VCP::Filter::changesets $self = shift; pr "sorting by change_id"; for my $change_id ( sort { VCP::Rev->cmp_id( $a, $b ) } keys %{$self->{REVS_BY_CHANGE_ID}} ) { my @rev_indexes = split /;/, delete $self->{REVS_BY_CHANGE_ID}->{$change_id}; lg "change $change_id: " . @rev_indexes . " revs"; debug "change $change_id:\n", map " $_\n", @rev_indexes if debugging; my @revs; for ( @rev_indexes ) { push @revs, $self->destore_rev( $_ ); } for my $r ( sort { _d $a->branch_id cmp _d $b->branch_id || $a->name cmp $b->name } @revs ) { $self->dest->handle_rev( $r ); } } } sub handle_footer { my VCP::Filter::changesets $self = shift; $self->SUPER::rev_count( $self->{REV_COUNT} ); $self->{ALL_HAVE_CHANGE_IDS} ? $self->sort_by_change_id_and_send : do { pr "aggregating changes"; $self->split_protochanges; $self->send_changes; }; $self->SUPER::handle_footer( @_ ); } =head1 LIMITATIONS This filter does not take the source_repo_id in to account: if somehow you are merging multiple repositories in to one and want to interleave the commits/submits "properly", ask for advice. =head1 AUTHOR Barrie Slaymaker <barries@slaysys.com> =head1 COPYRIGHT Copyright (c) 2000, 2001, 2002 Perforce Software, Inc. All rights reserved. See L<VCP::License|VCP::License> (C<vcp help license>) for the terms of use. =cut 1
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#31 | 5403 | Barrie Slaymaker | - Misc logging, maintainability & debugging improvements | ||
#30 | 4516 | Barrie Slaymaker | - VCP::Filter::changesets supports VCP::Rev::release_ids | ||
#29 | 4507 | Barrie Slaymaker |
- RevML: - added <action>, removed <delete>, <placeholder> and <move> - added <from_id> for clones (and eventually merge actions) - Simplified DTD (can't branch DTD based on which action any more) - VCP::Source::cvs, VCP::Filter::changesets and VCP::Dest::p4 support from_id in <action>clone</action> records - VCP::Dest::perl_data added - VCP::Rev::action() "branch" added, no more undefined action strings - "placeholder" action removed |
||
#28 | 4475 | Barrie Slaymaker |
- my ... if code replaced (thanks, clkao) - exists added (thanks, clkao) |
||
#27 | 4079 | Barrie Slaymaker |
- ChangeSets filter is now 40% or so more memory efficient on large repositories |
||
#26 | 4021 | Barrie Slaymaker |
- Remove all phashes and all base & fields pragmas - Work around SWASHGET error |
||
#25 | 4012 | Barrie Slaymaker | - Remove dependance on pseudohashes (deprecated Perl feature) | ||
#24 | 3996 | Barrie Slaymaker | - Make unknown rev times be ASAP | ||
#23 | 3970 | Barrie Slaymaker |
- VCP::Source handles rev queing, uses disk to reduce RAM - Lots of other fixes |
||
#22 | 3942 | Barrie Slaymaker | - ChangeSets now passes tests | ||
#21 | 3930 | Barrie Slaymaker |
- VCP::Source::cvs and VCP::Dest::p4 handle cloning deletes - "placeholder" actions and is_placeholder_rev() deprecated in favor of is_branch_rev() and is_clone_rev(). - Misc cleanups and minor bugfixes |
||
#20 | 3916 | Barrie Slaymaker | - Reduce memory consumption | ||
#19 | 3911 | Barrie Slaymaker | - ChangeSets no longer deletes a required ARRAY ref | ||
#18 | 3906 | Barrie Slaymaker | - ChangeSets filter now copes with cloned revs | ||
#17 | 3903 | Barrie Slaymaker | - Minor debug logging impovements | ||
#16 | 3902 | Barrie Slaymaker |
- VCP::Filter::changesets does not aggregate creation of multiple branches |
||
#15 | 3855 | Barrie Slaymaker |
- vcp scan, filter, transfer basically functional - Need more work in re: storage format, etc, but functional |
||
#14 | 3850 | Barrie Slaymaker | - No longer stores all revs in memory | ||
#13 | 3830 | Barrie Slaymaker | - VCP::Filter::changesets now ready for disk storage | ||
#12 | 3807 | Barrie Slaymaker | - changesets filter no longer needs VCP::Rev::previous() | ||
#11 | 3769 | Barrie Slaymaker | - avg_comment_time sort key removed | ||
#10 | 3762 | Barrie Slaymaker | - Changeset aggregation now works even when not debugging | ||
#9 | 3737 | Barrie Slaymaker |
- Missing / empty data fields other than branch_id no longer affect changeset aggregation - Needed for VSS, which lacks time information on deletes - If all revs have change_ids, they are now sorted by change_id and name (as opposed to change_id, rev_id) - VCP::Rev::sort_time() removed - VCP::Filter::changesets has better debugging - TestUtils now dumps large diffs |
||
#8 | 3706 | Barrie Slaymaker | - VCP gives some indication of output progress (need more) | ||
#7 | 3702 | Barrie Slaymaker |
- Sort and ChangeSets filters now allow unneeded VCP::Rev objects to be garbage collected. |
||
#6 | 3496 | Barrie Slaymaker | - VSS branching | ||
#5 | 3465 | Barrie Slaymaker | - VCP::Filters can now dump they're sections. | ||
#4 | 3160 | Barrie Slaymaker | Emit "sorting by change_id" to STDERR, too. | ||
#3 | 3155 | Barrie Slaymaker |
Convert to logging using VCP::Logger to reduce stdout/err spew. Simplify & speed up debugging quite a bit. Provide more verbose information in logs. Print to STDERR progress reports to keep users from wondering what's going on. Breaks test; halfway through upgrading run3() to an inline function for speed and for VCP specific features. |
||
#2 | 3127 | Barrie Slaymaker | Minor cleanups. | ||
#1 | 3120 | Barrie Slaymaker | Move changeset aggregation in to its own filter. |