rpc.cc #1

  • //
  • guest/
  • perforce_software/
  • p4/
  • 2014-1/
  • rpc/
  • rpc.cc
  • View
  • Commits
  • Open Download .zip Download (26 KB)
/*
 * Copyright 1995, 1996 Perforce Software.  All rights reserved.
 *
 * This file is part of Perforce - the FAST SCM System.
 */

/*
 * Rpc.cc - remote procedure service
 *
 * The general flow of the client/server protocol:
 *
 * 	The client sends a message containing the user's request to the 
 *	server and then dispatches, i.e. reads and executes requests from
 *	the server until the server sends the "release" message.
 *
 *	The server initially dispatches, i.e. reads and executes a
 *	request from the user.  The final act of the request should
 *	either be to release the client (send the "release" message), 
 *	or send a message to the client instructing it to invoke
 *	(eventually) another server request.  In the end, the final
 *	message sent to the client should be "release".
 */

# include <stdhdrs.h>

# include <signaler.h>

# include <debug.h>
# include <tunable.h>
# include <strbuf.h>
# include <strdict.h>
# include <strarray.h>
# include <strtable.h>
# include <error.h>
# include <errorlog.h>
# include <tracker.h>
# include <timer.h>
# include <md5.h>
# include <ticket.h>

# include <keepalive.h>
# include <netportparser.h>
# include <netconnect.h>
# include <netbuffer.h>

# include <rpcdebug.h>
# include <msgrpc.h>
# include <p4tags.h>

# ifdef USE_SSL
extern "C" {	// OpenSSL

# include "openssl/bio.h"
# include "openssl/ssl.h"
# include "openssl/err.h"

}
# endif //USE_SSL
# include "enviro.h"
# include "filesys.h"
# include "pathsys.h"
# include "rpc.h"
# include "rpcbuffer.h"
# include "rpctrans.h"
# include "rpcdispatch.h"
# include "rpcdebug.h"
# include "rpcservice.h"
# include "netsslcredentials.h"

const char *RpcTypeNames[] = {
	"",		//    RPC_CLIENT = 0,
	"rmt: ",	//    RPC_REMOTE,
	"px: ",		//    RPC_PX,
	"",		//    RPC_RH,
	"bgd: ",	//    RPC_BACKGROUND,
	"pxc: ",	//    RPC_PXCLIENT,
	"rpl: ",	//    RPC_RPL,
	"bs: ",		//    RPC_BROKER,
	"bc: ",		//    RPC_BROKER_TO_SERVER,
	"rauth: ",	//    RPC_RMTAUTH,
	"sbx: ",        //    RPC_SANDBOX,
	"ukn: ",        //    RPC_UNKNOWN
	"dmrpc: ",      //    RPC_DMRPC
	0
};

RpcService::RpcService()
{
	// Initialize our members.

	dispatcher = new RpcDispatcher;
	protoSendBuffer = new RpcSendBuffer;
	endPoint = 0;
	openFlag = RPC_CONNECT;

	// Load up core dispatch routines
	Dispatcher( rpcServices );
}

RpcService::~RpcService()
{
	delete dispatcher;
	delete endPoint;
	delete protoSendBuffer;
}

/**
 * RpcService::SetEndpoint
 *
 * @brief Creates an communication's endpoint for either listening or connecting.
 *
 * @param  address for socket
 * @param  error structure pointer
 * @see NetEndPoint::Create
 */
void
RpcService::SetEndpoint( const char *addr, Error *e )
{
	delete endPoint;
	endPoint = NetEndPoint::Create( addr, e );
}

/*
 * RpcService::Dispatcher() - add another list of function to call on received RPC
 */

void		
RpcService::Dispatcher( const RpcDispatch *dispatch )
{
	dispatcher->Add( dispatch );
}

void
RpcService::Listen( Error *e )
{
	// initialize listen

	openFlag = RPC_LISTEN;
	endPoint->Listen( e );

	if( e->Test() )
	{
	    // Failed open -- clear open flag so connect bails, too.

	    e->Set( MsgRpc::Listen ) << endPoint->GetAddress();
	    openFlag = RPC_NOOPEN;
	    return;
	}
}

