/******************************************************************************* * Copyright (c) 2013, Perforce Software * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ******************************************************************************/ package com.perforce.search.manager.impl; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import com.perforce.p4java.extension.exception.ServerAccessException; import com.perforce.p4java.extension.exception.ServerConnectionException; import com.perforce.p4java.extension.exception.ServerErrorException; import com.perforce.p4java.extension.operation.ChangesCommand; import com.perforce.p4java.extension.operation.CounterCommand; import com.perforce.p4java.extension.operation.DescribeCommand; import com.perforce.p4java.extension.operation.FilesCommand; import com.perforce.p4java.extension.operation.FstatCommand; import com.perforce.p4java.extension.path.PerforcePath; import com.perforce.p4java.extension.path.PerforcePathFactory; import com.perforce.p4java.extension.server.Server; import com.perforce.p4java.extension.utility.CommandUtil; import com.perforce.p4java.extension.utility.FormatUtility; import com.perforce.search.configuration.Configuration; import com.perforce.search.manager.ChangeCache; import com.perforce.search.manager.DirectoryScannerThread; import com.perforce.search.manager.FileQueueIndexThread; import com.perforce.search.manager.QueueManager; import com.perforce.search.manager.SchemaKey; import com.perforce.search.manager.SearchIndexManager; import com.perforce.search.server.ServerConnectionPoolFacade; public class QueueManagerImpl implements QueueManager, Runnable { private static Logger log = Logger.getLogger(QueueManagerImpl.class); private Thread manager = null; private Configuration configuration; private String queueDirectory; private File queueFile = null; private ServerConnectionPoolFacade pool = null; private SearchIndexManager searchIndexManager = null; private Set<String> neverProcess; private long maxScanQueueSize; private boolean indexAllRevisions; private boolean indexFstats; private String blackFstatRegex; private String whiteFstatRegex; private String changeCatchupKey; private Map<String, QueueFile> filesMap; // this is for looking them up protected BlockingQueue<QueueFile> filesTriggerQueue; // this is for // processing order protected BlockingQueue<QueueFile> filesScanQueue; // this is for processing // order private static int workerThreads = 3; // TODO: configurable private static int sleepNoWork = 1000; protected List<FileQueueIndexThread> threads; protected DirectoryScannerThread directoryScanner; private final Object lock = new Object(); private ChangeCache changeCache; // for testing only private boolean quitWhenEmpty = false; private AtomicInteger filesTotal = new AtomicInteger(0); public QueueManagerImpl(final Configuration config, final SearchIndexManager indexManager, final ServerConnectionPoolFacade pool) { this.configuration = config; neverProcess = config.getNeverProcessList(); config.getIgnoredExtensions(); config.getMaxFileSize(); this.neverProcess = config.getNeverProcessList(); this.filesTriggerQueue = new LinkedBlockingQueue<QueueFile>(); this.filesScanQueue = new LinkedBlockingQueue<QueueFile>(); this.filesMap = new ConcurrentHashMap<String, QueueFile>(); this.manager = new Thread(this); this.searchIndexManager = indexManager; this.pool = pool; this.changeCache = new ChangeCacheImpl(pool); this.queueDirectory = config.getQueueDir(); this.queueFile = new File(this.queueDirectory); this.maxScanQueueSize = config.getMaxScanQueueSize(); this.indexAllRevisions = config.isIndexAllRevisions(); this.changeCatchupKey = config.getChangelistCatchupKey(); this.indexFstats = config.indexFstats(); this.blackFstatRegex = config.getBlackFstatRegex(); this.whiteFstatRegex = config.getWhiteFstatRegex(); log.debug("starting queue manager thread"); // start the queue manager thread this.manager.start(); try { synchronized (lock) { while (this.directoryScanner == null && this.manager.isAlive()) { lock.wait(sleepNoWork); } } } catch (InterruptedException e) { log.error(e); } } private void updateChangeKey(final String change, final Server server) { try { if (!changeCatchupKey.isEmpty()) { // only update if the change is higher than the input change String curChange = CounterCommand.getCounterValueSideTable( changeCatchupKey, server); if (Integer.valueOf(curChange) >= Integer.valueOf(change)) { return; } CounterCommand.setCounterValueSideTable(changeCatchupKey, change, server); } } catch (ServerConnectionException e) { log.error("Failed up update change key", e); } catch (ServerAccessException e) { log.error("Failed up update change key", e); } catch (ServerErrorException e) { log.error("Failed up update change key", e); } } private Set<String> catchupOnChanges() throws InterruptedException { Set<String> returnSet = new HashSet<String>(); Server server = null; try { if (!changeCatchupKey.isEmpty()) { server = pool.acquire(); if (server == null) { log.error("Aborting, server = null"); return returnSet; } String changelist = CounterCommand.getCounterValueSideTable( changeCatchupKey, server); if (changelist != null && !changelist.equals("0")) { // also get the current change counter String curChange = CounterCommand.getCounterValue("change", server); if (curChange != null && !curChange.equals(changelist)) { int fromCL = Integer.valueOf(changelist); int toCL = Integer.valueOf(curChange); if (toCL - fromCL > this.configuration.getMaxCatchup()) { log.error("Too many changelists to catch up on. Re-scan the server, change the config file limit, or feed changes with curl"); return returnSet; } final String limit = String.valueOf(toCL - fromCL - 1); Map<String, Object>[] changes = ChangesCommand .getRecentChanges(limit, server); // changes that are not committed give us a map // without a change number // process in reverse order for (int i = changes.length - 1; i >= 0; --i) { Map<String, Object> change = changes[i]; String changeNum = (String) change .get(ChangesCommand.tagChange); if (changeNum == null) { continue; } int changeVal = Integer.valueOf(changeNum); if (changeVal < fromCL || changeVal > toCL) { continue; } changelistToFileQueue(server, changeNum); returnSet.add(changeNum); updateChangeKey(changeNum, server); } } } } } catch (ServerConnectionException e) { log.error("Failed, skipping catchup step", e); } catch (ServerAccessException e) { log.error("Failed, skipping catchup step", e); } catch (ServerErrorException e) { log.error("Failed, skipping catchup step", e); } finally { pool.release(server); } return returnSet; } @Override public void run() { log.debug("Started thread " + this.manager.getName()); // process files in the queue directory // poll the directory every M seconds, Server server = null; try { threads = new ArrayList<FileQueueIndexThread>(); for (int i = 0; i < workerThreads; ++i) { FileQueueIndexThread thread = new FileQueueIndexThread( configuration, pool, searchIndexManager, this, changeCache); threads.add(thread); thread.start(); } synchronized (lock) { // also start the directory scanner thread DirectoryScannerThread dst = new DirectoryScannerThread( configuration, pool, this, searchIndexManager); dst.setAndStart(); this.directoryScanner = dst; lock.notifyAll(); // we are running } // first check if we need to catch up on missing changelists Set<String> changeCatchup = catchupOnChanges(); do { try { File[] listFiles = queueFile.listFiles(); if (listFiles.length == 0) { // let the server connection go away when we sleep pool.release(server); server = null; // no need to keep this around changeCatchup.clear(); Thread.sleep(sleepNoWork); continue; } Arrays.sort(listFiles); ArrayList<File> files = new ArrayList<File>( Arrays.asList(listFiles)); for (File f : files) { BufferedReader br = new BufferedReader( new FileReader(f)); String trigger = br.readLine(); if (trigger == null) { log.info("waiting on empty file " + f.getName()); Thread.sleep(sleepNoWork); break; } br.close(); String[] args = trigger.split(","); if (args.length != 2) { log.error("Received bad queue data in " + f.getPath() + ": " + trigger); Thread.sleep(sleepNoWork); continue; } if (server == null) { server = pool.acquire(); } if (server == null) { log.error("Skipping, server = null"); Thread.sleep(sleepNoWork); continue; } // TODO: make this more generic if (args[0].equals("reindex")) { reindexFile(server, args[1]); } else if (args[0].equals("change-commit") || args[0].equals("commit")) { // check if we already processed this changelist // during the catchup phase if (changeCatchup.contains(args[1])) { changeCatchup.remove(args[1]); } else { changelistToFileQueue(server, args[1]); updateChangeKey(args[1], server); } } else { log.error("Unexpected command, skipping " + args[0] + "," + args[1]); } // clean up the file after successful processing f.delete(); } } catch (FileNotFoundException e) { log.error("File not found exception!"); Thread.sleep(sleepNoWork); } catch (IOException e) { log.error("IOException!", e); Thread.sleep(sleepNoWork); } } while (true); } catch (InterruptedException e) { log.error("Interrupted!", e); } catch (Exception e) { log.error("Unhandled exception: ", e); } finally { lock.notifyAll(); log.info("QueueManager.run() completed, exiting"); pool.release(server); } } private String indexedTag(final String tag, final int index) { return String.format("%s%d", tag, index); } @Override public boolean addScanFileToQueue(final Map<String, Object> filesMap, final Map<String, Long> sizesMap) throws InterruptedException { // should have already run through getOnlyValidNames() String path = (String) filesMap.get("depotFile"); String rev = (String) filesMap.get("rev"); Long fileSize = sizesMap.get(path); if (fileSize == null && !indexAllRevisions) { // null filesize means it's deleted at head, try anyway? log.info("Skipping " + path + ", fileSize is null"); return false; } final PerforcePath depotPath = (indexAllRevisions) ? PerforcePathFactory .getPath(path, "#" + rev) : PerforcePathFactory.getPath(path); final String changeNum = (String) filesMap.get("change"); changeCache.addChange(changeNum); addToFilesQueue(depotPath, Integer.valueOf(rev), fileSize != null ? fileSize : 0, false, fileSize != null, changeCache.getUser(changeNum), changeCache.getDate(changeNum), null); return true; } private void reindexFile(final Server server, final String fileRev) { // like the CL code but for a single file } public static String convertFstatValue(final Object o) { try { try { return (String) o; } catch (ClassCastException e) { byte[] array = (byte[]) o; return new String(array, "UTF-8"); } } catch (Exception e) { return null; } } private Map<String, Map<String, String>> getIndexableFstats( final List<PerforcePath> paths, final Server server) throws ServerConnectionException, ServerAccessException, ServerErrorException { Map<String, Map<String, String>> fstatsToIndex = new HashMap<String, Map<String, String>>(); if (!indexFstats) { return fstatsToIndex; } Map<String, Object>[] fstats = FstatCommand.getFstats(paths, server); for (Map<String, Object> fstat : fstats) { Map<String, String> fileFstats = new HashMap<String, String>(); for (Map.Entry<String, Object> e : fstat.entrySet()) { if (e.getKey().equals(FstatCommand.tagDigest)) { // freebee fileFstats.put(e.getKey(), (String) e.getValue()); continue; } if (!filterFstat(blackFstatRegex, whiteFstatRegex, e.getKey())) { continue; } String data = convertFstatValue(e.getValue()); if (data != null && data.length() > 0) { fileFstats.put(mangleFstatKey(e.getKey()), data); } } fstatsToIndex.put((String) fstat.get("depotFile"), fileFstats); } return fstatsToIndex; } private Map<String, Map<String, String>> getIndexableFstats( final Map<String, Object> describe, final Server server) throws ServerConnectionException, ServerAccessException, ServerErrorException { Map<String, Map<String, String>> fstats = new HashMap<String, Map<String, String>>(); if (!indexFstats) { return fstats; } List<PerforcePath> paths = new ArrayList<PerforcePath>(); int index = 0; while (describe.containsKey(indexedTag("depotFile", index))) { int fileIndex = index++; String rev = (String) describe.get(indexedTag("rev", fileIndex)); paths.add(PerforcePathFactory.getPath( (String) describe.get(indexedTag("depotFile", fileIndex)), "#" + rev)); } return getIndexableFstats(paths, server); } public static boolean isActionDelete(String action) { return action != null && (action.equals("delete") || action.equals("move/delete") || action.equals("purge") || action.equals("archive")); } private void changelistToFileQueue(final Server server, final String changelist) throws InterruptedException { // 1. describe the changelist // 2. for each file we intend to process, see if it is in our queue // a. if it is not there, add it // b. if it is, make the rev# the latest // c. if it is in process, add it try { // describe the changelist Map<String, Object>[] describe = DescribeCommand.describeChange( changelist, server); CommandUtil.checkError(describe); // for each non-deleted file in the CL, submit to the search indexer Map<String, Object> map = describe[0]; // submitted only String status = (String) map.get(DescribeCommand.tagStatus); if (status == null || !status.equals(DescribeCommand.tagSubmitted)) { log.error("non-submitted status for " + changelist + ": " + status); return; } String user = (String) map.get(DescribeCommand.tagUser); Date date = FormatUtility.getDateFromLongInSeconds((String) map .get(DescribeCommand.tagTime)); Map<String, Map<String, String>> fstats = getIndexableFstats(map, server); // pull each depotN out of the result // also include rev/action int index = 0; while (map.containsKey(indexedTag(DescribeCommand.tagDepotFile, index))) { int fileIndex = index++; // never process? String type = (String) map.get(indexedTag( DescribeCommand.tagType, fileIndex)); String path = (String) map.get(indexedTag( DescribeCommand.tagDepotFile, fileIndex)); if (this.neverProcess.contains(path.substring(path .lastIndexOf("/") + 1))) { log.debug("skipping never process file " + path); continue; } if (type.contains(DescribeCommand.tagAppleType)) { log.debug("Skipping apple filetype on file " + path); continue; } String action = (String) map.get(indexedTag( DescribeCommand.tagAction, fileIndex)); if (isActionDelete(action)) { log.debug("Skipping " + action + " on file " + path); continue; // TODO: no point? delete the index? } String rev = (String) map.get(indexedTag( DescribeCommand.tagRevision, fileIndex)); String filesize = (String) map.get(indexedTag( DescribeCommand.tagFileSize, fileIndex)); if (filesize == null) { log.error("failed to get filesize for " + path); continue; } long fileSize = Long.valueOf(filesize); final PerforcePath depotPath = (indexAllRevisions) ? PerforcePathFactory .getPath(path, "#" + rev) : PerforcePathFactory .getPath(path); addToFilesQueue(depotPath, Integer.valueOf(rev), fileSize, true, false, user, date, fstats.get(path)); } } catch (ServerConnectionException e) { log.debug(e); } catch (ServerAccessException e) { log.debug(e); } catch (ServerErrorException e) { log.debug(e); } } // used by the trigger and the scanner private void addToFilesQueue(final PerforcePath depotPath, final int rev, final long fileSize, final boolean highPriority, final boolean deletedAtHead, final String userId, final Date date, final Map<String, String> fstats) throws InterruptedException { // files scan queue too big? then sleep while (!highPriority && filesScanQueue.size() > this.maxScanQueueSize) { log.debug("File scan queue at maximum, sleeping"); Thread.sleep(sleepNoWork); } // dp just used for the map, so no escape, but be consistent String dp = depotPath.asRevisionlessString(null); if (indexAllRevisions) { dp += "#" + String.valueOf(rev); } QueueFile qf = filesMap.get(dp); log.debug("Adding " + dp); // it's possible but rare that the file is in the files map but not the // queue (one of the threads dequeued it) // if that is the case we would still update the revision, so the // processor should always process the latest rev if (qf == null) { qf = new QueueFile(depotPath, rev, fileSize, (indexAllRevisions && !highPriority), deletedAtHead, userId, date, fstats); filesMap.put(dp, qf); // add to the queue last if (highPriority) { filesTriggerQueue.add(qf); } else { filesScanQueue.add(qf); } filesTotal.incrementAndGet(); return; } if (qf.getRev() >= rev) { return; } qf.setRev(rev); if (filesMap.get(dp) == null) { log.debug("Reading " + dp); // re-add filesMap.put(dp, qf); if (highPriority) { filesTriggerQueue.add(qf); } else { filesScanQueue.add(qf); } filesTotal.incrementAndGet(); } } // used by the worker threads @Override public QueueFile removeFileFromQueue(final boolean block) throws InterruptedException { QueueFile qf = null; // TODO: make this blocking a little more clever // we have 2 queues, the high-priority one and the lower priority one, // so we can't just "block" if (block) { // this will "block" if the queue is empty while (true) { // high priority first, but do not pause if the queues are not // empty int timeout = (filesTriggerQueue.size() == 0 && filesScanQueue .size() == 0) ? 1 : 0; qf = filesTriggerQueue.poll(timeout, TimeUnit.SECONDS); if (qf != null) { break; } // we already paused, so revert to scan mode qf = filesScanQueue.poll(); if (qf != null) { break; } if (this.quitWhenEmpty) { throw new InterruptedException(); } } } else { // this will return null if the queue remains empty qf = filesTriggerQueue.poll(); if (qf == null) { qf = filesScanQueue.poll(); } } if (qf != null) { // again, depotPath used just for the map, so no escape required, be // consistent String dp = qf.getDepotPath().asRevisionlessString(null); if (indexAllRevisions) { dp += "#" + String.valueOf(qf.getRev()); } filesMap.remove(dp); } if (qf == null && this.quitWhenEmpty) { throw new InterruptedException(); } return qf; } public static boolean badType(final Map<String, Object> map) { String type = (String) map.get("type"); return (type.contains("apple")); } public static boolean badAction(final Map<String, Object> map) { String action = (String) map.get("action"); return (isActionDelete(action)); } public static boolean badTypeOrAction(final Map<String, Object> map) { return badType(map) || badAction(map); } @SuppressWarnings("unchecked") @Override public Map<String, Object>[] getOnlyValidNames( final Map<String, Object>[] files) { List<Map<String, Object>> retMap = new ArrayList<Map<String, Object>>(); for (Map<String, Object> filesMap : files) { // never process? String path = (String) filesMap.get(FilesCommand.tagDepotFile); if (this.neverProcess .contains(path.substring(path.lastIndexOf("/") + 1))) { log.debug("skipping never process file " + path); continue; } if (badType(filesMap)) { log.info("skipping bad type " + path); continue; } if (!indexAllRevisions && badAction(filesMap)) { log.info("skipping bad type " + path); continue; } retMap.add(filesMap); } return retMap.toArray(new Map[0]); } @Override public int getFileScanCount() { return this.filesScanQueue.size(); } @Override public void setScanComplete() { // empty the change cache this.changeCache.clearCache(); } @Override public void recycleQueueFile(final QueueFile qf) throws InterruptedException { // at this point we don't know the priority, stick it in the low // priority queue filesMap.put(qf.getDepotPath().asRevisionlessString(null), qf); filesScanQueue.add(qf); } // little util functions for fstats public static boolean filterFstat(final String black, final String white, final String key) { // the only thing we consider are real attribute keys (attr- and // attrProp-) // lazy final String trimmedKey = trimFstatKey(key); if (trimmedKey.length() == key.length()) { return false; } // blacklist: empty string means nothing is black // if you want everything to be rejected, use ".*" as the pattern if (!black.isEmpty() && trimmedKey.matches(black)) { return false; } // white list: empty string means anything, otherwise match the pattern return (white.isEmpty() || trimmedKey.matches(white)); } // turn the key name into our schema key name public static String mangleFstatKey(final String key) { return SchemaKey.FSTATS_PREFIX + trimFstatKey(key); } // remove the perforce decorations (we will add our own later) public static String trimFstatKey(final String key) { final String[] trim = { "attr-" }; for (String s : trim) { if (key.startsWith(s)) { return key.substring(s.length()); } } return key; } @Override public int getFileScanActiveThreads() { int active = 0; for (Thread t : this.threads) { active += (t.isAlive() ? 1 : 0); } return active; } protected int getFilesTotal() { return filesTotal.get(); } protected void setQuitWhenEmpty(final boolean quit) { this.quitWhenEmpty = quit; } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#2 | 8501 | Doug Scheirer |
* updating to gradlew for build * make (more) Mac compatible build process * default port for default install is now 8088 * updated API doc to include the queryRaw param on POSTs * minor bug fixes for when the p4d connection drops while indexing |
||
#1 | 8476 | Doug Scheirer | p4-search copy from //depot/main/p4-search |