Skip to content

Commit 5b7eea7

Browse files
author
Alexander Lavrukov
committed
better-spliterator: Better spliterator
1 parent 58cba6e commit 5b7eea7

File tree

12 files changed

+411
-31
lines changed

12 files changed

+411
-31
lines changed

repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -357,13 +357,13 @@ public void streamAll() {
357357
assertThatExceptionOfType(IllegalArgumentException.class)
358358
.isThrownBy(() -> db.tx(() -> db.projects().streamAll(5001)));
359359
}
360-
360+
361361
private static <ID extends Entity.Id<?>> ReadTableParams<ID> defaultReadTableParamsNonLegacy() {
362362
return RepositoryTest.<ID>buildReadTableParamsNonLegacy().build();
363363
}
364364

365365
private static <ID extends Entity.Id<?>> ReadTableParams.ReadTableParamsBuilder<ID> buildReadTableParamsNonLegacy() {
366-
return ReadTableParams.<ID>builder().useNewSpliterator(true);
366+
return ReadTableParams.<ID>builder().useNewSpliterator2(true);
367367
}
368368

369369
@Test

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java

+47-6
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import tech.ydb.core.Result;
1212
import tech.ydb.core.Status;
1313
import tech.ydb.core.StatusCode;
14+
import tech.ydb.core.grpc.GrpcReadStream;
1415
import tech.ydb.proto.ValueProtos;
1516
import tech.ydb.table.Session;
1617
import tech.ydb.table.query.DataQueryResult;
1718
import tech.ydb.table.query.Params;
19+
import tech.ydb.table.query.ReadTablePart;
1820
import tech.ydb.table.result.ResultSetReader;
1921
import tech.ydb.table.settings.BulkUpsertSettings;
2022
import tech.ydb.table.settings.CommitTxSettings;
@@ -54,14 +56,23 @@
5456
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
5557
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
5658
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
59+
import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator;
60+
import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator;
61+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator;
62+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueue;
63+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueWrapper;
64+
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator;
65+
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator;
5766
import tech.ydb.yoj.repository.ydb.statement.Statement;
5867
import tech.ydb.yoj.repository.ydb.table.YdbTable;
5968
import tech.ydb.yoj.util.lang.Interrupts;
6069

6170
import java.time.Duration;
6271
import java.util.ArrayList;
72+
import java.util.Iterator;
6373
import java.util.List;
6474
import java.util.Map;
75+
import java.util.concurrent.CompletableFuture;
6576
import java.util.concurrent.TimeUnit;
6677
import java.util.function.Supplier;
6778
import java.util.stream.Collectors;
@@ -78,7 +89,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
7889
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);
7990

8091
private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
81-
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
92+
private final List<ClosableSpliterator<?>> spliterators = new ArrayList<>();
8293

8394
@Getter
8495
private final TxOptions options;
@@ -102,8 +113,8 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
102113
this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
103114
}
104115

