/* * Copyright 2009 Perforce Software Inc., All Rights Reserved. */ package com.perforce.p4java.impl.mapbased.rpc.stream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; import java.security.PublicKey; import java.security.cert.Certificate; import java.security.cert.CertificateEncodingException; import java.security.cert.CertificateExpiredException; import java.security.cert.CertificateNotYetValidException; import java.security.cert.X509Certificate; import java.text.MessageFormat; import java.util.Map; import java.util.Properties; import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSocket; import com.perforce.p4java.Log; import com.perforce.p4java.exception.ConnectionException; import com.perforce.p4java.exception.NullPointerError; import com.perforce.p4java.exception.P4JavaError; import com.perforce.p4java.exception.ProtocolError; import com.perforce.p4java.impl.mapbased.rpc.ServerStats; import com.perforce.p4java.impl.mapbased.rpc.connection.RpcConnection; import com.perforce.p4java.impl.mapbased.rpc.func.RpcFunctionMapKey; import com.perforce.p4java.impl.mapbased.rpc.func.RpcFunctionSpec; import com.perforce.p4java.impl.mapbased.rpc.func.client.ClientTrust; import com.perforce.p4java.impl.mapbased.rpc.packet.RpcPacket; import com.perforce.p4java.impl.mapbased.rpc.packet.RpcPacketDispatcher; import com.perforce.p4java.impl.mapbased.rpc.packet.RpcPacketPreamble; import com.perforce.p4java.impl.mapbased.rpc.packet.helper.RpcPacketFieldRule; import com.perforce.p4java.impl.mapbased.rpc.stream.RpcSocketPool.ShutdownHandler; import com.perforce.p4java.impl.mapbased.rpc.stream.helper.RpcSocketHelper; import com.perforce.p4java.server.callback.IFilterCallback; /** * Socket stream I/O based implementation of the RpcConnection class.<p> * * The implementation here uses a small stack of input and output * streams based on socket streams at the lowest level, with (at least) an * optional connection compression stream on top of that layer, and with * charset conversion where necessary.<p> */ public class RpcStreamConnection extends RpcConnection { public static final String TRACE_PREFIX = "RpcStreamConnection"; /** * Number of bytes we allocate for initial byte arrays for sending RPC packets. * In general we don't know how big the final buffer is, so this figure is a bit * of a guessed compromise between over-allocation and frequent resizing. */ protected static final int INITIAL_SENDBUF_SIZE = 2048; /** * When we run out of send buffer space in putPacket, we allocate another, * larger, buffer; this constant determines how much larger than the existing * buffer the new one should be, or, alternatively, how much bigger than the * incoming field length the new buffer should be. Should probably be more * tunable... */ protected static final int SENDBUF_REALLOC_INCR = 1024; private RpcSocketPool pool = null; private Socket socket = null; private InputStream sockInputStream = null; private OutputStream sockOutputStream = null; private InputStream topInputStream = null; private OutputStream topOutputStream = null; /** * Construct a new Perforce RPC connection to the named Perforce server * using java.io socket streams at the lowest level. This constructor sets * up the default non-compressed stack; in general this means just a couple * of simple socket streams. * * @param serverHost * @param serverPort * @param props * @param stats * @param charset * @throws ConnectionException */ public RpcStreamConnection(String serverHost, int serverPort, Properties props, ServerStats stats, Charset charset) throws ConnectionException { this(serverHost, serverPort, props, stats, charset, (Socket) null); } /** * Construct a new Perforce RPC connection to the named Perforce server * using java.io socket streams at the lowest level. This constructor sets * up the default non-compressed stack; in general this means just a couple * of simple socket streams. * * @param serverHost * @param serverPort * @param props * @param stats * @param charset * @throws ConnectionException */ public RpcStreamConnection(String serverHost, int serverPort, Properties props, ServerStats stats, Charset charset, boolean secure) throws ConnectionException { this(serverHost, serverPort, props, stats, charset, (Socket) null, secure); } /** * Construct a new Perforce RPC connection to the named Perforce server using * java.io socket streams at the lowest level. This constructor sets up the default * non-compressed stack; in general this means just a couple of simple socket streams. * * @param serverHost * @param serverPort * @param props * @param stats * @param charset * @param socket * @throws ConnectionException */ public RpcStreamConnection(String serverHost, int serverPort, Properties props, ServerStats stats, Charset charset, Socket socket) throws ConnectionException { this(serverHost, serverPort, props, stats, charset, socket, false); } /** * Construct a new Perforce RPC connection to the named Perforce server using * java.io socket streams at the lowest level. This constructor sets up the default * non-compressed stack; in general this means just a couple of simple socket streams. * * @param serverHost * @param serverPort * @param props * @param stats * @param charset * @param socket * @param secure * @throws ConnectionException */ public RpcStreamConnection(String serverHost, int serverPort, Properties props, ServerStats stats, Charset charset, Socket socket, boolean secure) throws ConnectionException { super(serverHost, serverPort, props, stats, charset, secure); try { this.socket = socket; if( this.socket == null) { this.socket = RpcSocketHelper.createSocket(serverHost, serverPort, props, secure); } init(); } catch (UnknownHostException exc) { throw new ConnectionException("Unable to resolve Perforce server host name '" + hostName + "' for RPC connection"); } catch (IOException exc) { throw new ConnectionException("Unable to connect to Perforce server at " + hostName + ":" + hostPort); } catch (Throwable thr) { Log.error("Unexpected exception: " + thr.getLocalizedMessage()); Log.exception(thr); throw new ConnectionException(thr.getLocalizedMessage()); } } /** * Construct a new Perforce RPC connection to the named Perforce server using * java.io socket streams at the lowest level. This constructor sets up the default * non-compressed stack; in general this means just a couple of simple socket streams. * * @param serverHost * @param serverPort * @param props * @param stats * @param charset * @param pool * @throws ConnectionException */ public RpcStreamConnection(String serverHost, int serverPort, Properties props, ServerStats stats, Charset charset, RpcSocketPool pool) throws ConnectionException { this(serverHost, serverPort, props, stats, charset, pool, false); } /** * Construct a new Perforce RPC connection to the named Perforce server using * java.io socket streams at the lowest level. This constructor sets up the default * non-compressed stack; in general this means just a couple of simple socket streams. * * @param serverHost * @param serverPort * @param props * @param stats * @param charset * @param pool * @param secure * @throws ConnectionException */ public RpcStreamConnection(String serverHost, int serverPort, Properties props, ServerStats stats, Charset charset, RpcSocketPool pool, boolean secure) throws ConnectionException { super(serverHost, serverPort, props, stats, charset, secure); try { this.pool = pool; if( this.pool != null) { this.socket = this.pool.acquire(); } else { this.socket = RpcSocketHelper.createSocket(serverHost, serverPort, props, secure); } init(); } catch (UnknownHostException exc) { throw new ConnectionException("Unable to resolve Perforce server host name '" + hostName + "' for RPC connection"); } catch (IOException exc) { throw new ConnectionException("Unable to connect to Perforce server at " + hostName + ":" + hostPort); } catch (Throwable thr) { Log.error("Unexpected exception: " + thr.getLocalizedMessage()); Log.exception(thr); throw new ConnectionException(thr.getLocalizedMessage()); } } private void init() throws ConnectionException { // Get IP address from socket connection if (this.socket != null) { if (socket.getInetAddress() != null) { InetAddress address = socket.getInetAddress(); // Check if it is an IPv6 address if (Inet6Address.class.isAssignableFrom(address.getClass())) { // Add the square brackets for IPv6 address this.hostIp = "[" + socket.getInetAddress().getHostAddress() + "]"; } else { this.hostIp = socket.getInetAddress().getHostAddress(); } } } // Initialize SSL connection if (this.secure) { initSSL(); } try { this.sockInputStream = new RpcSocketInputStream(this.socket, this.stats); this.sockOutputStream = new RpcSocketOutputStream(this.socket, this.stats); this.topInputStream = this.sockInputStream; this.topOutputStream = this.sockOutputStream; } catch (Throwable thr) { Log.error("Unexpected exception: " + thr.getLocalizedMessage()); Log.exception(thr); throw new ConnectionException(thr.getLocalizedMessage()); } } private void initSSL() throws ConnectionException { // Start SSL handshake if (this.socket != null) { try { // The SSLSocket.getSession() method will initiate the initial // handshake if necessary. Thus, the SSLSocket.startHandshake() // call is not necessary. SSLSession sslSession = ((SSLSocket)this.socket).getSession(); if (!sslSession.isValid()) { // If an error occurs during the initial handshake, // this method returns an invalid session object which // reports an invalid cipher suite of "SSL_NULL_WITH_NULL_NULL". throw new ConnectionException("Error occurred during the SSL handshake: invalid SSL session"); } // Get the certificates Certificate[] serverCerts = sslSession.getPeerCertificates(); if (serverCerts == null || serverCerts.length == 0 || serverCerts[0] == null) { throw new ConnectionException("Error occurred during the SSL handshake: no certificate retrieved from SSL session"); } // Check that the certificate is currently valid. Check the // current date and time are within the validity period given // in the certificate. try { ((X509Certificate)serverCerts[0]).checkValidity(); } catch (CertificateExpiredException e) { throw new ConnectionException("Error occurred during the SSL handshake: certificate expired: " + e.toString(), e); } catch (CertificateNotYetValidException e) { throw new ConnectionException("Error occurred during the SSL handshake: certificate not yet valid: " + e.toString(), e); } // Get the public key from the first certificate PublicKey serverPubKey = serverCerts[0].getPublicKey(); if (serverPubKey == null) { throw new ConnectionException("Error occurred during the SSL handshake: no public key retrieved from server certificate"); } // Generate the fingerprint try { this.fingerprint = ClientTrust.generateFingerprint(serverPubKey); //this.fingerprint = ClientTrust.generateFingerprint((X509Certificate)this.serverCerts[0]); } catch (NoSuchAlgorithmException e) { throw new ConnectionException("Error occurred while generating the fingerprint for the Perforce SSL connection", e); //} catch (CertificateEncodingException e) { // throw new ConnectionException("Error occurred while generating the fingerprint for the Perforce SSL connection", e); } } catch (IOException e) { String message = "Error occurred during SSL hankshake. " + "Please check the release notes for known SSL issues: " + e.getLocalizedMessage(); Log.error(message); Log.exception(e); throw new ConnectionException(message); } } } /** * @see com.perforce.p4java.impl.mapbased.rpc.connection.RpcConnection#getServerIpPort() */ public String getServerIpPort() { String serverIpPort = null; if (this.hostIp != UNKNOWN_SERVER_HOST) { serverIpPort = this.hostIp; if (this.hostPort != UNKNOWN_SERVER_PORT) { serverIpPort += ":" + Integer.toString(this.hostPort); } } else if (this.hostPort != UNKNOWN_SERVER_PORT) { serverIpPort = Integer.toString(this.hostPort); } return serverIpPort; } /** * @see com.perforce.p4java.impl.mapbased.rpc.connection.RpcConnection#disconnect() */ public void disconnect(final RpcPacketDispatcher dispatcher) throws ConnectionException { try { // NOTE: don't do gratuitous (any) flushes here -- this has all been // handled already and will often cause errors in compressed client // connection setups -- HR. ShutdownHandler handler = new ShutdownHandler() { public void shutdown(Socket socket) { if (dispatcher != null) { try { dispatcher.shutdown(RpcStreamConnection.this); } catch (ConnectionException e) { Log.exception(e); } } } }; if (this.pool != null) { this.pool.release(this.socket, handler); } else { handler.shutdown(this.socket); this.topInputStream.close(); this.topOutputStream.close(); this.socket.close(); } } catch (IOException exc) { throw new ConnectionException( "RPC disconnection error: " + exc.getLocalizedMessage(), exc); } } /** * @see com.perforce.p4java.impl.mapbased.rpc.connection.RpcConnection#useConnectionCompression() */ @Override public void useConnectionCompression() throws ConnectionException { if (!this.usingCompression) { super.useConnectionCompression(); try { this.topOutputStream.flush(); // We do this here immediately to avoid having the compress2 itself // compressed... this.putRpcPacket(RpcPacket.constructRpcPacket( RpcFunctionSpec.PROTOCOL_COMPRESS2, "compress2", (String[]) null, null)); this.topOutputStream.flush(); this.topOutputStream = new RpcGZIPOutputStream(this.sockOutputStream); this.topInputStream = new RpcGZIPInputStream(this.sockInputStream); } catch (IOException exc) { Log.error("I/O exception encountered while setting up GZIP streaming: " + exc.getLocalizedMessage()); Log.exception(exc); throw new ConnectionException( "unable to set up client compression streaming to Perforce server: " + exc.getLocalizedMessage(), exc); } } } /** * Get a Perforce RPC packet from the underlying stream. If we're talking * to a Unicode-enabled Perforce server, we attempt to translate the incoming * bytes to the relevant client-side charsets where appropriate based on packet * field type, etc. */ public RpcPacket getRpcPacket() throws ConnectionException { return getRpcPacket(null, null); } /** * Get a Perforce RPC packet from the underlying stream with an optional * rule to handle the RPC packet fields. */ public RpcPacket getRpcPacket(RpcPacketFieldRule fieldRule, IFilterCallback filterCallback) throws ConnectionException { byte[] preambleBytes = new byte[RpcPacketPreamble.RPC_PREAMBLE_SIZE]; RpcPacket packet = null; try { int bytesRead = this.topInputStream.read(preambleBytes); this.stats.streamRecvs.incrementAndGet(); if (bytesRead < 0) { throw new ConnectionException("server connection unexpectedly closed"); } // If we get a partial read, try again until something goes wrong... while ((bytesRead >= 0) && (bytesRead < preambleBytes.length)) { int moreBytesRead = this.topInputStream.read(preambleBytes, bytesRead, preambleBytes.length - bytesRead); this.stats.streamRecvs.incrementAndGet(); if (moreBytesRead < 0) { throw new ConnectionException("server connection unexpectedly closed"); } else { bytesRead += moreBytesRead; } } this.stats.totalBytesRecv.getAndAdd(bytesRead); if (bytesRead != preambleBytes.length) { throw new ConnectionException( "Incomplete RPC packet preamble read from Perforce server; connection probably broken." + " bytes read: " + bytesRead); } RpcPacketPreamble preamble = RpcPacketPreamble.retrievePreamble(preambleBytes); if (preamble == null) { throw new ProtocolError("Null RPC packet preamble in byte buffer"); } else if (!preamble.isValidChecksum()) { throw new ProtocolError("Bad checksum in RPC preamble"); } int payloadLength = preamble.getPayloadSize(); // Note: size is for the *rest of the packet*... // FIXME: really should sanity check the size better here -- HR. if (payloadLength <= 0) { throw new ProtocolError("Bad payload size in RPC preamble: " + payloadLength); } // We know how many bytes to expect for the rest of this packet, so // try to read this in. This can be a ginormous packet in some pathological // cases, so we need to be flexible... byte[] packetBytes = new byte[payloadLength]; int packetBytesRead = this.topInputStream.read(packetBytes, 0, payloadLength); this.stats.streamRecvs.incrementAndGet(); this.stats.totalBytesRecv.getAndAdd(packetBytesRead); if (packetBytesRead <=0) { throw new ConnectionException( "Perforce server network connection closed unexpectedly"); } else { while (packetBytesRead < payloadLength) { // Incomplete read; just try until we get a complete or something goes wrong... this.stats.incompleteReads.incrementAndGet(); int moreBytesRead = this.topInputStream.read(packetBytes, packetBytesRead, payloadLength - packetBytesRead); this.stats.streamRecvs.incrementAndGet(); this.stats.totalBytesRecv.getAndAdd(moreBytesRead); if (moreBytesRead < 0) { throw new ConnectionException( "Perforce server network connection closed unexpectedly"); } packetBytesRead += moreBytesRead; } } if (packetBytesRead != payloadLength) { throw new P4JavaError("RPC packet payload read size mismatch; expected: " + payloadLength + "; got: " + packetBytesRead); } packet = RpcPacket.constructRpcPacket(preamble, packetBytes, this.unicodeServer, this.clientCharset, fieldRule, filterCallback); this.stats.packetsRecv.incrementAndGet(); this.stats.largestRpcPacketRecv.set(Math.max(this.stats.largestRpcPacketRecv.get(), packet.getPacketLength())); } catch (IOException exc) { throw new ConnectionException(exc); } catch (ConnectionException p4jexc) { // Just passing through... throw p4jexc; } catch (P4JavaError p4je) { // Just passing through... throw p4je; } catch (Throwable thr) { // Never a good sign; typically a buffer overflow or positioning // problem, and almost always unrecoverable. Log.error("Unexpected exception: " + thr.getLocalizedMessage()); Log.exception(thr); throw new P4JavaError(thr.getLocalizedMessage(), thr); } return packet; } /** * Put a Perforce RPC packet onto the output stream. In some cases this * may require considerable processing and things like charset translation * here and downstream, but it's normally fairly straightforward. */ public long putRpcPacket(RpcPacket packet) throws ConnectionException { // Note that in general, we don't know how large the packet's output byte // buffer is going to have to be until we've finished the packet contents // marshaling, so we implement buffer resizing when needed. Our initial // guess is INITIAL_SENDBUF_SIZE bytes; we grow the buffer by increasing // it SENDBUF_REALLOC_INCR times each buffer increase. byte[] sendBytes = new byte[INITIAL_SENDBUF_SIZE]; int sendPos = 0; if (packet == null) { throw new NullPointerError( "null RPC packet passed to RpcStreamConnection.putPacket"); } if (packet.getFuncNameString() == null) { throw new P4JavaError("Unmapped / unmappable function in RpcPacket.put()"); } // Skip over the first few bytes for the preamble, which we'll // come back to fill in later when we know the marshaled length. sendPos += RpcPacketPreamble.RPC_PREAMBLE_SIZE; Map<String, Object> mapArgs = packet.getMapArgs(); String[] strArgs = packet.getStrArgs(); if (mapArgs != null) { for (Map.Entry<String, Object> entry : mapArgs.entrySet()) { byte[] fieldBytes = marshalPacketField(entry.getKey(), entry.getValue()); if ((sendBytes.length - sendPos) <= fieldBytes.length) { this.stats.bufferCompacts.getAndIncrement(); // We're overloading the meaning here... int newBytesLength = sendBytes.length + fieldBytes.length + SENDBUF_REALLOC_INCR; byte[] newBytes = new byte[newBytesLength]; System.arraycopy(sendBytes, 0, newBytes, 0, sendPos); sendBytes = newBytes; } System.arraycopy(fieldBytes, 0, sendBytes, sendPos, fieldBytes.length); sendPos += fieldBytes.length; } } if (strArgs != null) { for (String arg : strArgs) { if (arg != null) { byte[] fieldBytes = marshalPacketField(null, arg); if ((sendBytes.length - sendPos) <= fieldBytes.length) { this.stats.bufferCompacts.getAndIncrement(); // We're overloading the meaning here... int newBytesLength = sendBytes.length + fieldBytes.length + SENDBUF_REALLOC_INCR; byte[] newBytes = new byte[newBytesLength]; System.arraycopy(sendBytes, 0, newBytes, 0, sendPos); sendBytes = newBytes; } System.arraycopy(fieldBytes, 0, sendBytes, sendPos, fieldBytes.length); sendPos += fieldBytes.length; } } } if (packet.getEnv() != null) { byte[] envBytes = packet.getEnv().marshal(); if ((sendBytes.length - sendPos) <= envBytes.length) { this.stats.bufferCompacts.getAndIncrement(); // We're overloading the meaning here... int newBytesLength = sendBytes.length + envBytes.length + SENDBUF_REALLOC_INCR; byte[] newBytes = new byte[newBytesLength]; System.arraycopy(sendBytes, 0, newBytes, 0, sendPos); sendBytes = newBytes; } System.arraycopy(envBytes, 0, sendBytes, sendPos, envBytes.length); sendPos += envBytes.length; } byte[] nameBytes = marshalPacketField(RpcFunctionMapKey.FUNCTION, packet.getFuncNameString()); if ((sendBytes.length - sendPos) <= nameBytes.length) { this.stats.bufferCompacts.getAndIncrement(); // We're overloading the meaning here... int newBytesLength = sendBytes.length + nameBytes.length; byte[] newBytes = new byte[newBytesLength]; System.arraycopy(sendBytes, 0, newBytes, 0, sendPos); sendBytes = newBytes; } System.arraycopy(nameBytes, 0, sendBytes, sendPos, nameBytes.length); sendPos += nameBytes.length; // Now go back and calculate the preamble bytes... byte[] preambleBytes = RpcPacketPreamble.constructPreamble( sendPos - RpcPacketPreamble.RPC_PREAMBLE_SIZE).marshalAsBytes(); System.arraycopy(preambleBytes, 0, sendBytes, 0, preambleBytes.length); // Now let's try sending it downstream and see what happens... try { this.topOutputStream.write(sendBytes, 0, sendPos); this.topOutputStream.flush(); this.stats.streamSends.incrementAndGet(); this.stats.totalBytesSent.getAndAdd(sendPos); this.stats.packetsSent.incrementAndGet(); if (this.stats.largestRpcPacketSent.get() < sendPos) { this.stats.largestRpcPacketSent.set(sendPos); } } catch (IOException exc) { Log.exception(exc); StringBuilder message = new StringBuilder(); if (exc instanceof SocketTimeoutException && this.secure) { message.append( MessageFormat .format("SSL connect to ssl:{0}:{1,number,#} failed.\nRemove SSL protocol prefix.\n", this.hostName, this.hostPort)); } else { message.append("Unable to send command to Perforce server: "); } message.append(exc.getMessage()); throw new ConnectionException(message.toString(), exc); } return 0; } /** * @see com.perforce.p4java.impl.mapbased.rpc.connection.RpcConnection#putRpcPackets(com.perforce.p4java.impl.mapbased.rpc.packet.RpcPacket[]) */ public long putRpcPackets(RpcPacket[] packets) throws ConnectionException { int retVal = 0; if (packets == null) { throw new NullPointerError( "Null RPC packets passed to RpcStreamConnection.putPacket"); } for (RpcPacket packet : packets) { if (packet != null) { retVal += putRpcPacket(packet); } } return retVal; } /** * @see com.perforce.p4java.impl.mapbased.rpc.connection.RpcConnection#getSystemSendBufferSize() */ public int getSystemSendBufferSize() { if (this.socket != null) { try { return this.socket.getSendBufferSize(); } catch (SocketException exc) { Log.error("unexpected exception: " + exc.getLocalizedMessage()); Log.exception(exc); } } return 0; } /** * @see com.perforce.p4java.impl.mapbased.rpc.connection.RpcConnection#getSystemRecvBufferSize() */ public int getSystemRecvBufferSize() { if (this.socket != null) { try { return this.socket.getReceiveBufferSize(); } catch (SocketException exc) { Log.error("unexpected exception: " + exc.getLocalizedMessage()); Log.exception(exc); } } return 0; } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#1 | 19903 | stuartrowe |
Branching //guest/perforce_software/p4java/... to //guest/stuartrowe/p4java/... |
||
//guest/perforce_software/p4java/r14.1/src/main/java/com/perforce/p4java/impl/mapbased/rpc/stream/RpcStreamConnection.java | |||||
#1 | 12541 | Matt Attaway | Initial add of the 14.1 p4java source code |