Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip/w 499 #398

Merged
merged 5 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.tsurugidb.tsubakuro.channel.common.connection;

import java.text.MessageFormat;

import javax.annotation.Nullable;

/**
* A credential by user name and password.
*/
public final class ClientInformation {

private final String connectionLabel;

private final String applicationName;

private final String userName;

/**
* Creates a new instance without information.
*/
public ClientInformation() {
this.connectionLabel = null;
this.applicationName = null;
this.userName = null;
}

/**
* Creates a new instance.
* @param connectionLabel the label
* @param applicationName the application name
* @param userName the application name
*/
public ClientInformation(@Nullable String connectionLabel, @Nullable String applicationName, @Nullable String userName) {
this.connectionLabel = connectionLabel;
this.applicationName = applicationName;
this.userName = userName;
}

/**
* Get the connection label.
* @return the connection label, null if connection label has not been set.
*/
public String getConnectionLabel() {
return connectionLabel;
}

/**
* Get the application name.
* @return the application name, null if application name has not been set.
*/
public String getApplicationName() {
return applicationName;
}

/**
* Get the user name.
* @return the user name, null if user name has not been set.
*/
public String getUserName() {
return userName;
}

@Override
public String toString() {
return MessageFormat.format(
"ClientInformation(connectionLabel={0}, applicationName={1}, userName={2})",
checkNull(connectionLabel), checkNull(applicationName), checkNull(userName));
}
private String checkNull(String string) {
return (string != null) ? string : "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static Connector create(@Nonnull URI endpoint) {
* @throws IOException connection error
*/
default FutureResponse<Wire> connect() throws IOException {
return connect(NullCredential.INSTANCE);
return connect(NullCredential.INSTANCE, new ClientInformation());
}

/**
Expand All @@ -50,5 +50,26 @@ default FutureResponse<Wire> connect() throws IOException {
* @return future session wire
* @throws IOException connection error
*/
FutureResponse<Wire> connect(@Nonnull Credential credential) throws IOException;
default FutureResponse<Wire> connect(@Nonnull Credential credential) throws IOException {
return connect(credential, new ClientInformation());
}

/**
* Establishes a connection to the Tsurugi server.
* @param clientInformation the client information
* @return future session wire
* @throws IOException connection error
*/
default FutureResponse<Wire> connect(@Nonnull ClientInformation clientInformation) throws IOException {
return connect(NullCredential.INSTANCE, clientInformation);
}

/**
* Establishes a connection to the Tsurugi server.
* @param credential the connection credential
* @param clientInformation the client information
* @return future session wire
* @throws IOException connection error
*/
FutureResponse<Wire> connect(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@
*/
@ThreadSafe
public interface Wire extends ServerResource {

/**
* The major service message version for FrameworkRequest.Header.
*/
int SERVICE_MESSAGE_VERSION_MAJOR = 0;

/**
* The minor service message version for FrameworkRequest.Header.
*/
int SERVICE_MESSAGE_VERSION_MINOR = 0;

/**
* send a message to the destination server.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,31 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tsurugidb.framework.proto.FrameworkRequest;
import com.tsurugidb.endpoint.proto.EndpointRequest;
import com.tsurugidb.endpoint.proto.EndpointResponse;
import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation;
import com.tsurugidb.tsubakuro.channel.common.connection.Credential;
import com.tsurugidb.tsubakuro.channel.common.connection.ForegroundFutureResponse;
import com.tsurugidb.tsubakuro.channel.common.connection.sql.ResultSetWire;
import com.tsurugidb.tsubakuro.channel.common.connection.wire.MainResponseProcessor;
import com.tsurugidb.tsubakuro.channel.common.connection.wire.Response;
import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire;
import com.tsurugidb.tsubakuro.exception.CoreServiceCode;
import com.tsurugidb.tsubakuro.exception.CoreServiceException;
import com.tsurugidb.tsubakuro.exception.ServerException;
import com.tsurugidb.tsubakuro.util.ByteBufferInputStream;
import com.tsurugidb.tsubakuro.util.FutureResponse;
import com.tsurugidb.tsubakuro.util.Owner;
import com.tsurugidb.tsubakuro.util.Timeout;
Expand All @@ -23,13 +35,37 @@
* WireImpl type.
*/
public class WireImpl implements Wire {
/**
* The major service message version for FrameworkRequest.Header.
*/
private static final int SERVICE_MESSAGE_VERSION_MAJOR = 0;

/**
* The minor service message version for FrameworkRequest.Header.
*/
private static final int SERVICE_MESSAGE_VERSION_MINOR = 0;

private static final int SERVICE_ID_ENDPOINT_BROKER = 1;
private static final long SESSION_ID_IS_NOT_ASSIGNED = Long.MAX_VALUE;

static final Logger LOG = LoggerFactory.getLogger(WireImpl.class);

private final Link link;
private final long sessionID;
private final ResponseBox responseBox;
private final AtomicBoolean closed = new AtomicBoolean();
private long sessionID;

/**
* Class constructor, called from IpcConnectorImpl that is a connector to the SQL server.
* @param link the stream object by which this WireImpl is connected to the SQL server
* @throws IOException error occurred in openNative()
*/
public WireImpl(@Nonnull Link link) throws IOException {
this.link = link;
this.responseBox = link.getResponseBox();
this.sessionID = SESSION_ID_IS_NOT_ASSIGNED;
LOG.trace("begin Session via ipc, id = {}", sessionID);
}

/**
* Class constructor, called from IpcConnectorImpl that is a connector to the SQL server.
Expand Down Expand Up @@ -57,8 +93,8 @@ public FutureResponse<? extends Response> send(int serviceId, @Nonnull byte[] pa
throw new IOException("already closed");
}
var header = FrameworkRequest.Header.newBuilder()
.setServiceMessageVersionMajor(Wire.SERVICE_MESSAGE_VERSION_MAJOR)
.setServiceMessageVersionMinor(Wire.SERVICE_MESSAGE_VERSION_MINOR)
.setServiceMessageVersionMajor(SERVICE_MESSAGE_VERSION_MAJOR)
.setServiceMessageVersionMinor(SERVICE_MESSAGE_VERSION_MINOR)
.setServiceId(serviceId)
.setSessionId(sessionID)
.build();
Expand All @@ -74,7 +110,7 @@ public FutureResponse<? extends Response> send(int serviceId, @Nonnull byte[] pa
* @throws IOException error occurred in responseBox.register()
*/
@Override
public FutureResponse<? extends Response> send(int serviceId, @Nonnull ByteBuffer payload) throws IOException {
public FutureResponse<? extends Response> send(int serviceId, @Nonnull ByteBuffer payload) throws IOException {
return send(serviceId, payload.array());
}

Expand Down Expand Up @@ -119,13 +155,99 @@ public void close() throws IOException {

}

private static EndpointRequest.Request.Builder newRequest() {
return EndpointRequest.Request.newBuilder()
.setServiceMessageVersionMajor(SERVICE_MESSAGE_VERSION_MAJOR)
.setServiceMessageVersionMinor(SERVICE_MESSAGE_VERSION_MINOR);
}

static class HandshakeProcessor implements MainResponseProcessor<Long> {
@Override
public Long process(ByteBuffer payload) throws IOException, ServerException, InterruptedException {
var message = EndpointResponse.Handshake.parseDelimitedFrom(new ByteBufferInputStream(payload));
LOG.trace("receive: {}", message); //$NON-NLS-1$
switch (message.getResultCase()) {
case SUCCESS:
return message.getSuccess().getSessionId();

case ERROR:
var errMessage = message.getError();
switch (errMessage.getCode()) {
case RESOURCE_LIMIT_REACHED:
throw new ConnectException("the server has declined the connection request"); // preserve compatibiity
case AUTHENTICATION_ERROR:
throw newUnknown(message.getError()); // FIXME
default:
break;
}
default:
break;
}
throw new AssertionError(); // may not occur
}
}

public FutureResponse<Long> handshake(@Nonnull Credential credential, @Nonnull ClientInformation clientInformation, @Nullable EndpointRequest.WireInformation wireInformation) throws IOException {
var handshakeMessageBuilder = EndpointRequest.Handshake.newBuilder();
if (wireInformation != null) {
handshakeMessageBuilder.setWireInformation(wireInformation);
}

var clientInformationBuilder = EndpointRequest.ClientInformation.newBuilder();
if (clientInformation.getConnectionLabel() != null) {
clientInformationBuilder.setConnectionLabel(clientInformation.getConnectionLabel());
}
if (clientInformation.getApplicationName() != null) {
clientInformationBuilder.setApplicationName(clientInformation.getApplicationName());
}
if (clientInformation.getUserName() != null) {
clientInformationBuilder.setCredential(
EndpointRequest.Credential.newBuilder().setUserName(clientInformation.getUserName()));
}
handshakeMessageBuilder.setClientInformation(clientInformationBuilder);

FutureResponse<? extends Response> future = send(
SERVICE_ID_ENDPOINT_BROKER,
toDelimitedByteArray(newRequest()
.setHandshake(handshakeMessageBuilder)
.build())
);
return new ForegroundFutureResponse<>(future, new HandshakeProcessor().asResponseProcessor());
}

public void setSessionID(long id) throws IOException {
if (sessionID == SESSION_ID_IS_NOT_ASSIGNED) {
this.sessionID = id;
return;
}
throw new IOException("handshake error (session ID is already assigned)");
}

public void checkSessionID(long id) throws IOException {
if (sessionID != id) {
throw new IOException(MessageFormat.format("handshake error (inconsistent session ID), {0} not equal {1}", sessionID, id));
}
}

byte[] toDelimitedByteArray(FrameworkRequest.Header request) throws IOException {
try (var buffer = new ByteArrayOutputStream()) {
request.writeDelimitedTo(buffer);
return buffer.toByteArray();
}
}

byte[] toDelimitedByteArray(EndpointRequest.Request request) throws IOException {
try (var buffer = new ByteArrayOutputStream()) {
request.writeDelimitedTo(buffer);
return buffer.toByteArray();
}
}

static CoreServiceException newUnknown(@Nonnull EndpointResponse.Error message) {
assert message != null;
return new CoreServiceException(CoreServiceCode.UNKNOWN, message.getMessage());
}

// for diagnostic
public long sessionID() {
return sessionID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class TestingConnector implements Connector {
}

@Override
public FutureResponse<Wire> connect(Credential credential) throws IOException {
public FutureResponse<Wire> connect(Credential credential, ClientInformation clientInformation) throws IOException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nonnull;

import com.tsurugidb.endpoint.proto.EndpointRequest;
import com.tsurugidb.tsubakuro.channel.common.connection.ClientInformation;
import com.tsurugidb.tsubakuro.channel.common.connection.Credential;
import com.tsurugidb.tsubakuro.channel.common.connection.wire.Wire;
import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.WireImpl;
import com.tsurugidb.tsubakuro.exception.ServerException;
import com.tsurugidb.tsubakuro.util.FutureResponse;

Expand All @@ -14,27 +20,51 @@
*/
public class FutureIpcWireImpl implements FutureResponse<Wire> {

private final Credential credential;
private final ClientInformation clientInformation;
private IpcConnectorImpl connector;
private long id;
private final AtomicBoolean gotton = new AtomicBoolean();

FutureIpcWireImpl(IpcConnectorImpl connector, long id) {
FutureIpcWireImpl(IpcConnectorImpl connector, long id, @Nonnull Credential credential, @Nonnull ClientInformation clientInformation) {
this.connector = connector;
this.id = id;
this.credential = credential;
this.clientInformation = clientInformation;
}

private EndpointRequest.WireInformation wireInformation() {
return EndpointRequest.WireInformation.newBuilder().setIpcInformation(
EndpointRequest.WireInformation.IpcInformation.newBuilder().setConnectionInformation(Long.toString(ProcessHandle.current().pid()))
).build();
}

@Override
public Wire get() throws IOException {
public Wire get() throws IOException, ServerException, InterruptedException {
if (!gotton.getAndSet(true)) {
return connector.getSessionWire(id);
var wire = connector.getSessionWire(id);
if (wire instanceof WireImpl) {
var wireImpl = (WireImpl) wire;
var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation());
wireImpl.checkSessionID(futureSessionID.get());
return wireImpl;
}
throw new IOException("FutureIpcWireImpl programing error"); // never occure
}
throw new IOException("FutureIpcWireImpl is already closed");
}

@Override
public Wire get(long timeout, TimeUnit unit) throws TimeoutException, IOException {
public Wire get(long timeout, TimeUnit unit) throws TimeoutException, IOException, ServerException, InterruptedException {
if (!gotton.getAndSet(true)) {
return connector.getSessionWire(id, timeout, unit);
var wire = connector.getSessionWire(id, timeout, unit);
if (wire instanceof WireImpl) {
var wireImpl = (WireImpl) wire;
var futureSessionID = wireImpl.handshake(credential, clientInformation, wireInformation());
wireImpl.checkSessionID(futureSessionID.get(timeout, unit));
return wireImpl;
}
throw new IOException("FutureIpcWireImpl programing error"); // never occure
}
throw new IOException("FutureIpcWireImpl is already closed");
}
Expand Down
Loading