From d1140b9e29bd5bfcc1205a4306bd6f553249b924 Mon Sep 17 00:00:00 2001 From: "yihua.huang" Date: Fri, 2 May 2014 20:20:22 +0800 Subject: [PATCH] add bloom filter for scheduler #118 --- ...er.java => DuplicateRemovedScheduler.java} | 26 ++++---- .../webmagic/scheduler/PriorityScheduler.java | 7 ++- .../webmagic/scheduler/QueueScheduler.java | 7 ++- .../BloomFilterDuplicateRemover.java | 61 +++++++++++++++++++ .../scheduler/component/DuplicateRemover.java | 35 +++++++++++ .../HashSetDuplicateRemover.java} | 17 +++--- .../webmagic/scheduler/component/package.html | 5 ++ .../BloomFilterDuplicateRemoverTest.java | 27 ++++++++ .../scheduler/FileCacheQueueScheduler.java | 11 ++-- .../webmagic/scheduler/RedisScheduler.java | 5 +- 10 files changed, 169 insertions(+), 32 deletions(-) rename webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/{DuplicatedRemoveScheduler.java => DuplicateRemovedScheduler.java} (63%) create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/BloomFilterDuplicateRemover.java create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/DuplicateRemover.java rename webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/{LocalDuplicatedRemoveScheduler.java => component/HashSetDuplicateRemover.java} (60%) create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/package.html create mode 100644 webmagic-core/src/test/java/us/codecraft/webmagic/scheduler/BloomFilterDuplicateRemoverTest.java diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicatedRemoveScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicateRemovedScheduler.java similarity index 63% rename from webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicatedRemoveScheduler.java rename to webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicateRemovedScheduler.java index 4b70b83c..558ffdb0 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicatedRemoveScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicateRemovedScheduler.java @@ -4,6 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.scheduler.component.DuplicateRemover; /** * Remove duplicate urls and only push urls which are not duplicate.

