Merge branch 'develop' of github.com:code4craft/webmagic into develop

pull/1006/merge
Sutra Zhou
commit 692605bd75

@ -106,7 +106,7 @@ public class Spider implements Runnable, Task {
private Date startTime; private Date startTime;
private int emptySleepTime = 30000; private long emptySleepTime = 30000;
/** /**
* create a spider with pageProcessor. * create a spider with pageProcessor.
@ -305,32 +305,52 @@ public class Spider implements Runnable, Task {
public void run() { public void run() {
checkRunningStat(); checkRunningStat();
initComponent(); initComponent();
logger.info("Spider {} started!",getUUID()); logger.info("Spider {} started!", getUUID());
// interrupt won't be necessarily detected
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
final Request request = scheduler.poll(this); Request poll = scheduler.poll(this);
if (request == null) { if (poll == null) {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { if (threadPool.getThreadAlive() == 0) {
break; //no alive thread anymore , try again
} poll = scheduler.poll(this);
// wait until new url added if (poll == null) {
waitNewUrl(); if (exitWhenComplete) {
} else { break;
threadPool.execute(new Runnable() { } else {
@Override // wait
public void run() { try {
try { Thread.sleep(emptySleepTime);
processRequest(request); continue;
onSuccess(request); } catch (InterruptedException e) {
} catch (Exception e) { break;
onError(request, e); }
logger.error("process request " + request + " error", e);
} finally {
pageCount.incrementAndGet();
signalNewUrl();
} }
} }
}); } else {
// wait until new url added
if (waitNewUrl())
//if interrupted
break;
continue;
}
} }
final Request request = poll;
//this may swallow the interruption
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
processRequest(request);
onSuccess(request);
} catch (Exception e) {
onError(request, e);
logger.error("process request " + request + " error", e);
} finally {
pageCount.incrementAndGet();
signalNewUrl();
}
}
});
} }
stat.set(STAT_STOPPED); stat.set(STAT_STOPPED);
// release some resources // release some resources
@ -565,16 +585,24 @@ public class Spider implements Runnable, Task {
return this; return this;
} }
private void waitNewUrl() { /**
*
* @return isInterrupted
*/
private boolean waitNewUrl() {
// now there may not be any thread live
newUrlLock.lock(); newUrlLock.lock();
try { try {
//double check //double checkunnecessary, unless very fast concurrent
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { if (threadPool.getThreadAlive() == 0) {
return; return false;
} }
//wait for amount of time
newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS); newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
return false;
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.warn("waitNewUrl - interrupted, error {}", e); // logger.warn("waitNewUrl - interrupted, error {}", e);
return true;
} finally { } finally {
newUrlLock.unlock(); newUrlLock.unlock();
} }
@ -772,7 +800,10 @@ public class Spider implements Runnable, Task {
* *
* @param emptySleepTime In MILLISECONDS. * @param emptySleepTime In MILLISECONDS.
*/ */
public void setEmptySleepTime(int emptySleepTime) { public void setEmptySleepTime(long emptySleepTime) {
if(emptySleepTime<=0){
throw new IllegalArgumentException("emptySleepTime should be more than zero!");
}
this.emptySleepTime = emptySleepTime; this.emptySleepTime = emptySleepTime;
} }
} }

Loading…
Cancel
Save