105-
private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
106-
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
116+
private <V> YdbNewLegacySpliterator<V> createSpliterator(String request, boolean isOrdered) {
117+
YdbNewLegacySpliterator<V> spliterator = new YdbNewLegacySpliterator<>(request, isOrdered);
107118
spliterators.add(spliterator);
108119
return spliterator;
109120
}
@@ -153,7 +164,7 @@ private void doCommit() {
153164

154165
private void closeStreams() {
155166
Exception summaryException = null;
156-
for (YdbSpliterator<?> spliterator : spliterators) {
167+
for (ClosableSpliterator<?> spliterator : spliterators) {
157168
try {
158169
spliterator.close();
159170
} catch (Exception e) {
@@ -387,7 +398,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
387398
String yql = getYql(statement);
388399
Params sdkParams = getSdkParams(statement, params);
389400

390-
YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
401+
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
391402

392403
initSession();
393404
session.executeScanQuery(
@@ -489,7 +500,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
489500
}
490501

491502
if (params.isUseNewSpliterator()) {
492-
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
503+
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
493504

494505
initSession();
495506
session.readTable(
@@ -500,6 +511,36 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
500511
return spliterator.createStream();
501512
}
502513

514+
if (params.isUseNewSpliterator2()) {
515+
initSession();
516+
517+
// TODO: configure stream timeout
518+
// TODO: rename wrapper to something other
519+
YdbSpliteratorQueue<Iterator<RESULT>> queue = new YdbSpliteratorQueue<>(1, Duration.ofMinutes(5));
520+
YdbSpliteratorQueueWrapper<Iterator<RESULT>> wrapper = new YdbSpliteratorQueueWrapper<>(
521+
"readTable: " + tableName, queue
522+
);
523+
524+
GrpcReadStream<ReadTablePart> grpcStream = session.executeReadTable(tableName, settings.build());
525+
526+
CompletableFuture<Status> future = grpcStream.start(readTablePart -> {
527+
ResultSetIterator<RESULT> iterator = new ResultSetIterator<>(
528+
readTablePart.getResultSetReader(),
529+
mapper::mapResult
530+
);
531+
wrapper.onNext(iterator);
532+
});
533+
future.whenComplete(wrapper::onSupplierThreadComplete);
534+
535+
// TODO: do we have to close grpcStream??
536+
537+
YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>(queue, params.isOrdered());
538+
539+
spliterators.add(spliterator);
540+
541+
return spliterator.createStream();
542+
}
543+
503544
try {
504545
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
505546
doCall("read table " + mapper.getTableName(""), () -> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import java.util.Spliterator;
4+
5+
public interface ClosableSpliterator<V> extends Spliterator<V> {
6+
void close();
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
5+
import java.util.List;
6+
7+
@FunctionalInterface
8+
public interface ResultConverter<V> {
9+
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
import tech.ydb.table.result.ResultSetReader;
5+
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
6+
7+
import java.util.ArrayList;
8+
import java.util.Iterator;
9+
import java.util.List;
10+
import java.util.NoSuchElementException;
11+
12+
public final class ResultSetIterator<V> implements Iterator<V> {
13+
private final ResultSetReader resultSet;
14+
private final ResultConverter<V> converter;
15+
private final List<ValueProtos.Column> columns;
16+
17+
private int position = 0;
18+
19+
public ResultSetIterator(ResultSetReader resultSet, ResultConverter<V> converter) {
20+
List<ValueProtos.Column> columns;
21+
if (resultSet.getRowCount() > 0) {
22+
resultSet.setRowIndex(0);
23+
columns = getColumns(resultSet);
24+
} else {
25+
columns = new ArrayList<>();
26+
}
27+
28+
this.resultSet = resultSet;
29+
this.converter = converter;
30+
this.columns = columns;
31+
}
32+
33+
@Override
34+
public boolean hasNext() {
35+
return position < resultSet.getRowCount();
36+
}
37+
38+
@Override
39+
public V next() {
40+
if (!hasNext()) {
41+
throw new NoSuchElementException();
42+
}
43+
44+
ValueProtos.Value value = buildValue(position++);
45+
46+
return converter.convert(columns, value);
47+
}
48+
49+
private ValueProtos.Value buildValue(int rowIndex) {
50+
resultSet.setRowIndex(rowIndex);
51+
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
52+
for (int i = 0; i < columns.size(); i++) {
53+
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i)));
54+
}
55+
return value.build();
56+
}
57+
58+
private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
59+
List<ValueProtos.Column> columns = new ArrayList<>();
60+
for (int i = 0; i < resultSet.getColumnCount(); i++) {
61+
columns.add(ValueProtos.Column.newBuilder()
62+
.setName(resultSet.getColumnName(i))
63+
.build()
64+
);
65+
}
66+
return columns;
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.yoj.ExperimentalApi;
4+
5+
import java.util.Iterator;
6+
import java.util.Spliterator;
7+
import java.util.function.Consumer;
8+
import java.util.stream.Stream;
9+
import java.util.stream.StreamSupport;
10+
11+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
12+
public final class YdbSpliterator<V> implements ClosableSpliterator<V> {
13+
private final YdbSpliteratorQueue<Iterator<V>> queue;
14+
private final int flags;
15+
16+
private Iterator<V> valueIterator;
17+
18+
private boolean closed = false;
19+
20+
public YdbSpliterator(YdbSpliteratorQueue<Iterator<V>> queue, boolean isOrdered) {
21+
this.queue = queue;
22+
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
23+
}
24+
25+
// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
26+
public Stream<V> createStream() {
27+
return StreamSupport.stream(this, false).onClose(this::close);
28+
}
29+
30+
@Override
31+
public boolean tryAdvance(Consumer<? super V> action) {
32+
if (closed) {
33+
return false;
34+
}
35+
36+
if (valueIterator == null || !valueIterator.hasNext()) {
37+
valueIterator = queue.poll();
38+
if (valueIterator == null || !valueIterator.hasNext()) {
39+
close();
40+
return false;
41+
}
42+
}
43+
44+
V value = valueIterator.next();
45+
46+
action.accept(value);
47+
48+
return true;
49+
}
50+
51+
@Override
52+
public void close() {
53+
closed = true;
54+
queue.close();
55+
}
56+
57+
@Override
58+
public Spliterator<V> trySplit() {
59+
return null;
60+
}
61+
62+
@Override
63+
public long estimateSize() {
64+
return Long.MAX_VALUE;
65+
}
66+
67+
@Override
68+
public long getExactSizeIfKnown() {
69+
return -1;
70+
}
71+
72+
@Override
73+
public int characteristics() {
74+
return flags;
75+
}
76+
77+
}

0 commit comments

Comments
 (0)