@@ -11,30 +12,29 @@ import us.codecraft.webmagic.Task; * @author code4crafer@gmail.com * @since 0.5.0 */ -public abstract class DuplicatedRemoveScheduler implements Scheduler { +public abstract class DuplicateRemovedScheduler implements Scheduler { protected Logger logger = LoggerFactory.getLogger(getClass()); + private DuplicateRemover duplicatedRemover; + + public DuplicateRemover getDuplicateRemover() { + return duplicatedRemover; + } + + public void setDuplicateRemover(DuplicateRemover duplicatedRemover) { + this.duplicatedRemover = duplicatedRemover; + } + @Override public void push(Request request, Task task) { logger.trace("get a candidate url {}", request.getUrl()); - if (!isDuplicate(request, task) || shouldReserved(request)) { + if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) { logger.debug("push to queue {}", request.getUrl()); pushWhenNoDuplicate(request, task); } } - /** - * Reset duplicate check. - */ - public abstract void resetDuplicateCheck(Task task); - - /** - * @param request - * @return - */ - protected abstract boolean isDuplicate(Request request, Task task); - protected boolean shouldReserved(Request request) { return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java index 38c9b6cc..8fa1b9ea 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java @@ -17,7 +17,7 @@ import java.util.concurrent.PriorityBlockingQueue; * @since 0.2.1 */ @ThreadSafe -public class PriorityScheduler extends LocalDuplicatedRemoveScheduler { +public class PriorityScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler { public static final int INITIAL_CAPACITY = 5; @@ -65,4 +65,9 @@ public class PriorityScheduler extends LocalDuplicatedRemoveScheduler { public int getLeftRequestsCount(Task task) { return noPriorityQueue.size(); } + + @Override + public int getTotalRequestsCount(Task task) { + return getDuplicateRemover().getTotalRequestsCount(task); + } } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java index 511d8a0a..c38311f2 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java @@ -16,7 +16,7 @@ import java.util.concurrent.LinkedBlockingQueue; * @since 0.1.0 */ @ThreadSafe -public class QueueScheduler extends LocalDuplicatedRemoveScheduler { +public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler { private BlockingQueue queue = new LinkedBlockingQueue(); @@ -34,4 +34,9 @@ public class QueueScheduler extends LocalDuplicatedRemoveScheduler { public int getLeftRequestsCount(Task task) { return queue.size(); } + + @Override + public int getTotalRequestsCount(Task task) { + return getDuplicateRemover().getTotalRequestsCount(task); + } } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/BloomFilterDuplicateRemover.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/BloomFilterDuplicateRemover.java new file mode 100644 index 00000000..d16c3add --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/BloomFilterDuplicateRemover.java @@ -0,0 +1,61 @@ +package us.codecraft.webmagic.scheduler.component; + +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; + +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * BloomFilterDuplicateRemover for huge number of urls. + * + * @author code4crafer@gmail.com + * @since 0.5.1 + */ +public class BloomFilterDuplicateRemover implements DuplicateRemover { + + private int expectedInsertions; + + private double fpp; + + private AtomicInteger counter; + + public BloomFilterDuplicateRemover(int expectedInsertions) { + this(expectedInsertions, 0.03); + } + + public BloomFilterDuplicateRemover(int expectedInsertions, double fpp) { + this.expectedInsertions = expectedInsertions; + this.fpp = fpp; + this.bloomFilter = rebuildBloomFilter(); + } + + protected BloomFilter rebuildBloomFilter() { + counter = new AtomicInteger(0); + return BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedInsertions, fpp); + } + + private final BloomFilter bloomFilter; + + @Override + public boolean isDuplicate(Request request, Task task) { + boolean isDuplicate = bloomFilter.mightContain(request.getUrl()); + if (!isDuplicate) { + bloomFilter.apply(request.getUrl()); + counter.incrementAndGet(); + } + return isDuplicate; + } + + @Override + public void resetDuplicateCheck(Task task) { + rebuildBloomFilter(); + } + + @Override + public int getTotalRequestsCount(Task task) { + return counter.get(); + } +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/DuplicateRemover.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/DuplicateRemover.java new file mode 100644 index 00000000..fa88976b --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/DuplicateRemover.java @@ -0,0 +1,35 @@ +package us.codecraft.webmagic.scheduler.component; + +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; + +/** + * Remove duplicate requests. + * @author code4crafer@gmail.com + * @since 0.5.1 + */ +public interface DuplicateRemover { + /** + * + * Check whether the request is duplicate. + * + * @param request + * @param task + * @return + */ + public boolean isDuplicate(Request request, Task task); + + /** + * Reset duplicate check. + * @param task + */ + public void resetDuplicateCheck(Task task); + + /** + * Get TotalRequestsCount for monitor. + * @param task + * @return + */ + public int getTotalRequestsCount(Task task); + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemoveScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/HashSetDuplicateRemover.java similarity index 60% rename from webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemoveScheduler.java rename to webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/HashSetDuplicateRemover.java index a1b0cabd..f8bcf268 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemoveScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/HashSetDuplicateRemover.java @@ -1,4 +1,4 @@ -package us.codecraft.webmagic.scheduler; +package us.codecraft.webmagic.scheduler.component; import com.google.common.collect.Sets; import us.codecraft.webmagic.Request; @@ -8,23 +8,20 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** - * Base Scheduler with duplicated urls removed by hash set.

- * - * @author code4crafter@gmail.com - * @since 0.5.0 + * @author code4crafer@gmail.com */ -public abstract class LocalDuplicatedRemoveScheduler extends DuplicatedRemoveScheduler implements MonitorableScheduler { +public class HashSetDuplicateRemover implements DuplicateRemover { private Set urls = Sets.newSetFromMap(new ConcurrentHashMap()); @Override - public void resetDuplicateCheck(Task task) { - urls.clear(); + public boolean isDuplicate(Request request, Task task) { + return !urls.add(request.getUrl()); } @Override - protected boolean isDuplicate(Request request, Task task) { - return !urls.add(request.getUrl()); + public void resetDuplicateCheck(Task task) { + urls.clear(); } @Override diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/package.html b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/package.html new file mode 100644 index 00000000..213707c1 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/component/package.html @@ -0,0 +1,5 @@ + + +Component of scheduler. + + diff --git a/webmagic-core/src/test/java/us/codecraft/webmagic/scheduler/BloomFilterDuplicateRemoverTest.java b/webmagic-core/src/test/java/us/codecraft/webmagic/scheduler/BloomFilterDuplicateRemoverTest.java new file mode 100644 index 00000000..b6fc5e08 --- /dev/null +++ b/webmagic-core/src/test/java/us/codecraft/webmagic/scheduler/BloomFilterDuplicateRemoverTest.java @@ -0,0 +1,27 @@ +package us.codecraft.webmagic.scheduler; + +import org.junit.Test; +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.scheduler.component.BloomFilterDuplicateRemover; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author code4crafer@gmail.com + */ +public class BloomFilterDuplicateRemoverTest { + + @Test + public void testRemove() throws Exception { + BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover(10); + boolean isDuplicate = bloomFilterDuplicateRemover.isDuplicate(new Request("a"), null); + assertThat(isDuplicate).isFalse(); + isDuplicate = bloomFilterDuplicateRemover.isDuplicate(new Request("a"), null); + assertThat(isDuplicate); + isDuplicate = bloomFilterDuplicateRemover.isDuplicate(new Request("b"), null); + assertThat(isDuplicate).isFalse(); + isDuplicate = bloomFilterDuplicateRemover.isDuplicate(new Request("b"), null); + assertThat(isDuplicate); + + } +} diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java index 4215ab83..211b6989 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java @@ -2,8 +2,6 @@ package us.codecraft.webmagic.scheduler; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.math.NumberUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Task; @@ -23,9 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @author code4crafter@gmail.com
* @since 0.2.0 */ -public class FileCacheQueueScheduler extends LocalDuplicatedRemoveScheduler { - - private Logger logger = LoggerFactory.getLogger(getClass()); +public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler { private String filePath = System.getProperty("java.io.tmpdir"); @@ -166,4 +162,9 @@ public class FileCacheQueueScheduler extends LocalDuplicatedRemoveScheduler { public int getLeftRequestsCount(Task task) { return queue.size(); } + + @Override + public int getTotalRequestsCount(Task task) { + return getDuplicateRemover().getTotalRequestsCount(task); + } } diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java index 338f5af1..067ba639 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java @@ -7,6 +7,7 @@ import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.scheduler.component.DuplicateRemover; /** * Use Redis as url scheduler for distributed crawlers.
@@ -14,7 +15,7 @@ import us.codecraft.webmagic.Task; * @author code4crafter@gmail.com
* @since 0.2.0 */ -public class RedisScheduler extends DuplicatedRemoveScheduler implements MonitorableScheduler { +public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover { private JedisPool pool; @@ -43,7 +44,7 @@ public class RedisScheduler extends DuplicatedRemoveScheduler implements Monitor } @Override - protected boolean isDuplicate(Request request, Task task) { + public boolean isDuplicate(Request request, Task task) { Jedis jedis = pool.getResource(); try { boolean isDuplicate = jedis.sismember(getSetKey(task), request.getUrl());