/*******************************************************************************
* Copyright (c) 2013, Perforce Software
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
package com.perforce.p4java.extension.server.internal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.perforce.p4java.extension.exception.ServerAccessException;
import com.perforce.p4java.extension.exception.ServerConnectionException;
import com.perforce.p4java.extension.exception.ServerErrorException;
import com.perforce.p4java.extension.server.ConnectionPool;
import com.perforce.p4java.extension.server.PrincipalCredential;
import com.perforce.p4java.extension.server.Server;
import com.perforce.p4java.extension.server.ServerLocation;
import com.perforce.p4java.extension.server.internal.ServerImpl.ProgramInfo;
import com.perforce.p4java.extension.utility.Validate;
import com.perforce.p4java.impl.mapbased.rpc.RpcPropertyDefs;
/**
* @since 2012.2
*/
public class ServerConnectionPool implements ConnectionPool<Server> {
public static final String REAPER_INTERVAL_IN_MS_KEY = "yetiReaperIntervalInMs";
public static final String CONNECTION_EXPIRY_IN_MS_KEY = "yetiConnectionExpiryInMs";
public static final String P4D_SOCKET_TIMEOUT_KEY = RpcPropertyDefs.RPC_SOCKET_SO_TIMEOUT_NICK;
public static final long DEFAULT_REAPER_INTERVAL = 60000L;
public static final long DEFAULT_CONNECTION_EXPIRY = 120000L;
private final Logger log = Logger.getLogger(ServerConnectionPool.class);
private final Map<Server, Long> acquired, available;
private final int capacity;
private final ServerLocation serverLocation;
private final ProgramInfo programInfo;
private final Properties properties;
private long reaperIntervalInMillis = DEFAULT_REAPER_INTERVAL;
private long connectionExpiryInMillis = DEFAULT_CONNECTION_EXPIRY;
public ServerConnectionPool(int capacity, ServerLocation serverLocation) {
this(capacity, serverLocation, null, null, null);
}
public ServerConnectionPool(int capacity, ServerLocation serverLocation,
String programName, String version) {
this(capacity, serverLocation, programName, version, null);
}
public ServerConnectionPool(int capacity, ServerLocation serverLocation,
String programName, String version, Properties properties) {
Validate.greaterThanZero(capacity,
"capacity must be greater than zero to ServerConnectionPool");
Validate.notNull(serverLocation,
"serverLocation must not be null to ServerConnectionPool");
this.capacity = capacity;
this.serverLocation = serverLocation;
this.acquired = new ConcurrentHashMap<Server, Long>(capacity);
this.available = new ConcurrentHashMap<Server, Long>(capacity);
if (programName == null || version == null)
programInfo = null;
else
programInfo = new ProgramInfo(programName, version);
this.properties = properties;
extractLocalProperties();
setUpReaper();
}
private void extractLocalProperties() {
if (properties == null)
return;
String reaperIntervalString = (String) properties
.remove(REAPER_INTERVAL_IN_MS_KEY);
if (reaperIntervalString != null) {
try {
reaperIntervalInMillis = Long.parseLong(reaperIntervalString);
} catch (NumberFormatException e) {
log.warn("ServerConnectionPool was configured with a non-numeric Reaper Interval property. Ignoring.");
}
}
String connectionExpiryString = (String) properties
.remove(CONNECTION_EXPIRY_IN_MS_KEY);
if (connectionExpiryString != null) {
try {
connectionExpiryInMillis = Long
.parseLong(connectionExpiryString);
} catch (NumberFormatException e) {
log.warn("ServerConnectionPool was configured with a non-numeric Connection Expiry property. Ignoring.");
}
}
}
private void setUpReaper() {
Runnable reaper = new Runnable() {
public void run() {
try {
reap();
} catch (Throwable t) {
log.error("Error generated while reaping server connection"
+ " pool of expired connections. ", t);
}
}
};
Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(reaper,
reaperIntervalInMillis, reaperIntervalInMillis,
TimeUnit.MILLISECONDS);
}
private synchronized void reap() {
log.trace("Starting reaping of expired Server connections.");
Set<Server> servers = available.keySet();
for (Server server : servers) {
if (isExpired(available.get(server))) {
available.remove(server);
server.disconnect();
available.put(server, (long) 0);
}
}
log.trace("Completed reaping of expired Server connections.");
}
/**
* @see com.perforce.p4java.extension.server.ConnectionPool#acquire()
*/
public synchronized Server acquire() {
if (available.size() > 0) {
Entry<Server, Long> entry = available.entrySet().iterator().next();
long created = available.remove(entry.getKey());
if (created == 0)
created = System.currentTimeMillis();
acquired.put(entry.getKey(), created);
return entry.getKey();
}
log.info("creating a new connecting to the server...");
Server server = new ServerImpl(serverLocation, programInfo, properties);
acquired.put(server, System.currentTimeMillis());
return server;
}
/**
* @see com.perforce.p4java.extension.server.ConnectionPool#acquire(com.perforce.p4java.extension.server.PrincipalCredential)
*/
public Server acquire(PrincipalCredential user)
throws ServerAccessException, ServerConnectionException,
ServerErrorException {
Server server = acquire();
server.connectAs(user);
return server;
}
/**
* @see com.perforce.p4java.extension.server.ConnectionPool#release(com.perforce.p4java.extension.server.Server)
*/
public synchronized void release(Server server) {
if (server == null)
return;
if (!acquired.containsKey(server)) {
try {
throw new Exception();
} catch (Exception e) {
log.debug(
"Attempt to release a server that has already been released. Release is ignored.",
e);
}
return;
}
server.clean();
long created = acquired.remove(server);
if (isExpired(created)) {
server.disconnect();
created = 0;
}
if (available.size() < capacity)
available.put(server, created);
else if (created > 0)
server.disconnect();
}
private boolean isExpired(long created) {
return System.currentTimeMillis() > created + connectionExpiryInMillis;
}
public long getConnectionExpiryInMillis() {
return connectionExpiryInMillis;
}
public void setConnectionExpiryInMillis(long connectionExpiryInMillis) {
this.connectionExpiryInMillis = connectionExpiryInMillis;
}
// This is for test only
public synchronized void releaseAllConnections() {
for (Server server : acquired.keySet()) {
if (server.isConnected()) {
server.disconnect();
}
}
acquired.clear();
for (Server server : available.keySet()) {
if (server.isConnected()) {
server.disconnect();
}
}
available.clear();
}
@Override
public List<Server> getConnected() {
List<Server> liveList = new ArrayList<Server>();
for (Server server : acquired.keySet()) {
if (server.isConnected()) {
liveList.add(server);
}
}
for (Server server : available.keySet()) {
if (server.isConnected()) {
liveList.add(server);
}
}
return liveList;
}
@Override
public List<Server> getAquired() {
List<Server> liveList = new ArrayList<Server>();
for (Server server : acquired.keySet()) {
if (server.isConnected()) {
liveList.add(server);
}
}
return liveList;
}
}