fix getProxy to make poll and offer atomic

pull/819/head
延之 7 years ago
parent f272796de6
commit 5e863d28d4

@ -27,6 +27,8 @@ public abstract class EffectiveProxyProvider implements ProxyProvider {
private final ReentrantLock addProxyLock = new ReentrantLock();
private final ReentrantLock pollAndOfferLock = new ReentrantLock();
private int expandPoolSize = DEFAULT_EXPAND_POOL_SIZE;
private ProxyPageValidator proxyPageValidator;
@ -59,32 +61,43 @@ public abstract class EffectiveProxyProvider implements ProxyProvider {
@Override
public Proxy getProxy(Task task) {
//async addProxy and avoid invoke extra times
if (validProxyQueue.size() <= expandPoolSize) {
addProxyPool.submit(new Runnable() {
@Override public void run() {
if (addProxyLock.tryLock()) {
try {
List<Proxy> newProxies = addProxies();
if (CollectionUtils.isNotEmpty(newProxies)) {
validProxyQueue.addAll(newProxies);
}
}finally {
addProxyLock.unlock();
}
}
}
});
//make atomic poll and offer
pollAndOfferLock.lock();
try {
Proxy proxy = validProxyQueue.poll();
if (proxy != null) {
//put tail realize loop
validProxyQueue.offer(proxy);
}
//get more proxies when queue capacity less than expect
if (validProxyQueue.size() <= expandPoolSize) {
expand();
}
return proxy;
}finally {
pollAndOfferLock.unlock();
}
}
Proxy proxy = validProxyQueue.poll();
if (proxy == null) {
return null;
}
//put tail realize loop
validProxyQueue.offer(proxy);
//async addProxy and avoid invoke extra times
public void expand(){
return proxy;
if (addProxyLock.tryLock()) {
try {
addProxyPool.submit(new Runnable() {
@Override public void run() {
List<Proxy> newProxies = addProxies();
if (CollectionUtils.isNotEmpty(newProxies)) {
validProxyQueue.addAll(newProxies);
}
}
});
} finally {
addProxyLock.unlock();
}
}
}
@Override public void returnProxy(Proxy proxy, Page page, Task task) {

Loading…
Cancel
Save