diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisPriorityScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisPriorityScheduler.java new file mode 100644 index 00000000..e95ca921 --- /dev/null +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisPriorityScheduler.java @@ -0,0 +1,151 @@ +package us.codecraft.webmagic.scheduler; + +import com.alibaba.fastjson.JSON; +import com.sun.org.apache.regexp.internal.RE; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang.StringUtils; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; + +import java.util.Set; + +/** + * the redis scheduler with priority + * @author sai + * Created by sai on 16-5-27. + */ +public class RedisPriorityScheduler extends RedisScheduler +{ + + private static final String ZSET_PREFIX = "zset_"; + + private static final String QUEUE_PREFIX = "queue_"; + + private static final String NO_PRIORITY_SUFFIX = "_zore"; + + private static final String PLUS_PRIORITY_SUFFIX = "_plus"; + + private static final String MINUS_PRIORITY_SUFFIX = "_minus"; + + public RedisPriorityScheduler(String host) { + super(host); + } + + public RedisPriorityScheduler(JedisPool pool) { + super(pool); + } + + @Override + protected void pushWhenNoDuplicate(Request request, Task task) + { + Jedis jedis = pool.getResource(); + try + { + if(request.getPriority() > 0) + jedis.zadd(getZsetPlusPriorityKey(task), request.getPriority(), request.getUrl()); + else if(request.getPriority() < 0) + jedis.zadd(getZsetMinusPriorityKey(task), request.getPriority(), request.getUrl()); + else + jedis.lpush(getQueueNoPriorityKey(task), request.getUrl()); + + setExtrasInItem(jedis, request, task); + } + finally + { + pool.returnResource(jedis); + } + } + + @Override + public synchronized Request poll(Task task) + { + Jedis jedis = pool.getResource(); + try + { + String url = getRequest(jedis, task); + if(StringUtils.isBlank(url)) + return null; + return getExtrasInItem(jedis, url, task); + } + finally + { + pool.returnResource(jedis); + } + } + + private String getRequest(Jedis jedis, Task task) + { + String url; + Set urls = jedis.zrevrange(getZsetPlusPriorityKey(task), 0, 0); + if(urls.isEmpty()) + { + url = jedis.lpop(getQueueNoPriorityKey(task)); + if(StringUtils.isBlank(url)) + { + urls = jedis.zrevrange(getZsetMinusPriorityKey(task), 0, 0); + if(!urls.isEmpty()) + { + url = urls.toArray(new String[0])[0]; + jedis.zrem(getZsetMinusPriorityKey(task), url); + } + } + } + else + { + url = urls.toArray(new String[0])[0]; + jedis.zrem(getZsetPlusPriorityKey(task), url); + } + return url; + } + + @Override + public void resetDuplicateCheck(Task task) + { + Jedis jedis = pool.getResource(); + try + { + jedis.del(getSetKey(task)); + } + finally + { + pool.returnResource(jedis); + } + } + + private String getZsetPlusPriorityKey(Task task) + { + return ZSET_PREFIX + task.getUUID() + PLUS_PRIORITY_SUFFIX; + } + + private String getQueueNoPriorityKey(Task task) + { + return QUEUE_PREFIX + task.getUUID() + NO_PRIORITY_SUFFIX; + } + + private String getZsetMinusPriorityKey(Task task) + { + return ZSET_PREFIX + task.getUUID() + MINUS_PRIORITY_SUFFIX; + } + + private void setExtrasInItem(Jedis jedis,Request request, Task task) + { + if(request.getExtras() != null) + { + String field = DigestUtils.shaHex(request.getUrl()); + String value = JSON.toJSONString(request); + jedis.hset(getItemKey(task), field, value); + } + } + + private Request getExtrasInItem(Jedis jedis, String url, Task task) + { + String key = getItemKey(task); + String field = DigestUtils.shaHex(url); + byte[] bytes = jedis.hget(key.getBytes(), field.getBytes()); + if(bytes != null) + return JSON.parseObject(new String(bytes), Request.class); + return new Request(url); + } +} diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java index 1de718a8..61551b13 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java @@ -17,7 +17,7 @@ import us.codecraft.webmagic.scheduler.component.DuplicateRemover; */ public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover { - private JedisPool pool; + protected JedisPool pool; private static final String QUEUE_PREFIX = "queue_"; @@ -89,7 +89,7 @@ public class RedisScheduler extends DuplicateRemovedScheduler implements Monitor Request o = JSON.parseObject(new String(bytes), Request.class); return o; } - Request request = new Request(url); + Request request = new Request(url); return request; } finally { pool.returnResource(jedis); @@ -104,6 +104,11 @@ public class RedisScheduler extends DuplicateRemovedScheduler implements Monitor return QUEUE_PREFIX + task.getUUID(); } + protected String getItemKey(Task task) + { + return ITEM_PREFIX + task.getUUID(); + } + @Override public int getLeftRequestsCount(Task task) { Jedis jedis = pool.getResource(); diff --git a/webmagic-extension/src/test/java/us/codecraft/webmagic/scheduler/RedisPrioritySchedulerTest.java b/webmagic-extension/src/test/java/us/codecraft/webmagic/scheduler/RedisPrioritySchedulerTest.java new file mode 100644 index 00000000..15bd939f --- /dev/null +++ b/webmagic-extension/src/test/java/us/codecraft/webmagic/scheduler/RedisPrioritySchedulerTest.java @@ -0,0 +1,70 @@ +package us.codecraft.webmagic.scheduler; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Site; +import us.codecraft.webmagic.Task; + +/** + * @author sai + * Created by sai on 16-7-5. + */ +public class RedisPrioritySchedulerTest +{ + + private RedisPriorityScheduler scheduler; + + @Before + public void setUp() + { + scheduler = new RedisPriorityScheduler("localhost"); + } + + @Ignore("environment depended") + @Test + public void test() + { + Task task = new Task() { + @Override + public String getUUID() { + return "TestTask"; + } + + @Override + public Site getSite() { + return null; + } + }; + + scheduler.resetDuplicateCheck(task); + + Request request = new Request("https://www.google.com"); + Request request1= new Request("https://www.facebook.com/"); + Request request2= new Request("https://twitter.com"); + + request.setPriority(1).putExtra("name", "google"); + request1.setPriority(0).putExtra("name", "facebook"); + request2.setPriority(-1).putExtra("name", "twitter"); + + scheduler.push(request, task); + scheduler.push(request1, task); + scheduler.push(request2, task); + + Request GRequest = scheduler.poll(task); + Request FBRequest = scheduler.poll(task); + Request TRequest = scheduler.poll(task); + + Assert.assertEquals(GRequest.getUrl(), request.getUrl()); + Assert.assertEquals(GRequest.getExtra("name"), request.getExtra("name")); + + Assert.assertEquals(FBRequest.getUrl(), request1.getUrl()); + Assert.assertEquals(FBRequest.getExtra("name"), request.getExtra("name")); + + Assert.assertEquals(TRequest.getUrl(), request2.getUrl()); + Assert.assertEquals(TRequest.getExtra("name"), request.getExtra("name")); + } + +}