-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBuffer.groovy
91 lines (76 loc) · 2.43 KB
/
Buffer.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.BlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Supplier
/**
* @author Jonatan Ivanov
*/
class Buffer<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(Buffer.class)
private final BlockingQueue<T> queue = new LinkedBlockingQueue<>()
private final AtomicInteger inProgressCounter = new AtomicInteger()
private final String name
private final Supplier<T> supplier
private final int desiredSize
// This should be absolutely not necessary, it is more like a hack to make the spring-boot-cli happy o.O
private Buffer() {
this(UUID.randomUUID().toString(),{ null }, 0)
}
Buffer(String name, Supplier<T> supplier, int desiredSize) {
this.name = name
this.supplier = supplier
this.desiredSize = desiredSize
}
T poll() {
return queue.poll()
}
void clear() {
queue.clear()
}
int size() {
return queue.size()
}
int inProgressCount() {
return inProgressCounter.get()
}
def stats() {
return [
name: name,
supplier: supplier.getClass().getSimpleName(),
desiredSize: desiredSize,
size: queue.size(),
inProgress: this.inProgressCount()
]
}
@Override
String toString() {
return stats().toString()
}
synchronized void fill() {
int currentSize = queue.size()
int inProgress = inProgressCounter.get()
int goal = desiredSize - currentSize - inProgress
if (goal > 0) {
LOGGER.info("buffer: $name, currentSize: $currentSize, inProgress: $inProgress, goal: $goal")
for (int i = 0; i < goal; i++) {
inProgressCounter.incrementAndGet()
CompletableFuture.runAsync({ fetchAndAdd() })
}
}
}
private void fetchAndAdd() {
try {
queue.add(supplier.get())
}
catch (Exception e) {
LOGGER.warn("buffer: $name is not able to fetch item", e)
}
finally {
int remainingCount = inProgressCounter.decrementAndGet()
LOGGER.info("buffer: $name fetched an item, $remainingCount remaining")
}
}
}