void
RpcService::GetHost( StrPtr *peerAddress, StrBuf &hostBuf, Error *e )
{
	hostBuf.Clear();
	NetEndPoint *ep = NetEndPoint::Create( peerAddress->Text(), e );
	if( !e->Test() )
	    hostBuf.Set( ep->GetPrintableHost() );
	delete ep;
	return;
}

void
RpcService::ListenCheck( Error *e )
{
	endPoint->ListenCheck( e );
}

int
RpcService::CheaterCheck( const char *port )
{
	return endPoint->CheaterCheck( port );
}

void
RpcService::Unlisten()
{
	if( endPoint )
	    endPoint->Unlisten();
}

void
RpcService::GetExpiration( StrBuf &buf )
{
	if( endPoint )
	    endPoint->GetExpiration( buf );
}

void
RpcService::SetProtocol( const char *var, const StrRef &value )
{
	protoSendBuffer->SetVar( StrRef( (char*)var ), value );
}

void
RpcService::SetProtocolV( const char *arg )
{
	StrBuf s;
	const char *p;

	if( p = strchr( arg, '=' ) )
	{
	    s.Set( arg, p - arg );
	    protoSendBuffer->SetVar( s, StrRef( (char *)p + 1 ) );
	}
	else
	{
	    protoSendBuffer->SetVar( StrRef( (char *)arg ), StrRef::Null() );
	}
}
void
RpcService::GetMyFingerprint( StrBuf &value )
{
	endPoint->GetMyFingerprint( value );
}

int
RpcService::IsSingle()
{
	return endPoint ? endPoint->IsSingle() : 0;
}

static void RpcCleanup( Rpc *r )
{
	r->FlushTransport();
}
/*
 * Rpc
 */

Rpc::Rpc( RpcService *s )
{
	recvBuffering = 0;

	service = s;
	transport = 0;
	forward = 0;

	sendBuffer = new RpcSendBuffer;
	recvBuffer = new RpcRecvBuffer;
	protoDynamic = new StrBufDict;

	duplexFsend = 0;
	duplexFrecv = 0;
	duplexRsend = 0;
	duplexRrecv = 0;

	dispatchDepth = 0;
	endDispatch = 0;

	protocolSent = 0;
	protocolServer = 0;

	rpc_hi_mark_rev = 
	rpc_hi_mark_fwd = p4tunable.Get( P4TUNE_RPC_HIMARK );
	rpc_lo_mark = p4tunable.Get( P4TUNE_RPC_LOWMARK );

	TrackStart();

	timer = new Timer;
}

Rpc::~Rpc( void )
{
	signaler.DeleteOnIntr( this );
	Disconnect();

	delete sendBuffer;
	delete recvBuffer;
	delete protoDynamic;
	delete timer;
}

void
Rpc::DoHandshake( Error *e )
{
	if( transport )
	    transport->DoHandshake(e);
}

void 
Rpc::ClientMismatch( Error *e )
{
	if( transport )
	    transport->ClientMismatch( e );
}

void
Rpc::SetProtocolDynamic( const char *var, const StrRef &value )
{
	protoDynamic->ReplaceVar( var, value.Text() );
}

void
Rpc::ClearProtocolDynamic( const char *var )
{
	protoDynamic->RemoveVar( var );
}

void
Rpc::Connect( Error *e )
{
	// Don't allow double connects.

	if( transport )
	{
	    e->Set( MsgRpc::Reconn );
	    return;
	}

	// If an operation fails, cruft can be left in the sendBuffer.
	// Since we may use this buffer over for the next connection, we
	// need to clear it now.

	sendBuffer->Clear();

	duplexFsend = 0;
	duplexFrecv = 0;
	duplexRsend = 0;
	duplexRrecv = 0;
	dispatchDepth = 0;
	endDispatch = 0;
	protocolSent = 0;
	re.Clear();
	se.Clear();

	// Create the transport layer, 
	// Cleanup code is in Disconnect().

	NetTransport *t = 0;

	switch( service->openFlag )
	{
	case RPC_LISTEN:   
	    t = service->endPoint->Accept( e );
	    break;

	case RPC_CONNECT:  
	    t = service->endPoint->Connect( e );
	    break;

	default:    	    
	    e->Set( MsgRpc::Unconn );
	}

	if( e->Test() )
	{
	    if( t )
		delete t;

	    // connect errors leave send & receive errors
	    // so as to intercept Invoke() and Dispatch()
	    re = *e;
	    se = *e;
	    return;
	}

	// Pass transport onto the buffering routines.

	transport = new RpcTransport( t );

	// If rpc.himark tuned beyond net.bufsize, must tell NetBuffer.

	transport->SetBufferSizes( rpc_hi_mark_fwd, rpc_hi_mark_rev );

	if( service->openFlag == RPC_CONNECT )
	    signaler.OnIntr( (SignalFunc)RpcCleanup, this );
}

