diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index 5940e738..bc8bb94c 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -106,7 +106,7 @@ public class Spider implements Runnable, Task { private Date startTime; - private int emptySleepTime = 30000; + private long emptySleepTime = 30000; /** * create a spider with pageProcessor. @@ -305,32 +305,52 @@ public class Spider implements Runnable, Task { public void run() { checkRunningStat(); initComponent(); - logger.info("Spider {} started!",getUUID()); + logger.info("Spider {} started!", getUUID()); + // interrupt won't be necessarily detected while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { - final Request request = scheduler.poll(this); - if (request == null) { - if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { - break; - } - // wait until new url added - waitNewUrl(); - } else { - threadPool.execute(new Runnable() { - @Override - public void run() { - try { - processRequest(request); - onSuccess(request); - } catch (Exception e) { - onError(request, e); - logger.error("process request " + request + " error", e); - } finally { - pageCount.incrementAndGet(); - signalNewUrl(); + Request poll = scheduler.poll(this); + if (poll == null) { + if (threadPool.getThreadAlive() == 0) { + //no alive thread anymore , try again + poll = scheduler.poll(this); + if (poll == null) { + if (exitWhenComplete) { + break; + } else { + // wait + try { + Thread.sleep(emptySleepTime); + continue; + } catch (InterruptedException e) { + break; + } } } - }); + } else { + // wait until new url added, + if (waitNewUrl()) + //if interrupted + break; + continue; + } } + final Request request = poll; + //this may swallow the interruption + threadPool.execute(new Runnable() { + @Override + public void run() { + try { + processRequest(request); + onSuccess(request); + } catch (Exception e) { + onError(request, e); + logger.error("process request " + request + " error", e); + } finally { + pageCount.incrementAndGet(); + signalNewUrl(); + } + } + }); } stat.set(STAT_STOPPED); // release some resources @@ -565,16 +585,24 @@ public class Spider implements Runnable, Task { return this; } - private void waitNewUrl() { + /** + * + * @return isInterrupted + */ + private boolean waitNewUrl() { + // now there may not be any thread live newUrlLock.lock(); try { - //double check - if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { - return; + //double check,unnecessary, unless very fast concurrent + if (threadPool.getThreadAlive() == 0) { + return false; } + //wait for amount of time newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS); + return false; } catch (InterruptedException e) { - logger.warn("waitNewUrl - interrupted, error {}", e); + // logger.warn("waitNewUrl - interrupted, error {}", e); + return true; } finally { newUrlLock.unlock(); } @@ -772,7 +800,10 @@ public class Spider implements Runnable, Task { * * @param emptySleepTime In MILLISECONDS. */ - public void setEmptySleepTime(int emptySleepTime) { + public void setEmptySleepTime(long emptySleepTime) { + if(emptySleepTime<=0){ + throw new IllegalArgumentException("emptySleepTime should be more than zero!"); + } this.emptySleepTime = emptySleepTime; } }