/** * Copyright (c) 2010 Perforce Software. All rights reserved. */ package com.perforce.p4java.impl.mapbased.rpc.stream; import com.perforce.p4java.Log; import com.perforce.p4java.impl.mapbased.rpc.stream.helper.RpcSocketHelper; import java.io.IOException; import java.net.Socket; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.Queue; /** * @author Kevin Sawicki (ksawicki@perforce.com) */ public class RpcSocketPool { /** * Shutdown handler for cleaning up before a socket is closed */ public static interface ShutdownHandler { /** * Callback for before the socket is closed to do any pre-close work. * Implementors should not directly close the socket parameter. * * @param socket */ void shutdown(Socket socket); } /** * Pool manager that closes sockets that have been left open for more than * the idle time allowed. * */ private static class PoolManager implements Runnable { /** * Socket idle time system property in milliseconds */ private static final String RPC_SOCKET_IDLE_TIME = "com.perforce.p4java.RPC_SOCKET_IDLE_TIME"; /** * Default idle time to close sockets - 30 seconds */ private static final int DEFAULT_SOCKET_IDLE_TIME = 30000; private int idleTime; private List<RpcSocketPool> pools; private boolean started = false; /** * Create a new pool manager */ public PoolManager() { this.pools = new ArrayList<RpcSocketPool>(); int time = DEFAULT_SOCKET_IDLE_TIME; String configuredTime = System.getProperty(RPC_SOCKET_IDLE_TIME); if (configuredTime != null) { try { time = Integer.parseInt(configuredTime); } catch (NumberFormatException nfe) { time = DEFAULT_SOCKET_IDLE_TIME; } } this.idleTime = time; } public void register(RpcSocketPool pool) { if (pool != null) { synchronized (this) { pools.add(pool); } if (started) { synchronized (this.pools) { this.pools.notify(); } } else { start(); } } } public void start() { started = true; Thread thread = new Thread(this); thread.setName("P4Java Socket Pool Manager"); thread.setPriority(Thread.MIN_PRIORITY); thread.setDaemon(true); thread.start(); } public void unregister(RpcSocketPool pool) { if (pool != null) { synchronized (this) { pools.remove(pool); } } } /** * @see java.lang.Runnable#run() */ public void run() { while (true) { while (this.pools.isEmpty()) { synchronized (this.pools) { try { this.pools.wait(); } catch (InterruptedException e) { break; } } } RpcSocketPool[] pools = null; synchronized (this) { pools = this.pools.toArray(new RpcSocketPool[this.pools .size()]); } for (RpcSocketPool pool : pools) { pool.timeout(this.idleTime); } try { Thread.sleep(this.idleTime); } catch (InterruptedException e) { break; } } } } private static PoolManager MANAGER = new PoolManager(); private class SocketEntry { Socket socket; long releaseTime; /** * Create a new socket entry with the specified socket with a release * time of the current system time * * @param socket */ public SocketEntry(Socket socket) { this.socket = socket; this.releaseTime = System.currentTimeMillis(); } } private Properties socketProperties; private String host; private int port; private int size; private ShutdownHandler shutdownHandler; private Queue<SocketEntry> pool; private boolean secure = false; /** * Create a new socket pool indicating whether it is secure (SSL) or not. * * @param poolSize * @param host * @param port * @param socketProperties * @param shutdownHandler * @param secureServer */ public RpcSocketPool(int poolSize, String host, int port, Properties socketProperties, ShutdownHandler shutdownHandler, boolean secure) { this(poolSize, host, port, socketProperties, shutdownHandler); this.secure = secure; } /** * Create a new socket pool with a max pool size, host, port, and socket * properties, and an optional shutdown handler * * @param poolSize * @param host * @param port * @param socketProperties * @param shutdownHandler */ public RpcSocketPool(int poolSize, String host, int port, Properties socketProperties, ShutdownHandler shutdownHandler) { this.size = poolSize; this.host = host; this.port = port; this.socketProperties = socketProperties; this.pool = new LinkedList<SocketEntry>(); this.shutdownHandler = shutdownHandler; MANAGER.register(this); } /** * Acquire a socket to the configured server address * * @return - socket * @throws IOException */ public Socket acquire() throws IOException { Socket socket = null; synchronized (this.pool) { SocketEntry entry = this.pool.poll(); if (entry != null) { socket = entry.socket; } } if (!isAlive(socket)) { quietClose(socket); socket = RpcSocketHelper.createSocket(this.host, this.port, this.socketProperties, this.secure); } return socket; } private void quietClose(Socket socket) { if (socket != null) { try { socket.getInputStream().close(); } catch (IOException e) { } try { socket.getOutputStream().close(); } catch (IOException e) { } try { socket.close(); } catch (IOException e) { } } } private boolean isAlive(Socket socket) { return socket != null && socket.isBound() && !socket.isClosed() && socket.isConnected() && !socket.isInputShutdown() && !socket.isOutputShutdown(); } /** * Release a socket back to the pool as no longer using * * @param socket * @param shutdownHandler * @throws IOException */ public void release(Socket socket, ShutdownHandler shutdownHandler) throws IOException { if (isAlive(socket)) { boolean close = false; synchronized (this.pool) { if (this.pool.size() < size) { this.pool.add(new SocketEntry(socket)); } else { close = true; } } if (close) { if (shutdownHandler != null) { shutdownHandler.shutdown(socket); } if (!socket.isClosed()) { socket.getInputStream().close(); } if (!socket.isClosed()) { socket.getOutputStream().close(); } socket.close(); } } } private void close(Socket socket) throws IOException { if (socket != null) { if (!socket.isClosed()) { socket.getInputStream().close(); } if (!socket.isClosed()) { socket.getOutputStream().close(); } socket.close(); } } /** * Disconnect all sockets from the specified host and port */ public void disconnect() { Socket[] sockets = null; try { synchronized (this.pool) { sockets = new Socket[this.pool.size()]; int count = 0; for (SocketEntry entry : this.pool) { sockets[count] = entry.socket; count++; } this.pool.clear(); } for (Socket socket : sockets) { if (this.shutdownHandler != null) { this.shutdownHandler.shutdown(socket); } try { close(socket); } catch (IOException e) { Log.exception(e); } } } finally { MANAGER.unregister(this); } } /** * Timeout any sockets idle for greater than or equal to the milliseconds * value specified * * @param idleDuration */ public void timeout(int idleDuration) { synchronized (this.pool) { long openTime; List<SocketEntry> closed = new ArrayList<SocketEntry>(); for (SocketEntry entry : this.pool) { openTime = System.currentTimeMillis() - entry.releaseTime; if (openTime >= idleDuration) { if (this.shutdownHandler != null) { this.shutdownHandler.shutdown(entry.socket); } quietClose(entry.socket); closed.add(entry); } } this.pool.removeAll(closed); } } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#1 | 19903 | stuartrowe |
Branching //guest/perforce_software/p4java/... to //guest/stuartrowe/p4java/... |
||
//guest/perforce_software/p4java/r14.1/src/main/java/com/perforce/p4java/impl/mapbased/rpc/stream/RpcSocketPool.java | |||||
#1 | 12541 | Matt Attaway | Initial add of the 14.1 p4java source code |