/* Copyright (c) 2011, Perforce Software, Inc. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE SOFTWARE, INC. BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /* User contributed content on the Perforce Public Depot is not supported by Perforce, although it may be supported by its author. This applies to all contributions even those submitted by Perforce employees. */ /* p4 -> p4broker -> p4d | queuer | lock_file broker_queuer.cc is an example program to illustrate one way to pause a stream of incoming commands when they enter the broker, with the intent that the target server is down for some reason and needs a temporary reprieve before the commands resume - e.g. swapping p4d executables to a new patch release. This program is spawned by the Broker on every incoming command. It looks for a command named 'broker_queuer' and either pauses traffic or resumes it based off of the argument. You can configure various properties via its entry in the Broker's configuration file. There's the number of commands to queue before issuing rejection messages, the timeout for queued commands before rejection, a file containing the rejection message, the name of the lock file to use as a semaphore and a slash-separated list of user names allowed to run the broker_queuer command. See the "USAGE" comment in main() for arguments. Note that you probably don't want to queue too many commands or the Broker machine will run out of resources. Note that a paused command won't necessarily get to the target server ahead of a command issued later (after a 'resume' function) since there's a sleep() call when paused. This program relies on various POSIX functions, and won't work without modification on Windows. To compile, you need a C++ compiler (like g++ or clang++.) Since the end-to-end time it takes for this program to run is important (overhead imposed on every client command!) we want to make it as fast as possible. To make a fast C++ executable in terms of startup speed, the following g++ invokation seems pretty good. It's about ~10% slower than plain C, but that's probably good enough. g++ -static -O2 -ffunction-sections -fdata-sections -s -Wl,--gc-sections -fmerge-all-constants -fno-default-inline -fno-inline -m32 -o cc cc.cc strip --strip-all cc Here's a sample entry for the Broker configuration file: command : * { action = filter; # Be careful here. Bad arguments will cause the queuer to emit an error # which in turn will reject all incoming commands! execute = "/p4/broker_queuer 1000 10 msg_file lock_file super/super1"; } $Id: $ $DateTime: $ $Author: jason_gibson $ $Change: $ */ # include <iostream> # include <string> # include <sstream> # include <map> # include <set> # include <vector> # include <fstream> # include <sys/stat.h> // stat # include <string.h> // strerror # include <errno.h> # include <stdlib.h> // exit # include <sys/file.h> // flock # include <sys/time.h> // gettimeofday const char *version = "$Id:$"; const char *prog_name = NULL; using namespace std; // Report a fatal error with a diagnostic message. // Only works for single-line errors? void bail( const char *err ) { cout << "action: REJECT" << endl << "message: ERROR with broker script '" << prog_name << "': " << err << endl; exit( 1 ); } void pass() { cout << "action: PASS" << endl; exit( 0 ); } // Send a message back to the user. void reject( const char *msg ) { cout << "action: REJECT" << endl << "message: \"" << msg << "\"" << endl; exit( 0 ); } void reject( const string msg ) { reject( msg.c_str() ); } // Read in a whole file. If it's already opened, preserve its offset. const string read_file( const char *file, FILE *handle = NULL ) { bool needs_close = false; FILE *fh = NULL; if( !handle ) { fh = fopen( file, "r" ); needs_close = true; } else fh = handle; const long pos = ftell( fh ); char cbuf[ 1024 ]; memset( cbuf, '\0', 1024 ); string sbuf; rewind( fh ); while( fread( cbuf, sizeof( char ), 1023, fh ) > 0 ) { sbuf += cbuf; if( feof( fh ) ) break; } fseek( fh, pos, SEEK_SET ); if( needs_close ) fclose( fh ); return sbuf; } void pause_cmds( const char *file ) { struct stat sb; if( !stat( file, &sb ) ) reject( "Error: Can't pause already-paused server." ); char tmp[ 512 ]; memset( tmp, '\0', 512 ); strcpy( tmp, "bqbbq-XXXXXX" ); int tmp_ret = mkstemp( tmp ); if( tmp_ret == -1 ) { string err( "Couldn't create temporary lock file name: " ); err += strerror( errno ); reject( err ); } FILE *fh = fopen( tmp, "r+" ); if( !fh ) { string err( "Couldn't open tmp lock file: " ); err += strerror( errno ); reject( err ); } fprintf( fh, "0\n" ); fclose( fh ); if( rename( tmp, file ) == -1 ) { string err( "Couldn't rename tmp lock file: " ); err += strerror( errno ); reject( err ); } } void resume_cmds( const char *file ) { struct stat sb; if( stat( file, &sb ) == -1 ) reject( "Error: Can't unpause un-paused server." ); unlink( file ); } enum QINFO { QUEUEING, PASSING, REJECTING }; QINFO queue_info( const char *file, const int max_queued ) { struct stat sb; if( stat( file, &sb ) == -1 ) return PASSING; FILE *fh = fopen( file, "r" ); if( !fh ) { string err( "Couldn't read queue lock file: " ); err += strerror( errno ); reject( err ); } const int fd = fileno( fh ); if( flock( fd, LOCK_SH ) == -1 ) { string err( "Couldn't lock lock file: " ); err += strerror( errno ); reject( err ); } const int count = atoi( read_file( file, fh ).c_str() ); fclose( fh ); if( count >= max_queued ) return REJECTING; else return QUEUEING; } void queue( const char *file, const int num ) { struct stat sb; if( stat( file, &sb ) == -1 ) reject( "Error: Can't unpause un-paused server." ); FILE *fh = fopen( file, "r+" ); if( !fh ) // todo: consider just pass() in this case? { string err( "Couldn't read queue lock file: " ); err += strerror( errno ); reject( err ); } const int fd = fileno( fh ); if( flock( fd, LOCK_EX ) == -1 ) { string err( "Couldn't lock lock file: " ); err += strerror( errno ); reject( err ); } const int count = atoi( read_file( file, fh ).c_str() ); if( ftruncate( fd, 0 ) == -1 ) { string err( "Couldn't truncate lock file: " ); err += strerror( errno ); reject( err ); } rewind( fh ); // Don't accidentally go below zero. const int new_val = count + num > 0 ? count + num : 0; const int printed = fprintf( fh, "%d\n", new_val ); if( !printed ) { string err( "Couldn't update queue lock file: " ); err += strerror( errno ); reject( err ); } fclose( fh ); } # define DBG if( false ) int main( const int argc, const char **argv ) { if( argc == 2 ) { const string arg( argv[ 1 ] ); if( arg == "-v" ) { cout << "Version: " << version << endl; return 0; } } prog_name = argv[ 0 ]; // USAGE: bq.exe max_queued max_wait msg_file lock_file super1/sup2 // // E.G.: bq.exe 1000 60 /p4/broker/queue_reject.txt \ // /p4/broker/queue_lock bob/alice/trent/walter // // Note that any double-quotes in the message file will be stripped. if( argc != 6 ) bail( "invalid usage: tell your admin!" ); const int max_queued = atoi( argv[ 1 ] ); const int max_wait = atoi( argv[ 2 ] ); const string reject_msg = read_file( argv[ 3 ] ); const char *lock_file = argv[ 4 ]; set < string > super_users; map < string, string > cmd_info; // Split-up the list of super users. { stringstream ss( argv[ 5 ] ); string s; while( getline( ss, s, '/' ) ) super_users.insert( s ); } DBG cout << "action: REJECT" << endl << "message: \"" << "max_queued: " << max_queued << endl << "max_wait : " << max_wait << endl << "lock_file : " << lock_file << endl << "supers : " << argv[ 5 ] << endl << "reject_fil: " << argv[ 3 ] << endl << "reject_msg: " << reject_msg << endl; // Parse the connection details from the broker. while( cin ) { string l, k, v; stringstream ss; getline( cin, l ); ss << l; while( ss >> k, ss >> v ) { k.erase( k.length() - 1, 1 ); DBG cout << "'" << k << "' -> '" << v << "'" << endl; cmd_info[ k ] = v; } } DBG cout << "\"" << endl; // Before queueing, check if the incoming command is for us. if( cmd_info[ "command" ] == "broker_queuer" ) { set< string >::iterator it; it = super_users.find( cmd_info[ "user" ] ); // Silently pass-through with the default "Unknown command..." error // if the request came from a non-superuser. if( it == super_users.end() ) pass(); const string cmd( cmd_info[ "Arg0" ] ); if( cmd == "pause" ) pause_cmds( lock_file ); else if( cmd == "resume" ) resume_cmds( lock_file ); else reject( "broker_queuer: pause | resume" ); reject( "Done." ); } // Handle all other commands. int queued = 0; while( true ) { const QINFO status = queue_info( lock_file, max_queued ); if( status == PASSING ) pass(); if( status == REJECTING && !queued ) reject( reject_msg ); if( status == QUEUEING || queued ) { if( !queued ) queue( lock_file, 1 ); // todo: perhaps sleep a random period? usleep( 250000 ); // 1/4 second queued++; if( queued >= max_wait ) { queue( lock_file, -1 ); reject( reject_msg ); } } } // Fail-safe. pass(); }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#1 | 7955 | Jason Gibson |
Example program to pause the stream of incoming commands to the broker. |