add multithread support

pull/17/head
yihua.huang 12 years ago
parent 5a6a68a318
commit cad2594a08

@ -9,9 +9,12 @@ import us.codecraft.webmagic.pipeline.Pipeline;
import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.schedular.QueueScheduler;
import us.codecraft.webmagic.schedular.Scheduler;
import us.codecraft.webmagic.utils.ThreadUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <pre>
@ -51,6 +54,16 @@ public class Spider implements Runnable, Task {
private Logger logger = Logger.getLogger(getClass());
private ExecutorService executorService;
private AtomicInteger stat = new AtomicInteger(STAT_INIT);
private final static int STAT_INIT = 0;
private final static int STAT_RUNNING = 1;
private final static int STAT_STOPPED = 2;
/**
* 使Spider
* @param pageProcessor
@ -76,6 +89,7 @@ public class Spider implements Runnable, Task {
* @return this
*/
public Spider startUrls(List<String> startUrls) {
checkIfNotRunning();
this.startUrls = startUrls;
return this;
}
@ -96,6 +110,7 @@ public class Spider implements Runnable, Task {
* @return this
*/
public Spider scheduler(Scheduler scheduler) {
checkIfNotRunning();
this.scheduler = scheduler;
return this;
}
@ -106,6 +121,7 @@ public class Spider implements Runnable, Task {
* @return this
*/
public Spider pipeline(Pipeline pipeline) {
checkIfNotRunning();
this.pipelines.add(pipeline);
return this;
}
@ -113,6 +129,9 @@ public class Spider implements Runnable, Task {
@Override
public void run() {
if (!stat.compareAndSet(STAT_INIT, STAT_RUNNING)) {
throw new IllegalStateException("Spider is already running!");
}
if (startUrls != null) {
for (String startUrl : startUrls) {
scheduler.push(new Request(startUrl), this);
@ -122,20 +141,56 @@ public class Spider implements Runnable, Task {
if (pipelines.isEmpty()) {
pipelines.add(new ConsolePipeline());
}
while (request != null) {
Page page = downloader.download(request, site);
if (page == null) {
sleep(site.getSleepTime());
continue;
//singel thread
if (executorService==null){
while (request != null) {
processRequest(request);
request = scheduler.poll(this);
}
pageProcessor.process(page);
addRequest(page);
for (Pipeline pipeline : pipelines) {
pipeline.process(page, this);
} else {
final AtomicInteger threadAlive = new AtomicInteger(0);
while (true) {
if (request == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
} else {
final Request requestFinal = request;
threadAlive.incrementAndGet();
executorService.execute(new Runnable() {
@Override
public void run() {
processRequest(requestFinal);
threadAlive.decrementAndGet();
}
});
}
request = scheduler.poll(this);
if (threadAlive.get() == 0) {
request = scheduler.poll(this);
if (request == null) {
break;
}
}
}
executorService.shutdown();
}
stat.compareAndSet(STAT_RUNNING, STAT_STOPPED);
}
private void processRequest(Request request) {
Page page = downloader.download(request, site);
if (page == null) {
sleep(site.getSleepTime());
request = scheduler.poll(this);
return;
}
pageProcessor.process(page);
addRequest(page);
for (Pipeline pipeline : pipelines) {
pipeline.process(page, this);
}
sleep(site.getSleepTime());
}
private void sleep(int time) {
@ -154,6 +209,28 @@ public class Spider implements Runnable, Task {
}
}
private void checkIfNotRunning(){
if (!stat.compareAndSet(STAT_INIT,STAT_INIT)){
throw new IllegalStateException("Spider is already running!");
}
}
/**
* 线
* @param threadNum 线
* @return
*/
public Spider thread(int threadNum) {
checkIfNotRunning();
if (threadNum <= 1) {
throw new IllegalArgumentException("threadNum should be more than one!");
}
synchronized (this){
this.executorService = ThreadUtils.newFixedThreadPool(threadNum);
}
return this;
}
@Override
public String getUUID() {
if (uuid != null) {

@ -0,0 +1,33 @@
package us.codecraft.webmagic.utils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author code4crafer@gmail.com
* Date: 13-6-23
* Time: 7:11
*/
public class ThreadUtils {
public static ExecutorService newFixedThreadPool(int threadSize) {
return new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1) {
private static final long serialVersionUID = -9028058603126367678L;
@Override
public boolean offer(Runnable e) {
try {
put(e);
return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
});
}
}
Loading…
Cancel
Save