void
Rpc::GetEncryptionType( StrBuf &value )
{ 
	transport->GetEncryptionType( value );
}


void
Rpc::GetPeerFingerprint( StrBuf &value )
{
	transport->GetPeerFingerprint( value );
}

void
Rpc::GetExpiration(StrBuf &value)
{
    if( service )
	service->GetExpiration( value );
}

/*
 * Rpc::SetHiMark() - move rpc_hi_mark beyond 2000 default
 *
 *	If we get reliable send/receive TCP buffer sizes from the client
 *	(they often differ from each other and from the server's), we
 *	can feel confident about moving our himark off the default 2000.
 *
 *	The thinking as of Feb 2009 is that one direction of a TCP
 *	connection can have data outstanding that amounts to the receiver's
 *	SO_RCVBUF.  Many systems can take more -- we tried the sum of
 *	the sender's SO_SNDBUF and the receiver's SO_RCVBUF -- but some
 *	systems can't.  Solaris, for example, starts to close its window
 *	when it receives more than SO_RCVBUF, keeps accepting data until
 *	the window is 0, and won't reopen it until the application reads
 *	all outstanding data.  Weird and unfortunate, because we generally
 *	read in small chunks.
 *
 *	Sometimes we're metering client->server data, because the
 *	server->client side is clogged with file data (sync uses
 *	InvokeDuplex()).  Other times we're metering server->client data,
 *	because the client->server side is clogged (resolve, submit use
 *	InvokeDuplexRev()).  Since the server and client may have different
 *	SO_RCVBUF sizes, and we want to make the most of the connection,
 *	we keep separate rpc_hi_mark_fwd and rpc_hi_mark_rev limits.  We
 *	miss out when we meter data but neither side is clogged (diff,
 *	open, revert).  Our high mark could be somewhere around the sum
 *	of the SO_RCVBUFs for the client and server in this case, since
 *	the metered data could wait in either buffer.   But we're not
 *	yet that sophisticated.
 *
 *	Note that TCP limits its window size to the minimum of the
 *	sender's SO_SNDBUF and receiver's SO_RCVBUF, and that limits the
 *	amount of data travelling on the network in that direction.  If
 *	our himark were lower than the window size, we would artificially
 *	limit that amount of data further.  Thus we'd like the himark
 *	high.  Fortunately, we seem safe in making it SO_RCVBUF.
 *
 *	A SO_SNDBUF larger than the SO_RCVBUF (and thus the TCP window)
 *	should have no impact.  A SO_RCVBUF larger than the SO_SNDBUF
 *	(and thus a himark larger than the TCP window) is beneficial in
 *	the case where we're metering data on one side but the other
 *	side is not clogged.  It allows more data to be in transit on
 *	the other side of the connection.
 */

void
Rpc::SetHiMark( int sndbuf, int rcvbuf )
{
	// For testing and emergencies setting rpc.himark overrides all.

	if( p4tunable.IsSet( P4TUNE_RPC_HIMARK ) )
	    return;

	const int minHiMark = p4tunable.Get( P4TUNE_RPC_HIMARK );

	rpc_hi_mark_fwd = transport->GetRecvBuffering();
	rpc_hi_mark_rev = rcvbuf;

	// Since Invoke() needs to Dispatch() _before_ we hit
	// the himark, we reduce the himark by lomark, and hope that
	// no single duplex message is larger than lomark.

	rpc_hi_mark_fwd -= rpc_lo_mark;
	rpc_hi_mark_rev -= rpc_lo_mark;

	if( rpc_hi_mark_fwd < minHiMark ) rpc_hi_mark_fwd = minHiMark;
	if( rpc_hi_mark_rev < minHiMark ) rpc_hi_mark_rev = minHiMark;

	// Put a stake through the heart of himark hanging: set our
	// NetBuffer buffer sizes to handling any outstanding data,
	// in case the lying cheating OS (linux, solaris) renegs
	// on its reported TCP send or receive space.

	transport->SetBufferSizes( rpc_hi_mark_fwd, rpc_hi_mark_rev );

	RPC_DBG_PRINTF( DEBUG_CONNECT,
		"Rpc himark: snd+rcv server %d+%d client %d+%d = %d/%d",
		    transport->GetSendBuffering(), 
		    transport->GetRecvBuffering(), 
		    sndbuf, rcvbuf, 
		    rpc_hi_mark_fwd,
		    rpc_hi_mark_rev );
}

