From 9bc23d5c5877da06c6bb1c5a86052f8e3e7dfc0a Mon Sep 17 00:00:00 2001 From: t-horikawa Date: Thu, 14 Dec 2023 18:43:28 +0900 Subject: [PATCH 1/5] implement ClientInformation handling --- .../common/connection/ClientInformation.java | 70 +++++++++++++++++++ .../channel/common/connection/Connector.java | 15 +++- .../channel/common/connection/wire/Wire.java | 11 --- .../common/connection/wire/impl/WireImpl.java | 13 +++- .../common/connection/TestingConnector.java | 2 +- .../ipc/connection/IpcConnectorImpl.java | 6 +- .../tateyama/proto/endpoint/request.proto | 50 +++++++++++++ .../tateyama/proto/endpoint/response.proto | 30 ++++++++ .../tsubakuro/common/SessionBuilder.java | 31 +++++++- .../tsubakuro/common/SessionBuilderTest.java | 5 +- .../connection/StreamConnectorImpl.java | 5 +- 11 files changed, 215 insertions(+), 23 deletions(-) create mode 100644 modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java create mode 100644 modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto create mode 100644 modules/proto/src/main/protos/tateyama/proto/endpoint/response.proto diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java new file mode 100644 index 000000000..a8f7ca64d --- /dev/null +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java @@ -0,0 +1,70 @@ +package com.tsurugidb.tsubakuro.channel.common.connection; + +import java.text.MessageFormat; +import java.util.Optional; + +/** + * A credential by user name and password. + */ +public class ClientInformation { + + private String label; + + private String applicationName = "tsubakuro"; + + private final String connectionInformation; + + /** + * Creates a new instance. + */ + public ClientInformation() { + this.connectionInformation = Long.toString(ProcessHandle.current().pid()); + } + + /** + * Returns the label. + * @return the label. + */ + public Optional getLabel() { + return Optional.of(label); + } + + /** + * Returns the application name. + * @return the application name. + */ + public Optional getApplicationName() { + return Optional.of(applicationName); + } + + /** + * Returns the connection information. + * @return the connection information. + */ + public String getConnectionInformation() { + return connectionInformation; + } + + /** + * Set label. + * @param labelString the label + */ + public void label(String labelString) { + this.label = labelString; + } + + /** + * Set application name. + * @param applicationNameString the application name + */ + public void applicationName(String applicationNameString) { + this.applicationName = applicationNameString; + } + + @Override + public String toString() { + return MessageFormat.format( + "ClientInformation(label={0}, applicationName={1}, connectionInformation={2}))", + label, applicationName, connectionInformation); + } +} diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java index 296403e0f..529c683a4 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java @@ -41,7 +41,7 @@ static Connector create(@Nonnull URI endpoint) { * @throws IOException connection error */ default FutureResponse connect() throws IOException { - return connect(NullCredential.INSTANCE); + return connect(NullCredential.INSTANCE, new ClientInformation()); } /** @@ -50,5 +50,16 @@ default FutureResponse connect() throws IOException { * @return future session wire * @throws IOException connection error */ - FutureResponse connect(@Nonnull Credential credential) throws IOException; + default FutureResponse connect(@Nonnull Credential credential) throws IOException { + return connect(credential, new ClientInformation()); + } + + /** + * Establishes a connection to the Tsurugi server. + * @param credential the connection credential + * @param clientInformation the client information + * @return future session wire + * @throws IOException connection error + */ + FutureResponse connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException; } diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/Wire.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/Wire.java index 104219acc..488d5874d 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/Wire.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/Wire.java @@ -21,17 +21,6 @@ */ @ThreadSafe public interface Wire extends ServerResource { - - /** - * The major service message version for FrameworkRequest.Header. - */ - int SERVICE_MESSAGE_VERSION_MAJOR = 0; - - /** - * The minor service message version for FrameworkRequest.Header. - */ - int SERVICE_MESSAGE_VERSION_MINOR = 0; - /** * send a message to the destination server. *

diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java index 32de75673..c0bdb6bde 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java @@ -23,6 +23,15 @@ * WireImpl type. */ public class WireImpl implements Wire { + /** + * The major service message version for FrameworkRequest.Header. + */ + private static final int SERVICE_MESSAGE_VERSION_MAJOR = 0; + + /** + * The minor service message version for FrameworkRequest.Header. + */ + private static final int SERVICE_MESSAGE_VERSION_MINOR = 0; static final Logger LOG = LoggerFactory.getLogger(WireImpl.class); @@ -57,8 +66,8 @@ public FutureResponse send(int serviceId, @Nonnull byte[] pa throw new IOException("already closed"); } var header = FrameworkRequest.Header.newBuilder() - .setServiceMessageVersionMajor(Wire.SERVICE_MESSAGE_VERSION_MAJOR) - .setServiceMessageVersionMinor(Wire.SERVICE_MESSAGE_VERSION_MINOR) + .setServiceMessageVersionMajor(SERVICE_MESSAGE_VERSION_MAJOR) + .setServiceMessageVersionMinor(SERVICE_MESSAGE_VERSION_MINOR) .setServiceId(serviceId) .setSessionId(sessionID) .build(); diff --git a/modules/common/src/test/java/com/tsurugidb/tsubakuro/channel/common/connection/TestingConnector.java b/modules/common/src/test/java/com/tsurugidb/tsubakuro/channel/common/connection/TestingConnector.java index a00442bbd..b65ee13d2 100644 --- a/modules/common/src/test/java/com/tsurugidb/tsubakuro/channel/common/connection/TestingConnector.java +++ b/modules/common/src/test/java/com/tsurugidb/tsubakuro/channel/common/connection/TestingConnector.java @@ -15,7 +15,7 @@ class TestingConnector implements Connector { } @Override - public FutureResponse connect(Credential credential) throws IOException { + public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { throw new UnsupportedOperationException(); } } diff --git a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java index a06eeb930..7619f84a6 100644 --- a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java +++ b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java @@ -3,11 +3,15 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + +import javax.annotation.Nonnull; + import java.lang.ref.Cleaner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; import com.tsurugidb.tsubakuro.channel.common.connection.Connector; import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; @@ -49,7 +53,7 @@ public IpcConnectorImpl(String name) { } @Override - public FutureResponse connect(Credential credential) throws IOException { + public FutureResponse connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException { LOG.trace("will connect to {}", name); //$NON-NLS-1$ if (handle == 0) { diff --git a/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto b/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto new file mode 100644 index 000000000..c99506ec8 --- /dev/null +++ b/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto @@ -0,0 +1,50 @@ +syntax = "proto3"; + +package tateyama.proto.endpoint.request; + +option java_multiple_files = false; +option java_package = "com.tsurugidb.endpoint.proto"; +option java_outer_classname = "EndpointRequest"; + +// the request message to endpoint pseudo service. +message Request { + // service message version (major) + uint64 service_message_version_major = 1; + + // service message version (minor) + uint64 service_message_version_minor = 2; + + // reserved for system use + reserved 3 to 10; + + // the request command. + oneof command { + // handshake operation. + Handshake handshake = 11; + } + reserved 12 to 99; +} + +// handshake operation. +message Handshake { + // the client information. + ClientInformation client_infomation = 1; + + // the auth info. + AuthInfo auth_info = 2; +} + +// client information +message ClientInformation { + // the connection label. + string connection_label = 1; + + // the application name. + string application_name = 2; + + // the connection information + string connection_information = 3; +} + +// auto info. +message AuthInfo {} diff --git a/modules/proto/src/main/protos/tateyama/proto/endpoint/response.proto b/modules/proto/src/main/protos/tateyama/proto/endpoint/response.proto new file mode 100644 index 000000000..079438cbd --- /dev/null +++ b/modules/proto/src/main/protos/tateyama/proto/endpoint/response.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package tateyama.proto.endpoint.response; + +option java_multiple_files = false; +option java_package = "com.tsurugidb.endpoint.proto"; +option java_outer_classname = "EndpointResponse"; + +// empty message +message Void {} + +// unknown error was occurred. +message UnknownError { + // the error message. + string message = 1; +} + +// handshake operation. +message Handshake { + reserved 1 to 10; + + // the response body. + oneof result { + // request is successfully completed. + Void success = 11; + + // unknown error was occurred. + UnknownError unknown_error = 13; + } +} diff --git a/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java b/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java index 9bbaff7bf..6e73c9ec3 100644 --- a/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java +++ b/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java @@ -9,6 +9,7 @@ import javax.annotation.Nonnull; +import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; import com.tsurugidb.tsubakuro.channel.common.connection.Connector; import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.NullCredential; @@ -30,6 +31,8 @@ public final class SessionBuilder { private Credential connectionCredential = NullCredential.INSTANCE; + private ClientInformation clientInformation = new ClientInformation(); + private final SessionInfo sessionInfo; private SessionBuilder(Connector connector) { @@ -81,6 +84,28 @@ public SessionBuilder withCredential(@Nonnull Credential credential) { return this; } + /** + * Sets label information to connect. + * @param label the label information + * @return this + */ + public SessionBuilder withLabel(@Nonnull String label) { + Objects.requireNonNull(label); + this.clientInformation.label(label); + return this; + } + + /** + * Sets applicationName information to connect. + * @param applicationName the applicationName information + * @return this + */ + public SessionBuilder withApplicationName(@Nonnull String applicationName) { + Objects.requireNonNull(applicationName); + this.clientInformation.applicationName(applicationName); + return this; + } + /** * Establishes a connection to the Tsurugi server. * This operation will block until the connection was established, @@ -92,7 +117,7 @@ public SessionBuilder withCredential(@Nonnull Credential credential) { * @see #create(long, TimeUnit) */ public Session create() throws IOException, ServerException, InterruptedException { - try (var fWire = connector.connect(connectionCredential)) { + try (var fWire = connector.connect(connectionCredential, clientInformation)) { return create0(fWire.get()); } } @@ -110,7 +135,7 @@ public Session create() throws IOException, ServerException, InterruptedExceptio public Session create(long timeout, @Nonnull TimeUnit unit) throws IOException, ServerException, InterruptedException, TimeoutException { Objects.requireNonNull(unit); - try (var fWire = connector.connect(connectionCredential)) { + try (var fWire = connector.connect(connectionCredential, clientInformation)) { var session = create0(fWire.get(timeout, unit)); return session; } @@ -123,7 +148,7 @@ public Session create(long timeout, @Nonnull TimeUnit unit) * @throws IOException if I/O error was occurred during connection */ public FutureResponse createAsync() throws IOException { - var fWire = connector.connect(connectionCredential); + var fWire = connector.connect(connectionCredential, clientInformation); return new AbstractFutureResponse() { @Override diff --git a/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java b/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java index 80710bfc3..bdfe5c5c6 100644 --- a/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java +++ b/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; +import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; import com.tsurugidb.tsubakuro.channel.common.connection.Connector; import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.RememberMeCredential; @@ -23,7 +24,7 @@ void create() throws Exception { var creds = new RememberMeCredential("testing"); var builder = SessionBuilder.connect(new Connector() { @Override - public FutureResponse connect(Credential credential) throws IOException { + public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { assertSame(credential, creds); return FutureResponse.wrap(Owner.of(wire)); } @@ -41,7 +42,7 @@ void createAsync() throws Exception { var creds = new RememberMeCredential("testing"); var builder = SessionBuilder.connect(new Connector() { @Override - public FutureResponse connect(Credential credential) throws IOException { + public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { assertSame(credential, creds); return FutureResponse.wrap(Owner.of(wire)); } diff --git a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java index ebf3fe80d..b0e98d6cc 100644 --- a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java +++ b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java @@ -2,9 +2,12 @@ import java.io.IOException; +import javax.annotation.Nonnull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; import com.tsurugidb.tsubakuro.channel.common.connection.Connector; import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; @@ -28,7 +31,7 @@ public StreamConnectorImpl(String hostname, int port) { } @Override - public FutureResponse connect(Credential credential) throws IOException { + public FutureResponse connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException { LOG.trace("will connect to {}:{}", hostname, port); //$NON-NLS-1$ return new FutureStreamWireImpl(new StreamLink(hostname, port)); } From ff8d3130f7c37d6b8d0a2247a59d7ed67c3199d9 Mon Sep 17 00:00:00 2001 From: t-horikawa Date: Fri, 15 Dec 2023 15:40:11 +0900 Subject: [PATCH 2/5] implement handshake protocol --- .../common/connection/ClientInformation.java | 94 +++++++++++---- .../channel/common/connection/Connector.java | 10 ++ .../common/connection/wire/impl/WireImpl.java | 108 +++++++++++++++++- .../ipc/connection/FutureIpcWireImpl.java | 33 +++++- .../ipc/connection/IpcConnectorImpl.java | 9 +- .../channel/ipc/sql/ServerWireImpl.java | 94 +++++++++++++-- .../channel/ipc/sql/SessionWireTest.java | 8 +- .../src/test/native/include/server_wires.h | 30 +++-- .../src/test/native/src/server_wireJNI.cpp | 30 ++--- .../tateyama/proto/endpoint/request.proto | 24 ++-- .../tateyama/proto/endpoint/response.proto | 23 +++- .../tsubakuro/common/SessionBuilder.java | 13 ++- .../tsubakuro/common/SessionBuilderTest.java | 46 ++++++++ .../tsubakuro/channel/stream/StreamLink.java | 48 ++------ .../connection/FutureStreamWireImpl.java | 39 ++++--- .../connection/StreamConnectorImpl.java | 4 +- .../channel/stream/ServerStreamLink.java | 16 --- .../channel/stream/sql/ServerWireImpl.java | 12 +- 18 files changed, 481 insertions(+), 160 deletions(-) diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java index a8f7ca64d..0be751510 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java @@ -1,56 +1,86 @@ package com.tsurugidb.tsubakuro.channel.common.connection; import java.text.MessageFormat; -import java.util.Optional; /** * A credential by user name and password. */ public class ClientInformation { - private String label; + private String connectionLabel; - private String applicationName = "tsubakuro"; + private String applicationName; - private final String connectionInformation; + private String userName; + + private String connectionInformation; // used by IPC only + + private long maxResultSets; // used by Stream /** * Creates a new instance. */ public ClientInformation() { - this.connectionInformation = Long.toString(ProcessHandle.current().pid()); } /** - * Returns the label. - * @return the label. + * Get the connection label. + * @return the connection label. */ - public Optional getLabel() { - return Optional.of(label); + public String connectionLabel() { + if (connectionLabel != null) { + return connectionLabel; + } + return ""; } /** - * Returns the application name. + * Get the application name. * @return the application name. */ - public Optional getApplicationName() { - return Optional.of(applicationName); + public String applicationName() { + if (applicationName != null) { + return applicationName; + } + return ""; + } + + /** + * Get the user name. + * @return the user name. + */ + public String userName() { + if (userName != null) { + return userName; + } + return ""; } - /** - * Returns the connection information. + /** + * Get the connection information. * @return the connection information. */ - public String getConnectionInformation() { - return connectionInformation; + public String connectionInformation() { + if (connectionInformation != null) { + return connectionInformation; + } + return ""; + } + + /** + * Get the maximumConcurrentResultSets. + * @return the maximumConcurrentResultSets. + */ + public long maximumConcurrentResultSets() { + return maxResultSets; } /** * Set label. * @param labelString the label */ - public void label(String labelString) { - this.label = labelString; + public void connectionlabel(String labelString) { + this.connectionLabel = labelString; } /** @@ -61,10 +91,34 @@ public void applicationName(String applicationNameString) { this.applicationName = applicationNameString; } + /** + * Set user name. + * @param userNameString the application name + */ + public void userName(String userNameString) { + this.userName = userNameString; + } + + /** + * Set connectionInformation name. + * @param connectionInformationString the connectionInformation + */ + public void connectionInformation(String connectionInformationString) { + this.connectionInformation = connectionInformationString; + } + + /** + * Set maximumConcurrentResultSets. + * @param maximumConcurrentResultSets the number of maximumConcurrentResultSets + */ + public void maximumConcurrentResultSets(long maximumConcurrentResultSets) { + this.maxResultSets = maximumConcurrentResultSets; + } + @Override public String toString() { return MessageFormat.format( - "ClientInformation(label={0}, applicationName={1}, connectionInformation={2}))", - label, applicationName, connectionInformation); + "ClientInformation(connectionLabel={0}, applicationName={1}, userName={2}, connectionInformation={3}, maximumConcurrentResultSets={4})", + connectionLabel, applicationName, userName, connectionInformation, maxResultSets); } } diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java index 529c683a4..32730b859 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java @@ -54,6 +54,16 @@ default FutureResponse connect(@Nonnull Credential credential) throws IOEx return connect(credential, new ClientInformation()); } + /** + * Establishes a connection to the Tsurugi server. + * @param clientInformation the client information + * @return future session wire + * @throws IOException connection error + */ + default FutureResponse connect(@Nonnull ClientInformation clientInformation) throws IOException { + return connect(NullCredential.INSTANCE, clientInformation); + } + /** * Establishes a connection to the Tsurugi server. * @param credential the connection credential diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java index c0bdb6bde..516543ddc 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java @@ -2,7 +2,9 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.net.ConnectException; import java.nio.ByteBuffer; +import java.text.MessageFormat; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; @@ -11,10 +13,19 @@ import org.slf4j.LoggerFactory; import com.tsurugidb.framework.proto.FrameworkRequest; +import com.tsurugidb.endpoint.proto.EndpointRequest; +import com.tsurugidb.endpoint.proto.EndpointResponse; +import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; +import com.tsurugidb.tsubakuro.channel.common.connection.Credential; +import com.tsurugidb.tsubakuro.channel.common.connection.ForegroundFutureResponse; import com.tsurugidb.tsubakuro.channel.common.connection.sql.ResultSetWire; +import com.tsurugidb.tsubakuro.channel.common.connection.wire.MainResponseProcessor; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Response; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; +import com.tsurugidb.tsubakuro.exception.CoreServiceCode; +import com.tsurugidb.tsubakuro.exception.CoreServiceException; import com.tsurugidb.tsubakuro.exception.ServerException; +import com.tsurugidb.tsubakuro.util.ByteBufferInputStream; import com.tsurugidb.tsubakuro.util.FutureResponse; import com.tsurugidb.tsubakuro.util.Owner; import com.tsurugidb.tsubakuro.util.Timeout; @@ -33,12 +44,27 @@ public class WireImpl implements Wire { */ private static final int SERVICE_MESSAGE_VERSION_MINOR = 0; + private static final int SERVICE_ID_ENDPOINT_BROKER = 1; + private static final long SESSION_ID_IS_NOT_ASSIGNED = Long.MAX_VALUE; + static final Logger LOG = LoggerFactory.getLogger(WireImpl.class); private final Link link; - private final long sessionID; private final ResponseBox responseBox; private final AtomicBoolean closed = new AtomicBoolean(); + private long sessionID; + + /** + * Class constructor, called from IpcConnectorImpl that is a connector to the SQL server. + * @param link the stream object by which this WireImpl is connected to the SQL server + * @throws IOException error occurred in openNative() + */ + public WireImpl(@Nonnull Link link) throws IOException { + this.link = link; + this.responseBox = link.getResponseBox(); + this.sessionID = SESSION_ID_IS_NOT_ASSIGNED; + LOG.trace("begin Session via ipc, id = {}", sessionID); + } /** * Class constructor, called from IpcConnectorImpl that is a connector to the SQL server. @@ -83,7 +109,7 @@ public FutureResponse send(int serviceId, @Nonnull byte[] pa * @throws IOException error occurred in responseBox.register() */ @Override - public FutureResponse send(int serviceId, @Nonnull ByteBuffer payload) throws IOException { + public FutureResponse send(int serviceId, @Nonnull ByteBuffer payload) throws IOException { return send(serviceId, payload.array()); } @@ -128,6 +154,72 @@ public void close() throws IOException { } + private static EndpointRequest.Request.Builder newRequest() { + return EndpointRequest.Request.newBuilder() + .setServiceMessageVersionMajor(SERVICE_MESSAGE_VERSION_MAJOR) + .setServiceMessageVersionMinor(SERVICE_MESSAGE_VERSION_MINOR); + } + + static class HandshakeProcessor implements MainResponseProcessor { + @Override + public Long process(ByteBuffer payload) throws IOException, ServerException, InterruptedException { + var message = EndpointResponse.Handshake.parseDelimitedFrom(new ByteBufferInputStream(payload)); + LOG.trace("receive: {}", message); //$NON-NLS-1$ + switch (message.getResultCase()) { + case SUCCESS: + return message.getSuccess().getSessionId(); + + case ERROR: + var errMessage = message.getError(); + switch (errMessage.getCode()) { + case RESOURCE_LIMIT_REACHED: + throw new ConnectException("the server has declined the connection request"); // preserve compatibiity + case AUTHENTICATION_ERROR: + throw newUnknown(message.getError()); // FIXME + default: + break; + } + default: + break; + } + throw new AssertionError(); // may not occur + } + } + + public FutureResponse handshake(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException { + var handshakeMessageBuilder = EndpointRequest.Handshake.newBuilder(); +// if (credential != NullCredential.INSTANCE) { +// handshakeMessageBuilder.setAuthInfo(credential); // FIXME +// } + handshakeMessageBuilder.setClientInformation(EndpointRequest.ClientInformation.newBuilder() + .setConnectionLabel(clientInformation.connectionLabel()) + .setApplicationName(clientInformation.applicationName()) + .setUserName(clientInformation.userName()) + .setConnectionInformation(clientInformation.connectionInformation()) + .setMaximumConcurrentResultSets(clientInformation.maximumConcurrentResultSets())); + FutureResponse future = send( + SERVICE_ID_ENDPOINT_BROKER, + toDelimitedByteArray(newRequest() + .setHandshake(handshakeMessageBuilder) + .build()) + ); + return new ForegroundFutureResponse<>(future, new HandshakeProcessor().asResponseProcessor()); + } + + public void setSessionID(long id) throws IOException { + if (sessionID == SESSION_ID_IS_NOT_ASSIGNED) { + this.sessionID = id; + return; + } + throw new IOException("handshake error (session ID is already assigned)"); + } + + public void checkSessionID(long id) throws IOException { + if (sessionID != id) { + throw new IOException(MessageFormat.format("handshake error (inconsistent session ID), {0} not equal {1}", sessionID, id)); + } + } + byte[] toDelimitedByteArray(FrameworkRequest.Header request) throws IOException { try (var buffer = new ByteArrayOutputStream()) { request.writeDelimitedTo(buffer); @@ -135,6 +227,18 @@ byte[] toDelimitedByteArray(FrameworkRequest.Header request) throws IOException } } + byte[] toDelimitedByteArray(EndpointRequest.Request request) throws IOException { + try (var buffer = new ByteArrayOutputStream()) { + request.writeDelimitedTo(buffer); + return buffer.toByteArray(); + } + } + + static CoreServiceException newUnknown(@Nonnull EndpointResponse.Error message) { + assert message != null; + return new CoreServiceException(CoreServiceCode.UNKNOWN, message.getMessage()); + } + // for diagnostic public long sessionID() { return sessionID; diff --git a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java index 523dfcf2c..a2c981e32 100644 --- a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java +++ b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java @@ -5,7 +5,12 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nonnull; + +import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; +import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; +import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl; import com.tsurugidb.tsubakuro.exception.ServerException; import com.tsurugidb.tsubakuro.util.FutureResponse; @@ -14,27 +19,45 @@ */ public class FutureIpcWireImpl implements FutureResponse { + private final Credential credential; + private final ClientInformation clientInformation; private IpcConnectorImpl connector; private long id; private final AtomicBoolean gotton = new AtomicBoolean(); - FutureIpcWireImpl(IpcConnectorImpl connector, long id) { + FutureIpcWireImpl(IpcConnectorImpl connector, long id, @Nonnull Credential credential, @Nonnull ClientInformation clientInformation) { this.connector = connector; this.id = id; + this.credential = credential; + this.clientInformation = clientInformation; } @Override - public Wire get() throws IOException { + public Wire get() throws IOException, ServerException, InterruptedException { if (!gotton.getAndSet(true)) { - return connector.getSessionWire(id); + var wire = connector.getSessionWire(id); + if (wire instanceof WireImpl) { + var wireImpl = (WireImpl) wire; + var futureSessionID = wireImpl.handshake(credential, clientInformation); + wireImpl.checkSessionID(futureSessionID.get()); + return wireImpl; + } + throw new IOException("FutureIpcWireImpl programing error"); // never occure } throw new IOException("FutureIpcWireImpl is already closed"); } @Override - public Wire get(long timeout, TimeUnit unit) throws TimeoutException, IOException { + public Wire get(long timeout, TimeUnit unit) throws TimeoutException, IOException, ServerException, InterruptedException { if (!gotton.getAndSet(true)) { - return connector.getSessionWire(id, timeout, unit); + var wire = connector.getSessionWire(id, timeout, unit); + if (wire instanceof WireImpl) { + var wireImpl = (WireImpl) wire; + var futureSessionID = wireImpl.handshake(credential, clientInformation); + wireImpl.checkSessionID(futureSessionID.get(timeout, unit)); + return wireImpl; + } + throw new IOException("FutureIpcWireImpl programing error"); // never occure } throw new IOException("FutureIpcWireImpl is already closed"); } diff --git a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java index 7619f84a6..e37bb695e 100644 --- a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java +++ b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java @@ -1,13 +1,13 @@ package com.tsurugidb.tsubakuro.channel.ipc.connection; import java.io.IOException; +import java.lang.ref.Cleaner; +import java.net.ConnectException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; -import java.lang.ref.Cleaner; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +61,10 @@ public FutureResponse connect(@Nonnull Credential credential, @Nonnull Cli } try { long id = requestNative(handle); - return new FutureIpcWireImpl(this, id); + clientInformation.connectionInformation(Long.toString(ProcessHandle.current().pid())); + return new FutureIpcWireImpl(this, id, credential, clientInformation); } catch (IOException e) { - throw new IOException("the server has declined the connection request"); + throw new ConnectException("the server has declined the connection request"); } } diff --git a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java index 2fc2390d9..33374a5d5 100644 --- a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java +++ b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java @@ -3,11 +3,16 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.OutputStream; +import java.net.ServerSocket; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.io.Closeable; import java.io.IOException; import com.tsurugidb.sql.proto.SqlRequest; import com.tsurugidb.sql.proto.SqlResponse; +import com.tsurugidb.endpoint.proto.EndpointRequest; +import com.tsurugidb.endpoint.proto.EndpointResponse; import com.tsurugidb.framework.proto.FrameworkRequest; import com.tsurugidb.framework.proto.FrameworkResponse; @@ -15,10 +20,14 @@ * ServerWireImpl type. */ public class ServerWireImpl implements Closeable { - private long wireHandle = 0; // for c++ - private String dbName; + private final long wireHandle; // for c++ + private final String dbName; + private final Semaphore available = new Semaphore(0, true); private long sessionID; - private boolean takeSendAction; + private final boolean takeSendAction; + private final ReceiveWorker receiver; + private SqlRequest.Request sqlRequest; + private final AtomicBoolean closed = new AtomicBoolean(); private static native long createNative(String name); private static native byte[] getNative(long handle); @@ -44,14 +53,47 @@ private static byte[] dump(WriteAction action) throws IOException, InterruptedEx } } + private class ReceiveWorker extends Thread { + ReceiveWorker() throws IOException { + } + @Override + public void run() { + try { + handshake(); + while (true) { + try { + var byteArrayInputStream = new ByteArrayInputStream(getNative(wireHandle)); + if (byteArrayInputStream.available() == 0) { + close(); + return; + } + var header = FrameworkRequest.Header.parseDelimitedFrom(byteArrayInputStream); + sessionID = header.getSessionId(); + sqlRequest = SqlRequest.Request.parseDelimitedFrom(byteArrayInputStream); + available.release(); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + System.err.println(e); + e.printStackTrace(); + throw new IOException("error: ServerWireImpl.get()"); + } + } + } catch (IOException e) { + e.printStackTrace(); + System.err.println(e); + } + } + } + public ServerWireImpl(String dbName, long sessionID, boolean takeSendAction) throws IOException { this.dbName = dbName; this.sessionID = sessionID; this.takeSendAction = takeSendAction; - wireHandle = createNative(dbName + "-" + String.valueOf(sessionID)); + this.wireHandle = createNative(dbName + "-" + String.valueOf(sessionID)); if (wireHandle == 0) { throw new IOException("error: ServerWireImpl.ServerWireImpl()"); } + this.receiver = new ReceiveWorker(); + receiver.start(); } public ServerWireImpl(String dbName, long sessionID) throws IOException { @@ -59,9 +101,8 @@ public ServerWireImpl(String dbName, long sessionID) throws IOException { } public void close() throws IOException { - if (wireHandle != 0) { + if (!closed.getAndSet(true)) { closeNative(wireHandle); - wireHandle = 0; } } @@ -70,19 +111,48 @@ public long getSessionID() { } /** - * Get SqlRequest.Request from a client via the native wire. - @returns SqlRequest.Request + * implement handshake protocol + * step 1) receives the FrameworkRequest.Header and EndpointRequest.Request + * step 2) sends a FrameworkResponse.Header and EndpointResponse.Handshake with success */ - public SqlRequest.Request get() throws IOException { + public void handshake() throws IOException { try { var byteArrayInputStream = new ByteArrayInputStream(getNative(wireHandle)); var header = FrameworkRequest.Header.parseDelimitedFrom(byteArrayInputStream); - sessionID = header.getSessionId(); - return SqlRequest.Request.parseDelimitedFrom(byteArrayInputStream); + var request = EndpointRequest.Request.parseDelimitedFrom(byteArrayInputStream); + try { + var response = EndpointResponse.Handshake.newBuilder() + .setSuccess(EndpointResponse.Handshake.Success.newBuilder().setSessionId(1)) // who assign this sessionId? + .build(); + byte[] resposeByteArray = dump(out -> { + FrameworkResponse.Header.newBuilder().build().writeDelimitedTo(out); + response.writeDelimitedTo(out); + }); + if (wireHandle != 0) { + putNative(wireHandle, resposeByteArray); + } else { + throw new IOException("error: sessionWireHandle is 0"); + } + } catch (IOException | InterruptedException e) { + throw new IOException(e); + } } catch (com.google.protobuf.InvalidProtocolBufferException e) { System.err.println(e); e.printStackTrace(); - throw new IOException("error: ServerWireImpl.get()"); + throw new IOException("error: ServerWireImpl.getHandshakeRequest()"); + } + } + + /** + * Get SqlRequest.Request from a client via the native wire. + @returns SqlRequest.Request + */ + public SqlRequest.Request get() throws IOException { + try { + available.acquire(); + return sqlRequest; + } catch(InterruptedException e) { + throw new IOException(e); } } diff --git a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java index 94dd1ccc8..e1a1ca7a2 100644 --- a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java +++ b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java @@ -12,6 +12,8 @@ import org.junit.jupiter.api.Test; +import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; +import com.tsurugidb.tsubakuro.channel.common.connection.NullCredential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl; import com.tsurugidb.tsubakuro.channel.ipc.IpcLink; import com.tsurugidb.tsubakuro.exception.ServerException; @@ -31,6 +33,7 @@ void requestBegin() throws Exception { try { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); + client.handshake(NullCredential.INSTANCE, new ClientInformation()); } catch (Exception e) { fail("cought Exception"); } @@ -50,7 +53,8 @@ void inconsistentResponse() { try { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - + client.handshake(NullCredential.INSTANCE, new ClientInformation()); + // REQUEST test begin // client side send Request var futureResponse = client.send(SERVICE_ID_SQL, DelimitedConverter.toByteArray(ProtosForTest.BeginRequestChecker.builder().build())); @@ -78,6 +82,7 @@ void inconsistentResponse() { void timeout() throws Exception { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); + client.handshake(NullCredential.INSTANCE, new ClientInformation()); // REQUEST test begin // client side send Request @@ -107,6 +112,7 @@ void timeout() throws Exception { void serverCrashDetectionTest() throws Exception { server = new ServerWireImpl(dbName, sessionID, false); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); + client.handshake(NullCredential.INSTANCE, new ClientInformation()); // REQUEST test begin // client side send Request diff --git a/modules/ipc/src/test/native/include/server_wires.h b/modules/ipc/src/test/native/include/server_wires.h index 84bedb966..12c140102 100644 --- a/modules/ipc/src/test/native/include/server_wires.h +++ b/modules/ipc/src/test/native/include/server_wires.h @@ -58,27 +58,38 @@ class server_wire_container class wire_container { public: wire_container() = default; - wire_container(unidirectional_message_wire* wire, char* bip_buffer) : wire_(wire), bip_buffer_(bip_buffer) {}; - message_header peep(bool wait = false) { - return wire_->peep(bip_buffer_, wait); + wire_container(unidirectional_message_wire* wire, char* bip_buffer, server_wire_container* envelope) + : wire_(wire), bip_buffer_(bip_buffer), envelope_(envelope) { + } + message_header peep() { + auto rv = wire_->peep(bip_buffer_, true); + envelope_->slot(rv.get_idx()); + return rv; } std::string_view payload() { return wire_->payload(bip_buffer_); } + void dispose() { + return wire_->dispose(); + } private: unidirectional_message_wire* wire_{}; char* bip_buffer_{}; + server_wire_container* envelope_; }; class response_wire_container { public: response_wire_container() = default; - response_wire_container(unidirectional_response_wire* wire, char* bip_buffer) : wire_(wire), bip_buffer_(bip_buffer) {}; + response_wire_container(unidirectional_response_wire* wire, char* bip_buffer, server_wire_container* envelope) + : wire_(wire), bip_buffer_(bip_buffer), envelope_(envelope) {}; void write(const char* from, response_header header) { - wire_->write(bip_buffer_, from, header); + response_header rh(envelope_->slot(), header.get_length(), header.get_type()); + wire_->write(bip_buffer_, from, rh); } private: unidirectional_response_wire* wire_{}; char* bip_buffer_{}; + server_wire_container* envelope_; }; using resultset_wire = shm_resultset_wire; @@ -93,8 +104,8 @@ class server_wire_container auto res_wire = managed_shared_memory_->construct(response_wire_name)(managed_shared_memory_.get(), response_buffer_size); status_provider_ = managed_shared_memory_->construct(status_provider_name)(managed_shared_memory_.get(), "dummy_as_it_is_test"); - request_wire_ = wire_container(req_wire, req_wire->get_bip_address(managed_shared_memory_.get())); - response_wire_ = response_wire_container(res_wire, res_wire->get_bip_address(managed_shared_memory_.get())); + request_wire_ = wire_container(req_wire, req_wire->get_bip_address(managed_shared_memory_.get()), this); + response_wire_ = response_wire_container(res_wire, res_wire->get_bip_address(managed_shared_memory_.get()), this); } catch(const boost::interprocess::interprocess_exception& ex) { std::abort(); // FIXME @@ -125,7 +136,9 @@ class server_wire_container } return resultset_wires_.get(); } - + + void slot(response_header::index_type slot) { slot_ = slot; } + response_header::index_type slot() { return slot_; } private: std::string name_; std::unique_ptr managed_shared_memory_{}; @@ -133,6 +146,7 @@ class server_wire_container response_wire_container response_wire_; status_provider* status_provider_{}; std::unique_ptr resultset_wires_{}; + response_header::index_type slot_{}; }; class connection_container diff --git a/modules/ipc/src/test/native/src/server_wireJNI.cpp b/modules/ipc/src/test/native/src/server_wireJNI.cpp index fa2cc9eb4..02adb9e77 100644 --- a/modules/ipc/src/test/native/src/server_wireJNI.cpp +++ b/modules/ipc/src/test/native/src/server_wireJNI.cpp @@ -9,8 +9,8 @@ using namespace tateyama::common::wire; JNIEXPORT jlong JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_sql_ServerWireImpl_createNative (JNIEnv *env, [[maybe_unused]] jclass thisObj, jstring name) { - const char* name_ = env->GetStringUTFChars(name, NULL); - if (name_ == NULL) return 0; + const char* name_ = env->GetStringUTFChars(name, nullptr); + if (name_ == nullptr) return 0; jsize len_ = env->GetStringUTFLength(name); server_wire_container* container = new server_wire_container(std::string_view(name_, len_)); @@ -25,21 +25,23 @@ JNIEXPORT jbyteArray JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_sql_Server auto& wire = container->get_request_wire(); message_header h = wire.peep(); - if (h.get_idx() != 0) { - std::abort(); // out of the scope of this test program - } std::size_t length = h.get_length(); jbyteArray dstj = env->NewByteArray(length); - if (dstj == NULL) { - return NULL; + if (dstj == nullptr) { + wire.dispose(); + return nullptr; } - jbyte* dst = env->GetByteArrayElements(dstj, NULL); - if (dst == NULL) { - return NULL; + jbyte* dst = env->GetByteArrayElements(dstj, nullptr); + if (dst == nullptr) { + wire.dispose(); + return nullptr; } - memcpy(dst, wire.payload().data(), length); + if (length > 0) { + memcpy(dst, wire.payload().data(), length); + } env->ReleaseByteArrayElements(dstj, dst, 0); + wire.dispose(); return dstj; } @@ -57,7 +59,7 @@ JNIEXPORT void JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_sql_ServerWireIm std::abort(); // This is OK, because server_wire is used for test purpose only } - // use slot 0 only in test + // 0 will be rewrited response_header header(0, capacity, RESPONSE_PAYLOAD); container->write(src, header); env->ReleaseByteArrayElements(srcj, src, 0); @@ -81,8 +83,8 @@ JNIEXPORT jlong JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_sql_ServerWireI { server_wire_container* container = reinterpret_cast(static_cast(handle)); - const char* name_ = env->GetStringUTFChars(name, NULL); - if (name_ == NULL) return 0; + const char* name_ = env->GetStringUTFChars(name, nullptr); + if (name_ == nullptr) return 0; jsize len_ = env->GetStringUTFLength(name); server_wire_container::resultset_wires_container* rs_container = container->create_resultset_wires(std::string_view(name_, len_)); diff --git a/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto b/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto index c99506ec8..4ccfb201c 100644 --- a/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto +++ b/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto @@ -6,6 +6,8 @@ option java_multiple_files = false; option java_package = "com.tsurugidb.endpoint.proto"; option java_outer_classname = "EndpointRequest"; +import "tateyama/proto/auth/request.proto"; + // the request message to endpoint pseudo service. message Request { // service message version (major) @@ -27,11 +29,11 @@ message Request { // handshake operation. message Handshake { - // the client information. - ClientInformation client_infomation = 1; - // the auth info. - AuthInfo auth_info = 2; + auth.request.AuthInfo auth_info = 1; + + // the client information. + ClientInformation client_information = 2; } // client information @@ -42,9 +44,15 @@ message ClientInformation { // the application name. string application_name = 2; + // the user name. + string user_name = 4; + // the connection information - string connection_information = 3; -} + string connection_information = 5; -// auto info. -message AuthInfo {} + // reserved for future use + reserved 6 to 10; + + // the maximum concurrent result sets + uint64 maximum_concurrent_result_sets = 11; +} diff --git a/modules/proto/src/main/protos/tateyama/proto/endpoint/response.proto b/modules/proto/src/main/protos/tateyama/proto/endpoint/response.proto index 079438cbd..00992b37b 100644 --- a/modules/proto/src/main/protos/tateyama/proto/endpoint/response.proto +++ b/modules/proto/src/main/protos/tateyama/proto/endpoint/response.proto @@ -6,25 +6,36 @@ option java_multiple_files = false; option java_package = "com.tsurugidb.endpoint.proto"; option java_outer_classname = "EndpointResponse"; -// empty message -message Void {} +import "tateyama/proto/diagnostics.proto"; // unknown error was occurred. -message UnknownError { +message Error { // the error message. string message = 1; + + // error code + diagnostics.Code code = 2; + + // supplemental text for debug purpose + string supplemental_text = 4; } // handshake operation. message Handshake { reserved 1 to 10; - + // the response body. oneof result { // request is successfully completed. - Void success = 11; + Success success = 11; // unknown error was occurred. - UnknownError unknown_error = 13; + Error error = 12; + } + + // request is successfully completed. + message Success { + // the session id. + uint64 session_id = 11; } } diff --git a/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java b/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java index 6e73c9ec3..b2323c7f1 100644 --- a/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java +++ b/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java @@ -91,7 +91,7 @@ public SessionBuilder withCredential(@Nonnull Credential credential) { */ public SessionBuilder withLabel(@Nonnull String label) { Objects.requireNonNull(label); - this.clientInformation.label(label); + this.clientInformation.connectionlabel(label); return this; } @@ -106,6 +106,17 @@ public SessionBuilder withApplicationName(@Nonnull String applicationName) { return this; } + /** + * Sets userName information to connect. + * @param userName the userName information + * @return this + */ + public SessionBuilder withUserName(@Nonnull String userName) { + Objects.requireNonNull(userName); + this.clientInformation.userName(userName); + return this; + } + /** * Establishes a connection to the Tsurugi server. * This operation will block until the connection was established, diff --git a/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java b/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java index bdfe5c5c6..88bd9257f 100644 --- a/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java +++ b/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java @@ -56,4 +56,50 @@ public FutureResponse connect(Credential credential, ClientInformation cli } } + @Test + void createWithClientInformation() throws Exception { + try (var wire = new MockWire()) { + String label = "label for the test"; + String applicationName = "applicationName for the test"; + String userName = "userName for the test"; + var builder = SessionBuilder.connect(new Connector() { + @Override + public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { + assertSame(clientInformation.connectionLabel(), label); + assertSame(clientInformation.applicationName(), applicationName); + assertSame(clientInformation.userName(), userName); + return FutureResponse.wrap(Owner.of(wire)); + } + }) + .withLabel(label).withApplicationName(applicationName).withUserName(userName); + try (var session = builder.create()) { + // ok. + } + } + } + + @Test + void createAsyncWithClientInformation() throws Exception { + try (var wire = new MockWire()) { + String label = "label for the test"; + String applicationName = "applicationName for the test"; + String userName = "userName for the test"; + var builder = SessionBuilder.connect(new Connector() { + @Override + public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { + assertSame(clientInformation.connectionLabel(), label); + assertSame(clientInformation.applicationName(), applicationName); + assertSame(clientInformation.userName(), userName); + return FutureResponse.wrap(Owner.of(wire)); + } + }) + .withLabel(label).withApplicationName(applicationName).withUserName(userName); + try ( + var fSession = builder.createAsync(); + var session = fSession.get(10, TimeUnit.SECONDS)) { + // ok. + } + } + } + } diff --git a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/StreamLink.java b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/StreamLink.java index 5c36f5728..bc5d6e06a 100644 --- a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/StreamLink.java +++ b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/StreamLink.java @@ -10,9 +10,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,7 +18,6 @@ import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ChannelResponse; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.Link; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.LinkMessage; -import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ResponseBox; import com.tsurugidb.tsubakuro.channel.stream.sql.ResultSetBox; import com.tsurugidb.tsubakuro.channel.stream.sql.ResultSetWireImpl; import com.tsurugidb.tsubakuro.exception.ResponseTimeoutException; @@ -32,26 +28,25 @@ public final class StreamLink extends Link { private BufferedOutputStream outStream; private DataInputStream inStream; private ResultSetBox resultSetBox = new ResultSetBox(); - private final Lock lock = new ReentrantLock(); - private final AtomicReference helloResponse = new AtomicReference<>(); private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean socketError = new AtomicBoolean(); private final AtomicBoolean socketClosed = new AtomicBoolean(); - private static final byte REQUEST_SESSION_HELLO = 1; + // 1 is nolonger used private static final byte REQUEST_SESSION_PAYLOAD = 2; private static final byte REQUEST_RESULT_SET_BYE_OK = 3; private static final byte REQUEST_SESSION_BYE = 4; public static final byte RESPONSE_SESSION_PAYLOAD = 1; public static final byte RESPONSE_RESULT_SET_PAYLOAD = 2; - public static final byte RESPONSE_SESSION_HELLO_OK = 3; - public static final byte RESPONSE_SESSION_HELLO_NG = 4; + // 3, 4 are nolonger used public static final byte RESPONSE_RESULT_SET_HELLO = 5; public static final byte RESPONSE_RESULT_SET_BYE = 6; public static final byte RESPONSE_SESSION_BODYHEAD = 7; public static final byte RESPONSE_SESSION_BYE_OK = 8; + private static final int TERMINATION_REQUEST = 0xffff; + static final Logger LOG = LoggerFactory.getLogger(StreamLink.class); public StreamLink(String hostname, int port) throws IOException { @@ -59,24 +54,6 @@ public StreamLink(String hostname, int port) throws IOException { this.socket.setTcpNoDelay(true); this.outStream = new BufferedOutputStream(socket.getOutputStream()); this.inStream = new DataInputStream(socket.getInputStream()); - this.helloResponse.set(null); - send(REQUEST_SESSION_HELLO, ResponseBox.responseBoxSize()); - } - - public LinkMessage helloResponse(long timeout, TimeUnit unit) throws IOException, TimeoutException { - lock.lock(); - try { - while (helloResponse.get() == null) { - try { - doPull(timeout, unit, true); - } catch (IOException e) { - throw new IOException("Server crashed"); - } - } - return helloResponse.get(); - } finally { - lock.unlock(); - } } @Override @@ -157,17 +134,6 @@ private boolean doPull(long timeout, TimeUnit unit, boolean throwException) thro resultSetBox.pushBye(slot); return true; - case RESPONSE_SESSION_HELLO_OK: - case RESPONSE_SESSION_HELLO_NG: - LOG.trace("receive SESSION_HELLO_{}", ((info == RESPONSE_SESSION_HELLO_OK) ? "OK" : "NG")); - lock.lock(); - try { - helloResponse.set(message); - } finally { - lock.unlock(); - } - return true; - case RESPONSE_SESSION_BYE_OK: LOG.trace("receive RESPONSE_SESSION_BYE_OK"); closeBoxes(true); @@ -195,7 +161,7 @@ public ResultSetBox getResultSetBox() { return resultSetBox; } - private void send(byte i, int s) throws IOException { // SESSION_HELLO, RESULT_SET_BYE_OK + private void send(byte i, int s) throws IOException { // REQUEST_SESSION_BYE, RESULT_SET_BYE_OK byte[] header = new byte[7]; header[0] = i; // info @@ -213,7 +179,7 @@ private void send(byte i, int s) throws IOException { // SESSION_HELLO, RESULT_ outStream.write(header, 0, header.length); outStream.flush(); } - LOG.trace("send {}, slot = {}", ((i == REQUEST_SESSION_HELLO) ? "SESSION_HELLO" : "RESULT_SET_BYE_OK"), s); //$NON-NLS-1$ + LOG.trace("send RESULT_SET_BYE_OK, slot = {}", s); //$NON-NLS-1$ } @Override @@ -313,7 +279,7 @@ public boolean isAlive() { public void close() throws IOException, ServerException { if (!closed.getAndSet(true) && !socketError.get()) { try (var c1 = socket; var c2 = inStream; var c3 = outStream) { - send(REQUEST_SESSION_BYE, 0); + send(REQUEST_SESSION_BYE, TERMINATION_REQUEST); while (!socketClosed.get()) { doPull(timeout, timeUnit); } diff --git a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java index 0a3df439d..de61671f9 100644 --- a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java +++ b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java @@ -5,9 +5,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nonnull; + +import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; +import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; -import com.tsurugidb.tsubakuro.channel.stream.StreamLink; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl; +import com.tsurugidb.tsubakuro.channel.stream.StreamLink; import com.tsurugidb.tsubakuro.exception.ServerException; import com.tsurugidb.tsubakuro.util.FutureResponse; @@ -16,32 +20,37 @@ */ public class FutureStreamWireImpl implements FutureResponse { + private final Credential credential; + private final ClientInformation clientInformation; StreamLink streamLink; private final AtomicBoolean gotton = new AtomicBoolean(); - FutureStreamWireImpl(StreamLink streamLink) { + FutureStreamWireImpl(StreamLink streamLink, @Nonnull Credential credential, @Nonnull ClientInformation clientInformation) { this.streamLink = streamLink; + this.credential = credential; + this.clientInformation = clientInformation; } @Override - public Wire get() throws IOException { - return get(0, null); // No timeout + public Wire get() throws IOException, ServerException, InterruptedException { + if (!gotton.getAndSet(true)) { + var wireImpl = new WireImpl(streamLink); + var futureSessionID = wireImpl.handshake(credential, clientInformation); + wireImpl.setSessionID(futureSessionID.get()); + return wireImpl; + } + throw new IOException("FutureStreamWireImpl already closed."); } @Override - public Wire get(long timeout, TimeUnit unit) throws IOException { + public Wire get(long timeout, TimeUnit unit) throws IOException, ServerException, InterruptedException, TimeoutException { if (!gotton.getAndSet(true)) { - try { - var message = streamLink.helloResponse(timeout, unit); - if (message.getInfo() == StreamLink.RESPONSE_SESSION_HELLO_OK) { - return new WireImpl(streamLink, Long.parseLong(message.getString())); - } - throw new IOException("the server has declined the connection request"); - } catch (TimeoutException e) { - throw new IOException(e); - } + var wireImpl = new WireImpl(streamLink); + var futureSessionID = wireImpl.handshake(credential, clientInformation); + wireImpl.setSessionID(futureSessionID.get(timeout, unit)); + return wireImpl; } - throw new IOException("programming error: FutureStreamWire is already closed"); + throw new IOException("FutureStreamWireImpl already closed."); } @Override diff --git a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java index b0e98d6cc..19b8c57ff 100644 --- a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java +++ b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java @@ -11,6 +11,7 @@ import com.tsurugidb.tsubakuro.channel.common.connection.Connector; import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; +import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ResponseBox; import com.tsurugidb.tsubakuro.channel.stream.StreamLink; import com.tsurugidb.tsubakuro.util.FutureResponse; @@ -33,6 +34,7 @@ public StreamConnectorImpl(String hostname, int port) { @Override public FutureResponse connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException { LOG.trace("will connect to {}:{}", hostname, port); //$NON-NLS-1$ - return new FutureStreamWireImpl(new StreamLink(hostname, port)); + clientInformation.maximumConcurrentResultSets(ResponseBox.responseBoxSize()); + return new FutureStreamWireImpl(new StreamLink(hostname, port), credential, clientInformation); } } diff --git a/modules/stream/src/test/java/com/tsurugidb/tsubakuro/channel/stream/ServerStreamLink.java b/modules/stream/src/test/java/com/tsurugidb/tsubakuro/channel/stream/ServerStreamLink.java index b5ed6a928..9c99d76ec 100644 --- a/modules/stream/src/test/java/com/tsurugidb/tsubakuro/channel/stream/ServerStreamLink.java +++ b/modules/stream/src/test/java/com/tsurugidb/tsubakuro/channel/stream/ServerStreamLink.java @@ -24,22 +24,6 @@ public ServerStreamLink(Socket socket) throws IOException { this.sendOk = false; } - public void sendResponseHelo() throws IOException { - byte[] header = new byte[7]; - - header[0] = StreamLink.RESPONSE_SESSION_HELLO_OK; // info - header[1] = 0; - header[2] = 0; - header[3] = 0; - header[4] = 0; - header[5] = 0; - header[6] = 0; - - synchronized (this) { - outStream.write(header, 0, header.length); - } - } - public void sendResponse(int s, byte[] payload) throws IOException { byte[] header = new byte[7]; int length = payload.length; diff --git a/modules/stream/src/test/java/com/tsurugidb/tsubakuro/channel/stream/sql/ServerWireImpl.java b/modules/stream/src/test/java/com/tsurugidb/tsubakuro/channel/stream/sql/ServerWireImpl.java index 319a76232..12c5496b3 100644 --- a/modules/stream/src/test/java/com/tsurugidb/tsubakuro/channel/stream/sql/ServerWireImpl.java +++ b/modules/stream/src/test/java/com/tsurugidb/tsubakuro/channel/stream/sql/ServerWireImpl.java @@ -32,6 +32,7 @@ public class ServerWireImpl implements Closeable { private final ArrayDeque sendQueue; private SendWorker sender; private final long sessionID; + private int slot; private static class Message { byte[] bytes; @@ -59,10 +60,8 @@ public void run() { LOG.info("accept client: {}", port); while (serverStreamLink.receive()) { LOG.debug("received: ", serverStreamLink.getInfo()); + slot = serverStreamLink.getSlot(); switch (serverStreamLink.getInfo()) { - case 1: // StreamLink.REQUEST_SESSION_HELLO - serverStreamLink.sendResponseHelo(); - break; case 2: // StreamLink.REQUEST_SESSION_PAYLOAD receiveQueue.add(new Message(serverStreamLink.getBytes())); break; @@ -92,12 +91,12 @@ private class SendWorker extends Thread { @Override public void run() { try { - serverStreamLink.sendRecordHello(0, name); + serverStreamLink.sendRecordHello(slot, name); while (true) { if (serverStreamLink.isSnedOk()) { while (!sendQueue.isEmpty()) { var entry = sendQueue.poll().getBytes(); - serverStreamLink.sendRecord(0, 0, entry); + serverStreamLink.sendRecord(slot, 0, entry); if (entry.length == 0) { return; } @@ -128,6 +127,7 @@ public ServerWireImpl(int port, long sessionID) throws IOException { this.receiveQueue = new ArrayDeque(); this.sendQueue = new ArrayDeque(); this.receiver = new ReceiveWorker(port); + this.slot = 0; receiver.start(); } @@ -175,7 +175,7 @@ public void put(SqlResponse.Response response) throws IOException { header.writeDelimitedTo(buffer); response.writeDelimitedTo(buffer); var bytes = buffer.toByteArray(); - serverStreamLink.sendResponse(0, bytes); + serverStreamLink.sendResponse(slot, bytes); } } From 94f886ff3785ccd4b9c49de669bd36b26e17af9f Mon Sep 17 00:00:00 2001 From: t-horikawa Date: Mon, 15 Jan 2024 17:15:35 +0900 Subject: [PATCH 3/5] apply comments --- .../common/connection/ClientInformation.java | 122 +++++------------- .../common/connection/wire/impl/WireImpl.java | 29 +++-- .../ipc/connection/FutureIpcWireImpl.java | 11 +- .../ipc/connection/IpcConnectorImpl.java | 1 - .../channel/ipc/sql/ResultSetWireTest.java | 9 -- .../channel/ipc/sql/ServerWireImpl.java | 40 +++--- .../channel/ipc/sql/SessionWireTest.java | 8 +- .../tateyama/proto/endpoint/request.proto | 43 ++++-- .../tsubakuro/common/SessionBuilder.java | 38 +++--- .../tsubakuro/common/SessionBuilderTest.java | 12 +- .../connection/FutureStreamWireImpl.java | 12 +- .../connection/StreamConnectorImpl.java | 2 - 12 files changed, 153 insertions(+), 174 deletions(-) diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java index 0be751510..b088c974e 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java @@ -2,123 +2,71 @@ import java.text.MessageFormat; +import javax.annotation.Nullable; + /** * A credential by user name and password. */ -public class ClientInformation { - - private String connectionLabel; - - private String applicationName; +public final class ClientInformation { - private String userName; + private final String connectionLabel; - private String connectionInformation; // used by IPC only + private final String applicationName; - private long maxResultSets; // used by Stream + private final String userName; /** - * Creates a new instance. + * Creates a new instance without information. */ public ClientInformation() { + this.connectionLabel = null; + this.applicationName = null; + this.userName = null; } /** - * Get the connection label. - * @return the connection label. - */ - public String connectionLabel() { - if (connectionLabel != null) { - return connectionLabel; - } - return ""; - } - - /** - * Get the application name. - * @return the application name. - */ - public String applicationName() { - if (applicationName != null) { - return applicationName; - } - return ""; - } - - /** - * Get the user name. - * @return the user name. - */ - public String userName() { - if (userName != null) { - return userName; - } - return ""; - } - - /** - * Get the connection information. - * @return the connection information. - */ - public String connectionInformation() { - if (connectionInformation != null) { - return connectionInformation; - } - return ""; - } - - /** - * Get the maximumConcurrentResultSets. - * @return the maximumConcurrentResultSets. - */ - public long maximumConcurrentResultSets() { - return maxResultSets; - } - - /** - * Set label. - * @param labelString the label - */ - public void connectionlabel(String labelString) { - this.connectionLabel = labelString; - } - - /** - * Set application name. - * @param applicationNameString the application name + * Creates a new instance. + * @param connectionLabel the label + * @param applicationName the application name + * @param userName the application name */ - public void applicationName(String applicationNameString) { - this.applicationName = applicationNameString; + public ClientInformation(@Nullable String connectionLabel, @Nullable String applicationName, @Nullable String userName) { + this.connectionLabel = connectionLabel; + this.applicationName = applicationName; + this.userName = userName; } /** - * Set user name. - * @param userNameString the application name + * Get the connection label. + * @return the connection label, null if connection label has not been set. */ - public void userName(String userNameString) { - this.userName = userNameString; + public String getConnectionLabel() { + return connectionLabel; } /** - * Set connectionInformation name. - * @param connectionInformationString the connectionInformation + * Get the application name. + * @return the application name, null if application name has not been set. */ - public void connectionInformation(String connectionInformationString) { - this.connectionInformation = connectionInformationString; + public String getApplicationName() { + return applicationName; } /** - * Set maximumConcurrentResultSets. - * @param maximumConcurrentResultSets the number of maximumConcurrentResultSets + * Get the user name. + * @return the user name, null if user name has not been set. */ - public void maximumConcurrentResultSets(long maximumConcurrentResultSets) { - this.maxResultSets = maximumConcurrentResultSets; + public String getUserName() { + return userName; } @Override public String toString() { return MessageFormat.format( - "ClientInformation(connectionLabel={0}, applicationName={1}, userName={2}, connectionInformation={3}, maximumConcurrentResultSets={4})", - connectionLabel, applicationName, userName, connectionInformation, maxResultSets); + "ClientInformation(connectionLabel={0}, applicationName={1}, userName={2})", + checkNull(connectionLabel), checkNull(applicationName), checkNull(userName)); + } + private String checkNull(String string) { + return (string != null) ? string : ""; } } diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java index 516543ddc..1f442074e 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,17 +187,25 @@ public Long process(ByteBuffer payload) throws IOException, ServerException, Int } } - public FutureResponse handshake(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException { + public FutureResponse handshake(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation, @Nullable EndpointRequest.WireInformation wireInformation) throws IOException { var handshakeMessageBuilder = EndpointRequest.Handshake.newBuilder(); -// if (credential != NullCredential.INSTANCE) { -// handshakeMessageBuilder.setAuthInfo(credential); // FIXME -// } - handshakeMessageBuilder.setClientInformation(EndpointRequest.ClientInformation.newBuilder() - .setConnectionLabel(clientInformation.connectionLabel()) - .setApplicationName(clientInformation.applicationName()) - .setUserName(clientInformation.userName()) - .setConnectionInformation(clientInformation.connectionInformation()) - .setMaximumConcurrentResultSets(clientInformation.maximumConcurrentResultSets())); + if (wireInformation != null) { + handshakeMessageBuilder.setWireInformation(wireInformation); + } + + var clientInformationBuilder = EndpointRequest.ClientInformation.newBuilder(); + if (clientInformation.getConnectionLabel() != null) { + clientInformationBuilder.setConnectionLabel(clientInformation.getConnectionLabel()); + } + if (clientInformation.getApplicationName() != null) { + clientInformationBuilder.setApplicationName(clientInformation.getApplicationName()); + } + if (clientInformation.getUserName() != null) { + clientInformationBuilder.setCredential( + EndpointRequest.Credential.newBuilder().setUserName(clientInformation.getUserName())); + } + handshakeMessageBuilder.setClientInformation(clientInformationBuilder); + FutureResponse future = send( SERVICE_ID_ENDPOINT_BROKER, toDelimitedByteArray(newRequest() diff --git a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java index a2c981e32..b4b9b1387 100644 --- a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java +++ b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java @@ -7,6 +7,7 @@ import javax.annotation.Nonnull; +import com.tsurugidb.endpoint.proto.EndpointRequest; import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; @@ -32,13 +33,19 @@ public class FutureIpcWireImpl implements FutureResponse { this.clientInformation = clientInformation; } + private EndpointRequest.WireInformation wireInformation() { + return EndpointRequest.WireInformation.newBuilder().setIpcInformation( + EndpointRequest.WireInformation.IpcInformation.newBuilder().setConnectionInformation(Long.toString(ProcessHandle.current().pid())) + ).build(); + } + @Override public Wire get() throws IOException, ServerException, InterruptedException { if (!gotton.getAndSet(true)) { var wire = connector.getSessionWire(id); if (wire instanceof WireImpl) { var wireImpl = (WireImpl) wire; - var futureSessionID = wireImpl.handshake(credential, clientInformation); + var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation()); wireImpl.checkSessionID(futureSessionID.get()); return wireImpl; } @@ -53,7 +60,7 @@ public Wire get(long timeout, TimeUnit unit) throws TimeoutException, IOExceptio var wire = connector.getSessionWire(id, timeout, unit); if (wire instanceof WireImpl) { var wireImpl = (WireImpl) wire; - var futureSessionID = wireImpl.handshake(credential, clientInformation); + var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation()); wireImpl.checkSessionID(futureSessionID.get(timeout, unit)); return wireImpl; } diff --git a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java index e37bb695e..df7cdc6b1 100644 --- a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java +++ b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java @@ -61,7 +61,6 @@ public FutureResponse connect(@Nonnull Credential credential, @Nonnull Cli } try { long id = requestNative(handle); - clientInformation.connectionInformation(Long.toString(ProcessHandle.current().pid())); return new FutureIpcWireImpl(this, id, credential, clientInformation); } catch (IOException e) { throw new ConnectException("the server has declined the connection request"); diff --git a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ResultSetWireTest.java b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ResultSetWireTest.java index 870568cfc..8d0232973 100644 --- a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ResultSetWireTest.java +++ b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ResultSetWireTest.java @@ -2,24 +2,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl; import com.tsurugidb.tsubakuro.channel.ipc.IpcLink; -import com.tsurugidb.tsubakuro.exception.ServerException; -import com.tsurugidb.tsubakuro.protos.ProtosForTest; -import com.tsurugidb.tsubakuro.util.ByteBufferInputStream; -import com.tsurugidb.sql.proto.SqlResponse; class ResultSetWireTest { private final String NAME = "resultset"; diff --git a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java index 33374a5d5..87245e912 100644 --- a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java +++ b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java @@ -1,9 +1,10 @@ package com.tsurugidb.tsubakuro.channel.ipc.sql; +import static org.junit.jupiter.api.Assertions.fail; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.OutputStream; -import java.net.ServerSocket; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.io.Closeable; @@ -74,12 +75,13 @@ public void run() { } catch (com.google.protobuf.InvalidProtocolBufferException e) { System.err.println(e); e.printStackTrace(); - throw new IOException("error: ServerWireImpl.get()"); + fail("error: ServerWireImpl.get()"); } } } catch (IOException e) { e.printStackTrace(); System.err.println(e); + fail(e); } } } @@ -90,7 +92,7 @@ public ServerWireImpl(String dbName, long sessionID, boolean takeSendAction) thr this.takeSendAction = takeSendAction; this.wireHandle = createNative(dbName + "-" + String.valueOf(sessionID)); if (wireHandle == 0) { - throw new IOException("error: ServerWireImpl.ServerWireImpl()"); + fail("error: ServerWireImpl.ServerWireImpl()"); } this.receiver = new ReceiveWorker(); receiver.start(); @@ -128,18 +130,14 @@ public void handshake() throws IOException { FrameworkResponse.Header.newBuilder().build().writeDelimitedTo(out); response.writeDelimitedTo(out); }); - if (wireHandle != 0) { - putNative(wireHandle, resposeByteArray); - } else { - throw new IOException("error: sessionWireHandle is 0"); - } + putNative(wireHandle, resposeByteArray); } catch (IOException | InterruptedException e) { throw new IOException(e); } } catch (com.google.protobuf.InvalidProtocolBufferException e) { System.err.println(e); e.printStackTrace(); - throw new IOException("error: ServerWireImpl.getHandshakeRequest()"); + fail("error: ServerWireImpl.getHandshakeRequest()"); } } @@ -150,10 +148,10 @@ public void handshake() throws IOException { public SqlRequest.Request get() throws IOException { try { available.acquire(); - return sqlRequest; } catch(InterruptedException e) { - throw new IOException(e); + fail(e); } + return sqlRequest; } /** @@ -169,22 +167,18 @@ public void put(SqlResponse.Response response) throws IOException { FrameworkResponse.Header.newBuilder().build().writeDelimitedTo(out); response.writeDelimitedTo(out); }); - if (wireHandle != 0) { - putNative(wireHandle, resposeByteArray); - } else { - throw new IOException("error: sessionWireHandle is 0"); - } + putNative(wireHandle, resposeByteArray); } catch (IOException | InterruptedException e) { - throw new IOException(e); + fail(e); } } public long createRSL(String name) throws IOException { - if (wireHandle != 0) { - return createRSLNative(wireHandle, name); - } else { - throw new IOException("error: ServerWireImpl.createRSL()"); + var handle = createRSLNative(wireHandle, name); + if (handle == 0) { + fail("error: createRSLNative() returns 0"); } + return handle; } public void putRecordsRSL(long handle, byte[] ba) throws IOException { @@ -194,7 +188,7 @@ public void putRecordsRSL(long handle, byte[] ba) throws IOException { if (handle != 0) { putRecordsRSLNative(handle, ba); } else { - throw new IOException("error: resultSetWireHandle is 0"); + fail("error: resultSetWireHandle given is 0"); } } @@ -205,7 +199,7 @@ public void eorRSL(long handle) throws IOException { if (handle != 0) { eorRSLNative(handle); } else { - throw new IOException("error: resultSetWireHandle is 0"); + fail("error: resultSetWireHandle given is 0"); } } } diff --git a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java index e1a1ca7a2..b4a665c80 100644 --- a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java +++ b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java @@ -33,7 +33,7 @@ void requestBegin() throws Exception { try { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - client.handshake(NullCredential.INSTANCE, new ClientInformation()); + client.handshake(NullCredential.INSTANCE, new ClientInformation(), null); } catch (Exception e) { fail("cought Exception"); } @@ -53,7 +53,7 @@ void inconsistentResponse() { try { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - client.handshake(NullCredential.INSTANCE, new ClientInformation()); + client.handshake(NullCredential.INSTANCE, new ClientInformation(), null); // REQUEST test begin // client side send Request @@ -82,7 +82,7 @@ void inconsistentResponse() { void timeout() throws Exception { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - client.handshake(NullCredential.INSTANCE, new ClientInformation()); + client.handshake(NullCredential.INSTANCE, new ClientInformation(), null); // REQUEST test begin // client side send Request @@ -112,7 +112,7 @@ void timeout() throws Exception { void serverCrashDetectionTest() throws Exception { server = new ServerWireImpl(dbName, sessionID, false); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - client.handshake(NullCredential.INSTANCE, new ClientInformation()); + client.handshake(NullCredential.INSTANCE, new ClientInformation(), null); // REQUEST test begin // client side send Request diff --git a/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto b/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto index 4ccfb201c..709d318bb 100644 --- a/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto +++ b/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto @@ -6,8 +6,6 @@ option java_multiple_files = false; option java_package = "com.tsurugidb.endpoint.proto"; option java_outer_classname = "EndpointRequest"; -import "tateyama/proto/auth/request.proto"; - // the request message to endpoint pseudo service. message Request { // service message version (major) @@ -29,11 +27,17 @@ message Request { // handshake operation. message Handshake { - // the auth info. - auth.request.AuthInfo auth_info = 1; + // the credential. + Credential credential = 1; // the client information. ClientInformation client_information = 2; + + // reserved for system use + reserved 3 to 10; + + // the wire information. + WireInformation wire_information = 11; } // client information @@ -44,15 +48,32 @@ message ClientInformation { // the application name. string application_name = 2; + // the credential + Credential credential = 3; +} + +// the credential +message Credential { // the user name. - string user_name = 4; + string user_name = 1; +} - // the connection information - string connection_information = 5; +// wire information +message WireInformation { + oneof wire_information { + IpcInformation ipc_information = 1; + StreamInformation stream_information = 2; + } - // reserved for future use - reserved 6 to 10; + // ipc information + message IpcInformation { + // the connection information + string connection_information = 1; + } - // the maximum concurrent result sets - uint64 maximum_concurrent_result_sets = 11; + // stream information + message StreamInformation { + // the maximum concurrent result sets + uint64 maximum_concurrent_result_sets = 1; + } } diff --git a/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java b/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java index b2323c7f1..0c9fd1684 100644 --- a/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java +++ b/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java @@ -31,10 +31,14 @@ public final class SessionBuilder { private Credential connectionCredential = NullCredential.INSTANCE; - private ClientInformation clientInformation = new ClientInformation(); - private final SessionInfo sessionInfo; + private String connectionLabel; + + private String applicationName; + + private String userName; + private SessionBuilder(Connector connector) { assert connector != null; this.connector = connector; @@ -86,34 +90,34 @@ public SessionBuilder withCredential(@Nonnull Credential credential) { /** * Sets label information to connect. - * @param label the label information + * @param connectionLabelString the label information * @return this */ - public SessionBuilder withLabel(@Nonnull String label) { - Objects.requireNonNull(label); - this.clientInformation.connectionlabel(label); + public SessionBuilder withLabel(@Nonnull String connectionLabelString) { + Objects.requireNonNull(connectionLabelString); + connectionLabel = connectionLabelString; return this; } /** * Sets applicationName information to connect. - * @param applicationName the applicationName information + * @param applicationNameString the applicationName information * @return this */ - public SessionBuilder withApplicationName(@Nonnull String applicationName) { - Objects.requireNonNull(applicationName); - this.clientInformation.applicationName(applicationName); + public SessionBuilder withApplicationName(@Nonnull String applicationNameString) { + Objects.requireNonNull(applicationNameString); + applicationName = applicationNameString; return this; } /** * Sets userName information to connect. - * @param userName the userName information + * @param userNameString the userName information * @return this */ - public SessionBuilder withUserName(@Nonnull String userName) { - Objects.requireNonNull(userName); - this.clientInformation.userName(userName); + public SessionBuilder withUserName(@Nonnull String userNameString) { + Objects.requireNonNull(userNameString); + userName = userNameString; return this; } @@ -128,7 +132,7 @@ public SessionBuilder withUserName(@Nonnull String userName) { * @see #create(long, TimeUnit) */ public Session create() throws IOException, ServerException, InterruptedException { - try (var fWire = connector.connect(connectionCredential, clientInformation)) { + try (var fWire = connector.connect(connectionCredential, new ClientInformation(connectionLabel, applicationName, userName))) { return create0(fWire.get()); } } @@ -146,7 +150,7 @@ public Session create() throws IOException, ServerException, InterruptedExceptio public Session create(long timeout, @Nonnull TimeUnit unit) throws IOException, ServerException, InterruptedException, TimeoutException { Objects.requireNonNull(unit); - try (var fWire = connector.connect(connectionCredential, clientInformation)) { + try (var fWire = connector.connect(connectionCredential, new ClientInformation(connectionLabel, applicationName, userName))) { var session = create0(fWire.get(timeout, unit)); return session; } @@ -159,7 +163,7 @@ public Session create(long timeout, @Nonnull TimeUnit unit) * @throws IOException if I/O error was occurred during connection */ public FutureResponse createAsync() throws IOException { - var fWire = connector.connect(connectionCredential, clientInformation); + var fWire = connector.connect(connectionCredential, new ClientInformation(connectionLabel, applicationName, userName)); return new AbstractFutureResponse() { @Override diff --git a/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java b/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java index 88bd9257f..266bc7f28 100644 --- a/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java +++ b/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java @@ -65,9 +65,9 @@ void createWithClientInformation() throws Exception { var builder = SessionBuilder.connect(new Connector() { @Override public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { - assertSame(clientInformation.connectionLabel(), label); - assertSame(clientInformation.applicationName(), applicationName); - assertSame(clientInformation.userName(), userName); + assertSame(clientInformation.getConnectionLabel(), label); + assertSame(clientInformation.getApplicationName(), applicationName); + assertSame(clientInformation.getUserName(), userName); return FutureResponse.wrap(Owner.of(wire)); } }) @@ -87,9 +87,9 @@ void createAsyncWithClientInformation() throws Exception { var builder = SessionBuilder.connect(new Connector() { @Override public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { - assertSame(clientInformation.connectionLabel(), label); - assertSame(clientInformation.applicationName(), applicationName); - assertSame(clientInformation.userName(), userName); + assertSame(clientInformation.getConnectionLabel(), label); + assertSame(clientInformation.getApplicationName(), applicationName); + assertSame(clientInformation.getUserName(), userName); return FutureResponse.wrap(Owner.of(wire)); } }) diff --git a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java index de61671f9..5aa18f56d 100644 --- a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java +++ b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java @@ -7,9 +7,11 @@ import javax.annotation.Nonnull; +import com.tsurugidb.endpoint.proto.EndpointRequest; import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; +import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ResponseBox; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl; import com.tsurugidb.tsubakuro.channel.stream.StreamLink; import com.tsurugidb.tsubakuro.exception.ServerException; @@ -31,11 +33,17 @@ public class FutureStreamWireImpl implements FutureResponse { this.clientInformation = clientInformation; } + private EndpointRequest.WireInformation wireInformation() { + return EndpointRequest.WireInformation.newBuilder().setStreamInformation( + EndpointRequest.WireInformation.StreamInformation.newBuilder().setMaximumConcurrentResultSets(ResponseBox.responseBoxSize()) + ).build(); + } + @Override public Wire get() throws IOException, ServerException, InterruptedException { if (!gotton.getAndSet(true)) { var wireImpl = new WireImpl(streamLink); - var futureSessionID = wireImpl.handshake(credential, clientInformation); + var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation()); wireImpl.setSessionID(futureSessionID.get()); return wireImpl; } @@ -46,7 +54,7 @@ public Wire get() throws IOException, ServerException, InterruptedException { public Wire get(long timeout, TimeUnit unit) throws IOException, ServerException, InterruptedException, TimeoutException { if (!gotton.getAndSet(true)) { var wireImpl = new WireImpl(streamLink); - var futureSessionID = wireImpl.handshake(credential, clientInformation); + var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation()); wireImpl.setSessionID(futureSessionID.get(timeout, unit)); return wireImpl; } diff --git a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java index 19b8c57ff..1588fb7e0 100644 --- a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java +++ b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java @@ -11,7 +11,6 @@ import com.tsurugidb.tsubakuro.channel.common.connection.Connector; import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; -import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ResponseBox; import com.tsurugidb.tsubakuro.channel.stream.StreamLink; import com.tsurugidb.tsubakuro.util.FutureResponse; @@ -34,7 +33,6 @@ public StreamConnectorImpl(String hostname, int port) { @Override public FutureResponse connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException { LOG.trace("will connect to {}:{}", hostname, port); //$NON-NLS-1$ - clientInformation.maximumConcurrentResultSets(ResponseBox.responseBoxSize()); return new FutureStreamWireImpl(new StreamLink(hostname, port), credential, clientInformation); } } From d7da4995c8fe9b8052ac61b941668dc741fe3531 Mon Sep 17 00:00:00 2001 From: t-horikawa Date: Tue, 16 Jan 2024 13:07:18 +0900 Subject: [PATCH 4/5] apply comments (2) --- .../common/connection/ClientInformation.java | 30 ++++++++++--------- .../channel/common/connection/Connector.java | 25 ++-------------- .../common/connection/wire/impl/WireImpl.java | 7 +---- .../common/connection/TestingConnector.java | 2 +- .../ipc/connection/FutureIpcWireImpl.java | 9 ++---- .../ipc/connection/IpcConnectorImpl.java | 5 ++-- .../channel/ipc/sql/ServerWireImpl.java | 4 ++- .../channel/ipc/sql/SessionWireTest.java | 8 ++--- .../tateyama/proto/endpoint/request.proto | 12 +++----- .../tsubakuro/common/SessionBuilder.java | 17 ++--------- .../tsubakuro/common/SessionBuilderTest.java | 24 +++++++-------- .../connection/FutureStreamWireImpl.java | 9 ++---- .../connection/StreamConnectorImpl.java | 5 ++-- 13 files changed, 56 insertions(+), 101 deletions(-) diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java index b088c974e..fb505860b 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java @@ -2,10 +2,11 @@ import java.text.MessageFormat; +import javax.annotation.Nonnull; import javax.annotation.Nullable; /** - * A credential by user name and password. + * A client information intened to be used in handshake. */ public final class ClientInformation { @@ -13,7 +14,7 @@ public final class ClientInformation { private final String applicationName; - private final String userName; + private final Credential credential; /** * Creates a new instance without information. @@ -21,19 +22,19 @@ public final class ClientInformation { public ClientInformation() { this.connectionLabel = null; this.applicationName = null; - this.userName = null; + this.credential = NullCredential.INSTANCE; } /** * Creates a new instance. - * @param connectionLabel the label - * @param applicationName the application name - * @param userName the application name + * @param connectionLabel the label. + * @param applicationName the application name. + * @param credential the connection credential. */ - public ClientInformation(@Nullable String connectionLabel, @Nullable String applicationName, @Nullable String userName) { + public ClientInformation(@Nullable String connectionLabel, @Nullable String applicationName, @Nonnull Credential credential) { this.connectionLabel = connectionLabel; this.applicationName = applicationName; - this.userName = userName; + this.credential = credential; } /** @@ -53,20 +54,21 @@ public String getApplicationName() { } /** - * Get the user name. - * @return the user name, null if user name has not been set. + * Get the credential. + * @return the connection credential. */ - public String getUserName() { - return userName; + public Credential getCredential() { + return credential; } @Override public String toString() { return MessageFormat.format( - "ClientInformation(connectionLabel={0}, applicationName={1}, userName={2})", - checkNull(connectionLabel), checkNull(applicationName), checkNull(userName)); + "ClientInformation(connectionLabel={0}, applicationName={1}, credential={2})", + checkNull(connectionLabel), checkNull(applicationName), credential.toString()); } private String checkNull(String string) { return (string != null) ? string : ""; } + } diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java index 32730b859..173105072 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/Connector.java @@ -41,35 +41,14 @@ static Connector create(@Nonnull URI endpoint) { * @throws IOException connection error */ default FutureResponse connect() throws IOException { - return connect(NullCredential.INSTANCE, new ClientInformation()); + return connect(new ClientInformation()); } /** * Establishes a connection to the Tsurugi server. - * @param credential the connection credential - * @return future session wire - * @throws IOException connection error - */ - default FutureResponse connect(@Nonnull Credential credential) throws IOException { - return connect(credential, new ClientInformation()); - } - - /** - * Establishes a connection to the Tsurugi server. - * @param clientInformation the client information - * @return future session wire - * @throws IOException connection error - */ - default FutureResponse connect(@Nonnull ClientInformation clientInformation) throws IOException { - return connect(NullCredential.INSTANCE, clientInformation); - } - - /** - * Establishes a connection to the Tsurugi server. - * @param credential the connection credential * @param clientInformation the client information * @return future session wire * @throws IOException connection error */ - FutureResponse connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException; + FutureResponse connect(@Nonnull ClientInformation clientInformation) throws IOException; } diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java index 1f442074e..d1ffd00aa 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java @@ -17,7 +17,6 @@ import com.tsurugidb.endpoint.proto.EndpointRequest; import com.tsurugidb.endpoint.proto.EndpointResponse; import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; -import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.ForegroundFutureResponse; import com.tsurugidb.tsubakuro.channel.common.connection.sql.ResultSetWire; import com.tsurugidb.tsubakuro.channel.common.connection.wire.MainResponseProcessor; @@ -187,7 +186,7 @@ public Long process(ByteBuffer payload) throws IOException, ServerException, Int } } - public FutureResponse handshake(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation, @Nullable EndpointRequest.WireInformation wireInformation) throws IOException { + public FutureResponse handshake(@Nonnull ClientInformation clientInformation, @Nullable EndpointRequest.WireInformation wireInformation) throws IOException { var handshakeMessageBuilder = EndpointRequest.Handshake.newBuilder(); if (wireInformation != null) { handshakeMessageBuilder.setWireInformation(wireInformation); @@ -200,10 +199,6 @@ public FutureResponse handshake(@Nonnull Credential credential, @Nonnull C if (clientInformation.getApplicationName() != null) { clientInformationBuilder.setApplicationName(clientInformation.getApplicationName()); } - if (clientInformation.getUserName() != null) { - clientInformationBuilder.setCredential( - EndpointRequest.Credential.newBuilder().setUserName(clientInformation.getUserName())); - } handshakeMessageBuilder.setClientInformation(clientInformationBuilder); FutureResponse future = send( diff --git a/modules/common/src/test/java/com/tsurugidb/tsubakuro/channel/common/connection/TestingConnector.java b/modules/common/src/test/java/com/tsurugidb/tsubakuro/channel/common/connection/TestingConnector.java index b65ee13d2..67d5a145a 100644 --- a/modules/common/src/test/java/com/tsurugidb/tsubakuro/channel/common/connection/TestingConnector.java +++ b/modules/common/src/test/java/com/tsurugidb/tsubakuro/channel/common/connection/TestingConnector.java @@ -15,7 +15,7 @@ class TestingConnector implements Connector { } @Override - public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { + public FutureResponse connect(ClientInformation clientInformation) throws IOException { throw new UnsupportedOperationException(); } } diff --git a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java index b4b9b1387..5ac19aef2 100644 --- a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java +++ b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/FutureIpcWireImpl.java @@ -9,7 +9,6 @@ import com.tsurugidb.endpoint.proto.EndpointRequest; import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; -import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl; import com.tsurugidb.tsubakuro.exception.ServerException; @@ -20,16 +19,14 @@ */ public class FutureIpcWireImpl implements FutureResponse { - private final Credential credential; private final ClientInformation clientInformation; private IpcConnectorImpl connector; private long id; private final AtomicBoolean gotton = new AtomicBoolean(); - FutureIpcWireImpl(IpcConnectorImpl connector, long id, @Nonnull Credential credential, @Nonnull ClientInformation clientInformation) { + FutureIpcWireImpl(IpcConnectorImpl connector, long id, @Nonnull ClientInformation clientInformation) { this.connector = connector; this.id = id; - this.credential = credential; this.clientInformation = clientInformation; } @@ -45,7 +42,7 @@ public Wire get() throws IOException, ServerException, InterruptedException { var wire = connector.getSessionWire(id); if (wire instanceof WireImpl) { var wireImpl = (WireImpl) wire; - var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation()); + var futureSessionID = wireImpl.handshake(clientInformation, wireInformation()); wireImpl.checkSessionID(futureSessionID.get()); return wireImpl; } @@ -60,7 +57,7 @@ public Wire get(long timeout, TimeUnit unit) throws TimeoutException, IOExceptio var wire = connector.getSessionWire(id, timeout, unit); if (wire instanceof WireImpl) { var wireImpl = (WireImpl) wire; - var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation()); + var futureSessionID = wireImpl.handshake(clientInformation, wireInformation()); wireImpl.checkSessionID(futureSessionID.get(timeout, unit)); return wireImpl; } diff --git a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java index df7cdc6b1..958fa74d8 100644 --- a/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java +++ b/modules/ipc/src/main/java/com/tsurugidb/tsubakuro/channel/ipc/connection/IpcConnectorImpl.java @@ -13,7 +13,6 @@ import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; import com.tsurugidb.tsubakuro.channel.common.connection.Connector; -import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl; import com.tsurugidb.tsubakuro.channel.ipc.NativeLibrary; @@ -53,7 +52,7 @@ public IpcConnectorImpl(String name) { } @Override - public FutureResponse connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException { + public FutureResponse connect(@Nonnull ClientInformation clientInformation) throws IOException { LOG.trace("will connect to {}", name); //$NON-NLS-1$ if (handle == 0) { @@ -61,7 +60,7 @@ public FutureResponse connect(@Nonnull Credential credential, @Nonnull Cli } try { long id = requestNative(handle); - return new FutureIpcWireImpl(this, id, credential, clientInformation); + return new FutureIpcWireImpl(this, id, clientInformation); } catch (IOException e) { throw new ConnectException("the server has declined the connection request"); } diff --git a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java index 87245e912..841ef16f7 100644 --- a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java +++ b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/ServerWireImpl.java @@ -132,7 +132,9 @@ public void handshake() throws IOException { }); putNative(wireHandle, resposeByteArray); } catch (IOException | InterruptedException e) { - throw new IOException(e); + System.err.println(e); + e.printStackTrace(); + fail(e); } } catch (com.google.protobuf.InvalidProtocolBufferException e) { System.err.println(e); diff --git a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java index b4a665c80..758f08ea2 100644 --- a/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java +++ b/modules/ipc/src/test/java/com/tsurugidb/tsubakuro/channel/ipc/sql/SessionWireTest.java @@ -33,7 +33,7 @@ void requestBegin() throws Exception { try { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - client.handshake(NullCredential.INSTANCE, new ClientInformation(), null); + client.handshake(new ClientInformation(), null); } catch (Exception e) { fail("cought Exception"); } @@ -53,7 +53,7 @@ void inconsistentResponse() { try { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - client.handshake(NullCredential.INSTANCE, new ClientInformation(), null); + client.handshake(new ClientInformation(), null); // REQUEST test begin // client side send Request @@ -82,7 +82,7 @@ void inconsistentResponse() { void timeout() throws Exception { server = new ServerWireImpl(dbName, sessionID); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - client.handshake(NullCredential.INSTANCE, new ClientInformation(), null); + client.handshake(new ClientInformation(), null); // REQUEST test begin // client side send Request @@ -112,7 +112,7 @@ void timeout() throws Exception { void serverCrashDetectionTest() throws Exception { server = new ServerWireImpl(dbName, sessionID, false); client = new WireImpl(new IpcLink(dbName + "-" + String.valueOf(sessionID)), sessionID); - client.handshake(NullCredential.INSTANCE, new ClientInformation(), null); + client.handshake(new ClientInformation(), null); // REQUEST test begin // client side send Request diff --git a/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto b/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto index 709d318bb..06661ff34 100644 --- a/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto +++ b/modules/proto/src/main/protos/tateyama/proto/endpoint/request.proto @@ -27,14 +27,11 @@ message Request { // handshake operation. message Handshake { - // the credential. - Credential credential = 1; - // the client information. - ClientInformation client_information = 2; + ClientInformation client_information = 1; // reserved for system use - reserved 3 to 10; + reserved 2 to 10; // the wire information. WireInformation wire_information = 11; @@ -48,14 +45,13 @@ message ClientInformation { // the application name. string application_name = 2; - // the credential + // the credential. Credential credential = 3; } // the credential message Credential { - // the user name. - string user_name = 1; + // FIXME (T.B.D.) } // wire information diff --git a/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java b/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java index 0c9fd1684..2910fbb87 100644 --- a/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java +++ b/modules/session/src/main/java/com/tsurugidb/tsubakuro/common/SessionBuilder.java @@ -110,17 +110,6 @@ public SessionBuilder withApplicationName(@Nonnull String applicationNameString) return this; } - /** - * Sets userName information to connect. - * @param userNameString the userName information - * @return this - */ - public SessionBuilder withUserName(@Nonnull String userNameString) { - Objects.requireNonNull(userNameString); - userName = userNameString; - return this; - } - /** * Establishes a connection to the Tsurugi server. * This operation will block until the connection was established, @@ -132,7 +121,7 @@ public SessionBuilder withUserName(@Nonnull String userNameString) { * @see #create(long, TimeUnit) */ public Session create() throws IOException, ServerException, InterruptedException { - try (var fWire = connector.connect(connectionCredential, new ClientInformation(connectionLabel, applicationName, userName))) { + try (var fWire = connector.connect(new ClientInformation(connectionLabel, applicationName, connectionCredential))) { return create0(fWire.get()); } } @@ -150,7 +139,7 @@ public Session create() throws IOException, ServerException, InterruptedExceptio public Session create(long timeout, @Nonnull TimeUnit unit) throws IOException, ServerException, InterruptedException, TimeoutException { Objects.requireNonNull(unit); - try (var fWire = connector.connect(connectionCredential, new ClientInformation(connectionLabel, applicationName, userName))) { + try (var fWire = connector.connect(new ClientInformation(connectionLabel, applicationName, connectionCredential))) { var session = create0(fWire.get(timeout, unit)); return session; } @@ -163,7 +152,7 @@ public Session create(long timeout, @Nonnull TimeUnit unit) * @throws IOException if I/O error was occurred during connection */ public FutureResponse createAsync() throws IOException { - var fWire = connector.connect(connectionCredential, new ClientInformation(connectionLabel, applicationName, userName)); + var fWire = connector.connect(new ClientInformation(connectionLabel, applicationName, connectionCredential)); return new AbstractFutureResponse() { @Override diff --git a/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java b/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java index 266bc7f28..a010716a4 100644 --- a/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java +++ b/modules/session/src/test/java/com/tsurugidb/tsubakuro/common/SessionBuilderTest.java @@ -24,8 +24,8 @@ void create() throws Exception { var creds = new RememberMeCredential("testing"); var builder = SessionBuilder.connect(new Connector() { @Override - public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { - assertSame(credential, creds); + public FutureResponse connect(ClientInformation clientInformation) throws IOException { + assertSame(clientInformation.getCredential(), creds); return FutureResponse.wrap(Owner.of(wire)); } }) @@ -42,8 +42,8 @@ void createAsync() throws Exception { var creds = new RememberMeCredential("testing"); var builder = SessionBuilder.connect(new Connector() { @Override - public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { - assertSame(credential, creds); + public FutureResponse connect(ClientInformation clientInformation) throws IOException { + assertSame(clientInformation.getCredential(), creds); return FutureResponse.wrap(Owner.of(wire)); } }) @@ -61,17 +61,17 @@ void createWithClientInformation() throws Exception { try (var wire = new MockWire()) { String label = "label for the test"; String applicationName = "applicationName for the test"; - String userName = "userName for the test"; + var creds = new RememberMeCredential("testing"); var builder = SessionBuilder.connect(new Connector() { @Override - public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { + public FutureResponse connect(ClientInformation clientInformation) throws IOException { assertSame(clientInformation.getConnectionLabel(), label); assertSame(clientInformation.getApplicationName(), applicationName); - assertSame(clientInformation.getUserName(), userName); + assertSame(clientInformation.getCredential(), creds); return FutureResponse.wrap(Owner.of(wire)); } }) - .withLabel(label).withApplicationName(applicationName).withUserName(userName); + .withLabel(label).withApplicationName(applicationName).withCredential(creds); try (var session = builder.create()) { // ok. } @@ -83,17 +83,17 @@ void createAsyncWithClientInformation() throws Exception { try (var wire = new MockWire()) { String label = "label for the test"; String applicationName = "applicationName for the test"; - String userName = "userName for the test"; + var creds = new RememberMeCredential("testing"); var builder = SessionBuilder.connect(new Connector() { @Override - public FutureResponse connect(Credential credential, ClientInformation clientInformation) throws IOException { + public FutureResponse connect(ClientInformation clientInformation) throws IOException { assertSame(clientInformation.getConnectionLabel(), label); assertSame(clientInformation.getApplicationName(), applicationName); - assertSame(clientInformation.getUserName(), userName); + assertSame(clientInformation.getCredential(), creds); return FutureResponse.wrap(Owner.of(wire)); } }) - .withLabel(label).withApplicationName(applicationName).withUserName(userName); + .withLabel(label).withApplicationName(applicationName).withCredential(creds); try ( var fSession = builder.createAsync(); var session = fSession.get(10, TimeUnit.SECONDS)) { diff --git a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java index 5aa18f56d..419b783a2 100644 --- a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java +++ b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/FutureStreamWireImpl.java @@ -9,7 +9,6 @@ import com.tsurugidb.endpoint.proto.EndpointRequest; import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; -import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ResponseBox; import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl; @@ -22,14 +21,12 @@ */ public class FutureStreamWireImpl implements FutureResponse { - private final Credential credential; private final ClientInformation clientInformation; StreamLink streamLink; private final AtomicBoolean gotton = new AtomicBoolean(); - FutureStreamWireImpl(StreamLink streamLink, @Nonnull Credential credential, @Nonnull ClientInformation clientInformation) { + FutureStreamWireImpl(StreamLink streamLink, @Nonnull ClientInformation clientInformation) { this.streamLink = streamLink; - this.credential = credential; this.clientInformation = clientInformation; } @@ -43,7 +40,7 @@ private EndpointRequest.WireInformation wireInformation() { public Wire get() throws IOException, ServerException, InterruptedException { if (!gotton.getAndSet(true)) { var wireImpl = new WireImpl(streamLink); - var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation()); + var futureSessionID = wireImpl.handshake(clientInformation, wireInformation()); wireImpl.setSessionID(futureSessionID.get()); return wireImpl; } @@ -54,7 +51,7 @@ public Wire get() throws IOException, ServerException, InterruptedException { public Wire get(long timeout, TimeUnit unit) throws IOException, ServerException, InterruptedException, TimeoutException { if (!gotton.getAndSet(true)) { var wireImpl = new WireImpl(streamLink); - var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation()); + var futureSessionID = wireImpl.handshake(clientInformation, wireInformation()); wireImpl.setSessionID(futureSessionID.get(timeout, unit)); return wireImpl; } diff --git a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java index 1588fb7e0..4b4a4f88d 100644 --- a/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java +++ b/modules/stream/src/main/java/com/tsurugidb/tsubakuro/channel/stream/connection/StreamConnectorImpl.java @@ -9,7 +9,6 @@ import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation; import com.tsurugidb.tsubakuro.channel.common.connection.Connector; -import com.tsurugidb.tsubakuro.channel.common.connection.Credential; import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire; import com.tsurugidb.tsubakuro.channel.stream.StreamLink; import com.tsurugidb.tsubakuro.util.FutureResponse; @@ -31,8 +30,8 @@ public StreamConnectorImpl(String hostname, int port) { } @Override - public FutureResponse connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException { + public FutureResponse connect(@Nonnull ClientInformation clientInformation) throws IOException { LOG.trace("will connect to {}:{}", hostname, port); //$NON-NLS-1$ - return new FutureStreamWireImpl(new StreamLink(hostname, port), credential, clientInformation); + return new FutureStreamWireImpl(new StreamLink(hostname, port), clientInformation); } } From 8b095aa411f030d638ab8564132979cdc575677a Mon Sep 17 00:00:00 2001 From: t-horikawa Date: Thu, 18 Jan 2024 18:49:58 +0900 Subject: [PATCH 5/5] apply comments (3) --- .../channel/common/connection/ClientInformation.java | 2 ++ .../channel/common/connection/wire/impl/WireImpl.java | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java index fb505860b..c3bdc29b6 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/ClientInformation.java @@ -1,6 +1,7 @@ package com.tsurugidb.tsubakuro.channel.common.connection; import java.text.MessageFormat; +import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -32,6 +33,7 @@ public ClientInformation() { * @param credential the connection credential. */ public ClientInformation(@Nullable String connectionLabel, @Nullable String applicationName, @Nonnull Credential credential) { + Objects.requireNonNull(credential); this.connectionLabel = connectionLabel; this.applicationName = applicationName; this.credential = credential; diff --git a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java index d1ffd00aa..ccde8e622 100644 --- a/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java +++ b/modules/common/src/main/java/com/tsurugidb/tsubakuro/channel/common/connection/wire/impl/WireImpl.java @@ -175,7 +175,7 @@ public Long process(ByteBuffer payload) throws IOException, ServerException, Int case RESOURCE_LIMIT_REACHED: throw new ConnectException("the server has declined the connection request"); // preserve compatibiity case AUTHENTICATION_ERROR: - throw newUnknown(message.getError()); // FIXME + throw newUnknown(); // FIXME default: break; } @@ -243,6 +243,10 @@ static CoreServiceException newUnknown(@Nonnull EndpointResponse.Error message) return new CoreServiceException(CoreServiceCode.UNKNOWN, message.getMessage()); } + static CoreServiceException newUnknown() { + return new CoreServiceException(CoreServiceCode.UNKNOWN); + } + // for diagnostic public long sessionID() { return sessionID;