|
|
@ -1,22 +1,23 @@
|
|
|
|
package us.codecraft.webmagic.scheduler;
|
|
|
|
package us.codecraft.webmagic.scheduler;
|
|
|
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
import java.util.Set;
|
|
|
|
|
|
|
|
|
|
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
|
|
|
|
|
|
|
|
import redis.clients.jedis.Jedis;
|
|
|
|
import redis.clients.jedis.Jedis;
|
|
|
|
import redis.clients.jedis.JedisPool;
|
|
|
|
import redis.clients.jedis.JedisPool;
|
|
|
|
import us.codecraft.webmagic.Request;
|
|
|
|
import us.codecraft.webmagic.Request;
|
|
|
|
import us.codecraft.webmagic.Task;
|
|
|
|
import us.codecraft.webmagic.Task;
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* the redis scheduler with priority
|
|
|
|
* the redis scheduler with priority
|
|
|
|
* @author sai
|
|
|
|
* @author sai
|
|
|
|
* Created by sai on 16-5-27.
|
|
|
|
* Created by sai on 16-5-27.
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
public class RedisPriorityScheduler extends RedisScheduler
|
|
|
|
public class RedisPriorityScheduler extends RedisScheduler {
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final String ZSET_PREFIX = "zset_";
|
|
|
|
private static final String ZSET_PREFIX = "zset_";
|
|
|
|
|
|
|
|
|
|
|
@ -37,62 +38,44 @@ public class RedisPriorityScheduler extends RedisScheduler
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
protected void pushWhenNoDuplicate(Request request, Task task)
|
|
|
|
protected void pushWhenNoDuplicate(Request request, Task task) {
|
|
|
|
{
|
|
|
|
try (Jedis jedis = pool.getResource()) {
|
|
|
|
Jedis jedis = pool.getResource();
|
|
|
|
if (request.getPriority() > 0) {
|
|
|
|
try
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
if(request.getPriority() > 0)
|
|
|
|
|
|
|
|
jedis.zadd(getZsetPlusPriorityKey(task), request.getPriority(), request.getUrl());
|
|
|
|
jedis.zadd(getZsetPlusPriorityKey(task), request.getPriority(), request.getUrl());
|
|
|
|
else if(request.getPriority() < 0)
|
|
|
|
} else if (request.getPriority() < 0) {
|
|
|
|
jedis.zadd(getZsetMinusPriorityKey(task), request.getPriority(), request.getUrl());
|
|
|
|
jedis.zadd(getZsetMinusPriorityKey(task), request.getPriority(), request.getUrl());
|
|
|
|
else
|
|
|
|
} else {
|
|
|
|
jedis.lpush(getQueueNoPriorityKey(task), request.getUrl());
|
|
|
|
jedis.lpush(getQueueNoPriorityKey(task), request.getUrl());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
setExtrasInItem(jedis, request, task);
|
|
|
|
setExtrasInItem(jedis, request, task);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
finally
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
pool.returnResource(jedis);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public synchronized Request poll(Task task)
|
|
|
|
public synchronized Request poll(Task task) {
|
|
|
|
{
|
|
|
|
try (Jedis jedis = pool.getResource()) {
|
|
|
|
Jedis jedis = pool.getResource();
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
String url = getRequest(jedis, task);
|
|
|
|
String url = getRequest(jedis, task);
|
|
|
|
if(StringUtils.isBlank(url))
|
|
|
|
if (StringUtils.isBlank(url)) {
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
|
|
|
|
}
|
|
|
|
return getExtrasInItem(jedis, url, task);
|
|
|
|
return getExtrasInItem(jedis, url, task);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
finally
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
pool.returnResource(jedis);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private String getRequest(Jedis jedis, Task task)
|
|
|
|
private String getRequest(Jedis jedis, Task task) {
|
|
|
|
{
|
|
|
|
|
|
|
|
String url;
|
|
|
|
String url;
|
|
|
|
Set<String> urls = jedis.zrevrange(getZsetPlusPriorityKey(task), 0, 0);
|
|
|
|
Set<String> urls = jedis.zrevrange(getZsetPlusPriorityKey(task), 0, 0);
|
|
|
|
if(urls.isEmpty())
|
|
|
|
if (urls.isEmpty()) {
|
|
|
|
{
|
|
|
|
|
|
|
|
url = jedis.lpop(getQueueNoPriorityKey(task));
|
|
|
|
url = jedis.lpop(getQueueNoPriorityKey(task));
|
|
|
|
if(StringUtils.isBlank(url))
|
|
|
|
if (StringUtils.isBlank(url)) {
|
|
|
|
{
|
|
|
|
|
|
|
|
urls = jedis.zrevrange(getZsetMinusPriorityKey(task), 0, 0);
|
|
|
|
urls = jedis.zrevrange(getZsetMinusPriorityKey(task), 0, 0);
|
|
|
|
if(!urls.isEmpty())
|
|
|
|
if (!urls.isEmpty()) {
|
|
|
|
{
|
|
|
|
|
|
|
|
url = urls.toArray(new String[0])[0];
|
|
|
|
url = urls.toArray(new String[0])[0];
|
|
|
|
jedis.zrem(getZsetMinusPriorityKey(task), url);
|
|
|
|
jedis.zrem(getZsetMinusPriorityKey(task), url);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
else
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
url = urls.toArray(new String[0])[0];
|
|
|
|
url = urls.toArray(new String[0])[0];
|
|
|
|
jedis.zrem(getZsetPlusPriorityKey(task), url);
|
|
|
|
jedis.zrem(getZsetPlusPriorityKey(task), url);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -100,51 +83,39 @@ public class RedisPriorityScheduler extends RedisScheduler
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void resetDuplicateCheck(Task task)
|
|
|
|
public void resetDuplicateCheck(Task task) {
|
|
|
|
{
|
|
|
|
try (Jedis jedis = pool.getResource()) {
|
|
|
|
Jedis jedis = pool.getResource();
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
jedis.del(getSetKey(task));
|
|
|
|
jedis.del(getSetKey(task));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
finally
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
pool.returnResource(jedis);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private String getZsetPlusPriorityKey(Task task)
|
|
|
|
private String getZsetPlusPriorityKey(Task task) {
|
|
|
|
{
|
|
|
|
|
|
|
|
return ZSET_PREFIX + task.getUUID() + PLUS_PRIORITY_SUFFIX;
|
|
|
|
return ZSET_PREFIX + task.getUUID() + PLUS_PRIORITY_SUFFIX;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private String getQueueNoPriorityKey(Task task)
|
|
|
|
private String getQueueNoPriorityKey(Task task) {
|
|
|
|
{
|
|
|
|
|
|
|
|
return QUEUE_PREFIX + task.getUUID() + NO_PRIORITY_SUFFIX;
|
|
|
|
return QUEUE_PREFIX + task.getUUID() + NO_PRIORITY_SUFFIX;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private String getZsetMinusPriorityKey(Task task)
|
|
|
|
private String getZsetMinusPriorityKey(Task task) {
|
|
|
|
{
|
|
|
|
|
|
|
|
return ZSET_PREFIX + task.getUUID() + MINUS_PRIORITY_SUFFIX;
|
|
|
|
return ZSET_PREFIX + task.getUUID() + MINUS_PRIORITY_SUFFIX;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void setExtrasInItem(Jedis jedis,Request request, Task task)
|
|
|
|
private void setExtrasInItem(Jedis jedis,Request request, Task task) {
|
|
|
|
{
|
|
|
|
if (request.getExtras() != null) {
|
|
|
|
if(request.getExtras() != null)
|
|
|
|
String field = DigestUtils.sha1Hex(request.getUrl());
|
|
|
|
{
|
|
|
|
|
|
|
|
String field = DigestUtils.shaHex(request.getUrl());
|
|
|
|
|
|
|
|
String value = JSON.toJSONString(request);
|
|
|
|
String value = JSON.toJSONString(request);
|
|
|
|
jedis.hset(getItemKey(task), field, value);
|
|
|
|
jedis.hset(getItemKey(task), field, value);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private Request getExtrasInItem(Jedis jedis, String url, Task task)
|
|
|
|
private Request getExtrasInItem(Jedis jedis, String url, Task task) {
|
|
|
|
{
|
|
|
|
|
|
|
|
String key = getItemKey(task);
|
|
|
|
String key = getItemKey(task);
|
|
|
|
String field = DigestUtils.shaHex(url);
|
|
|
|
String field = DigestUtils.sha1Hex(url);
|
|
|
|
byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());
|
|
|
|
byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());
|
|
|
|
if(bytes != null)
|
|
|
|
if (bytes != null) {
|
|
|
|
return JSON.parseObject(new String(bytes), Request.class);
|
|
|
|
return JSON.parseObject(new String(bytes), Request.class);
|
|
|
|
|
|
|
|
}
|
|
|
|
return new Request(url);
|
|
|
|
return new Request(url);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|