invite redis for distribution

pull/17/head
yihua.huang 12 years ago
parent 619aab4399
commit 55d80129bf

@ -25,6 +25,11 @@
<artifactId>freemarker</artifactId>
<version>2.3.15</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>

@ -0,0 +1,45 @@
package us.codecraft.webmagic.scheduler;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.schedular.Scheduler;
/**
* 使redisurl<br>
* @author yihua.huang@dianping.com <br>
* @date: 13-7-25 <br>
* Time: 7:07 <br>
*/
public class RedisScheduler implements Scheduler{
private JedisPool pool;
private static final String QUEUE_PREFIX = "queue_";
private static final String SET_PREFIX = "set_";
public RedisScheduler(String host){
pool = new JedisPool(new JedisPoolConfig(), host);
}
@Override
public synchronized void push(Request request, Task task) {
Jedis jedis = pool.getResource();
if (jedis.zrank(SET_PREFIX+task.getUUID(),request.getUrl())==null){
jedis.rpush(QUEUE_PREFIX+task.getUUID(),request.getUrl());
jedis.zadd(SET_PREFIX+task.getUUID(),System.currentTimeMillis(),request.getUrl());
}
pool.returnResource(jedis);
}
@Override
public synchronized Request poll(Task task) {
Jedis jedis = pool.getResource();
String url = jedis.lpop(QUEUE_PREFIX+task.getUUID());
pool.returnResource(jedis);
return new Request(url);
}
}

@ -0,0 +1,41 @@
package us.codecraft.webmagic.scheduler;
import org.junit.Before;
import org.junit.Test;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Task;
/**
* @author yihua.huang@dianping.com <br>
* @date: 13-7-25 <br>
* Time: 7:51 <br>
*/
public class RedisSchedulerTest {
private RedisScheduler redisScheduler;
@Before
public void setUp() {
redisScheduler = new RedisScheduler("localhost");
}
@Test
public void test() {
Task task = new Task() {
@Override
public String getUUID() {
return "1";
}
@Override
public Site getSite() {
return null;
}
};
redisScheduler.push(new Request("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/"), task);
Request poll = redisScheduler.poll(task);
System.out.println(poll.getUrl());
}
}

@ -3,11 +3,9 @@ package us.codecraft.webmagic.samples;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.downloader.FileDownloader;
import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.pipeline.FilePipeline;
import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.schedular.FileCacheQueueScheduler;
import us.codecraft.webmagic.scheduler.RedisScheduler;
import java.util.List;
@ -40,9 +38,12 @@ public class GlobalProcessor implements PageProcessor {
public static void main(String[] args) {
Spider.create(new GlobalProcessor()).thread(10)
.scheduler(new FileCacheQueueScheduler("/data/webmagic/test"))
.downloader(new FileDownloader("/data/webmagic/test", new HttpClientDownloader()))
.pipeline(new FilePipeline("/data/webmagic/test"))
.scheduler(new RedisScheduler("localhost"))
.pipeline(new FilePipeline("/data/webmagic/test/"))
.runAsync();
Spider.create(new GlobalProcessor()).thread(10)
.scheduler(new RedisScheduler("localhost"))
.pipeline(new FilePipeline("/data/webmagic/test/"))
.run();
}
}

Loading…
Cancel
Save