Skip to content

Commit

Permalink
Adapt handshakes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Moderocky committed Feb 6, 2022
1 parent 57650a7 commit d475227
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public void feed(Element element) {
}
}

@Override
public Element[] readAllRemaining(Element[] array) {
final List<Element> list = new ArrayList<>();
while (!this.isClosed()) list.add(this.read());
return list.toArray(array);
}

@Override
public Element read() {
boolean read;
Expand All @@ -61,13 +68,6 @@ public Element read() {
return element;
}

@Override
public Element[] readAllRemaining(Element[] array) {
final List<Element> list = new ArrayList<>();
while (!this.isClosed()) list.add(this.read());
return list.toArray(array);
}

@Override
public synchronized boolean isClosed() {
return closed;
Expand Down
37 changes: 30 additions & 7 deletions src/main/java/mx/kenzie/jupiter/stream/impl/QueuedChunkStream.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
package mx.kenzie.jupiter.stream.impl;

import mx.kenzie.jupiter.iterator.LazyIterator;
import mx.kenzie.jupiter.stream.ChunkStream;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;

public class QueuedChunkStream<Element> extends ChunkStream<Element> {

protected final Deque<Element> queue;
protected final AtomicLong counter;
protected volatile boolean closed;

public QueuedChunkStream() {
this.queue = new LinkedBlockingDeque<>();
this.counter = new AtomicLong();
}

@Override
public void feed(Element element) {
this.queue.addLast(element);
}

@Override
public Element read() {
return this.queue.getFirst();
}

@Override
public Element[] readAllRemaining(Element[] array) {
final List<Element> list = new ArrayList<>();
Expand All @@ -34,15 +35,37 @@ public Element[] readAllRemaining(Element[] array) {
}

@Override
public synchronized boolean isClosed() {
return closed;
public @NotNull Iterator<Element> iterator() {
return new LazyIterator<>() {
@Override
public boolean hasNext() {
return !closed || counter.get() + 1 < queue.size();
}

@Override
public Element next() {
return read();
}
};
}

@Override
public boolean canRead() {
return !this.queue.isEmpty();
}

@Override
public Element read() {
final Element element = this.queue.getFirst();
this.counter.getAndIncrement();
return element;
}

@Override
public synchronized boolean isClosed() {
return closed;
}

@Override
public synchronized void close() {
this.closed = true;
Expand Down

0 comments on commit d475227

Please sign in to comment.