diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index dc0102ce..709b6579 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -9,9 +9,12 @@ import us.codecraft.webmagic.pipeline.Pipeline; import us.codecraft.webmagic.processor.PageProcessor; import us.codecraft.webmagic.schedular.QueueScheduler; import us.codecraft.webmagic.schedular.Scheduler; +import us.codecraft.webmagic.utils.ThreadUtils; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; /** *
@@ -51,6 +54,16 @@ public class Spider implements Runnable, Task { private Logger logger = Logger.getLogger(getClass()); + private ExecutorService executorService; + + private AtomicInteger stat = new AtomicInteger(STAT_INIT); + + private final static int STAT_INIT = 0; + + private final static int STAT_RUNNING = 1; + + private final static int STAT_STOPPED = 2; + /** * 使用已定义的抽取规则新建一个Spider。 * @param pageProcessor 已定义的抽取规则 @@ -76,6 +89,7 @@ public class Spider implements Runnable, Task { * @return this */ public Spider startUrls(ListstartUrls) { + checkIfNotRunning(); this.startUrls = startUrls; return this; } @@ -96,6 +110,7 @@ public class Spider implements Runnable, Task { * @return this */ public Spider scheduler(Scheduler scheduler) { + checkIfNotRunning(); this.scheduler = scheduler; return this; } @@ -106,6 +121,7 @@ public class Spider implements Runnable, Task { * @return this */ public Spider pipeline(Pipeline pipeline) { + checkIfNotRunning(); this.pipelines.add(pipeline); return this; } @@ -113,6 +129,9 @@ public class Spider implements Runnable, Task { @Override public void run() { + if (!stat.compareAndSet(STAT_INIT, STAT_RUNNING)) { + throw new IllegalStateException("Spider is already running!"); + } if (startUrls != null) { for (String startUrl : startUrls) { scheduler.push(new Request(startUrl), this); @@ -122,20 +141,56 @@ public class Spider implements Runnable, Task { if (pipelines.isEmpty()) { pipelines.add(new ConsolePipeline()); } - while (request != null) { - Page page = downloader.download(request, site); - if (page == null) { - sleep(site.getSleepTime()); - continue; + //singel thread + if (executorService==null){ + while (request != null) { + processRequest(request); + request = scheduler.poll(this); } - pageProcessor.process(page); - addRequest(page); - for (Pipeline pipeline : pipelines) { - pipeline.process(page, this); + } else { + final AtomicInteger threadAlive = new AtomicInteger(0); + while (true) { + if (request == null) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } else { + final Request requestFinal = request; + threadAlive.incrementAndGet(); + executorService.execute(new Runnable() { + @Override + public void run() { + processRequest(requestFinal); + threadAlive.decrementAndGet(); + } + }); + } + request = scheduler.poll(this); + if (threadAlive.get() == 0) { + request = scheduler.poll(this); + if (request == null) { + break; + } + } } + executorService.shutdown(); + } + stat.compareAndSet(STAT_RUNNING, STAT_STOPPED); + } + + private void processRequest(Request request) { + Page page = downloader.download(request, site); + if (page == null) { sleep(site.getSleepTime()); - request = scheduler.poll(this); + return; + } + pageProcessor.process(page); + addRequest(page); + for (Pipeline pipeline : pipelines) { + pipeline.process(page, this); } + sleep(site.getSleepTime()); } private void sleep(int time) { @@ -154,6 +209,28 @@ public class Spider implements Runnable, Task { } } + private void checkIfNotRunning(){ + if (!stat.compareAndSet(STAT_INIT,STAT_INIT)){ + throw new IllegalStateException("Spider is already running!"); + } + } + + /** + * 建立多个线程下载 + * @param threadNum 线程数 + * @return + */ + public Spider thread(int threadNum) { + checkIfNotRunning(); + if (threadNum <= 1) { + throw new IllegalArgumentException("threadNum should be more than one!"); + } + synchronized (this){ + this.executorService = ThreadUtils.newFixedThreadPool(threadNum); + } + return this; + } + @Override public String getUUID() { if (uuid != null) { diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java new file mode 100644 index 00000000..ebe61198 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java @@ -0,0 +1,33 @@ +package us.codecraft.webmagic.utils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author code4crafer@gmail.com + * Date: 13-6-23 + * Time: 下午7:11 + */ +public class ThreadUtils { + + public static ExecutorService newFixedThreadPool(int threadSize) { + return new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue (1) { + + private static final long serialVersionUID = -9028058603126367678L; + + @Override + public boolean offer(Runnable e) { + try { + put(e); + return true; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + return false; + } + }); + } +}