// -*- mode: C++; tab-width: 4; -*-
// vi:ts=8 sw=4 noexpandtab autoindent
/*
* NetTcpTransport
*
* Copyright 1995, 1996, 2011 Perforce Software. All rights reserved.
*
* This file is part of Perforce - the FAST SCM System.
* - previously part of nettcp.cc
*/
# define NEED_ERRNO
# define NEED_SIGNAL
# ifdef OS_NT
# define NEED_FILE
# endif
# define NEED_FCNTL
# define NEED_IOCTL
# define NEED_TYPES
# define NEED_SLEEP
# define NEED_SOCKET_IO
# ifdef OS_MPEIX
# define _SOCKET_SOURCE /* for sys/types.h */
# endif
# include <stdhdrs.h>
# include <ctype.h>
# include <error.h>
# include <strbuf.h>
# include "netaddrinfo.h"
# include <bitarray.h>
# include <debug.h>
# include <tunable.h>
# include <keepalive.h>
# include <msgrpc.h>
# include <timer.h>
# include "netportparser.h"
# include "netconnect.h"
# include "nettcptransport.h"
# include "netselect.h"
# include "netport.h"
# include "netdebug.h"
# include "netutils.h"
# include "netsupport.h"
# define PEEK_TIMEOUT 200 /* 200 milliseconds */
NetTcpTransport::NetTcpTransport( int t, bool fromClient )
: isAccepted(fromClient)
{
this->t = t;
breakCallback = 0;
lastRead = 0;
selector = new NetTcpSelector( t );
/* SendOrReceive() likes non-blocking I/O. */
/* Without it, it's just synchronous. */
# ifdef OS_NT
u_long u_one = 1;
ioctlsocket( t, FIONBIO, &u_one );
# else
int f = fcntl( t, F_GETFL, 0 );
fcntl( t, F_SETFL, f | O_NONBLOCK );
# endif
SetupKeepAlives( t );
TRANSPORT_PRINTF( DEBUG_CONNECT,
"NetTcpTransport %s connected to %s",
GetAddress( RAF_PORT )->Text(),
GetPeerAddress( RAF_PORT )->Text() );
}
NetTcpTransport::~NetTcpTransport()
{
Close();
delete selector;
}
void
NetTcpTransport::SetupKeepAlives( int t )
{
# ifdef SO_KEEPALIVE
const SOCKOPT_T one = 1;
// turn on tcp keepalives unless user disabled it (in which case turn them off)
int ka_disable = p4tunable.Get( P4TUNE_NET_KEEPALIVE_DISABLE );
if( ka_disable )
{
const SOCKOPT_T zero = 0;
TRANSPORT_PRINT( DEBUG_CONNECT, "NetTcpTransport: disabling TCP keepalives" );
do_setsockopt( "NetTcpTransport", t, SOL_SOCKET, SO_KEEPALIVE, &zero, sizeof( zero ) );
}
else
{
TRANSPORT_PRINT( DEBUG_CONNECT, "NetTcpTransport: enabling TCP keepalives" );
do_setsockopt( "NetTcpTransport", t, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof( one ) );
// set tcp keepalive count if user requested
const SOCKOPT_T ka_keepcount = p4tunable.Get( P4TUNE_NET_KEEPALIVE_COUNT );
if( ka_keepcount )
{
// some systems (eg, Darwin 10.5.x) support keepalive but not the tuning parameters!
# if defined(SOL_TCP) && defined(TCP_KEEPCNT)
TRANSPORT_PRINTF( DEBUG_CONNECT,
"NetTcpTransport: setting TCP keepalive count = %d", ka_keepcount );
do_setsockopt( "NetTcpTransport", t, SOL_TCP, TCP_KEEPCNT, &ka_keepcount, sizeof( ka_keepcount ) );
# else
TRANSPORT_PRINT( DEBUG_CONNECT,
"NetTcpTransport: this system does not support setting TCP keepalive count" );
# endif // TCP_KEEPCNT
}
// set tcp keepalive idle time (in seconds) if user requested
const SOCKOPT_T ka_idlesecs = p4tunable.Get( P4TUNE_NET_KEEPALIVE_IDLE );
if( ka_idlesecs )
{
# if defined(SOL_TCP) && defined(TCP_KEEPIDLE)
TRANSPORT_PRINTF( DEBUG_CONNECT,
"NetTcpTransport: setting TCP keepalive idle secs = %d", ka_idlesecs );
do_setsockopt( "NetTcpTransport", t, SOL_TCP, TCP_KEEPIDLE, &ka_idlesecs, sizeof( ka_idlesecs ) );
# else
TRANSPORT_PRINT( DEBUG_CONNECT,
"NetTcpTransport: this system does not support setting TCP keepalive idle time" );
# endif // TCP_KEEPIDLE
}
// set tcp keepalive interval time (in seconds) if user requested
const int ka_intvlsecs = p4tunable.Get( P4TUNE_NET_KEEPALIVE_INTERVAL );
if( ka_intvlsecs )
{
# if defined(SOL_TCP) && defined(TCP_KEEPINTVL)
TRANSPORT_PRINTF( DEBUG_CONNECT,
"NetTcpTransport: setting TCP keepalive interval secs = %d", ka_intvlsecs );
do_setsockopt( "NetTcpTransport", t, SOL_TCP, TCP_KEEPINTVL, &ka_intvlsecs, sizeof( ka_intvlsecs ) );
# else
TRANSPORT_PRINT( DEBUG_CONNECT,
"NetTcpTransport: this system does not support setting TCP keepalive interval" );
# endif // TCP_KEEPINTVL
}
}
# else
TRANSPORT_PRINTF( DEBUG_CONNECT,
"NetTcpTransport: this system does not support TCP keepalive packets" );
# endif // SO_KEEPALIVE
}
void
NetTcpTransport::GetAddress( int t, int raf_flags, StrBuf &myAddr )
{
struct sockaddr_storage addr;
struct sockaddr *saddrp = reinterpret_cast<struct sockaddr *>(&addr);
TYPE_SOCKLEN addrlen = sizeof addr;
if( getsockname( t, saddrp, &addrlen ) < 0 || addrlen > sizeof addr )
{
myAddr.Set( "unknown" );
}
else
{
NetUtils::GetAddress( addr.ss_family, saddrp, raf_flags, myAddr );
}
}
void
NetTcpTransport::GetPeerAddress( int t, int raf_flags, StrBuf &peerAddr )
{
struct sockaddr_storage addr;
struct sockaddr *saddrp = reinterpret_cast<struct sockaddr *>(&addr);
TYPE_SOCKLEN addrlen = sizeof addr;
if( getpeername( t, saddrp, &addrlen ) < 0 || addrlen > sizeof addr )
{
if( addrlen > sizeof addr )
{
DEBUGPRINT( DEBUG_CONNECT,
"Unable to get peer address since addrlen > sizeof addr.");
}
else
{
StrBuf buf;
Error::StrError(buf, errno);
DEBUGPRINTF( DEBUG_CONNECT,
"Unable to get peer address: %s", buf.Text());
}
peerAddr.Set( "unknown" );
}
else
{
NetUtils::GetAddress( addr.ss_family, saddrp, raf_flags, peerAddr );
}
}
int
NetTcpTransport::GetPortNum( int t )
{
struct sockaddr_storage addr;
struct sockaddr *saddrp = reinterpret_cast<struct sockaddr *>(&addr);
TYPE_SOCKLEN addrlen = sizeof addr;
if( getsockname( t, saddrp, &addrlen ) < 0 || addrlen > sizeof addr )
{
StrBuf buf;
Error::StrError(buf, errno);
DEBUGPRINTF( DEBUG_CONNECT,
"Unable to get sockname: %s", buf.Text());
return -1;
}
else
{
return NetUtils::GetInPort( saddrp );
}
}
bool
NetTcpTransport::IsSockIPv6( int t )
{
struct sockaddr_storage addr;
struct sockaddr *saddrp = reinterpret_cast<struct sockaddr *>(&addr);
TYPE_SOCKLEN addrlen = sizeof addr;
if( getsockname( t, saddrp, &addrlen ) < 0 || addrlen > sizeof addr )
{
StrBuf buf;
Error::StrError(buf, errno);
DEBUGPRINTF( DEBUG_CONNECT,
"Unable to get sockname: %s", buf.Text());
return false;
}
else
{
return NetUtils::IsAddrIPv6( saddrp );
}
}
void
NetTcpTransport::Send( const char *buffer, int length, Error *e )
{
NetIoPtrs io;
io.sendPtr = (char *)buffer;
io.sendEnd = (char *)buffer + length;
io.recvPtr = 0;
io.recvEnd = 0;
while( io.sendPtr != io.sendEnd )
if( !NetTcpTransport::SendOrReceive( io, e, e ) )
return;
}
int
NetTcpTransport::Receive( char *buffer, int length, Error *e )
{
NetIoPtrs io;
io.recvPtr = buffer;
io.recvEnd = buffer + length;
io.sendPtr = 0;
io.sendEnd = 0;
if( NetTcpTransport::SendOrReceive( io, e, e ) )
return io.recvPtr - buffer;
return e->Test() ? -1 : 0;
}
/*
* NetTcpTransport::SendOrReceive() - send or receive data as ready
*
* If data to write and no room to read, block writing.
* If room to read and no data to write, block reading.
* If both data to write and room to read, block until one is ready.
*
* If neither data to write nor room to read, don't call this!
*
* Brain-dead version of NetTcpSelector::Select() indicates both
* read/write are ready. To avoid blocking on read, we only do so
* if not instructed to write.
*
* Returns 1 if either data read or written.
* Returns 0 if neither, meaning EOF or error.
*/
int
NetTcpTransport::SendOrReceive( NetIoPtrs &io, Error *se, Error *re )
{
// if data is waiting to be read, don't let a read error stop us
bool wasReadError = re->Test(); // remember the read error
int doRead = io.recvPtr != io.recvEnd && (!wasReadError || selector->Peek());
int doWrite = io.sendPtr != io.sendEnd && !se->Test();
int dataReady;
int maxwait = p4tunable.Get( P4TUNE_NET_MAXWAIT );
Timer waitTime;
if( t < 0 )
{
// Socket has been closed don't try to use
return 0;
}
if( maxwait )
{
maxwait *= 1000;
waitTime.Start();
}
int readable;
int writable;
// Flush can call us with nothing to do when the connection
// gets broken.
if( !doRead && !doWrite )
return 0;
for( ;; )
{
readable = doRead;
writable = doWrite;
// 500000 is .5 seconds.
int tv = -1;
if( ( readable && breakCallback ) || maxwait )
tv = 500000;
if( ( dataReady = selector->Select( readable, writable, tv ) ) < 0 )
{
re->Sys( "select", "socket" );
return 0;
}
if( !dataReady && maxwait && waitTime.Time() >= maxwait )
{
lastRead = 0;
re->Set( MsgRpc::MaxWait ) <<
( doRead ? "receive" : "send" ) << ( maxwait / 1000 );
return 0;
}
// Before checking for data do the callback isalive test.
// If the user defined IsAlive() call returns a zero
// value then the user wants this request to terminate.
if( doRead && breakCallback && !breakCallback->IsAlive() )
{
lastRead = 0;
re->Set( MsgRpc::Break );
return 0;
}
if( !writable && !readable )
continue;
// Write what we can; read what we can
if( writable )
{
int l = write( t, io.sendPtr, io.sendEnd - io.sendPtr );
if( l > 0 )
TRANSPORT_PRINTF( DEBUG_TRANS, "NetTcpTransport send %d bytes", l );
if( l > 0 )
{
lastRead = 0;
io.sendPtr += l;
return 1;
}
if( l < 0 )
{
# ifdef OS_NT
int errornum = WSAGetLastError();
// don't use switch, because some of these values might be the same
if( (errornum == WSAEWOULDBLOCK) || (errornum == WSATRY_AGAIN) || (errornum == WSAEINTR) )
continue;
# else
// don't use switch, because some of these values might be the same
if( (errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR) )
continue;
# endif // OS_NT
se->Net( "write", "socket" );
se->Set( MsgRpc::TcpSend );
}
}
if( readable )
{
int l = read( t, io.recvPtr, io.recvEnd - io.recvPtr );
if( l > 0 )
TRANSPORT_PRINTF( DEBUG_TRANS, "NetTcpTransport recv %d bytes", l );
if( l > 0 )
{
/*
* probably we'll never be able to read the FIN if there
* isn't data waiting already
*/
lastRead = (wasReadError ? selector->Peek() : 1);
io.recvPtr += l;
return 1;
}
if( l < 0 )
{
# ifdef OS_NT
int errornum = WSAGetLastError();
// don't use switch, because some of these values might be the same
if( (errornum == WSAEWOULDBLOCK) || (errornum == WSATRY_AGAIN) || (errornum == WSAEINTR) )
continue;
# else
// don't use switch, because some of these values might be the same
if( (errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR) )
continue;
# endif // OS_NT
re->Net( "read", "socket" );
re->Set( MsgRpc::TcpRecv );
}
}
return 0;
}
}
int
NetTcpTransport::GetSendBuffering()
{
int sz = 4096;
# ifdef SO_SNDBUF
TYPE_SOCKLEN rsz = sizeof( sz );
if( getsockopt( t, SOL_SOCKET, SO_SNDBUF, (char *)&sz, &rsz ) < 0 )
sz = 4096;
# endif
# ifdef OS_LINUX
// Linux says one thing but means another, reserving 1/4th
// the space for internal use.
sz = sz * 3 / 4;
# endif
# ifdef OS_DARWIN
// Darwin's window-size adaptive mechanism returns inflated values
// in getsockopt() from what sysctl reports in send and recv spaces,
// at least until net.inet.tcp.sockthreshold sockets are open.
// Our technique of filling the pipe can cause dead-lock in
// extreme cases, so we manually avoid Darwin's mechanism here for
// the send buffer.
if( sz > DARWIN_MAX )
sz = DARWIN_MAX;
# endif
# ifdef SO_SNDLOWAT
// FreeBSD and Darwin (at least) will block the sending process
// when writes are smaller than both the low water mark and space
// available, so don't use that last bit of space.
int sl;
if( getsockopt( t, SOL_SOCKET, SO_SNDLOWAT, (char *)&sl, &rsz ) == 0 )
sz -= sl;
# endif
return sz;
}
int
NetTcpTransport::GetRecvBuffering()
{
int sz = 4096;
# ifdef SO_RCVBUF
TYPE_SOCKLEN rsz = sizeof( sz );
if( getsockopt( t, SOL_SOCKET, SO_RCVBUF, (char *)&sz, &rsz ) < 0 )
sz = 4096;
# endif
# ifdef OS_LINUX
sz = sz * 3 / 4;
# endif
return sz;
}
void
NetTcpTransport::Close( void )
{
if( t < 0 )
return;
TRANSPORT_PRINTF( DEBUG_CONNECT, "NetTcpTransport %s closing %s",
GetAddress( RAF_PORT )->Text(),
GetPeerAddress( RAF_PORT )->Text() );
// Avoid TIME_WAIT on the server by reading the EOF after
// the last message sent by the client. Getting the EOF
// means we've received the TH_FIN packet, which means we
// don't have to send our own on close(). He who sends
// a TH_FIN goes into the 2 minute TIME_WAIT imposed by TCP.
TRANSPORT_PRINTF( DEBUG_CONNECT,
"NetTcpTransport lastRead=%d", lastRead );
if( lastRead )
{
int r = 1;
int w = 0;
char buf[1];
if( selector->Select( r, w, -1 ) >= 0 && r )
read( t, buf, 1 );
}
NET_CLOSE_SOCKET(t);
}
int
NetTcpTransport::IsAlive()
{
int readable = 1;
int writeable = 0;
if( selector->Select( readable, writeable, 0 ) < 0 )
return 0;
// All good if no EOF waiting
return !readable || selector->Peek();
}
void
NetTcpTransport::ClientMismatch( Error *e )
{
if ( CheckForHandshake(t) == PeekSSL)
{
// this is a non-ssl connection
// this is a ssl connection and we are a cleartext server
e->Net( "accept", "socket" );
e->Set( MsgRpc::TcpPeerSsl );
NET_CLOSE_SOCKET(t);
}
}
int
NetTcpTransport::Peek( int fd, char *buffer, int length )
{
int count = 0;
int retval = -1;
# ifdef OS_NT
u_long u_value = 0;
// Set to blocking on windows for peek
ioctlsocket( t, FIONBIO, &u_value );
retval = recv( fd, buffer, length, MSG_PEEK );
// Set back to non blocking, @#!%$#@ stupid windows....
u_value = 1;
ioctlsocket( t, FIONBIO, &u_value );
# else
retval = recv( fd, buffer, length, MSG_PEEK );
// lengthened timeout because found out that timed out early with VPN
while ((retval == -1 ) && (errno == EAGAIN) && (count < PEEK_TIMEOUT))
{
// parent process closing socket can make
// resource temporarily unavailable.
msleep(1);
retval = recv( fd, buffer, length, MSG_PEEK );
count++;
}
# endif
if( retval == -1 && count < 10 )
{
# ifdef OS_NT
TRANSPORT_PRINTF( SSLDEBUG_ERROR,
"Peek error is: %d", WSAGetLastError());
# else
TRANSPORT_PRINTF( SSLDEBUG_ERROR,
"Peek error is: %d", errno);
# endif
}
return retval;
}
const NetPortParser &
NetTcpTransport::GetPortParser() const
{
return portParser;
}
void NetTcpTransport::SetPortParser(const NetPortParser &portParser)
{
this->portParser = portParser;
}