/******************************************************************************* * Copyright (c) 2013, Perforce Software * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ******************************************************************************/ package com.perforce.search.manager; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import com.perforce.p4java.extension.exception.ServerAccessException; import com.perforce.p4java.extension.exception.ServerConnectionException; import com.perforce.p4java.extension.exception.ServerErrorException; import com.perforce.p4java.extension.operation.CounterCommand; import com.perforce.p4java.extension.operation.DepotsCommand; import com.perforce.p4java.extension.operation.DirsCommand; import com.perforce.p4java.extension.operation.FilesCommand; import com.perforce.p4java.extension.operation.SizesCommand; import com.perforce.p4java.extension.path.PathStringFormat; import com.perforce.p4java.extension.path.PerforcePath; import com.perforce.p4java.extension.path.PerforcePathFactory; import com.perforce.p4java.extension.server.Server; import com.perforce.p4java.extension.utility.CommandUtil; import com.perforce.p4java.extension.utility.ErrorIds; import com.perforce.search.configuration.Configuration; import com.perforce.search.manager.impl.QueueManagerImpl; import com.perforce.search.model.impl.SearchModelImpl; import com.perforce.search.server.ServerConnectionPoolFacade; public class DirectoryScannerThread extends Thread { private static final Logger log = Logger .getLogger(DirectoryScannerThread.class); private Configuration config; private ServerConnectionPoolFacade pool; private QueueManager queueManager; private SearchIndexManager searchIndexer; private boolean indexAllRevisions; private int sleepBetweenCalls; private BlockingQueue<PerforcePath> directoryScanQueue; private List<WorkerThread> threads; private AtomicInteger directoryCount; private ConcurrentHashMap<Thread, Thread> waiting = new ConcurrentHashMap<Thread, Thread>(); private final int failedSleepTime = 10000; public DirectoryScannerThread(final Configuration configuration, final ServerConnectionPoolFacade pool, final QueueManager queueManager, final SearchIndexManager searchIndexer) { this.config = configuration; this.pool = pool; this.queueManager = queueManager; this.searchIndexer = searchIndexer; this.sleepBetweenCalls = config.getFileScanSleep(); this.indexAllRevisions = config.isIndexAllRevisions(); } /** * set the base depot directories up for worker threads, launch the workers, * then launch ourself */ public void setAndStart() { // compare the scan token, if they are the same, do not scan Server server = null; try { String tokenKey = config.getFileScannerTokenKey(); if (tokenKey == null || tokenKey.isEmpty()) { log.info("Configured to not scan, skipping"); return; } directoryScanQueue = new LinkedBlockingQueue<PerforcePath>(); directoryCount = new AtomicInteger(0); threads = new ArrayList<WorkerThread>(); server = pool.acquireIndexer(); if (server == null) { log.error("Aborting, server = null"); return; } String token = CounterCommand.getCounterValueSideTable(tokenKey, server); if (token.equals(config.getFileScannerTokenValue())) { log.info("Scanner token " + tokenKey + " == " + token + ", matches configuration, skipping"); return; } // is there a list of paths to scan? if not, it's any depot Set<String> scanPaths = config.getScanPaths(); if (scanPaths.isEmpty()) { // get a depot list, then start walking directories and files, // adding them to the queueManager Map<String, Object>[] depots = DepotsCommand.getDepots(server); for (Map<String, Object> d : depots) { String type = (String) d.get(DepotsCommand.tagType); if (type == null) { continue; } if (type.equals(DepotsCommand.tagTypeLocal) || type.equals(DepotsCommand.tagTypeStream)) { scanPaths.add(String.format("//%s", d.get(DepotsCommand.tagName))); } } } // queue them up for (String s : scanPaths) { directoryScanQueue.add(PerforcePathFactory.getFolderPath(s)); directoryCount.incrementAndGet(); } if (this.directoryScanQueue.isEmpty()) { log.error("Could not find any depots to scan, aborting"); return; } for (int i = 0; i < config.getMaxScannerThreads(); ++i) { WorkerThread wt = new WorkerThread(this); threads.add(wt); log.debug("Starting thread " + wt.getName()); wt.start(); } // now run "me" to watch for completion this.start(); } catch (ServerConnectionException e) { log.error(e); } catch (ServerAccessException e) { log.error(e); } catch (ServerErrorException e) { log.error(e); } finally { pool.release(server); } } private class WorkerThread extends Thread { private DirectoryScannerThread thread; WorkerThread(final DirectoryScannerThread fileScannerThread) { thread = fileScannerThread; } private void checkErrors(final Map<String, Object>[] maps) throws ServerConnectionException, ServerAccessException, ServerErrorException { try { CommandUtil.checkError(maps); } catch (ServerErrorException e) { if (!ErrorIds.MsgDm_ExFILE.match(e.getCode())) { throw e; } } } @SuppressWarnings("unchecked") private Map<String, Object>[] filterNewRevisions( final Map<String, Object>[] files) { // query solr: id:(base path) AND filename:... OR filename ... // then get the depotrev for each doc and return the ones that need // to be indexed if (files.length == 0 || files[0].get(FilesCommand.tagDepotFile) == null) { return files; } String dp0 = (String) files[0].get(FilesCommand.tagDepotFile); PerforcePath dp0Path = PerforcePathFactory.getPath(dp0); // TODO: do not create a new one StringBuilder sb = new StringBuilder(); Map<String, Map<String, Object>> pathToRev = new HashMap<String, Map<String, Object>>(); // search for "id:(depotpath)/* AND (filename:... OR filename...) sb.append(SchemaKey.ID) .append(":") .append(SearchModelImpl.escapeSolrString(dp0Path .getBasePath())).append("* AND ("); // chunk out requests, solr can overflow if we ask too much of it // we will go just over the "max" so don't run up to the edge // solr (jetty) can be configured with a bigger buffer, the default // is 4096 final int maxSearchSize = 2048; // TODO: configurable List<String> requests = new ArrayList<String>(); List<Integer> requestSizes = new ArrayList<Integer>(); StringBuilder current = null; int curCount = 0; for (int i = 0; i < files.length; ++i, ++curCount) { if (current == null) { current = new StringBuilder(sb); } String dp = (String) files[i].get(FilesCommand.tagDepotFile); pathToRev.put(dp, files[i]); current.append(SchemaKey.FILENAME) .append(":") .append(SearchModelImpl .escapeSolrString(PerforcePathFactory.getPath( dp).getFileName())); if (current.length() > maxSearchSize && i < files.length - 1) { current.append(")"); requests.add(current.toString()); requestSizes.add(curCount); curCount = 0; current = new StringBuilder(sb); } else if (i < files.length - 1) { current.append(" OR "); } } current.append(")"); requests.add(current.toString()); requestSizes.add(curCount); // it's possible this would turn up identical files, so we ask once, // get the "real" count of matches, then ask again in pages // Yes, this is a degenerative case, but it's only for "scan" which // should only happen once. Map<String, Integer> docs = new HashMap<String, Integer>(); for (int i = 0; i < requests.size(); ++i) { // iterate SolrDocumentList curList = null; // page results on an upper bound final int maxDocsPerRequest = 1000; int curPage = 0; QueryResponse resp = searchIndexer.searchForFiles( requests.get(i), maxDocsPerRequest, curPage); // this is bad, the server failed if (resp == null) { return null; } curList = resp.getResults(); long numFound = (curList == null) ? 0 : curList.size(); while (numFound > 0) { // process this amount for (SolrDocument doc : curList) { String id = (String) doc.get(SchemaKey.ID); int indexedRev = Integer.valueOf((String) doc .get(SchemaKey.REVISION)); docs.put(id, indexedRev); } curPage += numFound; // search again? if (numFound < maxDocsPerRequest) { break; } resp = searchIndexer.searchForFiles(requests.get(i), maxDocsPerRequest, curPage); // this is bad, the server failed if (resp == null) { return null; } curList = resp.getResults(); numFound = (curList == null) ? 0 : curList.size(); } } // we will still process after hitting the limit, but it means we // probably missed some (duplicate processing) List<Map<String, Object>> retMapList = new ArrayList<Map<String, Object>>(); for (Map<String, Object> file : files) { String rev = (String) file.get(FilesCommand.tagRevision); int toIndexRev = Integer.valueOf(rev); String lookupId = PathStringFormat.UNESCAPE .modify((String) file.get(FilesCommand.tagDepotFile)); String action = (String) file.get(FilesCommand.tagAction); if (QueueManagerImpl.isActionDelete(action)) { // back the revision off by one toIndexRev--; if (toIndexRev == 0) { // yup, it can happen continue; } rev = String.valueOf(toIndexRev); } if (indexAllRevisions) { lookupId += "#" + rev; } Integer indexRev = docs.get(lookupId); if (indexRev == null || indexRev < toIndexRev) { retMapList.add(file); } } return retMapList.toArray(new Map[0]); } @Override public void run() { log.debug("Started thread " + this.getName()); Server server = null; try { while (true) { Thread.sleep(sleepBetweenCalls); if (server == null) server = pool.acquireIndexer(); if (server == null) { log.error("Waiting, server = null"); // double sleep Thread.sleep(sleepBetweenCalls); continue; } // while there are things in the queue // pull directory out of queue PerforcePath dp = thread.directoryScanQueue.poll(); if (dp == null) { log.debug("directory queue is empty, waiting..."); // if Q is empty, report idle thread.waiting.put(this, this); dp = thread.directoryScanQueue.take(); thread.waiting.remove(this); } addFilesFor(dp, server); addDirsFor(dp, server); // recycle the connection pool.release(server); server = null; } } catch (InterruptedException e) { log.info("Interrupted"); } catch (Exception e) { log.error("Exception", e); } finally { pool.release(server); log.info("Directory scan completed: " + directoryCount.get()); } } private void addDirsFor(final PerforcePath dp, final Server server) throws InterruptedException { try { // inner try, don't abort the whole thread for one bad // server response List<PerforcePath> paths = null; // run p4 dirs, put back into queue (escape first) paths = DirsCommand.getDirs( dp.asRevisionlessString(EnumSet .of(PathStringFormat.ESCAPE)) + "*", server); if (paths != null && paths.size() > 0) { thread.directoryScanQueue.addAll(paths); directoryCount.addAndGet(paths.size()); } return; } catch (ServerErrorException e) { // be robust: if the server goes down, just try again later log.error("Server error exception, recycling " + dp.asRevisionlessString(null)); } catch (ServerAccessException e) { // be robust: if the server goes down, just try again later log.error("Server access exception, recycling " + dp.asRevisionlessString(null)); } catch (ServerConnectionException e) { // be robust: if the server goes down, just try again later log.error("Server connection exception, recycling " + dp.asRevisionlessString(null)); } // failed, try again later thread.directoryScanQueue.add(dp); Thread.sleep(failedSleepTime); } private void addFilesFor(final PerforcePath dp, final Server server) throws InterruptedException { try { // run p4 files, send to the main file queue manager Map<String, Object>[] files = FilesCommand.getFileByPath(dp, server); try { CommandUtil.checkError(files); } catch (ServerErrorException e) { if (!ErrorIds.MsgDm_ExFILE.match(e.getCode())) { throw e; } return; } // remove "never index" files, apple // types, etc. Map<String, Object>[] onlyValidNames = queueManager .getOnlyValidNames(files); // // last step: check if the latest is already indexed Map<String, Object>[] newFiles = filterNewRevisions(onlyValidNames); if (newFiles == null) { // something went wrong, try again later log.error("Search server is down, try again later"); throw new ServerConnectionException("Search server down"); } // nothing left? skip if (newFiles.length == 0) { return; } // also run sizes, turn that into a depot path to long // map Map<String, Object>[] sizes = SizesCommand.getSizesAtPath(dp, server); checkErrors(sizes); Map<String, Long> sizesMap = new HashMap<String, Long>(); for (Map<String, Object> size : sizes) { String depotFile = (String) size .get(SizesCommand.tagDepotFile); if (depotFile == null) { continue; // empty directory } String fileSize = (String) size .get(SizesCommand.tagFileSize); if (fileSize == null) { log.error("Failed to get size for " + depotFile + ", skipping"); continue; } sizesMap.put(depotFile, Long.valueOf(fileSize)); } for (Map<String, Object> map : newFiles) { queueManager.addScanFileToQueue(map, sizesMap); } return; } catch (ServerErrorException e) { // be robust: if the server goes down, just try again later log.error("Server error exception, recycling " + dp.asRevisionlessString(null)); } catch (ServerAccessException e) { // be robust: if the server goes down, just try again later log.error("Server access exception, recycling " + dp.asRevisionlessString(null)); } catch (ServerConnectionException e) { // be robust: if the server goes down, just try again later log.error("Server connection exception, recycling " + dp.asRevisionlessString(null)); } // failed, try again later thread.directoryScanQueue.add(dp); Thread.sleep(failedSleepTime); } } @Override public void run() { // TODO: configurable or accurate comment final int pollIdleQueue = 5000; // 5s, TODO: make configurable? final int oneMeg = 1024 * 1024; try { while (true) { Thread.sleep(pollIdleQueue); Runtime rt = Runtime.getRuntime(); log.debug(String.format("%d / %d / %d / %d", (rt.totalMemory() - rt.freeMemory()) / oneMeg, rt.freeMemory() / oneMeg, rt.totalMemory() / oneMeg, rt.maxMemory() / oneMeg)); int checkSize = 0; for (Thread t : threads) { if (t.isAlive()) { checkSize++; } else { log.debug("Dead thread detected [" + t + "]"); } } int waitCount = waiting.size(); log.debug("waiting threads: " + waitCount + ", filescancount: " + queueManager.getFileScanCount()); // double check that at least one of the FileQueueIndexThreads // are still running if (queueManager.getFileScanActiveThreads() == 0) { log.error("All file scanners are dead, aborting"); return; } // also wait until the queuemanager completes fileScans if (waitCount == checkSize && queueManager.getFileScanCount() == 0) { log.info("All scanner threads idle, ending scan"); for (Map.Entry<Thread, Thread> e : waiting.entrySet()) { e.getValue().interrupt(); } break; } } writeScanCompleteToken(); queueManager.setScanComplete(); log.info("File scanner complete: " + this.directoryCount.get()); } catch (InterruptedException e) { log.error(e); } } public synchronized void writeScanCompleteToken() { Server server = null; try { server = pool.acquireIndexer(); if (server == null) { log.error("Aborting, server = null"); return; } // if this command fails, we might try to write again CounterCommand.setCounterValueSideTable( config.getFileScannerTokenKey(), config.getFileScannerTokenValue(), server); } catch (ServerConnectionException e) { log.error(e); } catch (ServerAccessException e) { log.error(e); } catch (ServerErrorException e) { if (ErrorIds.MsgDm_NoPerms.match(e.getCode())) { log.error( "Permission problem writing completion token, will not be written", e); } } finally { pool.release(server); } } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#2 | 8501 | Doug Scheirer |
* updating to gradlew for build * make (more) Mac compatible build process * default port for default install is now 8088 * updated API doc to include the queryRaw param on POSTs * minor bug fixes for when the p4d connection drops while indexing |
||
#1 | 8476 | Doug Scheirer | p4-search copy from //depot/main/p4-search |