Merge pull request #343 from Salon-sai/master

add: redis scheduler with priority
pull/254/head
Yihua Huang 9 years ago committed by GitHub
commit e22d6426fc

@ -0,0 +1,151 @@
package us.codecraft.webmagic.scheduler;
import com.alibaba.fastjson.JSON;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import java.util.Set;
/**
* the redis scheduler with priority
* @author sai
* Created by sai on 16-5-27.
*/
public class RedisPriorityScheduler extends RedisScheduler
{
private static final String ZSET_PREFIX = "zset_";
private static final String QUEUE_PREFIX = "queue_";
private static final String NO_PRIORITY_SUFFIX = "_zore";
private static final String PLUS_PRIORITY_SUFFIX = "_plus";
private static final String MINUS_PRIORITY_SUFFIX = "_minus";
public RedisPriorityScheduler(String host) {
super(host);
}
public RedisPriorityScheduler(JedisPool pool) {
super(pool);
}
@Override
protected void pushWhenNoDuplicate(Request request, Task task)
{
Jedis jedis = pool.getResource();
try
{
if(request.getPriority() > 0)
jedis.zadd(getZsetPlusPriorityKey(task), request.getPriority(), request.getUrl());
else if(request.getPriority() < 0)
jedis.zadd(getZsetMinusPriorityKey(task), request.getPriority(), request.getUrl());
else
jedis.lpush(getQueueNoPriorityKey(task), request.getUrl());
setExtrasInItem(jedis, request, task);
}
finally
{
pool.returnResource(jedis);
}
}
@Override
public synchronized Request poll(Task task)
{
Jedis jedis = pool.getResource();
try
{
String url = getRequest(jedis, task);
if(StringUtils.isBlank(url))
return null;
return getExtrasInItem(jedis, url, task);
}
finally
{
pool.returnResource(jedis);
}
}
private String getRequest(Jedis jedis, Task task)
{
String url;
Set<String> urls = jedis.zrevrange(getZsetPlusPriorityKey(task), 0, 0);
if(urls.isEmpty())
{
url = jedis.lpop(getQueueNoPriorityKey(task));
if(StringUtils.isBlank(url))
{
urls = jedis.zrevrange(getZsetMinusPriorityKey(task), 0, 0);
if(!urls.isEmpty())
{
url = urls.toArray(new String[0])[0];
jedis.zrem(getZsetMinusPriorityKey(task), url);
}
}
}
else
{
url = urls.toArray(new String[0])[0];
jedis.zrem(getZsetPlusPriorityKey(task), url);
}
return url;
}
@Override
public void resetDuplicateCheck(Task task)
{
Jedis jedis = pool.getResource();
try
{
jedis.del(getSetKey(task));
}
finally
{
pool.returnResource(jedis);
}
}
private String getZsetPlusPriorityKey(Task task)
{
return ZSET_PREFIX + task.getUUID() + PLUS_PRIORITY_SUFFIX;
}
private String getQueueNoPriorityKey(Task task)
{
return QUEUE_PREFIX + task.getUUID() + NO_PRIORITY_SUFFIX;
}
private String getZsetMinusPriorityKey(Task task)
{
return ZSET_PREFIX + task.getUUID() + MINUS_PRIORITY_SUFFIX;
}
private void setExtrasInItem(Jedis jedis,Request request, Task task)
{
if(request.getExtras() != null)
{
String field = DigestUtils.shaHex(request.getUrl());
String value = JSON.toJSONString(request);
jedis.hset(getItemKey(task), field, value);
}
}
private Request getExtrasInItem(Jedis jedis, String url, Task task)
{
String key = getItemKey(task);
String field = DigestUtils.shaHex(url);
byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());
if(bytes != null)
return JSON.parseObject(new String(bytes), Request.class);
return new Request(url);
}
}

@ -17,7 +17,7 @@ import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
*/
public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {
private JedisPool pool;
protected JedisPool pool;
private static final String QUEUE_PREFIX = "queue_";
@ -89,7 +89,7 @@ public class RedisScheduler extends DuplicateRemovedScheduler implements Monitor
Request o = JSON.parseObject(new String(bytes), Request.class);
return o;
}
Request request = new Request(url);
Request request = new Request(url);
return request;
} finally {
pool.returnResource(jedis);
@ -104,6 +104,11 @@ public class RedisScheduler extends DuplicateRemovedScheduler implements Monitor
return QUEUE_PREFIX + task.getUUID();
}
protected String getItemKey(Task task)
{
return ITEM_PREFIX + task.getUUID();
}
@Override
public int getLeftRequestsCount(Task task) {
Jedis jedis = pool.getResource();

@ -0,0 +1,70 @@
package us.codecraft.webmagic.scheduler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Task;
/**
* @author sai
* Created by sai on 16-7-5.
*/
public class RedisPrioritySchedulerTest
{
private RedisPriorityScheduler scheduler;
@Before
public void setUp()
{
scheduler = new RedisPriorityScheduler("localhost");
}
@Ignore("environment depended")
@Test
public void test()
{
Task task = new Task() {
@Override
public String getUUID() {
return "TestTask";
}
@Override
public Site getSite() {
return null;
}
};
scheduler.resetDuplicateCheck(task);
Request request = new Request("https://www.google.com");
Request request1= new Request("https://www.facebook.com/");
Request request2= new Request("https://twitter.com");
request.setPriority(1).putExtra("name", "google");
request1.setPriority(0).putExtra("name", "facebook");
request2.setPriority(-1).putExtra("name", "twitter");
scheduler.push(request, task);
scheduler.push(request1, task);
scheduler.push(request2, task);
Request GRequest = scheduler.poll(task);
Request FBRequest = scheduler.poll(task);
Request TRequest = scheduler.poll(task);
Assert.assertEquals(GRequest.getUrl(), request.getUrl());
Assert.assertEquals(GRequest.getExtra("name"), request.getExtra("name"));
Assert.assertEquals(FBRequest.getUrl(), request1.getUrl());
Assert.assertEquals(FBRequest.getExtra("name"), request.getExtra("name"));
Assert.assertEquals(TRequest.getUrl(), request2.getUrl());
Assert.assertEquals(TRequest.getExtra("name"), request.getExtra("name"));
}
}
Loading…
Cancel
Save