void
Rpc::Disconnect()
{
	if( !transport )
	    return;

	transport->Flush( &se );
	transport->Close();

	delete transport;
	transport = 0;
}

StrPtr *
Rpc::GetAddress( int raf_flags )
{
	return transport ? transport->GetAddress( raf_flags ) : 0;
}

StrPtr *
Rpc::GetPeerAddress( int raf_flags )
{
	return transport ? transport->GetPeerAddress( raf_flags ) : 0;
}

int
Rpc::GetPortNum()
{
	return transport ? transport->GetPortNum() : -1;
}

bool
Rpc::IsSockIPv6()
{
	return transport ? transport->IsSockIPv6() : false;
}

StrBuf *
Rpc::MakeVar( const char *var )
{
	return sendBuffer->MakeVar( StrRef( (char *)var ) );
}

void		
Rpc::VSetVar( const StrPtr &var, const StrPtr &value )
{
	sendBuffer->SetVar( var, value );
}

void
Rpc::Release()
{
	this->Invoke( P4Tag::p_release );
}

void
Rpc::ReleaseFinal()
{
	this->Invoke( P4Tag::p_release2 );
}

StrPtr *
Rpc::VGetVar( const StrPtr &var )
{
	return recvBuffer->GetVar( var );
}

int
Rpc::VGetVarX( int x, StrRef &var, StrRef &val )
{
	return recvBuffer->GetVar( x, var, val );
}

void
Rpc::VRemoveVar( const StrPtr &var )
{
	recvBuffer->RemoveVar( var );
}

void
Rpc::VClear()
{
	sendBuffer->Clear();
}

StrPtr *
Rpc::GetArgi( int i )
{
	return i < recvBuffer->GetArgc() ? &recvBuffer->GetArgv()[i] : 0;
}

StrPtr *
Rpc::GetArgi( int i, Error *e )
{
	StrPtr *s = GetArgi( i );

	if( !s )
	    e->Set( MsgRpc::NoPoss );

	return s;
}

int
Rpc::GetArgc()
{
	return recvBuffer->GetArgc();
}

StrPtr *
Rpc::GetArgv()
{
	return recvBuffer->GetArgv();
}

void
Rpc::CopyVars()
{
	sendBuffer->CopyVars( recvBuffer );
}

KeepAlive *
Rpc::GetKeepAlive()
{
	return transport;
}

void
Rpc::SetBreak( KeepAlive *breakCallback )
{
	if( transport )
	    transport->SetBreak( breakCallback );
}

/*
 * Invoke()
 * InvokeDuplex()
 * InvokeDuplexRev()
 * FlushDuplex()
 * InvokeFlush()
 * InvokeOne()
 * GotFlushed()
 */

void
Rpc::Invoke( const char *opName )
{
	// If there are any outstanding requests sent via InvokeDuplexRev,
	// we have to assume the return pipe is clogged, and thus meter all
	// data sent.

	if( duplexRrecv )
	    InvokeDuplex( opName );
	else
	    InvokeOne( opName );
}

void
Rpc::InvokeDuplex( const char *opName )
{
	// This data makes a loop: meter it, so we know to dispatch for it.

	int sz = InvokeOne( opName );
	duplexFrecv += sz;
	duplexFsend += sz;
	Dispatch( DfDuplex, service->dispatcher );
}

