diff --git a/webmagic-core/pom.xml b/webmagic-core/pom.xml index 4b89cac1..0cea05fe 100644 --- a/webmagic-core/pom.xml +++ b/webmagic-core/pom.xml @@ -1,5 +1,6 @@ - + us.codecraft webmagic-parent @@ -24,6 +25,12 @@ org.apache.commons commons-lang3 + + org.projectlombok + lombok + 1.18.10 + provided + us.codecraft diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index bc07651a..dfca9dd9 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -421,6 +421,7 @@ public class Spider implements Runnable, Task { } } else if(site.getRefreshCode().contains(page.getStatusCode())) { logger.info("page status code error, page {} , code: {}, start refresh downloader", request.getUrl(), page.getStatusCode()); + downloader.refreshComponent(this); failHandler(request); }else { logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode()); @@ -434,7 +435,6 @@ public class Spider implements Runnable, Task { } private void failHandler(Request request){ - downloader.refreshComponent(this); if (site.getCycleRetryTimes() == 0) { sleep(site.getSleepTime()); } else { diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java b/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java index ace81755..8e8676d0 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java @@ -13,6 +13,8 @@ import us.codecraft.webmagic.Site; import us.codecraft.webmagic.Task; import us.codecraft.webmagic.proxy.Proxy; import us.codecraft.webmagic.proxy.ProxyProvider; +import us.codecraft.webmagic.proxy.RefreshableProxyProvider; +import us.codecraft.webmagic.proxy.ReturnableProxyProvider; import us.codecraft.webmagic.selector.PlainText; import us.codecraft.webmagic.utils.CharsetUtils; import us.codecraft.webmagic.utils.HttpClientUtils; @@ -93,8 +95,8 @@ public class HttpClientDownloader extends AbstractDownloader { } catch (IOException e) { logger.warn("download page {} error", request.getUrl(), e); onError(request, e, proxyProvider); - if (proxyProvider != null && refreshProxyOnError.test(e)) { - proxyProvider.refreshProxy(task,proxy); + if (proxyProvider != null && proxy != null && proxyProvider instanceof RefreshableProxyProvider && refreshProxyOnError.test(e)) { + ((RefreshableProxyProvider)proxyProvider).refreshProxy(task,proxy); } if(refreshClientOnError.test(e)) { httpClients.remove(task.getSite().getDomain()); @@ -105,8 +107,9 @@ public class HttpClientDownloader extends AbstractDownloader { //ensure the connection is released back to pool EntityUtils.consumeQuietly(httpResponse.getEntity()); } - if (proxyProvider != null && proxy != null) { - proxyProvider.returnProxy(proxy, page, task); + if (proxyProvider != null && proxy != null && proxyProvider instanceof ReturnableProxyProvider) { + ((ReturnableProxyProvider) proxyProvider).returnProxy(proxy, page, task); + } } } @@ -114,8 +117,8 @@ public class HttpClientDownloader extends AbstractDownloader { @Override public void refreshComponent(Task task) { - if (proxyProvider != null ) { - proxyProvider.refreshProxy(task,proxyProvider.getCurrentProxy(task)); + if (proxyProvider != null && proxyProvider instanceof RefreshableProxyProvider) { + ((RefreshableProxyProvider) proxyProvider).refreshProxy(task, ((RefreshableProxyProvider) proxyProvider).getCurrentProxy(task)); } httpClients.remove(task.getSite().getDomain()); diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java new file mode 100644 index 00000000..8e7cb08a --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java @@ -0,0 +1,135 @@ +package us.codecraft.webmagic.proxy; + +import lombok.extern.slf4j.Slf4j; +import us.codecraft.webmagic.Task; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Comparator; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; + +/** + * @author yaoqiang + * 可刷新的代理提供商抽象实现 + */ +@Slf4j +public abstract class AbstractRefreshableProxyProvider implements RefreshableProxyProvider { + + private final LongAdder totalGet = new LongAdder(); + + private final LongAdder canUse = new LongAdder(); + + private final AtomicReference> usedProxyCache = new AtomicReference<>(); + + private final PriorityBlockingQueue ipQueue = new PriorityBlockingQueue<>(1000, Comparator.comparing(ExpirableProxy::getExpireTime)); + + private final int maxHostNum; + + public AbstractRefreshableProxyProvider(int maxHostNum) { + this.maxHostNum = maxHostNum; + } + + protected void doPut(ExpirableProxy expirableProxy) { + synchronized (ipQueue) { + if (ipQueue.size() <= maxHostNum) { + ipQueue.put(expirableProxy); + } + } + } + + @Override + public void refreshProxy(Task task, Proxy proxy) { + if (proxy != null) { + FutureTask proxyFutureTask = usedProxyCache.get(); + Proxy currentProxy = getCurrentProxy(task); + // 如果在出错到这里的过程中,usedProxyCache被更新过,proxy 就不可能相等,如果依然相等,说明没有更新过 + // 可能没有使用代理的情况 + if (proxy.equals(currentProxy)) { + // 如果此时依然没有更新过,就设置为空 + usedProxyCache.compareAndSet(proxyFutureTask, null); + } + } + } + + @Override + public Proxy getCurrentProxy(Task task) { + FutureTask cache = usedProxyCache.get(); + Proxy currentProxy = null; + try { + if (cache != null) + currentProxy = cache.get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + log.error(e.getMessage(), e); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + e.printStackTrace(); + log.error(e.getCause().getMessage(), e); + } catch (TimeoutException e) { + log.error(e.getMessage(), e); + e.printStackTrace(); + } + return currentProxy; + } + + + private FutureTask buildCacheTask() { + return new FutureTask<>(this::doGet); + } + + + /** + * 特别注意,防止活锁,集cache中总是抛出异常,那么将无限循环,无限报错 + * + * @param task 下载任务 + * @return 返回代理 + */ + @Override + public Proxy getProxy(Task task) { + while (!Thread.currentThread().isInterrupted()) { + FutureTask cache = usedProxyCache.get(); + if (cache == null) { + FutureTask futureTask = buildCacheTask(); + if (usedProxyCache.compareAndSet(null, futureTask)) { + cache = futureTask; + futureTask.run(); + } else { + // 交换失败,需要更新到最新数据 + cache = usedProxyCache.get(); + } + } + try { + if (cache != null) { + + ExpirableProxy proxy = (ExpirableProxy) cache.get(5, TimeUnit.SECONDS); + if (!proxy.isExpire()) + return proxy; + } + usedProxyCache.compareAndSet(cache, null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error(e.getMessage(), e); + usedProxyCache.compareAndSet(cache, null); + } catch (ExecutionException e) { + log.error(e.getMessage(), e); + usedProxyCache.compareAndSet(cache, null); + } catch (TimeoutException e) { + log.error(e.getMessage(), e); + } + } + return null; + } + + private Proxy doGet() throws InterruptedException { + ExpirableProxy proxy; + do { + proxy = ipQueue.take(); + } while (proxy.isExpire()); + log.info("切换到proxy:ip:{},port:{},ip可用率:{}", proxy.getHost(), proxy.getPort(), BigDecimal.valueOf(canUse.sum()).divide(BigDecimal.valueOf(totalGet.sum()), 2, RoundingMode.HALF_DOWN).doubleValue()); + return proxy; + } + + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java new file mode 100644 index 00000000..f23caaf5 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java @@ -0,0 +1,34 @@ +package us.codecraft.webmagic.proxy; + +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; + +/** + * @author yaoqiang + * + * 可以过期的代理 + */ +@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL) +public class ExpirableProxy extends Proxy { + private final int ttl; + private final LocalDateTime expireTime; + + + public ExpirableProxy(String host, int port, int ttl, ChronoUnit chronoUnit) { + super(host, port); + this.ttl = ttl; + this.expireTime = LocalDateTime.now().plus(ttl, chronoUnit); + + } + + public boolean isExpire() { + return LocalDateTime.now().isAfter(expireTime); + } + public LocalDateTime getExpireTime(){ + return expireTime; + } + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/Proxy.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/Proxy.java index 6554fab5..ffae4be4 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/Proxy.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/Proxy.java @@ -6,33 +6,30 @@ import java.net.URISyntaxException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import jdk.nashorn.internal.ir.annotations.Immutable; import org.apache.commons.lang3.StringUtils; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; +@Contract(threading = ThreadingBehavior.IMMUTABLE) public class Proxy { - private String scheme; + private final String scheme; - private String host; + private final String host; - private int port; + private final int port; - private String username; + private final String username; - private String password; + private final String password; - public static Proxy create(final URI uri) { - Proxy proxy = new Proxy(uri.getHost(), uri.getPort(), uri.getScheme()); - String userInfo = uri.getUserInfo(); - if (userInfo != null) { - String[] up = userInfo.split(":"); - if (up.length == 1) { - proxy.username = up[0].isEmpty() ? null : up[0]; - } else { - proxy.username = up[0].isEmpty() ? null : up[0]; - proxy.password = up[1].isEmpty() ? null : up[1]; - } - } - return proxy; + public Proxy(String host, int port, String scheme, String username, String password) { + this.scheme = scheme; + this.host = host; + this.port = port; + this.username = username; + this.password = password; } public Proxy(String host, int port) { @@ -40,27 +37,30 @@ public class Proxy { } public Proxy(String host, int port, String scheme) { - this.host = host; - this.port = port; - this.scheme = scheme; + this(host, port, scheme, null, null); } public Proxy(String host, int port, String username, String password) { - this.host = host; - this.port = port; - this.username = username; - this.password = password; + this(host, port, null, username, password); } - public String getScheme() { - return scheme; - } - - public void setScheme(String scheme) { - this.scheme = scheme; + public static Proxy create(final URI uri) { + String userInfo = uri.getUserInfo(); + String username = null; + String password = null; + if (userInfo != null) { + String[] up = userInfo.split(":"); + if (up.length == 1) { + username = up[0].isEmpty() ? null : up[0]; + } else { + username = up[0].isEmpty() ? null : up[0]; + password = up[1].isEmpty() ? null : up[1]; + } + } + return new Proxy(uri.getHost(), uri.getPort(), uri.getScheme(), username, password); } - public String getHost() { + public String getHost() { return host; } @@ -68,6 +68,8 @@ public class Proxy { return port; } + public String getScheme(){return scheme;} + public String getUsername() { return username; } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyProvider.java index b4e7b484..b567d582 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyProvider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyProvider.java @@ -1,6 +1,5 @@ package us.codecraft.webmagic.proxy; -import us.codecraft.webmagic.Page; import us.codecraft.webmagic.Task; /** @@ -10,32 +9,6 @@ import us.codecraft.webmagic.Task; */ public interface ProxyProvider { - /** - * - * Return proxy to Provider when complete a download. - * @param proxy the proxy config contains host,port and identify info - * @param page the download result - * @param task the download task - */ - void returnProxy(Proxy proxy, Page page, Task task); - - /** - * 代理IP是珍贵资源,有可能代理提供者内部代理没有过期,就一直提供某个IP,但这个IP又不可以使用,所以提供一种方式通知提供者,这个代理该刷新了 - * - * @param task 下载任务 - * @param proxy 需要对代理进行验证,如果确实持有的时错误代理,则刷新,否则,继续执行 - */ - void refreshProxy(Task task,Proxy proxy); - - - /** - * - * 获取当前正在提供的代理 - * - * @param task - * @return - */ - Proxy getCurrentProxy(Task task); /** * Get a proxy for task by some strategy. diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/RefreshableProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/RefreshableProxyProvider.java new file mode 100644 index 00000000..77e1ce2c --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/RefreshableProxyProvider.java @@ -0,0 +1,30 @@ +package us.codecraft.webmagic.proxy; + +import us.codecraft.webmagic.Task; + +/** + * @author yaoqiang + * + * 可以手动刷新的代理供应商 + */ +public interface RefreshableProxyProvider extends ProxyProvider{ + + /** + * 代理IP是珍贵资源,有可能代理提供者内部代理没有过期,就一直提供某个IP,但这个IP又不可以使用,所以提供一种方式通知提供者,这个代理该刷新了 + * + * @param task 爬虫任务 + * @param proxy 需要对代理进行验证,如果确实持有的时错误代理,则刷新,否则,继续执行 + */ + void refreshProxy(Task task,Proxy proxy); + + + /** + * + * 获取当前正在提供的代理 + * + * @param task 工作中的爬虫任务 + * @return 获取当前正在使用的代理 + */ + Proxy getCurrentProxy(Task task); + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ReturnableProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ReturnableProxyProvider.java new file mode 100644 index 00000000..43b49fc3 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ReturnableProxyProvider.java @@ -0,0 +1,22 @@ +package us.codecraft.webmagic.proxy; + +import us.codecraft.webmagic.Page; +import us.codecraft.webmagic.Task; + +/** + * @author yaoqiang + * + * 可归还的代理提供商,代理被取出后,实用完成,可以归还给代理提供商 + */ +public interface ReturnableProxyProvider { + + /** + * + * Return proxy to Provider when complete a download. + * @param proxy the proxy config contains host,port and identify info + * @param page the download result + * @param task the download task + */ + void returnProxy(Proxy proxy, Page page, Task task); + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/SimpleProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/SimpleProxyProvider.java index 8ad9ce7b..fda3e238 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/SimpleProxyProvider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/SimpleProxyProvider.java @@ -1,6 +1,5 @@ package us.codecraft.webmagic.proxy; -import us.codecraft.webmagic.Page; import us.codecraft.webmagic.Task; import java.util.ArrayList; @@ -30,15 +29,6 @@ public class SimpleProxyProvider implements ProxyProvider { this.pointer = pointer; } - @Override - public Proxy getCurrentProxy(Task task) { - return null; - } - - @Override - public void refreshProxy(Task task,Proxy proxy) { - - } public static SimpleProxyProvider from(Proxy... proxies) { List proxiesTemp = new ArrayList(proxies.length); @@ -48,11 +38,6 @@ public class SimpleProxyProvider implements ProxyProvider { return new SimpleProxyProvider(Collections.unmodifiableList(proxiesTemp)); } - @Override - public void returnProxy(Proxy proxy, Page page, Task task) { - //Donothing - } - @Override public Proxy getProxy(Task task) { return proxies.get(incrForLoop());