|
|
|
@ -119,8 +119,8 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Otherwise, scan the shared list ... for maximum of timeout
|
|
|
|
|
final long retryTimeout = (retries > 0 && timeoutMillis > 0) ? Math.max((timeoutMillis / (retries + 1)), 50) : timeoutMillis;
|
|
|
|
|
// Otherwise, scan the shared list ... for maximum of timeoutMillis
|
|
|
|
|
final long retryTimeoutMs = (retries > 0 && timeoutMillis > 0) ? Math.max((timeoutMillis / (retries + 1)), 50) : timeoutMillis;
|
|
|
|
|
long totalTimeoutNs = TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
|
|
|
|
|
boolean tryAddItem = true;
|
|
|
|
|
while (totalTimeoutNs > 0)
|
|
|
|
@ -136,16 +136,17 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
|
|
|
|
|
if (listener != null && tryAddItem)
|
|
|
|
|
{
|
|
|
|
|
listener.addBagItem(retryTimeout);
|
|
|
|
|
listener.addBagItem(retryTimeoutMs);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
totalTimeoutNs -= (System.nanoTime() - startAttempt);
|
|
|
|
|
final long startTryAcquire = System.nanoTime();
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
long timeoutNs = TimeUnit.MILLISECONDS.toNanos(retryTimeout) - (System.nanoTime() - startAttempt);
|
|
|
|
|
long timeoutNs = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMs) - (System.nanoTime() - startAttempt);
|
|
|
|
|
if (synchronizer.tryAcquireSharedNanos(startAttempt, timeoutNs))
|
|
|
|
|
{
|
|
|
|
|
// Somebody requited and item while we were waiting, rescan the list but don't try to add a new item
|
|
|
|
|
tryAddItem = false;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@ -319,11 +320,24 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
private static class Synchronizer extends AbstractQueuedLongSynchronizer
|
|
|
|
|
{
|
|
|
|
|
private static final long serialVersionUID = 104753538004341218L;
|
|
|
|
|
private static boolean JAVA7;
|
|
|
|
|
|
|
|
|
|
static
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
JAVA7 = AbstractQueuedLongSynchronizer.class.getMethod("hasQueuedPredecessors", new Class<?>[0]) != null;
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
{
|
|
|
|
|
// nothing
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected long tryAcquireShared(long startScanTime)
|
|
|
|
|
{
|
|
|
|
|
return getState() > startScanTime ? 1 : -1;
|
|
|
|
|
return getState() >= startScanTime && !java67hasQueuedPredecessors() ? 1 : -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
|
|
@ -334,5 +348,15 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean java67hasQueuedPredecessors()
|
|
|
|
|
{
|
|
|
|
|
if (JAVA7)
|
|
|
|
|
{
|
|
|
|
|
return hasQueuedPredecessors();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|