void
Rpc::InvokeDuplexRev( const char *opName )
{
	// This data only goes forward, but causes the return pipe to fill.
	// Meter it, but also turn on duplexRrecv so that all Invokes are
	// metered until this message is flushed..

	++duplexRrecv;
	++duplexRsend;
	InvokeDuplex( opName );
}

void
Rpc::FlushDuplex()
{
	// Flush any outstanding duplex data in the pipe.
	// Do nothing if there is no duplex data expected back

	if( duplexFrecv > 0 )
	{
	    // Proxies and brokers expect a flush1 message
	    // (with himark = 0) when flushing the channel,
	    // so we have to tell Dispatch() to send one.
	    // DfFlush moves the lomark to 0, and ensuring
	    // duplexFsend > 0 forces a flush1 message.

	    ++duplexFrecv;
	    ++duplexFsend;

	    Dispatch( DfFlush, service->dispatcher );
	}
}

void
Rpc::InvokeOver( const char *opName )
{
	// This data makes a loop, but we specifically don't flush yet:
	// we return so that the outer Dispatch() can take over.
	// Otherwise, we'd needlessly nest Dispatch() calls, and in
	// one case ('submit') we'd try to nest too deeply.

	int sz = InvokeOne( opName );
	duplexFrecv += sz;
	duplexFsend += sz;
	Dispatch( DfOver, service->dispatcher );
}

int		
Rpc::InvokeOne( const char *opName )
{
	// Don't pile errors

	if( se.Test() || re.Test() || !transport )
	{
	    sendBuffer->Clear();
	    return 0;
	}

	// Send off protocol if not yet sent.
	// If we're using RpcForward we want to allow it
	// to send its own protocol message.

	if( !protocolSent && strcmp( opName, P4Tag::p_protocol ) )
	{
	    RpcSendBuffer buf;
	    int sz = transport->GetSendBuffering();
	    int rz = transport->GetRecvBuffering();

	    buf.CopyBuffer( service->protoSendBuffer );
	    
	    int i = 0;
	    StrRef var, val;
	    while( protoDynamic->GetVar( i++, var, val ) )
	        buf.SetVar( var, val );

	    buf.SetVar( StrRef( P4Tag::v_sndbuf ), StrNum( sz ) );
	    buf.SetVar( StrRef( P4Tag::v_rcvbuf ), StrNum( rz ) );
	    buf.SetVar( StrRef( P4Tag::v_func ), StrRef( P4Tag::p_protocol ) );

	    RPC_DBG_PRINT( DEBUG_FUNCTION,
		    "Rpc invoking protocol" );

	    timer->Start();

	    transport->Send( buf.GetBuffer(), &re, &se );

	    sendTime += timer->Time();
	}

	protocolSent = 1;

	// Set func=opName variable.

	SetVar( P4Tag::v_func, opName );

	// Tracking

	RPC_DBG_PRINTF( DEBUG_FUNCTION,
		"Rpc invoking %s", opName );

	// Send the buffer to peer

	timer->Start();

	transport->Send( sendBuffer->GetBuffer(), &re, &se );

	// time tracking
	sendTime += timer->Time();

	if( se.Test() )
	    return 0;

	// Clear for next call.
	// Get size of buffer so we know how full the pipe is.
	// We must include RpcTransport's overhead.

	int sz = sendBuffer->GetBufferSize() + transport->SendOverhead(); 

	sendBuffer->Clear();

	// tracking

	sendCount++;
	sendBytes += sz;

	return sz;
}

void
Rpc::GotFlushed()
{
	StrPtr *fseq = GetVar( P4Tag::v_fseq );
	StrPtr *rseq = GetVar( P4Tag::v_rseq );

	if( fseq )
	    duplexFrecv -= fseq->Atoi();

	if( rseq )
	    duplexRrecv -= rseq->Atoi();
}

/*
 * Rpc::Dispatch() - dispatch incoming RPC's until 'release' received
 * Rpc::DispatchOne() - just dispatch from the current buffer
 */

