Skip to content

Commit

Permalink
add LargeObjectCacheImpl and its tests
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Feb 7, 2025
1 parent 3d4b39b commit 6fa02c2
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -190,6 +192,26 @@ public InputStream openSubResponse(String id, long timeout, TimeUnit unit) throw
throw new NoSuchElementException("illegal SubResponse id");
}

/**
* Returns a Path of the file containing subResponse.
* @return a Path of the file containing subResponse,
* null if the subResponse is not associated with a file
* @param id the channel name
* @throws IOException if I/O error was occurred while retrieving main response body
* @throws ServerException if server error was occurred while retrieving main response body
*/
public Path subResponseFilePath(String id) throws IOException, ServerException {
waitForMainResponse();
var entry = blobs.get(id);
if (entry != null) {
var path = entry.getLeft();
if (path != null) {
return Paths.get(path);
}
}
return null;
}

@Override
public void close() throws IOException, InterruptedException {
closed.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ default FutureResponse<Reader> send(@Nonnull SqlRequest.GetLargeObjectData reque
throw new UnsupportedOperationException();
}

/**
* Requests {@code GetLargeObjectData} to SQL service.
* @param request the request
* @return the future response of the request,
* which may raise error if the request was failed.
* @throws IOException if I/O error was occurred while sending the request
*/
default FutureResponse<LargeObjectCache> send(@Nonnull SqlRequest.GetLargeObjectData request) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Requests {@code DisposeTransaction} to SQL service.
* @param request the request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2023-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.tsurugidb.tsubakuro.sql.impl;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import com.tsurugidb.tsubakuro.sql.LargeObjectCache;

/**
* An implementation of {@link LargeObjectCache}.
*/
public class LargeObjectCacheImpl implements LargeObjectCache {

private final Path path;
private final boolean exists;
private final AtomicBoolean closed = new AtomicBoolean();

public LargeObjectCacheImpl(@Nullable Path path) {
this.path = path;
if (path != null) {
exists = new File(path.toString()).exists();
return;
}
exists = false;
}
public LargeObjectCacheImpl() {
this.path = null;
exists = false;
}

@Override
public Optional<Path> find() {
if (closed.get() || !exists) {
return Optional.empty();
}
return Optional.of(path);
}

@Override
public void copyTo(@Nonnull Path destination) throws IOException {
Objects.requireNonNull(destination);
if (closed.get()) {
throw new IOException("already closed");
}
if (!exists) {
throw new IOException("cannot find the file");
}
Files.copy(path, destination);
}

@Override
public void close() {
closed.set(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.tsurugidb.tsubakuro.sql.BlobReference;
import com.tsurugidb.tsubakuro.sql.ClobReference;
import com.tsurugidb.tsubakuro.sql.ExecuteResult;
import com.tsurugidb.tsubakuro.sql.LargeObjectCache;
import com.tsurugidb.tsubakuro.util.FutureResponse;

/**
Expand Down Expand Up @@ -173,6 +174,27 @@ public FutureResponse<Reader> openReader(ClobReference clobReference) throws IOE
throw new IllegalStateException(clobReference.getClass().getName() + "is unsupported.");
}

@Override
public FutureResponse<LargeObjectCache> getLargeObjectCache(BlobReference blobReference) throws IOException {
if (blobReference instanceof BlobReferenceForSql) {
var blobReferenceForSql = (BlobReferenceForSql) blobReference;
var pb = SqlRequest.GetLargeObjectData.newBuilder()
.setReference(blobReferenceForSql.blobReference());
return service.send(pb.build());
}
throw new IllegalStateException(blobReference.getClass().getName() + "is unsupported.");
}

public FutureResponse<LargeObjectCache> getLargeObjectCache(ClobReference clobReference) throws IOException {
if (clobReference instanceof ClobReferenceForSql) {
var clobReferenceForSql = (ClobReferenceForSql) clobReference;
var pb = SqlRequest.GetLargeObjectData.newBuilder()
.setReference(clobReferenceForSql.clobReference());
return service.send(pb.build());
}
throw new IllegalStateException(clobReference.getClass().getName() + "is unsupported.");
}

@Override
public void close() throws ServerException, IOException, InterruptedException {
// FIXME close underlying resources (e.g. ongoing transactions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.tsurugidb.tsubakuro.sql.BlobReference;
import com.tsurugidb.tsubakuro.sql.ClobReference;
import com.tsurugidb.tsubakuro.sql.ExecuteResult;
import com.tsurugidb.tsubakuro.sql.LargeObjectCache;
import com.tsurugidb.tsubakuro.sql.PreparedStatement;
import com.tsurugidb.tsubakuro.sql.ResultSet;
import com.tsurugidb.tsubakuro.sql.SearchPath;
Expand Down Expand Up @@ -923,13 +924,66 @@ public FutureResponse<Reader> send(
@Nonnull SqlRequest.GetLargeObjectData request, ClobReference reference) throws IOException {
Objects.requireNonNull(request);
LOG.trace("send (GetLargeObjectData): {}", request); //$NON-NLS-1$
if (reference instanceof ClobReferenceForSql) {
return session.send(
SERVICE_ID,
SqlRequestUtils.toSqlRequestDelimitedByteArray(request),
new GetClobProcessor());
return session.send(
SERVICE_ID,
SqlRequestUtils.toSqlRequestDelimitedByteArray(request),
new GetClobProcessor());
}

class GetLargeObjectCacheProcessor implements ResponseProcessor<LargeObjectCache> {
private final AtomicReference<SqlResponse.GetLargeObjectData> detailResponseCache = new AtomicReference<>();

@Override
public LargeObjectCache process(Response response) throws IOException, ServerException, InterruptedException {
return process(response, Timeout.DISABLED);
}

@Override
public LargeObjectCache process(Response response, Timeout timeout) throws IOException, ServerException, InterruptedException {
Objects.requireNonNull(response);

if (session.isClosed()) {
throw new SessionAlreadyClosedException();
}
try (response) {
var payload = response.waitForMainResponse();
if (detailResponseCache.get() == null) {
var sqlResponse = SqlResponse.Response.parseDelimitedFrom(new ByteBufferInputStream(payload));
if (!SqlResponse.Response.ResponseCase.GET_LARGE_OBJECT_DATA.equals(sqlResponse.getResponseCase())) {
// FIXME log error message
throw new IOException("response type is inconsistent with the request type");
}
detailResponseCache.set(sqlResponse.getGetLargeObjectData());
}
var detailResponse = detailResponseCache.get();
LOG.trace("receive (GetLargeObjectData): {}", detailResponse); //$NON-NLS-1$
if (SqlResponse.GetLargeObjectData.ResultCase.ERROR.equals(detailResponse.getResultCase())) {
var errorResponse = detailResponse.getError();
throw SqlServiceException.of(SqlServiceCode.valueOf(errorResponse.getCode()), errorResponse.getDetail());
}
var channelName = detailResponse.getSuccess().getChannelName();
if (response instanceof ChannelResponse) {
return new LargeObjectCacheImpl(((ChannelResponse) response).subResponseFilePath(channelName));
}
return new LargeObjectCacheImpl();
}
}
throw new UnsupportedOperationException();

@Override
public boolean isReturnsServerResource() {
return false;
}
}

@Override
public FutureResponse<LargeObjectCache> send(
@Nonnull SqlRequest.GetLargeObjectData request) throws IOException {
Objects.requireNonNull(request);
LOG.trace("send (GetLargeObjectData): {}", request); //$NON-NLS-1$
return session.send(
SERVICE_ID,
SqlRequestUtils.toSqlRequestDelimitedByteArray(request),
new GetLargeObjectCacheProcessor());
}

class DisposeTransactionProcessor implements MainResponseProcessor<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import org.junit.jupiter.api.io.TempDir;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
Expand All @@ -37,7 +39,9 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import com.tsurugidb.framework.proto.FrameworkCommon;
import com.tsurugidb.framework.proto.FrameworkRequest;
import com.tsurugidb.framework.proto.FrameworkResponse;
import com.tsurugidb.sql.proto.SqlCommon;
import com.tsurugidb.sql.proto.SqlRequest;
import com.tsurugidb.sql.proto.SqlResponse;
Expand All @@ -51,6 +55,7 @@
import com.tsurugidb.tsubakuro.sql.CounterType;
import com.tsurugidb.tsubakuro.sql.ExecuteResult;
import com.tsurugidb.tsubakuro.sql.Parameters;
import com.tsurugidb.tsubakuro.sql.SqlClient;
import com.tsurugidb.tsubakuro.sql.SqlService;

class SqlServiceStubLobTest {
Expand All @@ -61,11 +66,14 @@ class SqlServiceStubLobTest {

private Session session = null;

private SqlClient client = null;

SqlServiceStubLobTest() {
try {
wire = new WireImpl(link);
session = new SessionImpl(wire);
} catch (IOException e) {
client = SqlClient.attach(session);
} catch (IOException e) {
System.err.println(e);
fail("fail to create WireImpl");
}
Expand Down Expand Up @@ -182,4 +190,77 @@ void executeStatementWithLob() throws Exception {
}
assertFalse(link.hasRemaining());
}

@Test
void getLargeObjectCache_find(@TempDir Path tempDir) throws Exception {
String fileName = "lob.data";
String channelName = "lobChannel";
long objectId = 12345;

byte[] data = new byte[] { 0x01, 0x02, 0x03 };
Path file = tempDir.resolve("lob.data");
Files.write(file, data);

var header = FrameworkResponse.Header.newBuilder()
.setPayloadType(FrameworkResponse.Header.PayloadType.SERVICE_RESULT)
.setBlobs(FrameworkCommon.RepeatedBlobInfo.newBuilder()
.addBlobs(FrameworkCommon.BlobInfo.newBuilder()
.setChannelName(channelName)
.setPath(file.toString())))
.build();
var payload = SqlResponse.Response.newBuilder()
.setGetLargeObjectData(SqlResponse.GetLargeObjectData.newBuilder()
.setSuccess(SqlResponse.GetLargeObjectData.Success.newBuilder()
.setChannelName(channelName)))
.build();
// for getLargeObjectData
link.next(header, payload);

var largeObjectCache = client.getLargeObjectCache(new BlobReferenceForSql(SqlCommon.LargeObjectProvider.forNumber(2), objectId)).await();
var pathOpt = largeObjectCache.find();
assertTrue(pathOpt.isPresent());
var obtainedData = Files.readAllBytes(pathOpt.get());
assertEquals(data.length, obtainedData.length);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], obtainedData[i]);
}
assertFalse(link.hasRemaining());
}

@Test
void getLargeObjectCache_copyTo(@TempDir Path tempDir) throws Exception {
String fileName = "lob.data";
String channelName = "lobChannel";
long objectId = 12345;

byte[] data = new byte[] { 0x01, 0x02, 0x03 };
Path file = tempDir.resolve("lob.data");
Files.write(file, data);

var header = FrameworkResponse.Header.newBuilder()
.setPayloadType(FrameworkResponse.Header.PayloadType.SERVICE_RESULT)
.setBlobs(FrameworkCommon.RepeatedBlobInfo.newBuilder()
.addBlobs(FrameworkCommon.BlobInfo.newBuilder()
.setChannelName(channelName)
.setPath(file.toString())))
.build();
var payload = SqlResponse.Response.newBuilder()
.setGetLargeObjectData(SqlResponse.GetLargeObjectData.newBuilder()
.setSuccess(SqlResponse.GetLargeObjectData.Success.newBuilder()
.setChannelName(channelName)))
.build();
// for getLargeObjectData
link.next(header, payload);

var largeObjectCache = client.getLargeObjectCache(new BlobReferenceForSql(SqlCommon.LargeObjectProvider.forNumber(2), objectId)).await();
Path copy = tempDir.resolve("lob_copy.data");
largeObjectCache.copyTo(copy);
var obtainedData = Files.readAllBytes(copy);
assertTrue(Files.exists(copy));
assertEquals(data.length, obtainedData.length);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], obtainedData[i]);
}
assertFalse(link.hasRemaining());
}
}

0 comments on commit 6fa02c2

Please sign in to comment.