Revert " 代理功能扩展,对原代理提供商进行拆分,加入lombok"

This reverts commit 33906e36f4.
pull/993/head
Sutra Zhou 4 years ago
parent aabc5584b8
commit 3f756c9325

@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>us.codecraft</groupId>
<artifactId>webmagic-parent</artifactId>
@ -25,12 +24,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>us.codecraft</groupId>

@ -426,7 +426,6 @@ public class Spider implements Runnable, Task {
}
} else if(site.getRefreshCode().contains(page.getStatusCode())) {
logger.info("page status code error, page {} , code: {}, start refresh downloader", request.getUrl(), page.getStatusCode());
downloader.refreshComponent(this);
failHandler(request);
}else {
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
@ -440,6 +439,7 @@ public class Spider implements Runnable, Task {
}
private void failHandler(Request request){
downloader.refreshComponent(this);
if (site.getCycleRetryTimes() == 0) {
sleep(site.getSleepTime());
} else {

@ -13,8 +13,6 @@ import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.proxy.Proxy;
import us.codecraft.webmagic.proxy.ProxyProvider;
import us.codecraft.webmagic.proxy.RefreshableProxyProvider;
import us.codecraft.webmagic.proxy.ReturnableProxyProvider;
import us.codecraft.webmagic.selector.PlainText;
import us.codecraft.webmagic.utils.CharsetUtils;
import us.codecraft.webmagic.utils.HttpClientUtils;
@ -95,8 +93,8 @@ public class HttpClientDownloader extends AbstractDownloader {
} catch (IOException e) {
logger.warn("download page {} error", request.getUrl(), e);
onError(request, e, proxyProvider);
if (proxyProvider != null && proxy != null && proxyProvider instanceof RefreshableProxyProvider && refreshProxyOnError.test(e)) {
((RefreshableProxyProvider)proxyProvider).refreshProxy(task,proxy);
if (proxyProvider != null && refreshProxyOnError.test(e)) {
proxyProvider.refreshProxy(task,proxy);
}
if(refreshClientOnError.test(e)) {
httpClients.remove(task.getSite().getDomain());
@ -107,9 +105,8 @@ public class HttpClientDownloader extends AbstractDownloader {
//ensure the connection is released back to pool
EntityUtils.consumeQuietly(httpResponse.getEntity());
}
if (proxyProvider != null && proxy != null && proxyProvider instanceof ReturnableProxyProvider) {
((ReturnableProxyProvider) proxyProvider).returnProxy(proxy, page, task);
if (proxyProvider != null && proxy != null) {
proxyProvider.returnProxy(proxy, page, task);
}
}
}
@ -117,8 +114,8 @@ public class HttpClientDownloader extends AbstractDownloader {
@Override
public void refreshComponent(Task task) {
if (proxyProvider != null && proxyProvider instanceof RefreshableProxyProvider) {
((RefreshableProxyProvider) proxyProvider).refreshProxy(task, ((RefreshableProxyProvider) proxyProvider).getCurrentProxy(task));
if (proxyProvider != null ) {
proxyProvider.refreshProxy(task,proxyProvider.getCurrentProxy(task));
}
httpClients.remove(task.getSite().getDomain());

@ -1,135 +0,0 @@
package us.codecraft.webmagic.proxy;
import lombok.extern.slf4j.Slf4j;
import us.codecraft.webmagic.Task;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Comparator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
/**
* @author yaoqiang
*
*/
@Slf4j
public abstract class AbstractRefreshableProxyProvider implements RefreshableProxyProvider {
private final LongAdder totalGet = new LongAdder();
private final LongAdder canUse = new LongAdder();
private final AtomicReference<FutureTask<Proxy>> usedProxyCache = new AtomicReference<>();
private final PriorityBlockingQueue<ExpirableProxy> ipQueue = new PriorityBlockingQueue<>(1000, Comparator.comparing(ExpirableProxy::getExpireTime));
private final int maxHostNum;
public AbstractRefreshableProxyProvider(int maxHostNum) {
this.maxHostNum = maxHostNum;
}
protected void doPut(ExpirableProxy expirableProxy) {
synchronized (ipQueue) {
if (ipQueue.size() <= maxHostNum) {
ipQueue.put(expirableProxy);
}
}
}
@Override
public void refreshProxy(Task task, Proxy proxy) {
if (proxy != null) {
FutureTask<Proxy> proxyFutureTask = usedProxyCache.get();
Proxy currentProxy = getCurrentProxy(task);
// 如果在出错到这里的过程中usedProxyCache被更新过proxy 就不可能相等,如果依然相等,说明没有更新过
// 可能没有使用代理的情况
if (proxy.equals(currentProxy)) {
// 如果此时依然没有更新过,就设置为空
usedProxyCache.compareAndSet(proxyFutureTask, null);
}
}
}
@Override
public Proxy getCurrentProxy(Task task) {
FutureTask<Proxy> cache = usedProxyCache.get();
Proxy currentProxy = null;
try {
if (cache != null)
currentProxy = cache.get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
log.error(e.getCause().getMessage(), e);
} catch (TimeoutException e) {
log.error(e.getMessage(), e);
e.printStackTrace();
}
return currentProxy;
}
private FutureTask<Proxy> buildCacheTask() {
return new FutureTask<>(this::doGet);
}
/**
* cache
*
* @param task
* @return
*/
@Override
public Proxy getProxy(Task task) {
while (!Thread.currentThread().isInterrupted()) {
FutureTask<Proxy> cache = usedProxyCache.get();
if (cache == null) {
FutureTask<Proxy> futureTask = buildCacheTask();
if (usedProxyCache.compareAndSet(null, futureTask)) {
cache = futureTask;
futureTask.run();
} else {
// 交换失败,需要更新到最新数据
cache = usedProxyCache.get();
}
}
try {
if (cache != null) {
ExpirableProxy proxy = (ExpirableProxy) cache.get(5, TimeUnit.SECONDS);
if (!proxy.isExpire())
return proxy;
}
usedProxyCache.compareAndSet(cache, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
usedProxyCache.compareAndSet(cache, null);
} catch (ExecutionException e) {
log.error(e.getMessage(), e);
usedProxyCache.compareAndSet(cache, null);
} catch (TimeoutException e) {
log.error(e.getMessage(), e);
}
}
return null;
}
private Proxy doGet() throws InterruptedException {
ExpirableProxy proxy;
do {
proxy = ipQueue.take();
} while (proxy.isExpire());
log.info("切换到proxyip:{}port:{}ip可用率:{}", proxy.getHost(), proxy.getPort(), BigDecimal.valueOf(canUse.sum()).divide(BigDecimal.valueOf(totalGet.sum()), 2, RoundingMode.HALF_DOWN).doubleValue());
return proxy;
}
}

@ -1,34 +0,0 @@
package us.codecraft.webmagic.proxy;
import org.apache.http.annotation.Contract;
import org.apache.http.annotation.ThreadingBehavior;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
/**
* @author yaoqiang
*
*
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
public class ExpirableProxy extends Proxy {
private final int ttl;
private final LocalDateTime expireTime;
public ExpirableProxy(String host, int port, int ttl, ChronoUnit chronoUnit) {
super(host, port);
this.ttl = ttl;
this.expireTime = LocalDateTime.now().plus(ttl, chronoUnit);
}
public boolean isExpire() {
return LocalDateTime.now().isAfter(expireTime);
}
public LocalDateTime getExpireTime(){
return expireTime;
}
}

@ -7,28 +7,32 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.annotation.Contract;
import org.apache.http.annotation.ThreadingBehavior;
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class Proxy {
private final String scheme;
private String scheme;
private final String host;
private String host;
private final int port;
private int port;
private final String username;
private String username;
private final String password;
private String password;
public Proxy(String host, int port, String scheme, String username, String password) {
this.scheme = scheme;
this.host = host;
this.port = port;
this.username = username;
this.password = password;
public static Proxy create(final URI uri) {
Proxy proxy = new Proxy(uri.getHost(), uri.getPort(), uri.getScheme());
String userInfo = uri.getUserInfo();
if (userInfo != null) {
String[] up = userInfo.split(":");
if (up.length == 1) {
proxy.username = up[0].isEmpty() ? null : up[0];
} else {
proxy.username = up[0].isEmpty() ? null : up[0];
proxy.password = up[1].isEmpty() ? null : up[1];
}
}
return proxy;
}
public Proxy(String host, int port) {
@ -36,30 +40,27 @@ public class Proxy {
}
public Proxy(String host, int port, String scheme) {
this(host, port, scheme, null, null);
this.host = host;
this.port = port;
this.scheme = scheme;
}
public Proxy(String host, int port, String username, String password) {
this(host, port, null, username, password);
this.host = host;
this.port = port;
this.username = username;
this.password = password;
}
public static Proxy create(final URI uri) {
String userInfo = uri.getUserInfo();
String username = null;
String password = null;
if (userInfo != null) {
String[] up = userInfo.split(":");
if (up.length == 1) {
username = up[0].isEmpty() ? null : up[0];
} else {
username = up[0].isEmpty() ? null : up[0];
password = up[1].isEmpty() ? null : up[1];
}
}
return new Proxy(uri.getHost(), uri.getPort(), uri.getScheme(), username, password);
public String getScheme() {
return scheme;
}
public String getHost() {
public void setScheme(String scheme) {
this.scheme = scheme;
}
public String getHost() {
return host;
}
@ -67,8 +68,6 @@ public class Proxy {
return port;
}
public String getScheme(){return scheme;}
public String getUsername() {
return username;
}

@ -1,5 +1,6 @@
package us.codecraft.webmagic.proxy;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Task;
/**
@ -9,6 +10,32 @@ import us.codecraft.webmagic.Task;
*/
public interface ProxyProvider {
/**
*
* Return proxy to Provider when complete a download.
* @param proxy the proxy config contains host,port and identify info
* @param page the download result
* @param task the download task
*/
void returnProxy(Proxy proxy, Page page, Task task);
/**
* IPIPIP使
*
* @param task
* @param proxy
*/
void refreshProxy(Task task,Proxy proxy);
/**
*
*
*
* @param task
* @return
*/
Proxy getCurrentProxy(Task task);
/**
* Get a proxy for task by some strategy.

@ -1,30 +0,0 @@
package us.codecraft.webmagic.proxy;
import us.codecraft.webmagic.Task;
/**
* @author yaoqiang
*
*
*/
public interface RefreshableProxyProvider extends ProxyProvider{
/**
* IPIPIP使
*
* @param task
* @param proxy
*/
void refreshProxy(Task task,Proxy proxy);
/**
*
*
*
* @param task
* @return 使
*/
Proxy getCurrentProxy(Task task);
}

@ -1,22 +0,0 @@
package us.codecraft.webmagic.proxy;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Task;
/**
* @author yaoqiang
*
*
*/
public interface ReturnableProxyProvider {
/**
*
* Return proxy to Provider when complete a download.
* @param proxy the proxy config contains host,port and identify info
* @param page the download result
* @param task the download task
*/
void returnProxy(Proxy proxy, Page page, Task task);
}

@ -1,5 +1,6 @@
package us.codecraft.webmagic.proxy;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Task;
import java.util.ArrayList;
@ -29,6 +30,15 @@ public class SimpleProxyProvider implements ProxyProvider {
this.pointer = pointer;
}
@Override
public Proxy getCurrentProxy(Task task) {
return null;
}
@Override
public void refreshProxy(Task task,Proxy proxy) {
}
public static SimpleProxyProvider from(Proxy... proxies) {
List<Proxy> proxiesTemp = new ArrayList<Proxy>(proxies.length);
@ -38,6 +48,11 @@ public class SimpleProxyProvider implements ProxyProvider {
return new SimpleProxyProvider(Collections.unmodifiableList(proxiesTemp));
}
@Override
public void returnProxy(Proxy proxy, Page page, Task task) {
//Donothing
}
@Override
public Proxy getProxy(Task task) {
return proxies.get(incrForLoop());

Loading…
Cancel
Save