void		
Rpc::Dispatch( DispatchFlag flag, RpcDispatcher *dispatcher )
{
	// Don't nest more than once: we only allow a
	// Dispatch()/InvokeDuplex()/FlushDuplex() combo.

	if( dispatchDepth > 1 )
	    return;

	++dispatchDepth;

	RPC_DBG_PRINTF( DEBUG_FLOW,
		">>> Dispatch(%d) %d/%d %d/%d %d",
		dispatchDepth, duplexFsend, duplexFrecv, 
		duplexRsend, duplexRrecv, flag );

	// Use server's recv buffer size as himark for InvokeDuplex()
	// Use client's recv buffer size as himark for InvokeDuplexRev()

	int loMark = rpc_lo_mark;
	int hiMark = duplexRrecv ? rpc_hi_mark_rev : rpc_hi_mark_fwd;

	// Flushing means eveything goes.
	// Complete: lo = N/A, hi = N/A
	// Duplex: lo = LO, hi = HI
	// Flush: lo = 0, hi = 0
	// Over: lo = 0, hi = HI

	if( flag != DfDuplex ) loMark = 0;
	if( flag == DfFlush ) hiMark = 0;

	// Push the recvBuffer, in case we are nesting Dispatch().

	RpcRecvBuffer *savRecvBuffer = recvBuffer;
	recvBuffer = 0;

	// Receive (dispatching) until told to stop.

	while( !endDispatch )
	{
	    if( re.Test() && ( !transport || !transport->RecvReady() ) )
	        break;

	    // If more than loMark in pipe, send a marker.
	    // We flush always if Dispatch() called from FlushDuplex()
	    // or if Invoking from such a Dispatch().

	    if( duplexFsend > loMark && !se.Test() )
	    {
		// Send a flush1 through for metering flow control.

		// Note: hardcode the size of the flush1 message
		// As of 2008.1 its 46 bytes (round up to 60).

	        const int flushMessageSize = 60;

	        RPC_DBG_PRINTF( DEBUG_FLOW,
	        	"Rpc flush %d bytes", duplexFsend );

		SetVar( P4Tag::v_himark, loMark ? hiMark : 0 );

		duplexFrecv += flushMessageSize;
		duplexFsend += flushMessageSize;

		if( duplexFsend ) SetVar( P4Tag::v_fseq, duplexFsend );
		if( duplexRsend ) SetVar( P4Tag::v_rseq, duplexRsend );

		duplexFsend = 0;
		duplexRsend = 0;

		InvokeOne( P4Tag::p_flush1 );
	    }

	    // If top level Dispatch(), go forever.
	    // If 2nd level Dispatch(), get below hiMark.
	    // If flushing, go until all flush2's received.
	    // If error sending, go until receive error.

	    else if( flag == DfComplete ||
		     flag == DfDuplex && duplexFrecv > hiMark ||  
		     flag == DfFlush && duplexFrecv ||
		     se.Test() )
	    {
		if( !recvBuffer )
		    recvBuffer = new RpcRecvBuffer;

		DispatchOne( dispatcher );
	    }
	    else break;
	}

	// Pop recvBuffer.

	delete recvBuffer;
	recvBuffer = savRecvBuffer;

	RPC_DBG_PRINTF( DEBUG_FLOW,
		"<<< Dispatch(%d) %d/%d %d/%d %d",
		dispatchDepth, duplexFsend, duplexFrecv, 
		duplexRsend, duplexRrecv, flag );

	if( !--dispatchDepth )
	    endDispatch = 0;
}

