/*******************************************************************************
* Copyright (c) 2013, Perforce Software
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
package com.perforce.search.manager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ContentStreamBase.StringStream;
import org.springframework.util.StringUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.perforce.p4java.extension.command.Request;
import com.perforce.p4java.extension.exception.ServerAccessException;
import com.perforce.p4java.extension.exception.ServerConnectionException;
import com.perforce.p4java.extension.exception.ServerErrorException;
import com.perforce.p4java.extension.path.PerforcePath;
import com.perforce.p4java.extension.path.PerforcePathFactory;
import com.perforce.p4java.extension.server.PrincipalCredential;
import com.perforce.p4java.extension.server.Server;
import com.perforce.search.configuration.Configuration;
import com.perforce.search.configuration.internal.ConfigurationImpl;
import com.perforce.search.manager.impl.QueueFile;
import com.perforce.search.server.ServerConnectionPoolFacade;
public class DirectoryScannerThreadTest {
private static final Logger log = Logger
.getLogger(DirectoryScannerThreadTest.class);
private Configuration configuration;
private ServerConnectionPoolFacade pool;
private QueueManager queueManager;
private Server server;
private SearchIndexManager searchIndexManager;
private AtomicInteger dirCount = new AtomicInteger(0);
private AtomicInteger fileCount = new AtomicInteger(0);
public AtomicInteger queuedCount = new AtomicInteger(0);
final int COMPLEXITY = 3;
final int FILES_PER_DIR = 10;
public DirectoryScannerThreadTest() {
// init config, pool and queueManager with test implementations
StringStream ss = new StringStream(
"com.perforce.search.fileScannerThreads=3\n"
+ "com.perforce.search.fileScannerSleep=0");
// mostly defaults
try {
configuration = new ConfigurationImpl(ss.getStream());
} catch (IOException e1) {
log.error("configuration failed");
}
server = new TestServer() {
@SuppressWarnings("unchecked")
@Override
public Map<String, Object>[] execute(Request arg0)
throws ServerConnectionException, ServerAccessException,
ServerErrorException {
List<Map<String, Object>> resp = new ArrayList<Map<String, Object>>();
String cmd = arg0.getCommand();
// simulate server response variance
double rand = Math.random();
try {
Thread.sleep((long) (rand * 20));
} catch (InterruptedException e1) {
log.error("Thread sleep interrupted!", e1);
}
// luckily we only need to support a few response types
if (cmd.equals("counter")) {
Map<String, Object> r0 = new HashMap<String, Object>();
r0.put("counter", configuration.getFileScannerTokenKey());
r0.put("value", "0");
resp.add(r0);
} else if (cmd.equals("depots")) {
// need name and type in each map
for (int i = 0; i < COMPLEXITY; ++i) {
Map<String, Object> r0 = new HashMap<String, Object>();
r0.put("name", "depot" + String.valueOf(i));
r0.put("type", (i % 2) == 1 ? "stream" : "local");
resp.add(r0);
dirCount.incrementAndGet();
}
} else if (cmd.equals("files")) {
// files needs depotFile, rev, action
String ca0 = arg0.getCommandArgs()[0];
int rev = StringUtils.countOccurrencesOf(ca0, "/");
String rootPath = ca0.substring(0, ca0.length() - 1);
for (int i = 0; i < FILES_PER_DIR; ++i) {
Map<String, Object> r0 = new HashMap<String, Object>();
r0.put("depotFile",
rootPath + "file" + String.valueOf(i) + ".txt");
r0.put("rev", String.valueOf(rev));
r0.put("action", "edit");
resp.add(r0);
fileCount.incrementAndGet();
}
} else if (cmd.equals("dirs")) {
// dir : depot path
String ca0 = arg0.getCommandArgs()[0];
// terminate if deeper than N directories in
if (StringUtils.countOccurrencesOf(ca0, "/") < COMPLEXITY + 3) {
String rootPath = ca0.substring(0, ca0.length() - 1);
for (int i = 0; i < COMPLEXITY; ++i) {
Map<String, Object> r0 = new HashMap<String, Object>();
r0.put("dir",
rootPath + "dirnum_" + String.valueOf(i));
resp.add(r0);
dirCount.incrementAndGet();
}
}
} else if (cmd.equals("sizes")) {
// need depotFile and fileSize
String ca0 = arg0.getCommandArgs()[0];
int rev = StringUtils.countOccurrencesOf(ca0, "/");
String rootPath = ca0.substring(0, ca0.length() - 1);
for (int i = 0; i < FILES_PER_DIR; ++i) {
Map<String, Object> r0 = new HashMap<String, Object>();
r0.put("depotFile",
rootPath + "file" + String.valueOf(i) + ".txt");
r0.put("fileSize", String.valueOf(rev));
resp.add(r0);
}
}
return resp.toArray(new Map[0]);
}
};
pool = new ServerConnectionPoolFacade() {
@Override
public Server acquire() {
return server;
}
@Override
public Server acquire(PrincipalCredential arg0)
throws ServerAccessException, ServerConnectionException,
ServerErrorException {
return server;
}
@Override
public List<Server> getAquired() {
return null;
}
@Override
public List<Server> getConnected() {
return null;
}
@Override
public void release(Server arg0) {
}
@Override
public void releaseAllConnections() {
}
@Override
public Server acquireIndexer() {
return server;
}
@Override
public Server acquireTicketed(PrincipalCredential user)
throws ServerAccessException, ServerConnectionException,
ServerErrorException {
return server;
}
};
queueManager = new QueueManager() {
private BlockingQueue<QueueFile> filesTriggerQueue = new LinkedBlockingQueue<QueueFile>(); // this
@Override
public QueueFile removeFileFromQueue(boolean block)
throws InterruptedException {
if (block)
return filesTriggerQueue.take();
else
return filesTriggerQueue.poll();
}
@Override
public Map<String, Object>[] getOnlyValidNames(
Map<String, Object>[] files) {
return files;
}
@Override
public boolean addScanFileToQueue(Map<String, Object> filesMap,
Map<String, Long> sizesMap) {
queuedCount.incrementAndGet();
String df = (String) filesMap.get("depotFile");
filesTriggerQueue.add(new QueueFile(PerforcePathFactory
.getPath(df), Integer.valueOf((String) filesMap
.get("rev")), sizesMap.get(df), false, false, "dummy",
new Date(), null));
return true;
}
@Override
public int getFileScanCount() {
return 0; // filesTriggerQueue.size();
}
@Override
public void setScanComplete() {
// TODO Auto-generated method stub
}
@Override
public void recycleQueueFile(QueueFile qf)
throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public int getFileScanActiveThreads() {
// keep the directory scanner going as long as possible
return 3;
}
};
searchIndexManager = new SearchIndexManager() {
@Override
public QueryResponse searchForFiles(String query, int count,
int startRow) {
return new QueryResponse();
}
@Override
public List<String> search(String query) {
return Collections.emptyList();
}
@Override
public void commit() {
}
@Override
public boolean upload(PerforcePath path, ContentStream cs,
boolean headRev, String modifiedUser, Date modifiedDate,
long size, Map<String, String> fstats) {
return true;
}
@Override
public boolean add(PerforcePath path, int rev, boolean headRev,
String modifiedUser, Date modifiedDate, long size,
Map<String, String> fstats) {
return true;
}
@Override
public List<SolrSchemaField> getSearchFields() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<SolrDocument> getRectifyHeadRevision(PerforcePath path) {
// TODO Auto-generated method stub
return null;
}
};
}
@Test
public void testScanCounts() {
DirectoryScannerThread dtt = new DirectoryScannerThread(configuration,
pool, queueManager, searchIndexManager);
try {
dtt.setAndStart();
// wait until it is done
dtt.join();
// check that all the directories and all of the files were picked
// up
// calculate the target sum
long expectedDirs = 0;
long lastPow = 1;
for (int i = 0; i <= COMPLEXITY; ++i) {
lastPow = lastPow * COMPLEXITY;
expectedDirs += lastPow;
}
Assert.assertEquals(expectedDirs, dirCount.get());
Assert.assertEquals(expectedDirs * FILES_PER_DIR, queuedCount.get());
Assert.assertEquals(expectedDirs * FILES_PER_DIR, fileCount.get());
} catch (InterruptedException e) {
Assert.assertTrue(e == null);
}
};
}