Revert " bug修改,对结果提供缓存能力"

This reverts commit f68795d7dd.
pull/993/head
Sutra Zhou 4 years ago
parent 57dfc7cfb3
commit aabc5584b8

@ -1,18 +0,0 @@
package us.codecraft.webmagic.pipeline;
import java.util.Collection;
/**
* @author yaoqiang
*
* pipeline
*
*/
public interface CachePipeline<T> extends Pipeline{
/**
* @param collection
*
*/
void process(Collection<T> collection);
}

@ -1,87 +0,0 @@
package us.codecraft.webmagic.pipeline;
import lombok.extern.slf4j.Slf4j;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
/**
* @author yaoqiang
*
* <p>
* <p>
* {@link ExecutorService}, {@link ExecutorService}
* @see ExecutorService
*/
@Slf4j
public abstract class CloseableCachePipeline implements CachePipeline<ResultItems>, Closeable {
private final BlockingQueue<ResultItems> cache;
private final ExecutorService executorService;
public CloseableCachePipeline(int max, ExecutorService executorService) {
this.cache = new ArrayBlockingQueue<>(max, false);
this.executorService = executorService;
}
public CloseableCachePipeline(int max) {
this(max, null);
}
/**
* @param resultItems
* @param task
*/
@Override
public final void process(ResultItems resultItems, Task task) {
try {
cache.put(resultItems);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
}
if (cache.remainingCapacity() == 0) {
// set 中的resultItem 使用权依然传递了出去cache的使用全保留考虑到后面也用不上 resultItem所以发布出去问题也不大
// temp的修改权限被发布理由同上想添加删除都可以反正以后也用不上了;
Set<ResultItems> temp = new HashSet<>(cache);
if (executorService != null && !executorService.isShutdown()) {
executorService.execute(() -> process(temp, task));
} else {
process(temp, task);
}
cache.clear();
}
}
protected abstract void process(Collection<ResultItems> resultItems, Task task);
private synchronized void flush(Collection<ResultItems> resultItems) {
process(resultItems, null);
}
/**
* 线
*
* @throws IOException
*/
@Override
public final void close() throws IOException {
if (!cache.isEmpty()) {
flush(cache);
cache.clear();
}
}
}

@ -3,9 +3,12 @@ package us.codecraft.webmagic.proxy;
import lombok.extern.slf4j.Slf4j;
import us.codecraft.webmagic.Task;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Comparator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
/**
* @author yaoqiang
@ -14,18 +17,26 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public abstract class AbstractRefreshableProxyProvider implements RefreshableProxyProvider {
private final LongAdder totalGet = new LongAdder();
private final LongAdder canUse = new LongAdder();
private final AtomicReference<FutureTask<Proxy>> usedProxyCache = new AtomicReference<>();
private final PriorityBlockingQueue<ExpirableProxy> ipQueue = new PriorityBlockingQueue<>(1000, Comparator.comparing(ExpirableProxy::getExpireTime));
private final int maxHostNum;
protected void doPut(ExpirableProxy expirableProxy) {
ipQueue.put(expirableProxy);
public AbstractRefreshableProxyProvider(int maxHostNum) {
this.maxHostNum = maxHostNum;
}
protected int hostSize() {
return ipQueue.size();
protected void doPut(ExpirableProxy expirableProxy) {
synchronized (ipQueue) {
if (ipQueue.size() <= maxHostNum) {
ipQueue.put(expirableProxy);
}
}
}
@Override
@ -116,6 +127,7 @@ public abstract class AbstractRefreshableProxyProvider implements RefreshablePro
do {
proxy = ipQueue.take();
} while (proxy.isExpire());
log.info("切换到proxyip:{}port:{}ip可用率:{}", proxy.getHost(), proxy.getPort(), BigDecimal.valueOf(canUse.sum()).divide(BigDecimal.valueOf(totalGet.sum()), 2, RoundingMode.HALF_DOWN).doubleValue());
return proxy;
}

@ -1,6 +1,5 @@
package us.codecraft.webmagic.proxy;
import lombok.Getter;
import org.apache.http.annotation.Contract;
import org.apache.http.annotation.ThreadingBehavior;
@ -14,7 +13,6 @@ import java.time.temporal.ChronoUnit;
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
public class ExpirableProxy extends Proxy {
@Getter
private final int ttl;
private final LocalDateTime expireTime;

Loading…
Cancel
Save