void
Rpc::DispatchOne( RpcDispatcher *dispatcher )
{
	StrPtr *func;

	// Flush any data due the other end: we can't expect
	// anything if the other end is awaiting us!

	// Note that a broken send pipe doesn't stop dispatching:
	// we want what's in the receive pipe (they may be important
	// acks), so we read until the receive pipe is broken too.

	// Receive sender's buffer and then parse the variables out.
	
	timer->Start();

	int sz = transport->Receive( recvBuffer->GetBuffer(), &re, &se );

	// time tracking
	recvTime += timer->Time();

	if( sz <= 0 )
	{
	    // EOF doesn't set re, so we will.

	    if( !re.Test() )
		re.Set( MsgRpc::Closed );

	    return;
	}

	// tracking

	recvCount++;
	recvBytes += recvBuffer->GetBufferSize();

	Error e;
	recvBuffer->Parse( &e );

	if( e.Test() )
	{
	    re = e;
	    return;
	}

	// Find the function to dispatch.  The protocol mandates that
	// this must be set: how else do we know what to do with the 
	// buffer?

	func = GetVar( P4Tag::v_func, &e );

	if( e.Test() )
	{
	    re = e;
	    return;
	}

	RPC_DBG_PRINTF( DEBUG_FUNCTION,
		"Rpc dispatch %s", func->Text() );

	// Find the registered function as given with 'func'.
	// If no such function is found, call 'funcHandler'.
	// If any error occurs, call 'errorHandler'.
	// If no 'errorHandler' just report the error and return.

	ue.Clear();

	const RpcDispatch *disp = dispatcher->Find( func->Text() );

	if( !disp && !( disp = dispatcher->Find( P4Tag::p_funcHandler ) ) )
	{
	    ue.Set( MsgRpc::UnReg ) << *func;
	    goto error;
	}

	// Invoke requested function.

	(*disp->function)( this, &ue );

	// If an error occurred, we'll call the caller's errorHandle
	// function.  If the error was fatal, we'll tack on some tracing
	// information.

	if( !ue.Test() )
	    return;

	if( ue.IsFatal() )
	    ue.Set( MsgRpc::Operat ) << disp->opName;

    error:
	// If a user error occurred, invoke errorHandler to deal with it.
	// In this case, the Error is acutally passed in to the dispatched 
	// function, rather than being just an output parameter.

	if( disp = dispatcher->Find( P4Tag::p_errorHandler ) )
	{
	    (*disp->function)( this, &ue );
	    return;
	}

	AssertLog.Report( &ue );
}

void
Rpc::Loopback( Error *e )
{
	recvBuffer->CopyBuffer( sendBuffer->GetBuffer() );
	recvBuffer->Parse( e );
	sendBuffer->Clear();
}

/*
 * Rpc::StartCompression() -- initiate full link compression
 * Rpc::GotRecvCompress() -- turn on recv half compression
 * Rpc::GotSendCompress() -- turn on send half compression
 *
 * Notes on compression.
 *
 * Both ends must support the compression protocol.  Use "client" >= 6
 * or "server2" >= "6" ? protocol level to be sure.
 *
 * Compression can be turned on any time; in practice, it happens
 * after protocol exchange has happened (otherwise, how do you know?).
 * To orchestrate this, the follow sequence is obeyed:
 *
 * 1.  StartCompression() send "compress1" to the other end, and then turns 
 *     on send compression.   All data sent afterwards are compressed.  
 *
 * 2.  When the other end receives "compress1", it sends back "compress2"
 *     and then turns on both send and receieve compression.
 *
 * 3.  When this end receives "compress2", it turns on receive compression.
 */

void
Rpc::StartCompression( Error *e )
{
	// send the "compress1" flag, then compress the send link.
	// When we get "compress2", we'll compress the recv link.

	Invoke( P4Tag::p_compress1 );
	transport->SendCompression( e );
}

void
Rpc::GotSendCompressed(  Error *e )
{
	transport->SendCompression( e );
}

void
Rpc::GotRecvCompressed(  Error *e )
{
	transport->RecvCompression( e );
}

void
Rpc::FlushTransport()
{
	if( transport )
	    transport->Flush( &se );
}

int	
Rpc::GetRecvBuffering() 
{
	if( transport )
	    return transport->GetRecvBuffering();
	return 0;
}


/*
 * Performance tracking
 *
 * Rpc::TrackStart() - reset tracking counters
 * Rpc::Trackable() - is any track data interesting?
 * Rpc::TrackReport() - report interesting track data
 */

void
Rpc::TrackStart()
{
	sendCount = 0;
	sendBytes = 0;
	recvCount = 0;
	recvBytes = 0;
	sendTime = 0;
	recvTime = 0;
}

int
Rpc::Trackable( int level )
{
	Tracker t( level );

	return t.Over( TT_RPC_ERRORS, se.Test() || re.Test() ) ||
	       t.Over( TT_RPC_MSGS, sendCount + recvCount ) ||
	       t.Over( TT_RPC_MBYTES, ( sendBytes + recvBytes ) / 1024 / 1024 );
}

