Skip to content

Commit 003bfe0

Browse files
committed
Support streaming in SQLQuery/TxSQLQuery
Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>
1 parent 032334c commit 003bfe0

File tree

5 files changed

+28
-14
lines changed

5 files changed

+28
-14
lines changed

.github/workflows/gradle.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
with:
2222
java-version: 1.8
2323
- name: Start immudb container
24-
run: docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 codenotary/immudb:1.4.0
24+
run: docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 codenotary/immudb:1.9.4
2525
- name: Grant execute permission for gradlew
2626
run: chmod +x gradlew
2727
- name: Build with Gradle

src/main/java/io/codenotary/immudb4j/ImmuClient.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -369,9 +369,11 @@ public synchronized SQLQueryResult sqlQuery(String stmt, Map<String, SQLValue> p
369369
final ImmudbProto.SQLQueryRequest req = ImmudbProto.SQLQueryRequest.newBuilder()
370370
.setSql(stmt)
371371
.addAllParams(sqlEncodeParams(params))
372+
.setAcceptStream(true)
372373
.build();
373374

374-
return new SQLQueryResult(blockingStub.txSQLQuery(req));
375+
Iterator<io.codenotary.immudb.ImmudbProto.SQLQueryResult> it = blockingStub.txSQLQuery(req);
376+
return new SQLQueryResult(it);
375377
}
376378

377379
private Map<String, SQLValue> sqlNameParams(SQLValue... params) {

src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java

+18-8
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,27 @@
1717
package io.codenotary.immudb4j.sql;
1818

1919
import java.util.Date;
20+
import java.util.NoSuchElementException;
2021
import java.util.concurrent.TimeUnit;
22+
import java.util.Iterator;
2123

2224
import io.codenotary.immudb.ImmudbProto;
2325

2426
public class SQLQueryResult {
25-
27+
28+
private final Iterator<ImmudbProto.SQLQueryResult> it;
2629
private ImmudbProto.SQLQueryResult res;
2730
private int currRow = -1;
2831

2932
private boolean closed;
3033

31-
public SQLQueryResult(ImmudbProto.SQLQueryResult res) {
32-
if (res == null) {
34+
public SQLQueryResult(Iterator<ImmudbProto.SQLQueryResult> it) {
35+
if (it == null) {
3336
throw new RuntimeException("illegal arguments");
3437
}
3538

36-
this.res = res;
39+
this.it = it;
40+
this.res = it.next();
3741
}
3842

3943
public synchronized void close() throws SQLException {
@@ -45,12 +49,18 @@ public synchronized boolean next() throws SQLException {
4549
throw new SQLException("already closed");
4650
}
4751

48-
if (currRow + 1 >= res.getRowsCount()) {
52+
if (res != null && currRow+1 < res.getRowsCount()) {
53+
currRow++;
54+
return true;
55+
}
56+
57+
try {
58+
res = this.it.next();
59+
} catch (NoSuchElementException e) {
4960
return false;
5061
}
62+
currRow = 0;
5163

52-
currRow++;
53-
5464
return true;
5565
}
5666

@@ -76,7 +86,7 @@ public synchronized int getColumnsCount() throws SQLException {
7686
if (closed) {
7787
throw new SQLException("already closed");
7888
}
79-
89+
8090
return res.getColumnsCount();
8191
}
8292

src/main/proto/schema.proto

+3-2
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,7 @@ message SQLQueryRequest {
685685
string sql = 1;
686686
repeated NamedParam params = 2;
687687
bool reuseSnapshot = 3;
688+
bool acceptStream = 4;
688689
}
689690

690691
message NamedParam {
@@ -783,7 +784,7 @@ service ImmuService {
783784
rpc Rollback (google.protobuf.Empty) returns (google.protobuf.Empty){};
784785

785786
rpc TxSQLExec(SQLExecRequest) returns (google.protobuf.Empty) {};
786-
rpc TxSQLQuery(SQLQueryRequest) returns (SQLQueryResult) {};
787+
rpc TxSQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {};
787788

788789
rpc Set (SetRequest) returns (TxHeader){
789790
};
@@ -894,7 +895,7 @@ service ImmuService {
894895
rpc SQLExec(SQLExecRequest) returns (SQLExecResult) {
895896
};
896897

897-
rpc SQLQuery(SQLQueryRequest) returns (SQLQueryResult) {
898+
rpc SQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {
898899
};
899900

900901
rpc ListTables(google.protobuf.Empty) returns (SQLQueryResult) {

src/test/java/io/codenotary/immudb4j/SQLTransactionsTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public void t1() throws VerificationException, InterruptedException, SQLExceptio
4242
new SQLValue(String.format("title%d", i)),
4343
new SQLValue(i % 2 == 0));
4444
}
45+
immuClient.commitTransaction();
46+
47+
immuClient.beginTransaction();
4548

4649
SQLQueryResult res = immuClient.sqlQuery("SELECT id, title, active FROM mytable");
4750

@@ -71,8 +74,6 @@ public void t1() throws VerificationException, InterruptedException, SQLExceptio
7174
res.close();
7275

7376
immuClient.commitTransaction();
74-
75-
immuClient.closeSession();
7677
}
7778

7879
}

0 commit comments

Comments
 (0)