From 0b9e0465ed0fc18541408aa9d496edc0eb0ecf7d Mon Sep 17 00:00:00 2001 From: "yihua.huang" Date: Wed, 21 Aug 2013 23:49:15 +0800 Subject: [PATCH] add delay queue --- .../scheduler/DelayQueueScheduler.java | 82 +++++++++++++++++++ .../scheduler/LevelLimitScheduler.java | 24 ++++++ .../scheduler/ZipCodePageProcessor.java | 8 +- .../scheduler/DelayQueueSchedulerTest.java | 24 ++++++ 4 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/DelayQueueScheduler.java create mode 100644 webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/LevelLimitScheduler.java create mode 100644 webmagic-samples/src/test/java/us/codecraft/webmagic/samples/scheduler/DelayQueueSchedulerTest.java diff --git a/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/DelayQueueScheduler.java b/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/DelayQueueScheduler.java new file mode 100644 index 00000000..a52b3d4b --- /dev/null +++ b/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/DelayQueueScheduler.java @@ -0,0 +1,82 @@ +package us.codecraft.webmagic.samples.scheduler; + +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.scheduler.PriorityScheduler; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +/** + * @author code4crafter@gmail.com + */ +public class DelayQueueScheduler extends PriorityScheduler { + + private DelayQueue queue = new DelayQueue(); + + private Set urls = new HashSet(); + + private long time; + + private TimeUnit timeUnit; + + private class RequestWrapper implements Delayed { + + private long startTime = System.currentTimeMillis(); + + private Request request; + + private RequestWrapper(Request request) { + this.request = request; + } + + private long getStartTime() { + return startTime; + } + + private Request getRequest() { + return request; + } + + @Override + public long getDelay(TimeUnit unit) { + long convert = unit.convert(TimeUnit.MILLISECONDS.convert(time, timeUnit) - System.currentTimeMillis() + startTime, TimeUnit.MILLISECONDS); + return convert; + } + + @Override + public int compareTo(Delayed o) { + return new Long(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS)); + } + } + + public DelayQueueScheduler(long time, TimeUnit timeUnit) { + this.time = time; + this.timeUnit = timeUnit; + } + + @Override + public synchronized void push(Request request, Task task) { + if (urls.add(request.getUrl())) { + queue.add(new RequestWrapper(request)); + } + + } + + @Override + public synchronized Request poll(Task task) { + RequestWrapper take = null; + while (take == null) { + try { + take = queue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + queue.add(new RequestWrapper(take.getRequest())); + return take.getRequest(); + } +} diff --git a/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/LevelLimitScheduler.java b/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/LevelLimitScheduler.java new file mode 100644 index 00000000..79ef209f --- /dev/null +++ b/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/LevelLimitScheduler.java @@ -0,0 +1,24 @@ +package us.codecraft.webmagic.samples.scheduler; + +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.scheduler.PriorityScheduler; + +/** + * @author code4crafter@gmail.com + */ +public class LevelLimitScheduler extends PriorityScheduler { + + private int levelLimit = 3; + + public LevelLimitScheduler(int levelLimit) { + this.levelLimit = levelLimit; + } + + @Override + public synchronized void push(Request request, Task task) { + if (((Integer) request.getExtra("_level")) <= levelLimit) { + super.push(request, task); + } + } +} diff --git a/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/ZipCodePageProcessor.java b/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/ZipCodePageProcessor.java index e6b3f66c..ddbaa088 100644 --- a/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/ZipCodePageProcessor.java +++ b/webmagic-samples/src/main/java/us/codecraft/webmagic/samples/scheduler/ZipCodePageProcessor.java @@ -18,7 +18,8 @@ import static us.codecraft.webmagic.selector.Selectors.xpath; */ public class ZipCodePageProcessor implements PageProcessor { - private Site site = Site.me().setCharset("gb2312").setSleepTime(0).addStartUrl("http://www.ip138.com/post/"); + private Site site = Site.me().setCharset("gb2312") + .setSleepTime(100).addStartUrl("http://www.ip138.com/post/"); @Override public void process(Page page) { @@ -79,5 +80,10 @@ public class ZipCodePageProcessor implements PageProcessor { public static void main(String[] args) { Spider.create(new ZipCodePageProcessor()).scheduler(new PriorityScheduler()).run(); + + PriorityScheduler scheduler = new PriorityScheduler(); + Spider spider = Spider.create(new ZipCodePageProcessor()).scheduler(scheduler); + scheduler.push(new Request("http://www.baidu.com/s?wd=webmagic&f=12&rsp=0&oq=webmagix&tn=baiduhome_pg&ie=utf-8"),spider); + spider.run(); } } diff --git a/webmagic-samples/src/test/java/us/codecraft/webmagic/samples/scheduler/DelayQueueSchedulerTest.java b/webmagic-samples/src/test/java/us/codecraft/webmagic/samples/scheduler/DelayQueueSchedulerTest.java new file mode 100644 index 00000000..31af3b2b --- /dev/null +++ b/webmagic-samples/src/test/java/us/codecraft/webmagic/samples/scheduler/DelayQueueSchedulerTest.java @@ -0,0 +1,24 @@ +package us.codecraft.webmagic.samples.scheduler; + +import org.junit.Ignore; +import org.junit.Test; +import us.codecraft.webmagic.Request; + +import java.util.concurrent.TimeUnit; + +/** + * @author code4crafter@gmail.com + */ +public class DelayQueueSchedulerTest { + + @Ignore("infinite") + @Test + public void test() { + DelayQueueScheduler delayQueueScheduler = new DelayQueueScheduler(1, TimeUnit.SECONDS); + delayQueueScheduler.push(new Request("1"), null); + while (true){ + Request poll = delayQueueScheduler.poll(null); + System.out.println(System.currentTimeMillis()+"\t"+poll); + } + } +}