diff --git a/pom.xml b/pom.xml index 8d25a666..8f9837f1 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,11 @@ httpclient 4.2.4 + + com.google.guava + guava + 15.0 + us.codecraft xsoup diff --git a/webmagic-core/pom.xml b/webmagic-core/pom.xml index f68114a3..3d89e5c7 100644 --- a/webmagic-core/pom.xml +++ b/webmagic-core/pom.xml @@ -20,6 +20,12 @@ junit + + com.google.guava + guava + 15.0 + + org.apache.commons commons-lang3 diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index 829546bb..149f0a8d 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; * Spider.create(new SimplePageProcessor("http://my.oschina.net/", * "http://my.oschina.net/*blog/*"))
* .scheduler(new FileCacheQueueScheduler("/data/temp/webmagic/cache/")).run();
- * + * * @author code4crafter@gmail.com
* @see Downloader * @see Scheduler @@ -52,381 +52,380 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class Spider implements Runnable, Task { - protected Downloader downloader; - - protected List pipelines = new ArrayList(); - - protected PageProcessor pageProcessor; - - protected List startUrls; - - protected Site site; - - protected String uuid; - - protected Scheduler scheduler = new QueueScheduler(); - - protected Logger logger = Logger.getLogger(getClass()); - - protected ExecutorService executorService; - - protected int threadNum = 1; - - protected AtomicInteger stat = new AtomicInteger(STAT_INIT); - - protected final static int STAT_INIT = 0; - - protected final static int STAT_RUNNING = 1; - - protected final static int STAT_STOPPED = 2; - - /** - * create a spider with pageProcessor. - * - * @param pageProcessor - * @return new spider - * @see PageProcessor - */ - public static Spider create(PageProcessor pageProcessor) { - return new Spider(pageProcessor); - } - - /** - * create a spider with pageProcessor. - * - * @param pageProcessor - */ - public Spider(PageProcessor pageProcessor) { - this.pageProcessor = pageProcessor; - this.site = pageProcessor.getSite(); - this.startUrls = pageProcessor.getSite().getStartUrls(); - } - - /** - * Set startUrls of Spider.
- * Prior to startUrls of Site. - * - * @param startUrls - * @return this - */ - public Spider startUrls(List startUrls) { - checkIfRunning(); - this.startUrls = startUrls; - return this; - } - - /** - * Set an uuid for spider.
- * Default uuid is domain of site.
- * - * @param uuid - * @return this - */ - public Spider setUUID(String uuid) { - this.uuid = uuid; - return this; - } - - /** - * set scheduler for Spider - * - * @param scheduler - * @return this - * @Deprecated - * @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler) - */ - public Spider scheduler(Scheduler scheduler) { - return setScheduler(scheduler); - } - - /** - * set scheduler for Spider - * - * @param scheduler - * @return this - * @see Scheduler - * @since 0.2.1 - */ - public Spider setScheduler(Scheduler scheduler) { - checkIfRunning(); - this.scheduler = scheduler; - return this; - } - - /** - * add a pipeline for Spider - * - * @param pipeline - * @return this - * @see #setPipeline(us.codecraft.webmagic.pipeline.Pipeline) - * @deprecated - */ - public Spider pipeline(Pipeline pipeline) { - return addPipeline(pipeline); - } - - /** - * add a pipeline for Spider - * - * @param pipeline - * @return this - * @see Pipeline - * @since 0.2.1 - */ - public Spider addPipeline(Pipeline pipeline) { - checkIfRunning(); - this.pipelines.add(pipeline); - return this; - } - - /** - * clear the pipelines set - * - * @return this - */ - public Spider clearPipeline() { - pipelines = new ArrayList(); - return this; - } - - /** - * set the downloader of spider - * - * @param downloader - * @return this - * @see #setDownloader(us.codecraft.webmagic.downloader.Downloader) - * @deprecated - */ - public Spider downloader(Downloader downloader) { - return setDownloader(downloader); - } - - /** - * set the downloader of spider - * - * @param downloader - * @return this - * @see Downloader - */ - public Spider setDownloader(Downloader downloader) { - checkIfRunning(); - this.downloader = downloader; - return this; - } - - protected void checkComponent() { - if (downloader == null) { - this.downloader = new HttpClientDownloader(); - } - if (pipelines.isEmpty()) { - pipelines.add(new ConsolePipeline()); - } - downloader.setThread(threadNum); - } - - @Override - public void run() { - if (!stat.compareAndSet(STAT_INIT, STAT_RUNNING) && !stat.compareAndSet(STAT_STOPPED, STAT_RUNNING)) { - throw new IllegalStateException("Spider is already running!"); - } - checkComponent(); - if (startUrls != null) { - for (String startUrl : startUrls) { - scheduler.push(new Request(startUrl), this); - } - startUrls.clear(); - } - Request request = scheduler.poll(this); + protected Downloader downloader; + + protected List pipelines = new ArrayList(); + + protected PageProcessor pageProcessor; + + protected List startUrls; + + protected Site site; + + protected String uuid; + + protected Scheduler scheduler = new QueueScheduler(); + + protected Logger logger = Logger.getLogger(getClass()); + + protected ExecutorService executorService; + + protected int threadNum = 1; + + protected AtomicInteger stat = new AtomicInteger(STAT_INIT); + + protected final static int STAT_INIT = 0; + + protected final static int STAT_RUNNING = 1; + + protected final static int STAT_STOPPED = 2; + + /** + * create a spider with pageProcessor. + * + * @param pageProcessor + * @return new spider + * @see PageProcessor + */ + public static Spider create(PageProcessor pageProcessor) { + return new Spider(pageProcessor); + } + + /** + * create a spider with pageProcessor. + * + * @param pageProcessor + */ + public Spider(PageProcessor pageProcessor) { + this.pageProcessor = pageProcessor; + this.site = pageProcessor.getSite(); + this.startUrls = pageProcessor.getSite().getStartUrls(); + } + + /** + * Set startUrls of Spider.
+ * Prior to startUrls of Site. + * + * @param startUrls + * @return this + */ + public Spider startUrls(List startUrls) { + checkIfRunning(); + this.startUrls = startUrls; + return this; + } + + /** + * Set an uuid for spider.
+ * Default uuid is domain of site.
+ * + * @param uuid + * @return this + */ + public Spider setUUID(String uuid) { + this.uuid = uuid; + return this; + } + + /** + * set scheduler for Spider + * + * @param scheduler + * @return this + * @Deprecated + * @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler) + */ + public Spider scheduler(Scheduler scheduler) { + return setScheduler(scheduler); + } + + /** + * set scheduler for Spider + * + * @param scheduler + * @return this + * @see Scheduler + * @since 0.2.1 + */ + public Spider setScheduler(Scheduler scheduler) { + checkIfRunning(); + this.scheduler = scheduler; + return this; + } + + /** + * add a pipeline for Spider + * + * @param pipeline + * @return this + * @see #setPipeline(us.codecraft.webmagic.pipeline.Pipeline) + * @deprecated + */ + public Spider pipeline(Pipeline pipeline) { + return addPipeline(pipeline); + } + + /** + * add a pipeline for Spider + * + * @param pipeline + * @return this + * @see Pipeline + * @since 0.2.1 + */ + public Spider addPipeline(Pipeline pipeline) { + checkIfRunning(); + this.pipelines.add(pipeline); + return this; + } + + /** + * clear the pipelines set + * + * @return this + */ + public Spider clearPipeline() { + pipelines = new ArrayList(); + return this; + } + + /** + * set the downloader of spider + * + * @param downloader + * @return this + * @see #setDownloader(us.codecraft.webmagic.downloader.Downloader) + * @deprecated + */ + public Spider downloader(Downloader downloader) { + return setDownloader(downloader); + } + + /** + * set the downloader of spider + * + * @param downloader + * @return this + * @see Downloader + */ + public Spider setDownloader(Downloader downloader) { + checkIfRunning(); + this.downloader = downloader; + return this; + } + + protected void initComponent() { + if (downloader == null) { + this.downloader = new HttpClientDownloader(); + } + if (pipelines.isEmpty()) { + pipelines.add(new ConsolePipeline()); + } + downloader.setThread(threadNum); + executorService = ThreadUtils.newFixedThreadPool(threadNum); + if (startUrls != null) { + for (String startUrl : startUrls) { + scheduler.push(new Request(startUrl), this); + } + startUrls.clear(); + } + } + + @Override + public void run() { + checkRunningStat(); + initComponent(); logger.info("Spider " + getUUID() + " started!"); - // single thread - if (threadNum <= 1) { - while (request != null && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { - processRequest(request); - request = scheduler.poll(this); - } - } else { - synchronized (this) { - this.executorService = ThreadUtils.newFixedThreadPool(threadNum); - } - // multi thread - final AtomicInteger threadAlive = new AtomicInteger(0); - while (true && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { - if (request == null) { - // when no request found but some thread is alive, sleep a - // while. - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } - } else { - final Request requestFinal = request; - threadAlive.incrementAndGet(); - executorService.execute(new Runnable() { - @Override - public void run() { - processRequest(requestFinal); - threadAlive.decrementAndGet(); - } - }); - } - request = scheduler.poll(this); - if (threadAlive.get() == 0) { - request = scheduler.poll(this); - if (request == null) { - break; - } - } - } - executorService.shutdown(); - } - stat.compareAndSet(STAT_RUNNING, STAT_STOPPED); - // release some resources - destroy(); - } - - protected void destroy() { - destroyEach(downloader); - destroyEach(pageProcessor); - for (Pipeline pipeline : pipelines) { - destroyEach(pipeline); - } - } - - private void destroyEach(Object object) { - if (object instanceof Closeable) { - try { - ((Closeable) object).close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - /** - * Process specific urls without url discovering. - * - * @param urls - * urls to process - */ - public void test(String... urls) { - checkComponent(); - if (urls.length > 0) { - for (String url : urls) { - processRequest(new Request(url)); - } - } - } - - protected void processRequest(Request request) { - Page page = downloader.download(request, this); - if (page == null) { - sleep(site.getSleepTime()); - return; - } - // for cycle retry - if (page.getHtml() == null) { - addRequest(page); - sleep(site.getSleepTime()); - return; - } - pageProcessor.process(page); - addRequest(page); - if (!page.getResultItems().isSkip()) { - for (Pipeline pipeline : pipelines) { - pipeline.process(page.getResultItems(), this); - } - } - sleep(site.getSleepTime()); - } - - protected void sleep(int time) { - try { - Thread.sleep(time); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - protected void addRequest(Page page) { - if (CollectionUtils.isNotEmpty(page.getTargetRequests())) { - for (Request request : page.getTargetRequests()) { - scheduler.push(request, this); - } - } - } - - protected void checkIfRunning() { - if (!stat.compareAndSet(STAT_INIT, STAT_INIT) && !stat.compareAndSet(STAT_STOPPED, STAT_STOPPED)) { - throw new IllegalStateException("Spider is already running!"); - } - } - - public void runAsync() { - Thread thread = new Thread(this); - thread.setDaemon(false); - thread.start(); - } - - public void start() { - runAsync(); - } - - public void stop() { - if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) { - if (executorService != null) { - executorService.shutdown(); - } - logger.info("Spider " + getUUID() + " stop success!"); - } else { - logger.info("Spider " + getUUID() + " stop fail!"); - } - } - - public void stopAndDestroy() { - stop(); - destroy(); - } - - /** - * start with more than one threads - * - * @param threadNum - * @return this - */ - public Spider thread(int threadNum) { - checkIfRunning(); - this.threadNum = threadNum; - if (threadNum <= 0) { - throw new IllegalArgumentException("threadNum should be more than one!"); - } - if (threadNum == 1) { - return this; - } - return this; - } - - /** - * switch off xsoup - * - * @return - */ - public static void xsoupOff() { - EnvironmentUtil.setUseXsoup(false); - } - - @Override - public String getUUID() { - if (uuid != null) { - return uuid; - } - if (site != null) { - return site.getDomain(); - } - return null; - } - - @Override - public Site getSite() { - return site; - } + final AtomicInteger threadAlive = new AtomicInteger(0); + while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { + Request request = scheduler.poll(this); + if (request == null) { + if (threadAlive.get() == 0) { + break; + } + // when no request found but some thread is alive, sleep a + // while. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } else { + final Request requestFinal = request; + threadAlive.incrementAndGet(); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + processRequest(requestFinal); + } catch (Exception e) { + logger.error("download "+requestFinal+" error",e); + } finally { + threadAlive.decrementAndGet(); + } + } + }); + } + } + executorService.shutdown(); + stat.set(STAT_STOPPED); + // release some resources + destroy(); + } + + private void checkRunningStat() { + while (true) { + int statNow = stat.get(); + if (statNow == STAT_RUNNING) { + throw new IllegalStateException("Spider is already running!"); + } + if (stat.compareAndSet(statNow, STAT_RUNNING)) { + break; + } + } + } + + protected void destroy() { + destroyEach(downloader); + destroyEach(pageProcessor); + for (Pipeline pipeline : pipelines) { + destroyEach(pipeline); + } + } + + private void destroyEach(Object object) { + if (object instanceof Closeable) { + try { + ((Closeable) object).close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * Process specific urls without url discovering. + * + * @param urls urls to process + */ + public void test(String... urls) { + initComponent(); + if (urls.length > 0) { + for (String url : urls) { + processRequest(new Request(url)); + } + } + } + + protected void processRequest(Request request) { + Page page = downloader.download(request, this); + if (page == null) { + sleep(site.getSleepTime()); + return; + } + // for cycle retry + if (page.getHtml() == null) { + addRequest(page); + sleep(site.getSleepTime()); + return; + } + pageProcessor.process(page); + addRequest(page); + if (!page.getResultItems().isSkip()) { + for (Pipeline pipeline : pipelines) { + pipeline.process(page.getResultItems(), this); + } + } + sleep(site.getSleepTime()); + } + + protected void sleep(int time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + protected void addRequest(Page page) { + if (CollectionUtils.isNotEmpty(page.getTargetRequests())) { + for (Request request : page.getTargetRequests()) { + scheduler.push(request, this); + } + } + } + + protected void checkIfRunning() { + if (stat.get() == STAT_RUNNING) { + throw new IllegalStateException("Spider is already running!"); + } + } + + public void runAsync() { + Thread thread = new Thread(this); + thread.setDaemon(false); + thread.start(); + } + + public void start() { + runAsync(); + } + + public void stop() { + if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) { + if (executorService != null) { + executorService.shutdown(); + } + logger.info("Spider " + getUUID() + " stop success!"); + } else { + logger.info("Spider " + getUUID() + " stop fail!"); + } + } + + public void stopAndDestroy() { + stop(); + destroy(); + } + + /** + * start with more than one threads + * + * @param threadNum + * @return this + */ + public Spider thread(int threadNum) { + checkIfRunning(); + this.threadNum = threadNum; + if (threadNum <= 0) { + throw new IllegalArgumentException("threadNum should be more than one!"); + } + if (threadNum == 1) { + return this; + } + return this; + } + + /** + * switch off xsoup + * + * @return + */ + public static void xsoupOff() { + EnvironmentUtil.setUseXsoup(false); + } + + @Override + public String getUUID() { + if (uuid != null) { + return uuid; + } + if (site != null) { + return site.getDomain(); + } + return null; + } + + @Override + public Site getSite() { + return site; + } } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java index ba9774db..cdfe6d01 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java @@ -1,5 +1,7 @@ package us.codecraft.webmagic.utils; +import com.google.common.util.concurrent.MoreExecutors; + import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -11,11 +13,15 @@ import java.util.concurrent.TimeUnit; */ public class ThreadUtils { - public static ExecutorService newFixedThreadPool(int threadSize) { - if (threadSize <= 1) { - throw new IllegalArgumentException("ThreadSize must be greater than 1!"); - } - return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS, - new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); - } + public static ExecutorService newFixedThreadPool(int threadSize) { + if (threadSize <= 0) { + throw new IllegalArgumentException("ThreadSize must be greater than 0!"); + } + if (threadSize == 1) { + return MoreExecutors.sameThreadExecutor(); + + } + return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS, + new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); + } } diff --git a/webmagic-core/src/test/java/us/codecraft/webmagic/SpiderTest.java b/webmagic-core/src/test/java/us/codecraft/webmagic/SpiderTest.java index 75c1ba11..3add86cc 100644 --- a/webmagic-core/src/test/java/us/codecraft/webmagic/SpiderTest.java +++ b/webmagic-core/src/test/java/us/codecraft/webmagic/SpiderTest.java @@ -18,7 +18,7 @@ public class SpiderTest { public void process(ResultItems resultItems, Task task) { System.out.println(1); } - }).thread(2); + }).thread(1); spider.start(); Thread.sleep(10000); spider.stop();