netbuffer.cc #1

  • //
  • guest/
  • perforce_software/
  • p4/
  • 2016-1/
  • net/
  • netbuffer.cc
  • View
  • Commits
  • Open Download .zip Download (8 KB)
/*
 * Copyright 1995, 1996 Perforce Software.  All rights reserved.
 *
 * This file is part of Perforce - the FAST SCM System.
 */

/*
 * netbuffer.cc - buffer I/O to transport
 */

# include <stdhdrs.h>

# include <debug.h>
# include <strbuf.h>
# include <strops.h>
# include <error.h>
# include <tunable.h>

# include <zlib.h>
# include <zutil.h>
# include <msgsupp.h>

# include <keepalive.h>
# include "netportparser.h"
# include "netconnect.h"
# include "netbuffer.h"
# include "netdebug.h"
# include <msgrpc.h>

NetBuffer::NetBuffer( NetTransport *t )
{
	int size = p4tunable.Get( P4TUNE_NET_BUFSIZE );
	int rcvsize = p4tunable.Get( P4TUNE_NET_RCVBUFSIZE );

	recvBuf.Alloc( rcvsize );
	sendBuf.Alloc( size );

	ResetRecv();
	ResetSend();

	zin = 0;
	zout = 0;
	compressing = 0;

	transport = t;
}

NetBuffer::~NetBuffer()
{
	if( zin ) inflateEnd( zin );
	if( zout ) deflateEnd( zout );
	delete zin;
	delete zout;
	delete transport;
}

void
NetBuffer::SetBufferSizes( int recvSize, int sendSize )
{
	// Remember offsets

	int recvDone = RecvDone();
	int recvReady = RecvReady();
	int sendDone = SendDone();
	int sendReady = SendReady();

	// Note we only increase sizes.

	if( recvSize > recvBuf.Length() )
	    recvBuf.Alloc( recvSize - recvBuf.Length() );

	if( sendSize > sendBuf.Length() )
	    sendBuf.Alloc( sendSize - sendBuf.Length() );

	// Now fix up pointers

	ResetRecv();
	ResetSend();

	recvPtr 	+= recvDone;
	ioPtrs.recvPtr 	+= recvDone + recvReady;
	ioPtrs.sendPtr 	+= sendDone;
	ioPtrs.sendEnd 	+= sendDone + sendReady;
}

void
NetBuffer::SendCompression( Error *e )
{
	// Sanity!
	// Don't let it be turned on twice.

	if( zout )
	    return;

	DEBUGPRINT( DEBUG_TRANS, "NetBuffer send compressing" );

	// create z_stream and init

	zout = new z_stream;
	zout->zalloc = (alloc_func)0;
	zout->zfree = (free_func)0;
	zout->opaque = (voidpf)0;

	if( deflateInit2(
		zout,
		Z_DEFAULT_COMPRESSION,
		Z_DEFLATED,
		-MAX_WBITS,		// - to suppress zlib header!
		DEF_MEM_LEVEL, 0 ) 
		!= Z_OK )
	{
	    e->Set( MsgSupp::DeflateInit );
	}
}

void
NetBuffer::RecvCompression( Error *e )
{
	// Sanity!
	// Don't let it be turned on twice.

	if( zin )
	    return;

	DEBUGPRINT( DEBUG_TRANS, "NetBuffer recv compressing"  );

	// create z_stream and init

	zin = new z_stream;
	zin->zalloc = (alloc_func)0;
	zin->zfree = (free_func)0;
	zin->opaque = (voidpf)0;

	if( inflateInit2( zin, -DEF_WBITS ) != Z_OK )
	    e->Set( MsgSupp::InflateInit );
}

int
NetBuffer::Receive( char *buffer, int length, Error *e )
{
	return Receive( buffer, length, e, e );
}

int
NetBuffer::Receive( char *buffer, int length, Error *re, Error *se )
{
	/*
	 * Receive buffering:
	 *	If we can receive without buffering, do so, otherwise...
	 *	If recvBuf is empty, fill it
	 *	Fill user buffer from recvBuf
	 */

	char *buf = buffer;
	int len = length;

	while( len )
	{
	    int l = RecvReady();

	    // Fill user buffer if data ready

	    if( zin && l )
	    {
		// Uncompress into user buffer

		zin->next_out = (unsigned char *)buf;
		zin->avail_out = len;
		zin->next_in = (unsigned char*)recvPtr;
		zin->avail_in = l;

		int err = inflate( zin, Z_NO_FLUSH );

		recvPtr = (char *)zin->next_in;
		buf = (char *)zin->next_out;
		len = zin->avail_out;

		if( err == Z_STREAM_END )
		    break;
		else if( err == Z_OK )
		    continue;

		re->Set( MsgSupp::Inflate );
		return 0;
	    }
	    else if( l )
	    {
		// Copy into user buffer

		if( l > len ) l = len;
		memcpy( buf, recvPtr, l );
		recvPtr += l;
		len -= l;
		buf += l;
		continue;
	    }

	    // Nothing ready in receive buffer: we're going to read.

	    // If we can receive without buffering, do so. Since we set the
	    // ioPtrs to point outside our buffer space, we must be sure to
	    // reset them.

	    if( !zin && len >= recvBuf.Length() )
	    {
		// Must set (and then reset) for special purpose
		// Limit it to recvBuf chunks.
		// OS/2 can't handle large read/write.

		ioPtrs.recvPtr = buf;
		ioPtrs.recvEnd = buf + recvBuf.Length();

		// Flush & read

		if( !transport->SendOrReceive( ioPtrs, se, re ) )
		{
		    ResetRecv();
		    return 0;
		}

		l = ioPtrs.recvPtr - buf;
		len -= l,
		buf += l;

		ResetRecv();
		continue;
	    }

	    // If compressing, we'll need to flush that separately.
	    // If that produces readable data, don't need another read.

	    if( zout )
	    {
		Flush( re, se );
		if( RecvReady() )
		    continue;
	    }

	    // Read into our buffer.

	    ResetRecv();

	    if( !transport->SendOrReceive( ioPtrs, se, re ) )
		return 0;
	}

	if( DEBUG_BUFFER )
	{
	    p4debug.printf( "NetBuffer rcv %d: ", length );
	    StrOps::Dump( StrRef( buffer, length ) );
	}

	return length;
}

