@ -4,9 +4,9 @@
package queue
import (
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@ -16,10 +16,7 @@ import (
)
func TestPersistableChannelUniqueQueue ( t * testing . T ) {
if os . Getenv ( "CI" ) != "" {
t . Skip ( "Skipping because test is flaky on CI" )
}
// Create a temporary directory for the queue
tmpDir := t . TempDir ( )
_ = log . NewLogger ( 1000 , "console" , "console" , ` { "level":"warn","stacktracelevel":"NONE","stderr":true} ` )
@ -100,7 +97,7 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
executedInitial := map [ string ] [ ] string { }
hasInitial := map [ string ] [ ] string { }
fillQueue := func ( name string , done chan struct { } ) {
fillQueue := func ( name string , done chan int64 ) {
t . Run ( "Initial Filling: " + name , func ( t * testing . T ) {
lock := sync . Mutex { }
@ -157,33 +154,39 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
assert . Equal ( t , 101 , len ( executedInitial [ name ] ) + len ( hasInitial [ name ] ) )
mapLock . Unlock ( )
} )
mapLock . Lock ( )
count := int64 ( len ( hasInitial [ name ] ) )
mapLock . Unlock ( )
done <- count
close ( done )
}
doneA := make ( chan struct { } )
doneB := make ( chan struct { } )
hasQueueAChan := make ( chan int64 )
hasQueueBChan := make ( chan int64 )
go fillQueue ( "QueueA" , doneA )
go fillQueue ( "QueueB" , doneB )
go fillQueue ( "QueueA" , hasQueueAChan )
go fillQueue ( "QueueB" , hasQueueBChan )
<- doneA
<- doneB
hasA := <- hasQueueAChan
hasB := <- hasQueueBChan
executedEmpty := map [ string ] [ ] string { }
hasEmpty := map [ string ] [ ] string { }
emptyQueue := func ( name string , done chan struct { } ) {
emptyQueue := func ( name string , numInQueue int64 , done chan struct { } ) {
t . Run ( "Empty Queue: " + name , func ( t * testing . T ) {
lock := sync . Mutex { }
stop := make ( chan struct { } )
// collect the tasks that have been executed
atomicCount := int64 ( 0 )
handle := func ( data ... Data ) [ ] Data {
lock . Lock ( )
for _ , datum := range data {
mapLock . Lock ( )
executedEmpty [ name ] = append ( executedEmpty [ name ] , datum . ( string ) )
mapLock . Unlock ( )
if datum . ( string ) == "final" {
count := atomic . AddInt64 ( & atomicCount , 1 )
if count >= numInQueue {
close ( stop )
}
}
@ -217,11 +220,11 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
close ( done )
}
doneA = make ( chan struct { } )
doneB = make ( chan struct { } )
doneA : = make ( chan struct { } )
doneB : = make ( chan struct { } )
go emptyQueue ( "QueueA" , doneA)
go emptyQueue ( "QueueB" , doneB)
go emptyQueue ( "QueueA" , hasA, doneA)
go emptyQueue ( "QueueB" , hasB, doneB)
<- doneA
<- doneB
@ -237,20 +240,20 @@ func TestPersistableChannelUniqueQueue(t *testing.T) {
hasEmpty = map [ string ] [ ] string { }
mapLock . Unlock ( )
doneA = make ( chan struct { } )
doneB = make ( chan struct { } )
hasQueueAChan = make ( chan int64 )
hasQueueBChan = make ( chan int64 )
go fillQueue ( "QueueA" , doneA )
go fillQueue ( "QueueB" , doneB )
go fillQueue ( "QueueA" , hasQueueAChan )
go fillQueue ( "QueueB" , hasQueueBChan )
<- doneA
<- doneB
hasA = <- hasQueueAChan
hasB = <- hasQueueBChan
doneA = make ( chan struct { } )
doneB = make ( chan struct { } )
go emptyQueue ( "QueueA" , doneA)
go emptyQueue ( "QueueB" , doneB)
go emptyQueue ( "QueueA" , hasA, doneA)
go emptyQueue ( "QueueB" , hasB, doneB)
<- doneA
<- doneB