void
Rpc::TrackReport( int level, StrBuf &out )
{
	if( !Trackable( level ) )
	    return;

	out 
	    << "--- rpc msgs/size in+out "
	    << recvCount << "+"
	    << sendCount << "/"
	    << recvBytes / 1024 / 1024 << "mb+"
	    << sendBytes / 1024 / 1024 << "mb "
	    << "himarks " 
	    << rpc_hi_mark_fwd << "/" 
	    << rpc_hi_mark_rev << " snd/rcv "
	    << StrMs( sendTime ) << "s/"
	    << StrMs( recvTime ) << "s\n";

	if( !se.Test() && !re.Test() )
	    return;

	out << "--- rpc ";

	if( se.Test() ) out << "send ";
	if( re.Test() ) out << "receive ";

	out << "errors, duplexing F/R " 
	    << duplexFrecv << "/"
	    << duplexRrecv << "\n";
}

void
Rpc::GetTrack( int level, RpcTrack *track )
{
	track->trackable = Trackable( level );
	if( !track->trackable )
	    return;
	ForceGetTrack( track );
}

void
Rpc::ForceGetTrack( RpcTrack *track )
{
	if( !track )
	    return;
	track->recvCount = recvCount;
	track->sendCount = sendCount;
	track->recvBytes = recvBytes;
	track->sendBytes = sendBytes;
	track->rpc_hi_mark_fwd = rpc_hi_mark_fwd;
	track->rpc_hi_mark_rev = rpc_hi_mark_rev;
	track->recvTime = recvTime;
	track->sendTime = sendTime;
}

void
Rpc::CheckKnownHost( Error *e, const StrRef & trustfile )
{
	StrBuf	pubkey;
	StrPtr  *peer;

	GetPeerFingerprint( pubkey );
	
	// if not ssl we are done
	if( !pubkey.Length() )
		return;

	peer = GetPeerAddress( RAF_PORT );

	RPC_DBG_PRINTF( DEBUG_CONNECT,
		"Checking host %s pubkey %s",
		peer->Text(), pubkey.Text() );

	StrRef dummyuser( "**++**" );
	StrRef altuser( "++++++" );
	StrBuf trustkey;
	int doreplace = 0;

	char *keystr;
	{
	    Ticket hostfile( &trustfile );
	    keystr = hostfile.GetTicket( *peer, dummyuser );
	    if( keystr )
	    {
		if( pubkey == keystr )
		    return;
		trustkey.Set( keystr );
	    }
	}
	{
	    Ticket hostfile( &trustfile );
	    keystr = hostfile.GetTicket( *peer, altuser );
	    if( keystr && pubkey == keystr )
		doreplace = 1;
	}
	if( doreplace )
	{
	    {
		Ticket hostfile( &trustfile );
		hostfile.ReplaceTicket( *peer, dummyuser, pubkey, e );
	    }
	    if( !e->Test() )
	    {
		Ticket hostfile( &trustfile );
		hostfile.DeleteTicket( *peer, altuser, e );
	    }
	    return;
	}
	e->Set( trustkey.Length() ?
		MsgRpc::HostKeyMismatch : MsgRpc::HostKeyUnknown );
	*e << *peer;
	*e << pubkey;
}

void
RpcUtility::Generate(RpcUtilityType type,
		     Error *e )
{
# ifdef USE_SSL
	NetSslCredentials credentials;

	switch( type )
	{
	case Generate_Credentials:
	    credentials.GenerateCredentials( e );
	    break;
	case Generate_Fingerprint:
	    credentials.ReadCredentials( e );
	    if ( !e->Test() )
	    {
		const StrPtr *fingerprint = credentials.GetFingerprint();
		if( fingerprint )
		    printf("Fingerprint: %s\n", fingerprint->Text());
	    }
	    break;
	}
	return;
# else
	e->Set( MsgRpc::SslNoSsl );
# endif //USE_SSL

}
# Change User Description Committed
#1 15902 Matt Attaway A second renaming that I will not obliterate as a badge of shame
//guest/perforce_software/p4/2014_1/rpc/rpc.cc
#1 15901 Matt Attaway Clean up code to fit modern Workshop naming standards
//guest/perforce_software/p4/2014.1/rpc/rpc.cc
#1 12188 Matt Attaway Move 'main' p4 into a release specific directory in prep for new releases
//guest/perforce_software/p4/rpc/rpc.cc
#1 9129 Matt Attaway Initial commit of the 2014.1 p4/p4api source code