void
NetBuffer::Send( const char *buffer, int length, Error *e )
{
	Send( buffer, length, e, e );
}

void
NetBuffer::Send( const char *buffer, int length, Error *re, Error *se )
{
	if( DEBUG_BUFFER )
	{
	    p4debug.printf( "NetBuffer snd %d: ", length );
	    StrOps::Dump( StrRef( buffer, length ) );
	}

	/*
	 * Send buffering:
	 *	If sendBuf is full, send it
	 *	If we can send without buffering, do so
	 *	Otherwise, buffer what we can
	 *	loop
	 */

	while( length )
	{
	    // If we can send without buffering, do so. Since we set the ioPtrs
	    // to point outside our buffer space, we must be sure to reset them.

	    if( !SendReady() && length >= sendBuf.Length() && !zout )
	    {
		ioPtrs.sendPtr = (char *)buffer;
		ioPtrs.sendEnd = (char *)buffer + length;

		PackRecv();

		if( !transport->SendOrReceive( ioPtrs, se, re ) )
		{
		    ResetSend();
		    return;
		}

		int l = ioPtrs.sendPtr - buffer;
		buffer += l;
		length -= l;

		ResetSend();
		continue;
	    }

	    //  If sendBuf is of sendable size, do it

	    if( SendReady() >= sendBuf.Length() )
	    {
		PackRecv();

		if( !transport->SendOrReceive( ioPtrs, se, re ) )
		    return;

		continue;
	    }

	    // Copy what we can into our buffer
	    // If no room and less than whole sendBuf ready, move
	    // it to the beginning of the sendbuf.

	    PackSend();

	    if( zout )
	    {
		// Compress into SendRoom()

		zout->next_in = (unsigned char *)buffer;
		zout->avail_in = length;
		zout->next_out = (unsigned char *)ioPtrs.sendEnd;
		zout->avail_out = SendRoom();

		if( deflate( zout, Z_NO_FLUSH ) != Z_OK )
		{
		    se->Set( MsgSupp::Deflate );
		    return;
		}

		ioPtrs.sendEnd = (char *)zout->next_out;
		buffer = (char *)zout->next_in;
		length = zout->avail_in;
		compressing = 1;
	    }
	    else
	    {
		// buffer into SendRoom()

		int l = SendRoom();
		if( l > length ) l = length;
		memcpy( ioPtrs.sendEnd, buffer, l );
		ioPtrs.sendEnd += l;
		buffer += l;
		length -= l;
	    }
	}
}

void
NetBuffer::Flush( Error *re, Error *se )
{
	DEBUGPRINT( DEBUG_TRANS, "NetBuffer flush"  );

	while( compressing || SendReady() )
	{
	    // Anything to purge from compressor?

	    PackSend();

	    if( compressing && SendRoom() )
	    {
		// Flush compress into SendRoom()

		zout->next_in = 0;
		zout->avail_in = 0;
		zout->next_out = (unsigned char *)ioPtrs.sendEnd;
		zout->avail_out = SendRoom();

		if( deflate( zout, Z_FULL_FLUSH ) != Z_OK )
		{
		    se->Set( MsgSupp::Deflate );
		    return;
		}

		ioPtrs.sendEnd = (char *)zout->next_out;
		compressing = !SendRoom();
	    }

	    // Flush what we've got or read if available.

	    PackRecv();
	    
	    if( !transport->SendOrReceive( ioPtrs, se, re ) )
		return;
	}
}

int
NetBuffer::IsAlive()
{
	int isAlive = transport->IsAlive(); 

	// Connection has been closed by the client, remove data from
	// the receive buffer while we are at it (client may have
	// been using RunTag() which means there could be a bunch of
	// commands sitting in the buffer to execute).

	if( !isAlive )
	    ResetRecv();

	return( isAlive );
}
# Change User Description Committed
#1 19472 Liz Lam Initial add of the 2016.1 p4/p4api source code.