|
|
|
@ -75,7 +75,7 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
/**
|
|
|
|
|
* @param timeout timeout to add item to bag in milliseconds
|
|
|
|
|
*/
|
|
|
|
|
void addBagItem(long timeout);
|
|
|
|
|
void addBagItem();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private ThreadLocal<LinkedList<WeakReference<T>>> threadList;
|
|
|
|
@ -102,7 +102,7 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
* @return a borrowed instance from the bag or null if a timeout occurs
|
|
|
|
|
* @throws InterruptedException if interrupted while waiting
|
|
|
|
|
*/
|
|
|
|
|
public T borrow(int retries, long timeoutMillis) throws InterruptedException
|
|
|
|
|
public T borrow(long timeout, TimeUnit timeUnit) throws InterruptedException
|
|
|
|
|
{
|
|
|
|
|
// Try the thread-local list first
|
|
|
|
|
LinkedList<WeakReference<T>> list = threadList.get();
|
|
|
|
@ -122,13 +122,10 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Otherwise, scan the shared list ... for maximum of timeoutMillis
|
|
|
|
|
final long retryTimeoutMs = (retries > 0 && timeoutMillis > 0) ? Math.max((timeoutMillis / (retries + 1)), 1000) : timeoutMillis;
|
|
|
|
|
long totalTimeoutNs = TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
|
|
|
|
|
boolean tryAddItem = true;
|
|
|
|
|
while (totalTimeoutNs > 0)
|
|
|
|
|
{
|
|
|
|
|
final long startAttempt = System.nanoTime();
|
|
|
|
|
// Otherwise, scan the shared list ... for maximum of timeout
|
|
|
|
|
timeout = timeUnit.toNanos(timeout);
|
|
|
|
|
do {
|
|
|
|
|
final long startScan = System.nanoTime();
|
|
|
|
|
for (T reference : sharedList)
|
|
|
|
|
{
|
|
|
|
|
if (reference.compareAndSetState(STATE_NOT_IN_USE, STATE_IN_USE))
|
|
|
|
@ -137,35 +134,15 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (listener != null && tryAddItem)
|
|
|
|
|
if (listener != null)
|
|
|
|
|
{
|
|
|
|
|
listener.addBagItem(retryTimeoutMs);
|
|
|
|
|
listener.addBagItem();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
totalTimeoutNs -= (System.nanoTime() - startAttempt);
|
|
|
|
|
final long startTryAcquire = System.nanoTime();
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
long timeoutNs = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMs) - (System.nanoTime() - startAttempt);
|
|
|
|
|
if (synchronizer.tryAcquireSharedNanos(startAttempt, timeoutNs))
|
|
|
|
|
{
|
|
|
|
|
// Somebody requited and item while we were waiting, rescan the list but don't try to add a new item
|
|
|
|
|
tryAddItem = false;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
finally
|
|
|
|
|
{
|
|
|
|
|
totalTimeoutNs -= (System.nanoTime() - startTryAcquire);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (retries-- == 0)
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
synchronizer.tryAcquireSharedNanos(startScan, timeout);
|
|
|
|
|
|
|
|
|
|
tryAddItem = true;
|
|
|
|
|
}
|
|
|
|
|
timeout -= (System.nanoTime() - startScan);
|
|
|
|
|
} while (timeout > 0);
|
|
|
|
|
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
@ -317,6 +294,29 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
return synchronizer.getQueueLength();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public int getCount(int state)
|
|
|
|
|
{
|
|
|
|
|
int count = 0;
|
|
|
|
|
for (T reference : sharedList)
|
|
|
|
|
{
|
|
|
|
|
if (reference.getState() == state)
|
|
|
|
|
{
|
|
|
|
|
count++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the total number of items in the bag.
|
|
|
|
|
*
|
|
|
|
|
* @return the number of items in the bag
|
|
|
|
|
*/
|
|
|
|
|
public int size()
|
|
|
|
|
{
|
|
|
|
|
return sharedList.size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Our private synchronizer that handles notify/wait type semantics.
|
|
|
|
|
*/
|
|
|
|
|