add delay queue

pull/23/merge
yihua.huang 12 years ago
parent 91dcccf7b5
commit 0b9e0465ed

@ -0,0 +1,82 @@
package us.codecraft.webmagic.samples.scheduler;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.PriorityScheduler;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author code4crafter@gmail.com
*/
public class DelayQueueScheduler extends PriorityScheduler {
private DelayQueue<RequestWrapper> queue = new DelayQueue<RequestWrapper>();
private Set<String> urls = new HashSet<String>();
private long time;
private TimeUnit timeUnit;
private class RequestWrapper implements Delayed {
private long startTime = System.currentTimeMillis();
private Request request;
private RequestWrapper(Request request) {
this.request = request;
}
private long getStartTime() {
return startTime;
}
private Request getRequest() {
return request;
}
@Override
public long getDelay(TimeUnit unit) {
long convert = unit.convert(TimeUnit.MILLISECONDS.convert(time, timeUnit) - System.currentTimeMillis() + startTime, TimeUnit.MILLISECONDS);
return convert;
}
@Override
public int compareTo(Delayed o) {
return new Long(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS));
}
}
public DelayQueueScheduler(long time, TimeUnit timeUnit) {
this.time = time;
this.timeUnit = timeUnit;
}
@Override
public synchronized void push(Request request, Task task) {
if (urls.add(request.getUrl())) {
queue.add(new RequestWrapper(request));
}
}
@Override
public synchronized Request poll(Task task) {
RequestWrapper take = null;
while (take == null) {
try {
take = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(new RequestWrapper(take.getRequest()));
return take.getRequest();
}
}

@ -0,0 +1,24 @@
package us.codecraft.webmagic.samples.scheduler;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.PriorityScheduler;
/**
* @author code4crafter@gmail.com
*/
public class LevelLimitScheduler extends PriorityScheduler {
private int levelLimit = 3;
public LevelLimitScheduler(int levelLimit) {
this.levelLimit = levelLimit;
}
@Override
public synchronized void push(Request request, Task task) {
if (((Integer) request.getExtra("_level")) <= levelLimit) {
super.push(request, task);
}
}
}

@ -18,7 +18,8 @@ import static us.codecraft.webmagic.selector.Selectors.xpath;
*/
public class ZipCodePageProcessor implements PageProcessor {
private Site site = Site.me().setCharset("gb2312").setSleepTime(0).addStartUrl("http://www.ip138.com/post/");
private Site site = Site.me().setCharset("gb2312")
.setSleepTime(100).addStartUrl("http://www.ip138.com/post/");
@Override
public void process(Page page) {
@ -79,5 +80,10 @@ public class ZipCodePageProcessor implements PageProcessor {
public static void main(String[] args) {
Spider.create(new ZipCodePageProcessor()).scheduler(new PriorityScheduler()).run();
PriorityScheduler scheduler = new PriorityScheduler();
Spider spider = Spider.create(new ZipCodePageProcessor()).scheduler(scheduler);
scheduler.push(new Request("http://www.baidu.com/s?wd=webmagic&f=12&rsp=0&oq=webmagix&tn=baiduhome_pg&ie=utf-8"),spider);
spider.run();
}
}

@ -0,0 +1,24 @@
package us.codecraft.webmagic.samples.scheduler;
import org.junit.Ignore;
import org.junit.Test;
import us.codecraft.webmagic.Request;
import java.util.concurrent.TimeUnit;
/**
* @author code4crafter@gmail.com
*/
public class DelayQueueSchedulerTest {
@Ignore("infinite")
@Test
public void test() {
DelayQueueScheduler delayQueueScheduler = new DelayQueueScheduler(1, TimeUnit.SECONDS);
delayQueueScheduler.push(new Request("1"), null);
while (true){
Request poll = delayQueueScheduler.poll(null);
System.out.println(System.currentTimeMillis()+"\t"+poll);
}
}
}
Loading…
Cancel
Save