From d475227f043a5d545b4e966a916bc9ab0b98b8e5 Mon Sep 17 00:00:00 2001 From: Moderocky Date: Sun, 6 Feb 2022 13:09:01 +0000 Subject: [PATCH] Adapt handshakes. --- .../stream/impl/HandshakeChunkStream.java | 14 +++---- .../stream/impl/QueuedChunkStream.java | 37 +++++++++++++++---- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/main/java/mx/kenzie/jupiter/stream/impl/HandshakeChunkStream.java b/src/main/java/mx/kenzie/jupiter/stream/impl/HandshakeChunkStream.java index 2c82ca0..d0e28d7 100644 --- a/src/main/java/mx/kenzie/jupiter/stream/impl/HandshakeChunkStream.java +++ b/src/main/java/mx/kenzie/jupiter/stream/impl/HandshakeChunkStream.java @@ -38,6 +38,13 @@ public void feed(Element element) { } } + @Override + public Element[] readAllRemaining(Element[] array) { + final List list = new ArrayList<>(); + while (!this.isClosed()) list.add(this.read()); + return list.toArray(array); + } + @Override public Element read() { boolean read; @@ -61,13 +68,6 @@ public Element read() { return element; } - @Override - public Element[] readAllRemaining(Element[] array) { - final List list = new ArrayList<>(); - while (!this.isClosed()) list.add(this.read()); - return list.toArray(array); - } - @Override public synchronized boolean isClosed() { return closed; diff --git a/src/main/java/mx/kenzie/jupiter/stream/impl/QueuedChunkStream.java b/src/main/java/mx/kenzie/jupiter/stream/impl/QueuedChunkStream.java index 7150497..dee5cbe 100644 --- a/src/main/java/mx/kenzie/jupiter/stream/impl/QueuedChunkStream.java +++ b/src/main/java/mx/kenzie/jupiter/stream/impl/QueuedChunkStream.java @@ -1,19 +1,25 @@ package mx.kenzie.jupiter.stream.impl; +import mx.kenzie.jupiter.iterator.LazyIterator; import mx.kenzie.jupiter.stream.ChunkStream; +import org.jetbrains.annotations.NotNull; import java.util.ArrayList; import java.util.Deque; +import java.util.Iterator; import java.util.List; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; public class QueuedChunkStream extends ChunkStream { protected final Deque queue; + protected final AtomicLong counter; protected volatile boolean closed; public QueuedChunkStream() { this.queue = new LinkedBlockingDeque<>(); + this.counter = new AtomicLong(); } @Override @@ -21,11 +27,6 @@ public void feed(Element element) { this.queue.addLast(element); } - @Override - public Element read() { - return this.queue.getFirst(); - } - @Override public Element[] readAllRemaining(Element[] array) { final List list = new ArrayList<>(); @@ -34,8 +35,18 @@ public Element[] readAllRemaining(Element[] array) { } @Override - public synchronized boolean isClosed() { - return closed; + public @NotNull Iterator iterator() { + return new LazyIterator<>() { + @Override + public boolean hasNext() { + return !closed || counter.get() + 1 < queue.size(); + } + + @Override + public Element next() { + return read(); + } + }; } @Override @@ -43,6 +54,18 @@ public boolean canRead() { return !this.queue.isEmpty(); } + @Override + public Element read() { + final Element element = this.queue.getFirst(); + this.counter.getAndIncrement(); + return element; + } + + @Override + public synchronized boolean isClosed() { + return closed; + } + @Override public synchronized void close() { this.closed = true;