|
|
@ -7,8 +7,12 @@ import us.codecraft.webmagic.Request;
|
|
|
|
import us.codecraft.webmagic.Task;
|
|
|
|
import us.codecraft.webmagic.Task;
|
|
|
|
import us.codecraft.webmagic.schedular.Scheduler;
|
|
|
|
import us.codecraft.webmagic.schedular.Scheduler;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.locks.Condition;
|
|
|
|
|
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 使用redis管理url,构建一个分布式的爬虫。<br>
|
|
|
|
* 使用redis管理url,构建一个分布式的爬虫。<br>
|
|
|
|
|
|
|
|
*
|
|
|
|
* @author yihua.huang@dianping.com <br>
|
|
|
|
* @author yihua.huang@dianping.com <br>
|
|
|
|
* @date: 13-7-25 <br>
|
|
|
|
* @date: 13-7-25 <br>
|
|
|
|
* Time: 上午7:07 <br>
|
|
|
|
* Time: 上午7:07 <br>
|
|
|
@ -21,6 +25,10 @@ public class RedisScheduler implements Scheduler{
|
|
|
|
|
|
|
|
|
|
|
|
private static final String SET_PREFIX = "set_";
|
|
|
|
private static final String SET_PREFIX = "set_";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private ReentrantLock lock = new ReentrantLock();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Condition condition = lock.newCondition();
|
|
|
|
|
|
|
|
|
|
|
|
public RedisScheduler(String host) {
|
|
|
|
public RedisScheduler(String host) {
|
|
|
|
pool = new JedisPool(new JedisPoolConfig(), host);
|
|
|
|
pool = new JedisPool(new JedisPoolConfig(), host);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -30,9 +38,15 @@ public class RedisScheduler implements Scheduler{
|
|
|
|
Jedis jedis = pool.getResource();
|
|
|
|
Jedis jedis = pool.getResource();
|
|
|
|
//使用SortedSet进行url去重
|
|
|
|
//使用SortedSet进行url去重
|
|
|
|
if (jedis.zrank(SET_PREFIX + task.getUUID(), request.getUrl()) == null) {
|
|
|
|
if (jedis.zrank(SET_PREFIX + task.getUUID(), request.getUrl()) == null) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
lock.lock();
|
|
|
|
//使用List保存队列
|
|
|
|
//使用List保存队列
|
|
|
|
jedis.rpush(QUEUE_PREFIX + task.getUUID(), request.getUrl());
|
|
|
|
jedis.rpush(QUEUE_PREFIX + task.getUUID(), request.getUrl());
|
|
|
|
jedis.zadd(SET_PREFIX + task.getUUID(), System.currentTimeMillis(), request.getUrl());
|
|
|
|
jedis.zadd(SET_PREFIX + task.getUUID(), System.currentTimeMillis(), request.getUrl());
|
|
|
|
|
|
|
|
condition.signal();
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
lock.unlock();
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
pool.returnResource(jedis);
|
|
|
|
pool.returnResource(jedis);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -41,6 +55,20 @@ public class RedisScheduler implements Scheduler{
|
|
|
|
public synchronized Request poll(Task task) {
|
|
|
|
public synchronized Request poll(Task task) {
|
|
|
|
Jedis jedis = pool.getResource();
|
|
|
|
Jedis jedis = pool.getResource();
|
|
|
|
String url = jedis.lpop(QUEUE_PREFIX + task.getUUID());
|
|
|
|
String url = jedis.lpop(QUEUE_PREFIX + task.getUUID());
|
|
|
|
|
|
|
|
if (url == null) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
lock.lock();
|
|
|
|
|
|
|
|
while (url == null) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
condition.await();
|
|
|
|
|
|
|
|
url = jedis.lpop(QUEUE_PREFIX + task.getUUID());
|
|
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
lock.unlock();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
pool.returnResource(jedis);
|
|
|
|
pool.returnResource(jedis);
|
|
|
|
return new Request(url);
|
|
|
|
return new Request(url);
|
|
|
|
}
|
|
|
|
}
|
|
|
|