/*******************************************************************************
* Copyright (c) 2013, Perforce Software
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
package com.perforce.search.manager;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.util.ContentStream;
import com.perforce.p4java.extension.exception.ServerAccessException;
import com.perforce.p4java.extension.exception.ServerConnectionException;
import com.perforce.p4java.extension.exception.ServerErrorException;
import com.perforce.p4java.extension.operation.FileStreamCommand;
import com.perforce.p4java.extension.operation.FilesCommand;
import com.perforce.p4java.extension.operation.FstatCommand;
import com.perforce.p4java.extension.operation.SizesCommand;
import com.perforce.p4java.extension.path.PathStringFormat;
import com.perforce.p4java.extension.path.PerforcePath;
import com.perforce.p4java.extension.path.PerforcePathFactory;
import com.perforce.p4java.extension.server.Server;
import com.perforce.search.configuration.Configuration;
import com.perforce.search.manager.impl.QueueFile;
import com.perforce.search.manager.impl.QueueManagerImpl;
import com.perforce.search.server.ServerConnectionPoolFacade;
public class FileQueueIndexThread extends Thread {
private final Logger log = Logger.getLogger(FileQueueIndexThread.class);
// private Configuration config;
private ServerConnectionPoolFacade pool;
private SearchIndexManager searchIndexManager;
private long maxFileSize;
private boolean indexAllRevisions;
private int allRevisionsDepth;
private boolean indexFstats;
private String whiteFstatRegex;
private String blackFstatRegex;
private QueueManager queueManager;;
private Set<String> ignoredExtensions;
private ChangeCache changeCache;
private int failRetryTimeout = 10000;
public FileQueueIndexThread(final Configuration config,
final ServerConnectionPoolFacade pool,
final SearchIndexManager searchIndexManager,
final QueueManager manager, final ChangeCache changeCache) {
// this.config = config;
this.pool = pool;
this.searchIndexManager = searchIndexManager;
this.maxFileSize = config.getMaxFileSize();
this.queueManager = manager;
this.changeCache = changeCache;
this.ignoredExtensions = config.getIgnoredExtensions();
this.indexAllRevisions = config.isIndexAllRevisions();
this.allRevisionsDepth = config.getFileScanRevisionDepth();
this.indexFstats = config.indexFstats();
this.blackFstatRegex = config.getBlackFstatRegex();
this.whiteFstatRegex = config.getWhiteFstatRegex();
}
private void indexFileName(final PerforcePath depotPath, final int rev,
final boolean headRev, final String user, final Date date,
final long size, final Map<String, String> fstats)
throws ServerConnectionException {
searchIndexManager.add(depotPath, rev, headRev, user, date, size,
fstats);
}
private boolean indexFileContent(final PerforcePath depotPath,
final int rev, final boolean headRev, final String user,
final Date date, final long size, final Map<String, String> fstats,
final Server server) throws ServerConnectionException {
// path is for the indexer, leave the indexer un-escaped
String path = depotPath.asRevisionlessString(null);
if (indexAllRevisions) {
path += "#" + rev;
}
ContentStream cs = null;
try {
// now we (maybe) need to print the file
final InputStream is = FileStreamCommand.getFileStream(
PerforcePathFactory.getPathWithNewRevision(depotPath, rev),
server);
if (is == null) {
return false;
}
// a facade for solr's ContentStream
cs = new ContentStreamWrapper(is, path);
} catch (ServerConnectionException e) {
log.error(e);
} catch (ServerAccessException e) {
log.error(e);
} catch (ServerErrorException e) {
log.error(e);
}
if (cs == null) {
return false;
}
try {
log.debug("indexing " + path);
// post to our indexer
if (!searchIndexManager.upload(
PerforcePathFactory.getPathWithNewRevision(depotPath, rev),
cs, headRev, user, date, size, fstats)) {
return false;
}
log.debug("done with " + path);
if (cs.getStream() != null) {
cs.getStream().close();
}
return true;
} catch (IOException e) {
log.error(e);
}
return false;
}
private void indexFile(final PerforcePath depotPath, final String ext,
final long fileSize, final int rev, final boolean headRev,
final String user, final Date date,
final Map<String, String> fstats, final Server server)
throws ServerConnectionException {
if (fileSize == 0 || maxFileSize <= fileSize
|| ignoredExtensions.contains(ext)) {
// log.debug("Skipping file content for: " + path);
indexFileName(depotPath, rev, headRev, user, date, fileSize, fstats);
} else {
if (!indexFileContent(depotPath, rev, headRev, user, date,
fileSize, fstats, server)) {
// log.debug("Skipping file content for: " + path);
indexFileName(depotPath, rev, headRev, user, date, fileSize,
fstats);
}
}
}
private Map<String, Map<String, String>> getFstatsFor(final QueueFile qf,
final Server server) throws ServerConnectionException,
ServerAccessException, ServerErrorException {
Map<String, Map<String, String>> retMap = new HashMap<String, Map<String, String>>();
// do we even other?
if (!indexFstats) {
return retMap;
}
// only doing the head and it's already done?
if (!this.isIndexAllRevisions() && qf.getFstatData() != null) {
retMap.put(String.valueOf(qf.getRev()), qf.getFstatData());
return retMap;
}
// either do just the head (-Oasl) or all (-Oalf)
Map<String, Object>[] fstatResults = FstatCommand.getFstats(
qf.getDepotPath(), indexAllRevisions, server);
for (Map<String, Object> map : fstatResults) {
Map<String, String> fstatPerRev = new HashMap<String, String>();
for (Map.Entry<String, Object> e : map.entrySet()) {
if (e.getKey().equals(FstatCommand.tagDigest)) {
// freebee
fstatPerRev.put(e.getKey(), (String) e.getValue());
continue;
}
if (QueueManagerImpl.filterFstat(blackFstatRegex,
whiteFstatRegex, e.getKey())) {
fstatPerRev.put(
QueueManagerImpl.mangleFstatKey(e.getKey()),
QueueManagerImpl.convertFstatValue(e.getValue()));
}
}
retMap.put((String) map.get(FstatCommand.tagHeadRevision),
fstatPerRev);
}
return retMap;
}
private boolean isIndexAllRevisions() {
// TODO Auto-generated method stub
return false;
}
@Override
public void run() {
log.debug("Starting thread " + this.getName());
Server server = null;
final int commitCount = 10; // commit 10 indexes at a time
try {
int submits = commitCount;
while (true) {
// this will timeout
QueueFile qf = queueManager.removeFileFromQueue(false);
// time to commit things?
if (qf == null || submits <= 0) {
// commit
searchIndexManager.commit();
submits = commitCount;
}
// time to wait?
if (qf == null) {
// release the server connection
pool.release(server);
server = null;
// now block
qf = queueManager.removeFileFromQueue(true);
}
while (true) {
// inner try so that server exceptions do not kill the
// thread
try {
if (server == null) {
server = pool.acquireIndexer();
}
// path is used extension and files/sizes query, escape
// it
final String path = qf.getDepotPath()
.asRevisionlessString(
EnumSet.of(PathStringFormat.ESCAPE));
String ext = getExtension(path);
// for scans, fstatData is null; either fetch one or
// everything if we're doing that
Map<String, Map<String, String>> fstatsToIndex = getFstatsFor(
qf, server);
indexFile(qf.getDepotPath(), ext, qf.getFileSize(),
qf.getRev(), true, qf.getModifiedUser(),
qf.getModifiedDate(),
fstatsToIndex.get(String.valueOf(qf.getRev())),
server);
--submits;
if (qf.isIndexPreviousRevs() && qf.getRev() > 1) {
// do a files -a 1,rev-1 on this file
Map<String, Object>[] fileHistory = FilesCommand
.getAllRevisions(path, server);
// get all of the change numbers and mass update the
// cache
List<String> changes = new ArrayList<String>();
for (Map<String, Object> map : fileHistory) {
final String change = (String) map
.get(FilesCommand.tagChange);
if (change == null) {
continue;
}
changes.add(change);
}
changeCache.addChanges(changes);
// also run sizes -a to filter out revs that are too
// large
Map<String, Object>[] sizesResult = SizesCommand
.getAllSizes(path, server);
Map<Integer, Long> sizes = new HashMap<Integer, Long>();
for (Map<String, Object> revMap : sizesResult) {
String rev = (String) revMap
.get(SizesCommand.tagRevision);
String size = (String) revMap
.get(SizesCommand.tagFileSize);
if (rev == null || size == null) {
continue;
}
sizes.put(Integer.valueOf(rev),
Long.valueOf(size));
}
for (Map<String, Object> map : fileHistory) {
String rev = (String) map
.get(FilesCommand.tagRevision);
if (rev == null) {
continue;
}
String change = (String) map
.get(FilesCommand.tagChange);
if (change == null) {
continue;
}
int depotRev = Integer.valueOf(rev);
// never index higher than rev-1
if (depotRev >= qf.getRev()) {
continue;
}
// never go deeper than allRevisionsDepth from
// the base rev
if (allRevisionsDepth > 0
&& qf.getRev() - depotRev >= allRevisionsDepth) {
continue;
}
// skip deletes
if (QueueManagerImpl.badTypeOrAction(map)) {
log.info("skipping bad action or type on "
+ path + "#" + rev);
continue;
}
Long size = sizes.get(depotRev);
if (size == null) {
log.info("skipping null size on " + path
+ "#" + rev);
continue;
}
final String user = changeCache.getUser(change);
final Date date = changeCache.getDate(change);
indexFile(qf.getDepotPath(), ext, size,
depotRev, false, user, date,
fstatsToIndex.get(rev), server);
--submits;
// time to commit things?
if (qf == null || submits <= 0) {
// commit
searchIndexManager.commit();
submits = commitCount;
}
}
}
// commit before rectifying what the "head" is
searchIndexManager.commit();
submits = commitCount;
// rectify the "head" attribute: query for this filename
// with the headrev attrib, reindex the incorrect ones
rectifyHeadRevisions(qf.getDepotPath(), server);
// break out of the while(true) loop
break;
} catch (ServerConnectionException e) {
log.error("Connection exception processing "
+ qf.getDepotPath().asRevisionlessString(null)
+ ", recycling");
queueManager.recycleQueueFile(qf);
pool.release(server);
server = null;
Thread.sleep(failRetryTimeout); // try again later
} catch (ServerAccessException e) {
log.error("Access exception processing "
+ qf.getDepotPath().asRevisionlessString(null)
+ ", recycling");
queueManager.recycleQueueFile(qf);
pool.release(server);
server = null;
Thread.sleep(failRetryTimeout); // try again later
} catch (ServerErrorException e) {
log.error("Server error processing "
+ qf.getDepotPath().asRevisionlessString(null)
+ ", recycling");
queueManager.recycleQueueFile(qf);
pool.release(server);
server = null;
Thread.sleep(failRetryTimeout); // try again later
}
}
}
} catch (InterruptedException e) {
log.info("Interrupted", e);
} catch (Exception e) {
log.error(
"Unhandled exception in file index thread, thread aborted",
e);
} finally {
pool.release(server);
}
}
private long getSize(final PerforcePath path, final Server server)
throws ServerConnectionException, ServerAccessException,
ServerErrorException {
// also run sizes to filter out revs that are too
// large
Map<String, Object>[] sizesResult = SizesCommand.getSizes(path, server);
// weird results mean size == 0
if (sizesResult == null || sizesResult.length != 1) {
return 0;
}
String size = (String) sizesResult[0].get("fileSize");
return (size == null) ? 0 : Long.valueOf(size);
}
private String getExtension(final String path) {
int extLoc = path.lastIndexOf('.');
return (extLoc < 0) ? "" : path.substring(extLoc);
}
private void rectifyHeadRevisions(final PerforcePath basePath,
final Server server) throws ServerConnectionException,
ServerAccessException, ServerErrorException {
List<SolrDocument> toRectify = searchIndexManager
.getRectifyHeadRevision(basePath);
// we need size, extension, revision, user, date, and fstats to reindex
for (SolrDocument doc : toRectify) {
final PerforcePath depotPath = PerforcePathFactory
.getPath((String) doc.getFieldValue(SchemaKey.ID));
final String rev = (String) doc.getFieldValue(SchemaKey.REVISION);
final String user = (String) doc
.getFieldValue(SchemaKey.MODIFIED_BY);
Date date = (Date) doc.getFieldValue(SchemaKey.MODIFIED_TIME);
final String ext = getExtension(depotPath
.asRevisionlessString(EnumSet.of(PathStringFormat.ESCAPE)));
final Map<String, String> fstatData = new HashMap<String, String>();
// add the digest and fstat fields, minus the prefix (it will be
// added later)
for (String field : doc.getFieldNames()) {
if (field.equals(SchemaKey.DIGEST)
|| field.startsWith(SchemaKey.FSTATS_PREFIX)) {
fstatData.put(field, (String) doc.getFieldValue(field));
}
}
this.indexFile(depotPath, ext, getSize(depotPath, server),
Integer.valueOf(rev), false, user, date, fstatData, server);
}
}
}