diff --git a/README.md b/README.md
index 2e4ee8a..fdfab86 100644
--- a/README.md
+++ b/README.md
@@ -1,20 +1,19 @@
![](https://s33.postimg.cc/getb2kc33/LOGO_YARMI_Hzt_500px.png)
-yarmi is yet anotehr RMI based on JSON. it's simple yet powerful when developing server & client based distributed application within a network of small scale
+yarmi is yet-another remote method invocation framework for simple distributed service architecture which provides service discovery mechanism out of the box.
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/5c9f40d574c64e629af11f284c447bea)](https://www.codacy.com/app/innocentevil0914/yarmi?utm_source=github.com&utm_medium=referral&utm_content=fritzprix/yarmi&utm_campaign=Badge_Grade)
### Features
-1. Support large blob as method parameter or response
+1. Simple APIs
+> discover and request service with just a few API calls
+2. Support large blob as method parameter or response
> yarmi supports blob exchange between client and server by default with BlobSession which exposes familiar read / write APIs
-2. Provide service discovery out-of-the-box
-> yarmi contains simple service discovery feature and also support another type of service discovery (e.g. DNS-SD) as module
-3. Support various transport
-> yarmi also provides abstraction over transport layer so it can over any kinds of transport like tcp / ip or bluetooth rfcomm.
-4. Zero-cost migration to (from) RESTful application
-> Provides conceptual similarity to popular RESTful application framework (like service / controller mapping).
-> and that means not only the migration from / to RESTful implementation is easy
-> but also implementing proxy for any RESTful service in heterogeneous network scenario (like typical IoT application) is also well supported
+3. Provide service discovery out-of-the-box
+> service discovery is provided out of the box which is not dependent on any other lookup service.
+4. Extensible design
+> yarmi core itself is agnostic to network / messaging / discovery / negotiation implementation.
+
### How-To
@@ -42,7 +41,7 @@ yarmi is yet anotehr RMI based on JSON. it's simple yet powerful when developing
com.doodream
yarmi-core
- 0.0.4
+ 0.0.5
```
@@ -148,8 +147,8 @@ public static class SimpleClient {
SimpleServiceDiscovery discovery = new SimpleServiceDiscovery();
discovery.startDiscovery(TestService.class, new ServiceDiscoveryListener() {
@Override
- public void onDiscovered(RMIServiceProxy proxy) {
- discoveredService.add(proxy);
+ public void onDiscovered(RMIServiceInfo info) {
+ discoveredService.add(RMIServiceInfo.toServiceProxy(info));
}
@Override
diff --git a/pom.xml b/pom.xml
index 351b44e..5babd44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,19 +153,19 @@
1.0
- org.mongodb
- bson
- 3.2.2
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.9.6
- ch.qos.logback
- logback-classic
- 1.3.0-alpha4
+ de.undercouch
+ bson4jackson
+ 2.9.2
- ch.qos.logback
- logback-access
- 1.3.0-alpha4
+ org.slf4j
+ slf4j-api
+ 1.7.25
com.google.code.gson
diff --git a/src/main/java/com/doodream/rmovjs/annotation/RMIException.java b/src/main/java/com/doodream/rmovjs/annotation/RMIException.java
new file mode 100644
index 0000000..a9f6bd4
--- /dev/null
+++ b/src/main/java/com/doodream/rmovjs/annotation/RMIException.java
@@ -0,0 +1,16 @@
+package com.doodream.rmovjs.annotation;
+
+import com.doodream.rmovjs.model.Response;
+
+public class RMIException extends RuntimeException {
+ private int code;
+
+ public RMIException(Response response) {
+ super((String) response.getBody());
+ code = response.getCode();
+ }
+
+ public int code() {
+ return code;
+ }
+}
diff --git a/src/main/java/com/doodream/rmovjs/annotation/server/Service.java b/src/main/java/com/doodream/rmovjs/annotation/server/Service.java
index 0987de5..200c2ee 100644
--- a/src/main/java/com/doodream/rmovjs/annotation/server/Service.java
+++ b/src/main/java/com/doodream/rmovjs/annotation/server/Service.java
@@ -6,12 +6,13 @@
import com.doodream.rmovjs.net.SimpleNegotiator;
import com.doodream.rmovjs.net.tcp.TcpServiceAdapter;
import com.doodream.rmovjs.serde.Converter;
-import com.doodream.rmovjs.serde.json.JsonConverter;
+import com.doodream.rmovjs.serde.bson.BsonConverter;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
+ * Service Annotation
* Created by innocentevil on 18. 5. 4.
*/
@@ -34,8 +35,8 @@
Class extends ServiceAdapter> adapter() default TcpServiceAdapter.class;
/**
- * parameters for constrcutor of network adapter class, will be passed as argument whenever the adapter class
- * needs to be instanciated.
+ * parameters for constructor of network adapter class, will be passed as argument whenever the adapter class
+ * needs to be instantiated.
* the order of parameters will be kept in the process of ser-der.
* @return parameters to adapter constructor
*/
@@ -43,5 +44,5 @@
Class extends RMINegotiator> negotiator() default SimpleNegotiator.class;
- Class extends Converter> converter() default JsonConverter.class;
+ Class extends Converter> converter() default BsonConverter.class;
}
diff --git a/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java b/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
index 390a2fa..075aa2e 100644
--- a/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
+++ b/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
@@ -1,78 +1,243 @@
package com.doodream.rmovjs.client;
+import com.doodream.rmovjs.annotation.RMIException;
+import com.doodream.rmovjs.annotation.server.Controller;
import com.doodream.rmovjs.annotation.server.Service;
-import com.doodream.rmovjs.model.Endpoint;
+import com.doodream.rmovjs.method.RMIMethod;
+import com.doodream.rmovjs.model.RMIError;
import com.doodream.rmovjs.model.RMIServiceInfo;
import com.doodream.rmovjs.net.RMIServiceProxy;
import com.doodream.rmovjs.sdp.ServiceDiscovery;
import com.doodream.rmovjs.sdp.ServiceDiscoveryListener;
-import com.doodream.rmovjs.serde.Converter;
-import com.doodream.rmovjs.util.LruCache;
+import com.google.common.base.Preconditions;
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.schedulers.Schedulers;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* High-Available RMI Client
*/
-public class HaRMIClient implements InvocationHandler, Consumer {
-
+public class HaRMIClient implements InvocationHandler {
+ private interface Selectable {
+ RMIClient selectNext(List proxies, RMIClient lastSelected);
+ }
/**
- * policy
+ * types of policy used to select service every request
*/
- public enum RequestRoutePolicy {
- RoundRobin,
-s }
+ public enum RequestRoutePolicy implements Selectable {
+ RoundRobin {
+ @Override
+ public RMIClient selectNext(List proxies, RMIClient lastSelected) {
+ if(lastSelected == null) {
+ return proxies.get(0);
+ }
+ if(proxies.contains(lastSelected)) {
+ int idx = proxies.indexOf(lastSelected) + 1;
+ if(idx >= proxies.size()) {
+ idx = 0;
+ }
+ return proxies.get(idx);
+ }
+ return proxies.get(0);
+ }
+ },
+ FastestFirst {
+ @Override
+ public RMIClient selectNext(List proxies, RMIClient lastSelected) {
+ return Observable.fromIterable(proxies)
+ .sorted(RMIClient::compareTo)
+ .blockingFirst();
+ }
+ },
+ Random {
+ @Override
+ public RMIClient selectNext(List proxies, RMIClient lastSelected) {
+ return null;
+ }
+ }
+ }
- private static final Logger Log = LoggerFactory.getLogger(HaRMIClient.class);
+ public interface AvailabilityChangeListener {
+ void onAvailabilityChanged(int availableServices);
+ }
- private String controllerPath;
- private Map methodMap;
- private RMIServiceProxy serviceProxy;
- private RMIServiceInfo serviceInfo;
- private Converter converter;
+ private static final long DEFAULT_QOS_UPDATE_PERIOD = 2000L;
+ private static final long AVAILABILITY_WAIT_TIMEOUT = 5000L;
+ private static final int MAX_TRIAL_COUNT = 3;
+ private static final Logger Log = LoggerFactory.getLogger(HaRMIClient.class);
- private ServiceDiscovery discovery;
+ private AvailabilityChangeListener availabilityChangeListener;
+ private RequestRoutePolicy routePolicy;
private CompositeDisposable compositeDisposable;
- private LruCache proxies;
+ private RMIClient lastProxy;
+ private long qosFactor;
+ private ExecutorService listenerInvoker;
+ private HashSet discoveredProxySet;
+ private final ArrayList clients;
+ private Class controller;
+ private Class svc;
+ private long qosUpdateTime;
+ private TimeUnit qosUpdateTimeUnit;
- public static T create(ServiceDiscovery discovery, Class svc, Class ctrl) throws IllegalAccessException, IOException, InstantiationException {
+ /**
+ * close call proxy and release its resources
+ * @param callProxy call proxy returned by {@link #create(ServiceDiscovery, long, Class, Class, RequestRoutePolicy, AvailabilityChangeListener)}
+ * @param force if false, caller wait until the on-going requests are finished. if true, close the network connection immediately without waiting
+ */
+ public static void destroy(Object callProxy, boolean force) {
+ final HaRMIClient client = (HaRMIClient) Proxy.getInvocationHandler(callProxy);
+ if(client == null) {
+ return;
+ }
+ client.close(force);
+ }
+
+ /**
+ * create call proxy for multiple services
+ * it tries to keep connection to services as many as possible
+ * @param discovery @{@link ServiceDiscovery} used to discover service
+ * @param qos latency in millisecond which service should meet to keep connection to this client
+ * @param svc service definition class annotated by {@link Service}
+ * @param ctrl controller interface
+ * @param policy policy used to select service
+ * @param listener listener to monitor the change of service availability
+ * @param controller type which proxy is created from
+ * @return call proxy
+ */
+ public static T create(ServiceDiscovery discovery, long qos, Class svc, Class ctrl, RequestRoutePolicy policy, AvailabilityChangeListener listener) {
Service service = (Service) svc.getAnnotation(Service.class);
- if(service == null) {
- return null;
- }
+ Preconditions.checkNotNull(service);
+
+
+ Controller controller = Observable.fromArray(svc.getDeclaredFields())
+ .filter(field -> field.getType().equals(ctrl))
+ .map(field -> field.getAnnotation(Controller.class))
+ .blockingFirst(null);
+
+
+ List validMethods = Observable.fromArray(ctrl.getMethods())
+ .filter(RMIMethod::isValidMethod).toList().blockingGet();
+
+ final RMIServiceInfo serviceInfo = RMIServiceInfo.from(svc);
+ Preconditions.checkNotNull(serviceInfo, "Invalid Service Class %s", svc);
+
+ Preconditions.checkArgument(validMethods.size() > 0);
+ Preconditions.checkNotNull(controller, "no matched controller");
+ Preconditions.checkArgument(ctrl.isInterface());
+
+ HaRMIClient haRMIClient = new HaRMIClient<>(svc, ctrl, qos, DEFAULT_QOS_UPDATE_PERIOD, TimeUnit.MILLISECONDS, policy);
+ haRMIClient.availabilityChangeListener = listener;
+
- HaRMIClient haRMIClient = new HaRMIClient();
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(startDiscovery(discovery, svc)
.subscribeOn(Schedulers.newThread())
- .subscribe(haRMIClient::accept));
+ .subscribe(haRMIClient::registerProxy, haRMIClient::onError));
+ haRMIClient.setDisposable(compositeDisposable);
- return null;
+ return (T) Proxy.newProxyInstance(ctrl.getClassLoader(),new Class[]{ ctrl }, haRMIClient);
}
+ /**
+ * check the availability of the service
+ * @param callProxy call proxy returned by {@link #create(ServiceDiscovery, long, Class, Class, RequestRoutePolicy, AvailabilityChangeListener)}
+ * @param blockUntilAvailable if true, caller will block until at least a service available, otherwise return immediately
+ * @return true, if there is at least an available service, otherwise false
+ */
+ public static boolean isAvailable(Object callProxy, boolean blockUntilAvailable) {
+ HaRMIClient haRMIClient = (HaRMIClient) Proxy.getInvocationHandler(callProxy);
+ if(haRMIClient == null) {
+ return false;
+ }
+ int availability;
+ synchronized (haRMIClient.clients) {
+ while (!((availability = haRMIClient.getUpdatedAvailability()) > 0) && blockUntilAvailable) {
+ try {
+ haRMIClient.clients.wait(AVAILABILITY_WAIT_TIMEOUT);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ }
+ return availability > 0;
+ }
+
+ private synchronized int getUpdatedAvailability() {
+ return clients.size();
+ }
+
+ /**
+ * private constructor
+ * @param svc service definition annotated with {@link Service}
+ * @param ctrl class of controller interface
+ * @param qos minimum Quality of Service (latency) used to determine the service is good or bad, if the service will be disconnected if it doesn't satisfy the QoS
+ * @param qosUpdatePeriod time value how frequently the QoS be measured.
+ * @param timeUnit time unit for qosUpdatePeriod
+ * @param policy policy used to select service for each request
+ */
+ private HaRMIClient(Class svc, Class ctrl, long qos, long qosUpdatePeriod, TimeUnit timeUnit, RequestRoutePolicy policy) {
+ this.svc = svc;
+ this.controller = ctrl;
+ this.routePolicy = policy;
+ this.qosFactor = qos;
+ listenerInvoker = Executors.newSingleThreadExecutor();
+ clients = new ArrayList<>();
+ discoveredProxySet = new HashSet<>();
+ qosUpdateTime = qosUpdatePeriod;
+ qosUpdateTimeUnit = timeUnit;
+ }
+
+ private void onError(Throwable throwable) {
+ Log.error(throwable.getLocalizedMessage());
+ close(true);
+ }
+
+
+ private void setDisposable(CompositeDisposable compositeDisposable) {
+ this.compositeDisposable = compositeDisposable;
+ }
+
+ private synchronized void registerProxy(RMIServiceProxy serviceProxy) {
+ if (serviceProxy == null) {
+ return;
+ }
+ if (discoveredProxySet.add(serviceProxy.who())) {
+ clients.add(RMIClient.createClient(serviceProxy, svc, controller, qosFactor, qosUpdateTime, qosUpdateTimeUnit));
+ }
+
+ listenerInvoker.submit(() -> {
+ synchronized (clients) {
+ availabilityChangeListener.onAvailabilityChanged(clients.size());
+ clients.notifyAll();
+ }
+ });
+ }
+
+
private static Observable startDiscovery(ServiceDiscovery discovery, Class svc) {
- return Observable.create(emitter -> discovery.startDiscovery(svc, new ServiceDiscoveryListener() {
+ return Observable.create(emitter -> discovery.startDiscovery(svc, false, new ServiceDiscoveryListener() {
+
@Override
- public void onDiscovered(RMIServiceProxy proxy) {
- emitter.onNext(proxy);
+ public void onDiscovered(RMIServiceInfo info) {
+ emitter.onNext(RMIServiceInfo.toServiceProxy(info));
}
@Override
@@ -81,23 +246,86 @@ public void onDiscoveryStarted() {
}
@Override
- public void onDiscoveryFinished() throws IllegalAccessException {
+ public void onDiscoveryFinished() {
emitter.onComplete();
}
}));
}
- private HaRMIClient() {
+
+ private void close(boolean force) {
+ compositeDisposable.dispose();
+ listenerInvoker.shutdown();
+ clients.forEach(client -> {
+ try {
+ client.close(force);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+
+ private RMIClient selectNext() throws TimeoutException {
+ RMIClient selected;
+ int trial = MAX_TRIAL_COUNT;
+ try {
+ do {
+ selected = routePolicy.selectNext(clients, lastProxy);
+ if (selected != null) {
+ return selected;
+ }
+ synchronized (clients) {
+ clients.wait();
+ }
+ } while (trial-- > 0);
+ } catch (InterruptedException ignore) { }
+ throw new TimeoutException(String.format("fail to select next client /w %s", routePolicy));
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- return null;
+ lastProxy = selectNext();
+ Preconditions.checkNotNull(lastProxy, "invalid call proxy : null call proxy");
+ try {
+ return lastProxy.invoke(lastProxy, method, args);
+ } catch (RMIException e) {
+ if(RMIError.isServiceBad(e.code())) {
+ purgeBadProxy(lastProxy);
+ }
+ // rethrow it
+ throw e;
+ }
}
@Override
- public void accept(RMIServiceProxy serviceProxy) {
+ protected void finalize() throws Throwable {
+ close(true);
+ super.finalize();
+ }
+ /**
+ * purge bad proxy
+ * @param client client which doesn't meet QoS requirement
+ */
+ private synchronized void purgeBadProxy(RMIClient client) {
+ Log.debug("Purge client {}", client);
+ clients.remove(client);
+ if(!discoveredProxySet.remove(client.who())) {
+ Log.warn("client ({}) is not in the discovered set", client);
+ }
+ try {
+ client.close(true);
+ } catch (IOException e) {
+ Log.warn(e.getMessage());
+ }
+ listenerInvoker.submit(() -> {
+ synchronized (client) {
+ availabilityChangeListener.onAvailabilityChanged(clients.size());
+ client.notifyAll();
+ }
+ });
}
+
}
diff --git a/src/main/java/com/doodream/rmovjs/client/RMIClient.java b/src/main/java/com/doodream/rmovjs/client/RMIClient.java
index 2504ca1..9b177dd 100644
--- a/src/main/java/com/doodream/rmovjs/client/RMIClient.java
+++ b/src/main/java/com/doodream/rmovjs/client/RMIClient.java
@@ -1,157 +1,272 @@
package com.doodream.rmovjs.client;
+import com.doodream.rmovjs.annotation.RMIException;
import com.doodream.rmovjs.annotation.server.Controller;
import com.doodream.rmovjs.annotation.server.Service;
import com.doodream.rmovjs.method.RMIMethod;
import com.doodream.rmovjs.model.Endpoint;
+import com.doodream.rmovjs.model.RMIError;
import com.doodream.rmovjs.model.RMIServiceInfo;
import com.doodream.rmovjs.model.Response;
import com.doodream.rmovjs.net.RMIServiceProxy;
import com.google.common.base.Preconditions;
import io.reactivex.Observable;
import io.reactivex.Single;
+import io.reactivex.schedulers.Schedulers;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* {@link RMIClient} build method invocation proxy from {@link RMIServiceProxy} which is discovered from SDP
*
*/
-public class RMIClient implements InvocationHandler {
+public class RMIClient implements InvocationHandler, Comparable {
private static final Logger Log = LoggerFactory.getLogger(RMIClient.class);
private Map methodMap;
private RMIServiceProxy serviceProxy;
+ private AtomicInteger ongoingRequestCount;
+ private long timeout;
+ private Long measuredPing;
+ private volatile boolean markToClose;
- private RMIClient(RMIServiceProxy serviceProxy) {
+ private RMIClient(RMIServiceProxy serviceProxy, long timeout,long pingUpdatePeriod, TimeUnit timeUnit) {
this.serviceProxy = serviceProxy;
+ markToClose = false;
+ measuredPing = Long.MAX_VALUE;
+ this.timeout = timeout;
+ ongoingRequestCount = new AtomicInteger(0);
+ if(pingUpdatePeriod > 0L) {
+ serviceProxy.startPeriodicQosUpdate(timeout, pingUpdatePeriod, timeUnit);
+ }
+ }
+
+ /**
+ * return QoS value measured updated last
+ * @param proxy proxy object
+ * @return QoS value (defined latency in millisecond from request to response)
+ */
+ public static long getMeasuredQoS(Object proxy) {
+ return forEachClient(proxy)
+ .blockingFirst().getMeasuredPing();
}
+ /**
+ * check whether there are on-going requests for given RMI call proxy
+ * @param proxy RMI proxy which is create by {@link #create(RMIServiceProxy, Class, Class)} or {@link #create(RMIServiceProxy, Class, Class, long, long, TimeUnit)}
+ * @return true if there is no on-going request, otherwise false
+ */
+ public static boolean isClosable(Object proxy) {
+ return forEachClient(proxy)
+ .blockingFirst().isClosable();
+ }
+
+
private void setMethodEndpointMap(Map map) {
this.methodMap = map;
}
+ private boolean isClosable() {
+ return ongoingRequestCount.get() == 0;
+ }
- public static void destroy(Object proxy) throws IOException {
- Class proxyClass = proxy.getClass();
- if(Proxy.isProxyClass(proxyClass)) {
- RMIClient client = (RMIClient) Proxy.getInvocationHandler(proxy);
- client.close();
- } else {
- Service service = proxy.getClass().getAnnotation(Service.class);
- if(service == null) {
- throw new RuntimeException("Invalid Proxy");
- }
- Observable.fromArray(proxyClass.getDeclaredFields())
- .filter(field -> field.getAnnotation(Controller.class) != null)
- .map(field -> field.get(proxy))
- .blockingSubscribe(RMIClient::destroy);
- }
+ /**
+ * get {@link RMIClient} for given RMI call proxy
+ * @param proxy
+ * @return
+ */
+ static RMIClient access(Object proxy) {
+ return forEachClient(proxy)
+ .blockingFirst();
}
+ /**
+ * destroy RMI call proxy and release resources
+ * @param proxy RMI call proxy returned by {@link #create(RMIServiceProxy, Class, Class, long, long, TimeUnit)} or {@link #create(RMIServiceProxy, Class, Class)}
+ * @param force if true, close regardless its on-going request, otherwise, wait until the all the on-going requests is complete
+ */
+ public static void destroy(Object proxy, boolean force) {
+ forEachClient(proxy)
+ .subscribeOn(Schedulers.io())
+ .blockingSubscribe(client -> client.close(force));
+ }
- public static T createService(RMIServiceProxy serviceProxy, Class svc) throws IllegalAccessException, InstantiationException {
- Object svcProxy = svc.newInstance();
+ private static Observable forEachClient(Object proxy) {
+ return Observable.create(emitter -> {
+ Class proxyClass = proxy.getClass();
+ if(Proxy.isProxyClass(proxyClass)) {
+ RMIClient client = (RMIClient) Proxy.getInvocationHandler(proxy);
+ emitter.onNext(client);
+ } else {
+ Service service = proxy.getClass().getAnnotation(Service.class);
+ if(service == null) {
+ throw new IllegalArgumentException("Invalid Proxy");
+ }
+ Observable.fromArray(proxyClass.getDeclaredFields())
+ .filter(field -> field.getAnnotation(Controller.class) != null)
+ .map(field -> field.get(proxy))
+ .map(Proxy::getInvocationHandler)
+ .cast(RMIClient.class)
+ .blockingSubscribe(emitter::onNext);
+ }
+ emitter.onComplete();
+ });
+ }
- Observable.fromArray(svc.getDeclaredFields())
- .filter(field -> field.getAnnotation(Controller.class) != null)
- .blockingSubscribe(field -> {
- Object controller = create(serviceProxy, svc, field.getType());
- field.set(svcProxy, controller);
- });
- return (T) svcProxy;
+ /**
+ * close method invocation proxy created by {@link #create(RMIServiceProxy, Class, Class)} or {@link #createService(RMIServiceProxy, Class)} method
+ * @param proxy returned proxy instance from {@link #create(RMIServiceProxy, Class, Class)} or {@link #createService(RMIServiceProxy, Class)}
+ */
+ public static void destroy(Object proxy) {
+ destroy(proxy, false);
}
/**
*
* @param serviceProxy
* @param svc
- * @param ctrl
* @param
* @return
+ * @throws IllegalAccessError
+ * @throws InstantiationException
+ * @throws IllegalAccessException
*/
- @Nullable
- public static T create(RMIServiceProxy serviceProxy, Class svc, Class ctrl) {
- Service service = (Service) svc.getAnnotation(Service.class);
+ public static T createService(RMIServiceProxy serviceProxy, Class svc) throws IllegalAccessError, InstantiationException, IllegalAccessException {
+ return createService(serviceProxy, svc, 0L, 0L, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * create service proxy instance which contains controller interface as its member fields.
+ * @param serviceProxy active service proxy which is obtained from the service discovery
+ * @param svc Service definition class
+ * @param Type of service class
+ * @return proxy instance for service, getter or direct field referencing can be used to access controller
+ * @throws IllegalAccessException there is no public constructor for service class
+ * @throws InstantiationException if this {@code Class} represents an abstract class,
+ * * an interface, an array class, a primitive type, or void;
+ * * or if the class has no nullary constructor;
+ * * or if the instantiation fails for some other reason.
+ */
+ public static T createService(RMIServiceProxy serviceProxy, Class svc, long timeout, long pingInterval, TimeUnit timeUnit) throws IllegalAccessException, InstantiationException {
+ Object svcProxy = svc.newInstance();
+ Observable.fromArray(svc.getDeclaredFields())
+ .filter(field -> field.getAnnotation(Controller.class) != null)
+ .blockingSubscribe(field -> {
+ Object controller = create(serviceProxy, svc, field.getType(), pingInterval, timeout, timeUnit);
+ field.setAccessible(true);
+ field.set(svcProxy, controller);
+ });
+
+ return (T) svcProxy;
+ }
+
+ static RMIClient createClient(RMIServiceProxy serviceProxy, Class> svc, Class ctrl, long pingTimeout, long pingInterval, TimeUnit timeUnit) {
+ Service service = svc.getAnnotation(Service.class);
+ Preconditions.checkNotNull(service);
if(!serviceProxy.provide(ctrl)) {
+ Log.warn("service is not supported");
return null;
}
+ Controller controller = Observable.fromArray(svc.getDeclaredFields())
+ .filter(field -> field.getType().equals(ctrl))
+ .map(field -> field.getAnnotation(Controller.class))
+ .blockingFirst(null);
- try {
- Preconditions.checkNotNull(service);
- Controller controller = Observable.fromArray(svc.getDeclaredFields())
- .filter(field -> field.getType().equals(ctrl))
- .map(field -> field.getAnnotation(Controller.class))
- .blockingFirst(null);
-
- Preconditions.checkNotNull(controller, "no matched controller");
- Preconditions.checkArgument(ctrl.isInterface());
-
+ final RMIServiceInfo serviceInfo = RMIServiceInfo.from(svc);
+ List validMethods = Observable.fromArray(ctrl.getMethods())
+ .filter(RMIMethod::isValidMethod).toList().blockingGet();
- final RMIServiceInfo serviceInfo = RMIServiceInfo.from(svc);
- Preconditions.checkNotNull(serviceInfo, "Invalid Service Class %s", svc);
+ Preconditions.checkNotNull(controller, "no matched controller");
+ Preconditions.checkArgument(ctrl.isInterface());
+ Preconditions.checkNotNull(serviceInfo, "Invalid Service Class %s", svc);
+ Preconditions.checkArgument(validMethods.size() > 0);
+ try {
if (!serviceProxy.isOpen()) {
+ // RMIServiceProxy is opened only once
serviceProxy.open();
}
- RMIClient rmiClient = new RMIClient(serviceProxy);
-
+ RMIClient rmiClient = new RMIClient(serviceProxy, pingTimeout, pingInterval, timeUnit);
- Observable methodObservable = Observable.fromArray(ctrl.getMethods())
- .filter(RMIMethod::isValidMethod);
-
- Observable endpointObservable = methodObservable
+ Observable endpointObservable = Observable.fromIterable(validMethods)
.map(method -> Endpoint.create(controller, method));
- Single> hashMapSingle = methodObservable
+ Single> hashMapSingle = Observable.fromIterable(validMethods)
.zipWith(endpointObservable, RMIClient::zipIntoMethodMap)
.collectInto(new HashMap<>(), RMIClient::collectMethodMap);
rmiClient.setMethodEndpointMap(hashMapSingle.blockingGet());
- // TODO: 18. 7. 31 consider give all the available controller interface to the call proxy
+ // 18. 7. 31 consider give all the available controller interface to the call proxy
// main concern is...
// what happen if there are two methods declared in different interfaces which is identical in parameter & return type, etc.
- // TODO: 18. 7. 31 method collision is not properly handled at the moment, simple poc is performed to test
+ // method collision is not properly handled at the moment, simple poc is performed to test
// https://gist.github.com/fritzprix/ca0ecc08fc3125cde529dd11185be0b9
- Object proxy = Proxy.newProxyInstance(ctrl.getClassLoader(), new Class[]{ctrl }, rmiClient);
-
- Log.debug("service proxy is created");
- return (T) proxy;
+ return rmiClient;
} catch (Exception e) {
- Log.error("{}", e);
+ Log.error("", e);
+ return null;
+ }
+ }
+
+ public static T create(RMIServiceProxy serviceProxy, Class> svc, Class ctrl, long pingTimeout, long pingInterval, TimeUnit timeUnit) {
+ RMIClient rmiClient = createClient(serviceProxy, svc, ctrl, pingTimeout, pingInterval, timeUnit);
+ if(rmiClient == null) {
return null;
}
+ return (T) Proxy.newProxyInstance(ctrl.getClassLoader(), new Class[] {ctrl}, rmiClient);
+ }
+
+ /**
+ * create call proxy instance corresponding to given controller class
+ * @param serviceProxy active service proxy which is obtained from the service discovery
+ * @param svc Service definition class
+ * @param ctrl controller definition as interface
+ * @param type of controller class
+ * @return call proxy instance for controller
+ */
+ @Nullable
+ public static T create(RMIServiceProxy serviceProxy, Class> svc, Class ctrl) {
+ return create(serviceProxy, svc, ctrl, 0L, 0L, TimeUnit.MILLISECONDS);
}
@Override
protected void finalize() throws Throwable {
super.finalize();
- serviceProxy.close();
+ try {
+ close(true);
+ } catch (IOException ignore) {
+
+ } finally {
+ super.finalize();
+ }
}
/**
- *
- * @param into
- * @param methodEndpointMap
+ * collect maps between {@link Method} and @{@link Endpoint} into single map
+ * @param into map collecting fragmented (or partial) map of {@link Method} and {@link Endpoint}
+ * @param methodEndpointMap fragmented (or partial) map of {@link Method} and {@link Endpoint}
*/
- private static void collectMethodMap(Map into, Map methodEndpointMap) {
+ static void collectMethodMap(Map into, Map methodEndpointMap) {
into.putAll(methodEndpointMap);
}
@@ -161,28 +276,60 @@ private static void collectMethodMap(Map into, Map zipIntoMethodMap(Method method, Endpoint endpoint) {
+ static Map zipIntoMethodMap(Method method, Endpoint endpoint) {
Map methodEndpointMap = new HashMap<>();
methodEndpointMap.put(method, endpoint);
return methodEndpointMap;
}
- private void close() throws IOException {
+ /**
+ * close service proxy used by this call proxy
+ * @param force if false, wait until on-going request complete, otherwise, close immediately
+ * @throws IOException proxy is already closed,
+ */
+ void close(boolean force) throws IOException {
+ markToClose = true;
+ if(!force) {
+ try {
+ while (!isClosable()) {
+ // wait until proxy is closable
+ Thread.sleep(10L);
+ }
+ } catch (InterruptedException ignored) { }
+ }
+
serviceProxy.close();
}
-
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ if(markToClose) {
+ // prevent new request from being made
+ throw new RMIException(RMIError.CLOSED.getResponse());
+ }
Endpoint endpoint = methodMap.get(method);
if(endpoint == null) {
return null;
}
- try {
- return serviceProxy.request(endpoint, args);
- } catch (IOException e) {
- serviceProxy.close();
- return Response.error(-1, e.getLocalizedMessage());
+ ongoingRequestCount.getAndIncrement();
+ Response response = serviceProxy.request(endpoint, timeout, args);
+ ongoingRequestCount.decrementAndGet();
+ if(response.isSuccessful()) {
+ return response;
}
+ throw new RMIException(response);
+ }
+
+ @Override
+ public int compareTo(@NotNull RMIClient o) {
+ return Math.toIntExact(getMeasuredPing() - o.getMeasuredPing());
+ }
+
+ private long getMeasuredPing() {
+ return measuredPing;
+ }
+
+ String who() {
+ return serviceProxy.who();
}
}
diff --git a/src/main/java/com/doodream/rmovjs/method/RMIMethod.java b/src/main/java/com/doodream/rmovjs/method/RMIMethod.java
index 7bb0f33..19ecffd 100644
--- a/src/main/java/com/doodream/rmovjs/method/RMIMethod.java
+++ b/src/main/java/com/doodream/rmovjs/method/RMIMethod.java
@@ -57,6 +57,7 @@ public String extractPath(java.lang.reflect.Method method) {
// but still inheritance of annotation is not supported (as I know)
// TODO : fix if Java provides annotation inheritance feature
+
switch (this) {
case GET:
Get get = method.getAnnotation(Get.class);
diff --git a/src/main/java/com/doodream/rmovjs/model/ControllerInfo.java b/src/main/java/com/doodream/rmovjs/model/ControllerInfo.java
index 3344bc4..74cb265 100644
--- a/src/main/java/com/doodream/rmovjs/model/ControllerInfo.java
+++ b/src/main/java/com/doodream/rmovjs/model/ControllerInfo.java
@@ -16,8 +16,8 @@ public class ControllerInfo {
public static ControllerInfo build(RMIController controller) {
return ControllerInfo.builder()
- .stubCls(controller.getItfcCls())
.version(controller.getController().version())
+ .stubCls(controller.getStub())
.build();
}
}
diff --git a/src/main/java/com/doodream/rmovjs/model/Endpoint.java b/src/main/java/com/doodream/rmovjs/model/Endpoint.java
index 49ee263..fa6ecd8 100644
--- a/src/main/java/com/doodream/rmovjs/model/Endpoint.java
+++ b/src/main/java/com/doodream/rmovjs/model/Endpoint.java
@@ -11,9 +11,12 @@
import com.doodream.rmovjs.parameter.Param;
import com.doodream.rmovjs.util.Types;
import com.google.common.base.Preconditions;
-import com.google.gson.reflect.TypeToken;
import io.reactivex.Observable;
import io.reactivex.Single;
+import io.reactivex.functions.BiFunction;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
+import io.reactivex.functions.Predicate;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -23,10 +26,10 @@
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Locale;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -57,33 +60,83 @@ public static Endpoint create(Controller controller, Method method) {
Annotation methodAnnotation = Observable
.fromArray(method.getAnnotations())
- .filter(Endpoint::verifyMethod)
+ .filter(new Predicate() {
+ @Override
+ public boolean test(Annotation annotation) throws Exception {
+ return Endpoint.verifyMethod(annotation);
+ }
+ })
.blockingFirst();
final String parentPath = controller.path();
RMIMethod rmiMethod = RMIMethod.fromAnnotation(methodAnnotation);
- Observable typeObservable = Observable.fromArray(method.getParameterTypes());
+ Observable typeObservable = Observable.fromArray(method.getGenericParameterTypes());
Observable annotationsObservable = Observable.fromArray(method.getParameterAnnotations());
final String paramUnique = typeObservable
.defaultIfEmpty(Void.class)
- .map(Type::getTypeName)
- .map("_"::concat)
- .reduce(String::concat)
+ .map(new Function() {
+ @Override
+ public String apply(Type type) throws Exception {
+ return type.getTypeName();
+ }
+ })
+ .map(new Function() {
+ @Override
+ public String apply(String s) throws Exception {
+ return "_".concat(s);
+ }
+ })
+ .reduce(new BiFunction() {
+ @Override
+ public String apply(String s, String s2) throws Exception {
+ return s.concat(s2);
+ }
+ })
.blockingGet();
Single respBlobObservable = Observable.fromArray(method.getGenericReturnType().getTypeName())
- .map(TYPE_PATTERN::matcher)
- .filter(Matcher::matches)
- .map(matcher -> matcher.group(1))
- .filter(s -> s.contains(BlobSession.class.getName()))
+ .map(new Function() {
+ @Override
+ public Matcher apply(String s) throws Exception {
+ return TYPE_PATTERN.matcher(s);
+ }
+ })
+ .filter(new Predicate() {
+ @Override
+ public boolean test(Matcher matcher) throws Exception {
+ return matcher.matches();
+ }
+ })
+ .map(new Function() {
+ @Override
+ public String apply(Matcher matcher) throws Exception {
+ return matcher.group(1);
+ }
+ })
+ .filter(new Predicate() {
+ @Override
+ public boolean test(String s) throws Exception {
+ return s.contains(BlobSession.class.getName());
+ }
+ })
.count();
final Long blobCount = typeObservable
- .filter(aClass -> aClass.equals(BlobSession.class))
- .count().zipWith(respBlobObservable, Math::addExact).blockingGet();
+ .filter(new Predicate() {
+ @Override
+ public boolean test(Type type) throws Exception {
+ return type.equals(BlobSession.class);
+ }
+ })
+ .count().zipWith(respBlobObservable, new BiFunction() {
+ @Override
+ public Long apply(Long aLong, Long aLong2) throws Exception {
+ return Math.addExact(aLong, aLong2);
+ }
+ }).blockingGet();
if(blobCount > 1) {
throw new IllegalArgumentException(String.format("too many BlobSession in method @ %s", method.getName()));
@@ -92,11 +145,21 @@ public static Endpoint create(Controller controller, Method method) {
final String path = String.format(Locale.ENGLISH, "%s%s", parentPath, rmiMethod.extractPath(method)).replaceAll(DUPLICATE_PATH_SEPARATOR, "/");
- final int[] order = {0};
+ final AtomicInteger order = new AtomicInteger(0);
List params = typeObservable
- .zipWith(annotationsObservable, Param::create)
- .doOnNext(param -> param.setOrder(order[0]++))
+ .zipWith(annotationsObservable, new BiFunction() {
+ @Override
+ public Param apply(Type type, Annotation[] annotations) throws Exception {
+ return Param.create(type, annotations);
+ }
+ })
+ .doOnNext(new Consumer() {
+ @Override
+ public void accept(Param param) throws Exception {
+ param.setOrder(order.getAndIncrement());
+ }
+ })
.toList().blockingGet();
final String methodLookupKey = String.format("%x%x%x", rmiMethod.name().hashCode(), controller.path().hashCode(), paramUnique.hashCode()).toUpperCase();
diff --git a/src/main/java/com/doodream/rmovjs/model/RMIError.java b/src/main/java/com/doodream/rmovjs/model/RMIError.java
index d88324a..ce6d63c 100644
--- a/src/main/java/com/doodream/rmovjs/model/RMIError.java
+++ b/src/main/java/com/doodream/rmovjs/model/RMIError.java
@@ -9,16 +9,28 @@ public enum RMIError {
FORBIDDEN(403, Response.error(403, "ServiceInfo is not matched")),
BAD_REQUEST(400, Response.error(400, "Bad Request")),
UNHANDLED(400, Response.error(400, "Request Not Handled")),
- INTERNAL_SERVER_ERROR(500, Response.error(500,"Internal Server Error"));
+ INTERNAL_SERVER_ERROR(500, Response.error(500,"Internal Server Error")),
+ TIMEOUT(500, Response.error(500,"Timeout")),
+ BAD_RESPONSE(501, Response.error(501,"Bad Response")),
+ CLOSED(510, Response.error(510, "RMI channel close"));
private final int code;
- private final Response response;
- RMIError(int code, Response response) {
+ private final Response response;
+ RMIError(int code, Response response) {
this.code = code;
this.response = response;
}
+ public static boolean isServiceBad(int code) {
+ return code >= 500;
+ }
+
public Response getResponse() {
return response;
}
+
+ public int code() {
+ return code;
+ }
+
}
diff --git a/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java b/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
index 4bf9354..d6e2263 100644
--- a/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
+++ b/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
@@ -2,17 +2,25 @@
import com.doodream.rmovjs.Properties;
import com.doodream.rmovjs.annotation.server.Service;
+import com.doodream.rmovjs.net.RMIServiceProxy;
+import com.doodream.rmovjs.net.ServiceAdapter;
+import com.doodream.rmovjs.net.ServiceProxyFactory;
import com.doodream.rmovjs.server.RMIController;
import com.google.gson.annotations.SerializedName;
import io.reactivex.Observable;
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
+import io.reactivex.Single;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
+import io.reactivex.functions.Predicate;
+import lombok.*;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
@Builder
+@NoArgsConstructor
+@AllArgsConstructor
@EqualsAndHashCode(exclude = {"proxyFactoryHint"})
@Data
public class RMIServiceInfo {
@@ -39,7 +47,7 @@ public class RMIServiceInfo {
private List controllerInfos;
/**
- * remoteHint is used to guess conntion information (like address or bluetooth device name etc.,)
+ * remoteHint is used to guess connection information (like address or bluetooth device name etc.,)
*
*/
@SerializedName("hint")
@@ -48,7 +56,7 @@ public class RMIServiceInfo {
public static RMIServiceInfo from(Class> svc) {
Service service = svc.getAnnotation(Service.class);
- RMIServiceInfoBuilder builder = RMIServiceInfo.builder();
+ final RMIServiceInfoBuilder builder = RMIServiceInfo.builder();
builder.version(Properties.getVersionString())
.adapter(service.adapter())
@@ -58,21 +66,83 @@ public static RMIServiceInfo from(Class> svc) {
.name(service.name());
Observable.fromArray(svc.getDeclaredFields())
- .filter(RMIController::isValidController)
- .map(RMIController::create)
- .map(ControllerInfo::build)
+ .filter(new Predicate() {
+ @Override
+ public boolean test(Field field) throws Exception {
+ return RMIController.isValidController(field);
+ }
+ })
+ .map(new Function() {
+ @Override
+ public RMIController apply(Field field) throws Exception {
+ return RMIController.create(field);
+ }
+ })
+ .map(new Function() {
+ @Override
+ public ControllerInfo apply(RMIController rmiController) throws Exception {
+ return ControllerInfo.build(rmiController);
+ }
+ })
.toList()
- .doOnSuccess(builder::controllerInfos)
+ .doOnSuccess(new Consumer>() {
+ @Override
+ public void accept(List controllerInfos) throws Exception {
+ builder.controllerInfos(controllerInfos);
+ }
+ })
.subscribe();
return builder.build();
}
- public static boolean isComplete(RMIServiceInfo info) {
+ public static boolean isValid(RMIServiceInfo info) {
return (info.getProxyFactoryHint() != null) &&
(info.getControllerInfos() != null);
}
+ public static RMIServiceProxy toServiceProxy(RMIServiceInfo info) {
+ return Single.just(info)
+ .map(new Function>() {
+ @Override
+ public Class> apply(RMIServiceInfo rmiServiceInfo) throws Exception {
+ return rmiServiceInfo.getAdapter();
+ }
+ })
+ .map(new Function, Object>() {
+ @Override
+ public Object apply(Class> cls) throws Exception {
+ return cls.newInstance();
+ }
+ })
+ .cast(ServiceAdapter.class)
+ .map(new Function() {
+ @Override
+ public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception {
+ return serviceAdapter.getProxyFactory(info);
+ }
+ })
+ .map(new Function() {
+ @Override
+ public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception {
+ return serviceProxyFactory.build();
+ }
+ })
+ .onErrorReturn(new Function() {
+ @Override
+ public RMIServiceProxy apply(Throwable throwable) throws Exception {
+ return RMIServiceProxy.NULL_PROXY;
+ }
+ })
+ .filter(new Predicate() {
+ @Override
+ public boolean test(RMIServiceProxy proxy) throws Exception {
+ return !RMIServiceProxy.NULL_PROXY.equals(proxy);
+ }
+ })
+ .blockingGet(RMIServiceProxy.NULL_PROXY);
+ }
+
public void copyFrom(RMIServiceInfo info) {
setProxyFactoryHint(info.getProxyFactoryHint());
setParams(info.getParams());
diff --git a/src/main/java/com/doodream/rmovjs/model/Request.java b/src/main/java/com/doodream/rmovjs/model/Request.java
index 1233860..03cac34 100644
--- a/src/main/java/com/doodream/rmovjs/model/Request.java
+++ b/src/main/java/com/doodream/rmovjs/model/Request.java
@@ -2,14 +2,13 @@
import com.doodream.rmovjs.net.ClientSocketAdapter;
-import com.doodream.rmovjs.net.session.BlobSession;
-import com.doodream.rmovjs.net.session.SessionControlMessage;
-import com.doodream.rmovjs.net.session.SessionControlMessageWriter;
+import com.doodream.rmovjs.net.session.*;
import com.doodream.rmovjs.parameter.Param;
-import com.doodream.rmovjs.serde.Converter;
import com.doodream.rmovjs.serde.Writer;
import com.google.gson.annotations.SerializedName;
import io.reactivex.Observable;
+import io.reactivex.functions.BiConsumer;
+import io.reactivex.functions.BiFunction;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -17,10 +16,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
/**
* Request contains information for client method invocation consisted with below
@@ -61,11 +60,14 @@ public static boolean isValid(Request request) {
(request.getParams() != null);
}
- public static SessionControlMessageWriter buildSessionMessageWriter(Writer writer) {
- return (controlMessage) -> {
- writer.write(Request.builder()
- .scm(controlMessage)
- .build());
+ public static SessionControlMessageWriter buildSessionMessageWriter(final Writer writer) {
+ return new SessionControlMessageWriter() {
+ @Override
+ public void write(SessionControlMessage controlMessage) throws IOException {
+ writer.write(Request.builder()
+ .scm(controlMessage)
+ .build());
+ }
};
}
@@ -80,29 +82,40 @@ public static Request fromEndpoint(Endpoint endpoint, Object ...args) {
.endpoint(endpoint.getUnique())
.build();
} else {
- Optional optionalSession = BlobSession.findOne(args);
- Request.RequestBuilder builder = Request.builder()
+ BlobSession session = BlobSession.findOne(args);
+ final Request.RequestBuilder builder = Request.builder()
.params(convertParams(endpoint, args))
.endpoint(endpoint.getUnique());
- optionalSession.ifPresent(builder::session);
+
+ if(!session.equals(BlobSession.NULL)) {
+ builder.session(session);
+ }
+
return builder.build();
}
}
- private static List convertParams(Endpoint endpoint, Object[] objects) {
+ private static List convertParams(final Endpoint endpoint, Object[] objects) {
if(objects == null) {
return Collections.EMPTY_LIST;
}
- return Observable.fromIterable(endpoint.getParams()).zipWith(Observable.fromArray(objects), (param, o) -> {
- param.apply(o);
- if(param.isInstanceOf(BlobSession.class)) {
- Log.debug("Endpoint {} has session param : {}", endpoint, param);
- if(o != null) {
- endpoint.session = (BlobSession) o;
+ return Observable.fromIterable(endpoint.getParams()).zipWith(Observable.fromArray(objects), new BiFunction() {
+ @Override
+ public Param apply(Param param, Object o) throws Exception {
+ param.apply(o);
+ if(param.isInstanceOf(BlobSession.class)) {
+ if(o != null) {
+ endpoint.session = (BlobSession) o;
+ }
}
+ return param;
+ }
+ }).collectInto(new ArrayList(), new BiConsumer, Param>() {
+ @Override
+ public void accept(ArrayList params, Param param) throws Exception {
+ params.add(param);
}
- return param;
- }).collectInto(new ArrayList(), List::add).blockingGet();
+ }).blockingGet();
}
}
diff --git a/src/main/java/com/doodream/rmovjs/model/Response.java b/src/main/java/com/doodream/rmovjs/model/Response.java
index fd564ad..08cc04c 100644
--- a/src/main/java/com/doodream/rmovjs/model/Response.java
+++ b/src/main/java/com/doodream/rmovjs/model/Response.java
@@ -8,6 +8,7 @@
import com.doodream.rmovjs.serde.Converter;
import com.doodream.rmovjs.serde.Writer;
import com.doodream.rmovjs.serde.json.JsonConverter;
+import com.doodream.rmovjs.util.Types;
import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import lombok.AllArgsConstructor;
@@ -17,7 +18,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
+import java.util.Collection;
/**
* Created by innocentevil on 18. 5. 4.
@@ -34,7 +38,6 @@ public class Response {
private T body;
private boolean isSuccessful;
private boolean hasSessionSwitch;
- private ResponseBody errorBody;
private int code;
private int nonce;
@SerializedName("scm")
@@ -44,19 +47,11 @@ public static Response success(T body) {
return Response.builder()
.body(body)
.code(SUCCESS)
+ .hasSessionSwitch(body instanceof BlobSession)
.isSuccessful(true)
.build();
}
- public static Response success(BlobSession session) {
- return Response.builder()
- .body(session)
- .code(SUCCESS)
- .isSuccessful(true)
- .hasSessionSwitch(true)
- .build();
- }
-
public T getBody() {
return body;
}
@@ -66,10 +61,10 @@ public void setBody(T body) {
}
public static Response error(int code, String msg) {
- return Response.builder()
- .code(code)
+ return Response.builder()
.isSuccessful(false)
- .errorBody(new ResponseBody(msg))
+ .code(code)
+ .body(msg)
.build();
}
@@ -102,19 +97,18 @@ public static Response from(RMIError error) {
}
public static void validate(Response res) {
- if(res.code == Response.SUCCESS) {
- Preconditions.checkNotNull(res.getBody(), "Successful response must have non-null body");
- } else {
- Preconditions.checkNotNull(res.getErrorBody(), "Error response must have non-null error body");
+ Preconditions.checkNotNull(res.getBody(), "Successful response must have non-null body");
+ if(!res.isSuccessful) {
+ Preconditions.checkArgument(res.getBody() instanceof String, "Error response must have non-null error body");
}
}
- public static SessionControlMessageWriter buildSessionMessageWriter(Writer writer) {
- return (controlMessage) -> {
- writer.write(Response.builder()
- .scm(controlMessage)
- .build());
-
+ public static SessionControlMessageWriter buildSessionMessageWriter(final Writer writer) {
+ return new SessionControlMessageWriter() {
+ @Override
+ public void write(SessionControlMessage controlMessage) throws IOException {
+ writer.write(Response.builder().scm(controlMessage).build());
+ }
};
}
@@ -134,7 +128,21 @@ public boolean hasScm() {
* @param converter converter implementation
* @param type {@link Type} for body content
*/
- public void resolve(Converter converter, Type type) {
- setBody(converter.resolve(getBody(), type));
+ public void resolve(Converter converter, Type type) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ if(type instanceof ParameterizedType) {
+ Class rawCls = Class.forName(((ParameterizedType) type).getRawType().getTypeName());
+ if(Types.isCastable(body, rawCls)) {
+ // ex > Bson4Jackson parsed as collections like ArrayList
+ // however, if there is recursive type parameters like ArrayList>
+ return;
+ }
+ } else {
+ Class rawCls = Class.forName(type.getTypeName());
+ if(Types.isCastable(body, rawCls)) {
+ return;
+ }
+ }
+ setBody((T) converter.resolve(getBody(), type));
}
+
}
diff --git a/src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java b/src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java
index bb2e104..18ff32f 100644
--- a/src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java
+++ b/src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java
@@ -7,15 +7,23 @@
import com.doodream.rmovjs.serde.Converter;
import com.google.common.base.Preconditions;
import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.ObservableOnSubscribe;
+import io.reactivex.ObservableSource;
import io.reactivex.disposables.CompositeDisposable;
-import io.reactivex.functions.Function;
+import io.reactivex.functions.*;
+import io.reactivex.observables.GroupedObservable;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Optional;
+import java.net.InetAddress;
+import java.net.InterfaceAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
public abstract class BaseServiceAdapter implements ServiceAdapter {
@@ -24,58 +32,124 @@ public abstract class BaseServiceAdapter implements ServiceAdapter {
private volatile boolean listen = false;
@Override
- public String listen(RMIServiceInfo serviceInfo, Converter converter, @NonNull Function handleRequest) throws IllegalAccessException, InstantiationException, IOException {
+ public String listen(final RMIServiceInfo serviceInfo, final Converter converter, final InetAddress network, @NonNull final Function handleRequest) throws IllegalAccessException, InstantiationException, IOException {
if(listen) {
throw new IllegalStateException("service already listening");
}
final RMINegotiator negotiator = (RMINegotiator) serviceInfo.getNegotiator().newInstance();
Preconditions.checkNotNull(negotiator, "fail to resolve %s", serviceInfo.getNegotiator());
Preconditions.checkNotNull(converter, "fail to resolve %s", serviceInfo.getConverter());
- onStart();
+ Preconditions.checkNotNull(network, "no network interface given");
+
+ onStart(network);
listen = true;
compositeDisposable.add(Observable.just(converter)
- .map(c -> acceptClient())
- .doOnNext(socket -> Log.debug("{} connected", socket.getRemoteName()))
- .repeatUntil(() -> !listen)
- .map(client -> negotiator.handshake(client, serviceInfo, converter, false))
- .map(socket -> new ClientSocketAdapter(socket, converter))
+ .map(new Function() {
+ @Override
+ public RMISocket apply(Converter converter) throws Exception {
+ return acceptClient();
+ }
+ })
+ .doOnNext(new Consumer() {
+ @Override
+ public void accept(RMISocket socket) throws Exception {
+ Log.debug("{} connected", socket.getRemoteName());
+ }
+ })
+ .repeatUntil(new BooleanSupplier() {
+ @Override
+ public boolean getAsBoolean() throws Exception {
+ return !listen;
+ }
+ })
+ .map(new Function() {
+ @Override
+ public RMISocket apply(RMISocket rmiSocket) throws Exception {
+ return negotiator.handshake(rmiSocket, serviceInfo, converter, false);
+ }
+ })
+ .map(new Function() {
+ @Override
+ public ClientSocketAdapter apply(RMISocket rmiSocket) throws Exception {
+ return new ClientSocketAdapter(rmiSocket, converter);
+ }
+ })
.subscribeOn(Schedulers.newThread())
- .subscribe(adapter-> onHandshakeSuccess(adapter, handleRequest),this::onError));
+ .subscribe(new Consumer() {
+ @Override
+ public void accept(ClientSocketAdapter clientSocketAdapter) throws Exception {
+ onHandshakeSuccess(clientSocketAdapter, handleRequest);
+ }
+ }, new Consumer() {
+ @Override
+ public void accept(Throwable throwable) throws Exception {
+ onError(throwable);
+ }
+ }));
return getProxyConnectionHint(serviceInfo);
}
- private void onHandshakeSuccess(ClientSocketAdapter adapter, Function handleRequest) {
-
+ private void onHandshakeSuccess(final ClientSocketAdapter adapter, final Function handleRequest) {
compositeDisposable.add(adapter.listen()
- .groupBy(Request::isValid)
- .flatMap(booleanRequestGroupedObservable -> Observable.>create(emitter -> {
- if(booleanRequestGroupedObservable.getKey()) {
- emitter.setDisposable(booleanRequestGroupedObservable.subscribe(request -> emitter.onNext(Optional.of(request))));
- } else {
- // bad request handle added
- emitter.setDisposable(booleanRequestGroupedObservable.subscribe(request -> {
- adapter.write(Response.from(RMIError.BAD_REQUEST));
- emitter.onNext(Optional.empty());
- }));
+ .groupBy(new Function() {
+ @Override
+ public Boolean apply(Request request) throws Exception {
+ return Request.isValid(request);
}
- }))
- .filter(Optional::isPresent)
- .map(Optional::get)
- .doOnNext(request -> request.setClient(adapter))
- .doOnNext(request -> Log.trace("Request <= {}", request))
+ })
+ .flatMap(new Function, ObservableSource>() {
+ @Override
+ public ObservableSource apply(final GroupedObservable booleanRequestGroupedObservable) throws Exception {
+ return Observable.create(new ObservableOnSubscribe() {
+ @Override
+ public void subscribe(final ObservableEmitter emitter) throws Exception {
+ if(booleanRequestGroupedObservable.getKey()) {
+ emitter.setDisposable(booleanRequestGroupedObservable.subscribe(new Consumer() {
+ @Override
+ public void accept(Request request) throws Exception {
+ emitter.onNext(request);
+ }
+ }));
+ } else {
+ // bad request handle added
+ emitter.setDisposable(booleanRequestGroupedObservable.subscribe(new Consumer() {
+ @Override
+ public void accept(Request request) throws Exception {
+ adapter.write(Response.from(RMIError.BAD_REQUEST));
+ }
+ }));
+ }
+ }
+ });
+ }
+ })
.observeOn(Schedulers.io())
- .subscribe(request -> {
- final Response response = handleRequest.apply(request);
- Log.trace("Response => {}", response);
- adapter.write(response);
- },this::onError));
+ .subscribe(new Consumer() {
+ @Override
+ public void accept(Request request) throws Exception {
+ if(Log.isTraceEnabled()) {
+ Log.trace("Request <= {}", request);
+ }
+ request.setClient(adapter);
+ final Response response = handleRequest.apply(request);
+ if(Log.isTraceEnabled()) {
+ Log.trace("Response => {}", response);
+ }
+ adapter.write(response);
+ }
+ }, new Consumer() {
+ @Override
+ public void accept(Throwable throwable) throws Exception {
+ onError(throwable);
+ }
+ }));
}
private void onError(Throwable throwable) {
- Log.error("Error : {}", throwable);
+ Log.error("Error : ", throwable);
close();
}
@@ -86,7 +160,7 @@ public void close() {
try {
onClose();
} catch (IOException e) {
- Log.warn("{}", e);
+ Log.warn("", e);
}
}
compositeDisposable.dispose();
@@ -94,7 +168,7 @@ public void close() {
}
- protected abstract void onStart() throws IOException;
+ protected abstract void onStart(InetAddress bindAddress) throws IOException;
protected abstract boolean isClosed();
protected abstract String getProxyConnectionHint(RMIServiceInfo serviceInfo);
protected abstract RMISocket acceptClient() throws IOException;
diff --git a/src/main/java/com/doodream/rmovjs/net/BaseServiceProxy.java b/src/main/java/com/doodream/rmovjs/net/BaseServiceProxy.java
index 8027ec4..ce395f5 100644
--- a/src/main/java/com/doodream/rmovjs/net/BaseServiceProxy.java
+++ b/src/main/java/com/doodream/rmovjs/net/BaseServiceProxy.java
@@ -12,19 +12,24 @@
import com.doodream.rmovjs.server.BasicService;
import com.doodream.rmovjs.server.svc.HealthCheckController;
import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.functions.BiFunction;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Method;
-import java.nio.charset.StandardCharsets;
import java.util.Base64;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,12 +56,15 @@ public class BaseServiceProxy implements RMIServiceProxy {
Log.debug("healthCheck {}", HEALTH_CHECK_ENDPOINT);
}
}
- private AtomicInteger semaphore;
+ private AtomicInteger openSemaphore;
+ private AtomicInteger pingSemaphore;
+ private long measuredQos;
private volatile boolean isOpened;
private final ConcurrentHashMap sessionRegistry;
private ConcurrentHashMap requestWaitQueue;
private CompositeDisposable compositeDisposable;
- private int requestNonce;
+ private Disposable pingDisposable;
+ private AtomicInteger requestId;
private RMIServiceInfo serviceInfo;
private Converter converter;
private RMISocket socket;
@@ -64,7 +72,6 @@ public class BaseServiceProxy implements RMIServiceProxy {
private Writer writer;
private Scheduler mListener = Schedulers.from(Executors.newWorkStealingPool(10));
-
public static BaseServiceProxy create(RMIServiceInfo info, RMISocket socket) {
return new BaseServiceProxy(info, socket);
}
@@ -73,10 +80,13 @@ private BaseServiceProxy(RMIServiceInfo info, RMISocket socket) {
sessionRegistry = new ConcurrentHashMap<>();
requestWaitQueue = new ConcurrentHashMap<>();
compositeDisposable = new CompositeDisposable();
+ pingDisposable = null;
+ // set Qos as bad as possible
+ measuredQos = Long.MAX_VALUE;
- semaphore = new AtomicInteger();
- semaphore.getAndSet(0);
- requestNonce = 0;
+ openSemaphore = new AtomicInteger(0);
+ pingSemaphore = new AtomicInteger(0);
+ requestId = new AtomicInteger(0);
serviceInfo = info;
isOpened = false;
this.socket = socket;
@@ -84,43 +94,63 @@ private BaseServiceProxy(RMIServiceInfo info, RMISocket socket) {
@Override
public synchronized void open() throws IOException, IllegalAccessException, InstantiationException {
- if(semaphore.getAndAdd(1) != 0) {
+ if(!markAsUse(openSemaphore)) {
+ Log.debug("already opened");
return;
}
- Log.debug("Initialized");
+
RMINegotiator negotiator = (RMINegotiator) serviceInfo.getNegotiator().newInstance();
converter = (Converter) serviceInfo.getConverter().newInstance();
+
socket.open();
reader = converter.reader(socket.getInputStream());
writer = converter.writer(socket.getOutputStream());
socket = negotiator.handshake(socket, serviceInfo, converter, true);
+
Log.trace("open proxy for {} : success", serviceInfo.getName());
isOpened = true;
- compositeDisposable.add(Observable.create(emitter -> {
- while (isOpened) {
- Response response = reader.read(Response.class);
- if(response == null) {
+ compositeDisposable.add(Observable.create(new ObservableOnSubscribe() {
+ @Override
+ public void subscribe(ObservableEmitter emitter) throws Exception {
+ try {
+ while (isOpened) {
+ Response response = reader.read(Response.class);
+ if (response == null) {
+ return;
+ }
+ if (response.hasScm()) {
+ handleSessionControlMessage(response);
+ continue;
+ }
+ emitter.onNext(response);
+ }
+ } catch (IOException ignore) {
+ isOpened = false;
+ } finally {
+ emitter.onComplete();
+ }
+ }
+ }).subscribeOn(mListener).subscribe(new Consumer() {
+ @Override
+ public void accept(Response response) throws Exception {
+ Request request = requestWaitQueue.remove(response.getNonce());
+ if (request == null) {
+ Log.warn("no mapped request exists : {}", response);
return;
}
- if(response.hasScm()) {
- handleSessionControlMessage(response);
- continue;
+ synchronized (request) {
+ Log.debug("request({}) is response({})", request, response);
+ request.setResponse(response);
+ request.notifyAll(); // wakeup waiting thread
}
- emitter.onNext(response);
}
- emitter.onComplete();
- }).subscribeOn(mListener).subscribe(response -> {
- Request request = requestWaitQueue.remove(response.getNonce());
- if(request == null) {
- Log.warn("no mapped request exists : {}", response);
- return;
+ }, new Consumer() {
+ @Override
+ public void accept(Throwable throwable) throws Exception {
+ onError(throwable);
}
- synchronized (request) {
- request.setResponse(response);
- request.notifyAll(); // wakeup waiting thread
- }
- }, this::onError));
+ }));
}
@Override
@@ -129,66 +159,107 @@ public boolean isOpen() {
}
@Override
- public Response request(Endpoint endpoint, Object ...args) {
+ public Response request(Endpoint endpoint, long timeoutInMillisec, Object ...args) {
return Observable.just(Request.fromEndpoint(endpoint, args))
- .doOnNext(request -> request.setNonce(++requestNonce))
- .doOnNext(request -> {
- final BlobSession session = request.getSession();
- if(session != null) {
- registerSession(session);
+ .doOnNext(new Consumer() {
+ @Override
+ public void accept(Request request) throws Exception {
+ request.setNonce(requestId.incrementAndGet());
+ final BlobSession session = request.getSession();
+ if(session != null) {
+ registerSession(session);
+ }
+ if(Log.isTraceEnabled()) {
+ Log.trace("Request => {}", request);
+ }
}
})
- .doOnNext(request -> Log.trace("Request => {}", request))
- .map(request -> {
- requestWaitQueue.put(request.getNonce(), request);
- synchronized (request) {
- writer.write(request);
- // caller block here, until the response is ready
- request.wait();
+ .map(new Function() {
+ @Override
+ public Response apply(Request request) throws Exception {
+ requestWaitQueue.put(request.getNonce(), request);
+ try {
+ synchronized (request) {
+ writer.write(request);
+ if (timeoutInMillisec > 0) {
+ request.wait(timeoutInMillisec);
+ } else {
+ request.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ return RMIError.CLOSED.getResponse();
+ }
+ if (request.getResponse() == null) {
+ return RMIError.TIMEOUT.getResponse();
+ }
+ return request.getResponse();
+ }
+ })
+ .doOnNext(new Consumer() {
+ @Override
+ public void accept(Response response) throws Exception {
+ if (response.isSuccessful()) {
+ response.resolve(converter, endpoint.getUnwrappedRetType());
+ }
}
- return Optional.of(request.getResponse());
})
- .filter(Optional::isPresent)
- .map(Optional::get)
- .defaultIfEmpty(RMIError.UNHANDLED.getResponse())
- .doOnError(this::onError)
- .doOnNext(response -> response.resolve(converter, endpoint.getUnwrappedRetType()))
- .doOnNext(response -> {
- Log.trace("Response <= {}", response);
- if(response.isHasSessionSwitch() &&
- response.isSuccessful()) {
- final BlobSession session = (BlobSession) response.getBody();
- if(session != null) {
- session.init();
- if (sessionRegistry.put(session.getKey(), session) != null) {
- Log.warn("session conflict for {}", session.getKey());
- return;
- }
- session.start(reader, writer, Request::buildSessionMessageWriter, () -> unregisterSession(session));
+ .map(new Function() {
+ @Override
+ public Response apply(Response response) throws Exception {
+ if (response.isHasSessionSwitch()) {
+ return handleBlobResponse(response);
+ } else {
+ return response;
}
}
})
+ .defaultIfEmpty(RMIError.UNHANDLED.getResponse())
+ .doOnError(new Consumer() {
+ @Override
+ public void accept(Throwable throwable) throws Exception {
+ onError(throwable);
+ }
+ })
.blockingSingle();
}
- private void handleSessionControlMessage(Response response) throws IOException {
+ private Response handleBlobResponse(Response response) {
+ if(response.getBody() != null) {
+ final BlobSession session = (BlobSession) response.getBody();
+ session.init();
+ if(sessionRegistry.put(session.getKey(), session) != null) {
+ Log.warn("session conflict for {}", session.getKey());
+ } else {
+ session.start(reader, writer, converter, Request::buildSessionMessageWriter, () -> unregisterSession(session));
+ }
+ return response;
+ }
+ return RMIError.BAD_RESPONSE.getResponse();
+ }
+
+ private void handleSessionControlMessage(Response response) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
SessionControlMessage scm = response.getScm();
BlobSession session;
session = sessionRegistry.get(scm.getKey());
if (session == null) {
- Log.warn("Session not available for {}", scm);
+ Log.warn("session not available for {} @ {}", scm.getCommand(), scm.getKey());
return;
}
session.handle(scm);
}
+ /**
+ * register session
+ * @param session
+ */
private void registerSession(BlobSession session) {
if (sessionRegistry.put(session.getKey(), session) != null) {
Log.warn("session : {} collision in registry", session.getKey());
}
Log.trace("session registered {}", session);
- session.start(reader, writer, Request::buildSessionMessageWriter, () -> unregisterSession(session));
+ session.start(reader, writer, converter, Request::buildSessionMessageWriter, () -> unregisterSession(session));
}
private void unregisterSession(BlobSession session ) {
@@ -200,43 +271,117 @@ private void unregisterSession(BlobSession session ) {
}
private void onError(Throwable throwable) {
- Log.error("{}", throwable);
+ Log.error("proxy closed {}({})", socket.getRemoteName() ,serviceInfo.getName(), throwable);
try {
- close();
+ actualClose();
} catch (IOException ignored) { }
}
public void close() throws IOException {
- if(semaphore.getAndDecrement() != 0) {
+ if(!markAsUnuse(openSemaphore)) {
return;
}
- if(!socket.isClosed()) {
- socket.close();
- }
+ actualClose();
+ }
+
+ /**
+ * close socket for this service proxy and stop listening to response from the remote service
+ * @throws IOException try to close socket when it is already closed by peer or has not been opened at all
+ */
+ private synchronized void actualClose() throws IOException {
if(!compositeDisposable.isDisposed()) {
compositeDisposable.dispose();
}
compositeDisposable.clear();
- // wake blocked thread from wait queue
- requestWaitQueue.values().forEach(request -> {
+ if(!pingDisposable.isDisposed()) {
+ pingDisposable.dispose();
+ }
+ if(!socket.isClosed()) {
+ socket.close();
+ }
+ for (Request request : requestWaitQueue.values()) {
synchronized (request) {
+ // put error response on the request
+ request.setResponse(RMIError.CLOSED.getResponse());
+ // wake blocked threads from wait queue
request.notifyAll();
}
- });
+ }
isOpened = false;
Log.debug("proxy for {} closed", serviceInfo.getName());
}
+
@Override
- public Optional ping() {
- final Response response = request(HEALTH_CHECK_ENDPOINT);
- if(response.isSuccessful() && (response.getCode() == Response.SUCCESS)) {
- Log.debug("body {} /w cls {}", response.getBody(), response.getBody().getClass());
- return Optional.of(response.getBody());
+ public void startPeriodicQosUpdate(long timeout, long interval, TimeUnit timeUnit) {
+ if(!markAsUse(pingSemaphore)) {
+ return;
}
- return Optional.empty();
+ pingDisposable = Observable.interval(interval, timeUnit)
+ .subscribeOn(Schedulers.io())
+ .subscribe(new Consumer() {
+ @Override
+ public void accept(Long aLong) throws Exception {
+ getQosUpdate(timeout);
+ }
+ });
+ }
+
+ @Override
+ public void stopPeriodicQosUpdate() {
+ if(!markAsUnuse(pingSemaphore)) {
+ return;
+ }
+ if(!pingDisposable.isDisposed()) {
+ pingDisposable.dispose();
+ }
+ }
+
+
+ /**
+ * check whether the resource has been used previously or not
+ * @param semaphore {@link AtomicInteger} to be used as semaphore for resource
+ * @return true, if the resource has been unused previously, otherwise false
+ */
+ private boolean markAsUse(AtomicInteger semaphore) {
+ return !(semaphore.getAndIncrement() > 0);
+ }
+
+ /**
+ * check whether the resource is still used or not
+ * @param semaphore {@link AtomicInteger} to be used as semaphore for resource
+ * @return true, resource becomes unused, otherwise false
+ */
+ private boolean markAsUnuse(AtomicInteger semaphore) {
+ return !(semaphore.updateAndGet(v -> {
+ if(v > 0) {
+ return --v;
+ }
+ return v;
+ }) > 0);
}
+ @Override
+ public Long getQosUpdate(long timeout) {
+ if(!isOpen()) {
+ return Long.MAX_VALUE;
+ }
+ long sTime = System.currentTimeMillis();
+ final Response response = request(HEALTH_CHECK_ENDPOINT, timeout);
+ if (response.isSuccessful() && (response.getCode() == Response.SUCCESS)) {
+ measuredQos = System.currentTimeMillis() - sTime;
+ } else {
+ measuredQos = Long.MAX_VALUE;
+ }
+ return measuredQos;
+ }
+
+ @Override
+ public Long getQosMeasured() {
+ return measuredQos;
+ }
+
+
@Override
public String who() {
return Base64.getEncoder().encodeToString(socket.getRemoteName().concat(serviceInfo.toString()).getBytes());
@@ -245,9 +390,24 @@ public String who() {
@Override
public boolean provide(Class controller) {
return Observable.fromIterable(serviceInfo.getControllerInfos())
- .map(ControllerInfo::getStubCls)
- .map(controller::equals)
- .reduce((isThere1, isThere2) -> isThere1 || isThere2)
+ .map(new Function>() {
+ @Override
+ public Class> apply(ControllerInfo controllerInfo) throws Exception {
+ return controllerInfo.getStubCls();
+ }
+ })
+ .map(new Function, Boolean>() {
+ @Override
+ public Boolean apply(Class> stubCls) throws Exception {
+ return controller.equals(stubCls);
+ }
+ })
+ .reduce(new BiFunction() {
+ @Override
+ public Boolean apply(Boolean match1, Boolean match2) throws Exception {
+ return match1 || match2;
+ }
+ })
.blockingGet(false);
}
}
diff --git a/src/main/java/com/doodream/rmovjs/net/ClientSocketAdapter.java b/src/main/java/com/doodream/rmovjs/net/ClientSocketAdapter.java
index ed313e6..ea94844 100644
--- a/src/main/java/com/doodream/rmovjs/net/ClientSocketAdapter.java
+++ b/src/main/java/com/doodream/rmovjs/net/ClientSocketAdapter.java
@@ -25,6 +25,7 @@ public class ClientSocketAdapter {
private RMISocket client;
private Reader reader;
private Writer writer;
+ private Converter converter;
private final ConcurrentHashMap sessionRegistry;
ClientSocketAdapter(RMISocket socket, Converter converter) throws IOException {
@@ -32,14 +33,15 @@ public class ClientSocketAdapter {
sessionRegistry = new ConcurrentHashMap<>();
reader = converter.reader(socket.getInputStream());
writer = converter.writer(socket.getOutputStream());
+ this.converter = converter;
}
- public void write(Response response) throws IOException {
+ public void write(Response response) throws Exception {
if(response.isHasSessionSwitch()) {
BlobSession session = (BlobSession) response.getBody();
sessionRegistry.put(session.getKey(), session);
- session.start(reader, writer, Response::buildSessionMessageWriter, () -> unregisterSession(session));
+ session.start(reader, writer, converter, Response::buildSessionMessageWriter, () -> unregisterSession(session));
}
writer.write(response);
}
@@ -68,14 +70,14 @@ Observable listen() {
return;
}
Log.debug("session registered {}", session);
- session.start(reader, writer, Response::buildSessionMessageWriter, () -> unregisterSession(session));
+ session.start(reader, writer, converter, Response::buildSessionMessageWriter, () -> unregisterSession(session));
// forward request to transfer session object to application
}
emitter.onNext(request);
}
emitter.onComplete();
} catch (IOException e) {
- emitter.onError(e);
+ client.close();
}
});
return requestObservable.subscribeOn(Schedulers.io());
@@ -89,7 +91,7 @@ private void unregisterSession(BlobSession session ) {
Log.trace("remove session : {}", session.getKey());
}
- private void handleSessionControlMessage(Request request) throws SessionControlException, IllegalStateException, IOException {
+ private void handleSessionControlMessage(Request request) throws SessionControlException, IllegalStateException, IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
final SessionControlMessage scm = request.getScm();
BlobSession session;
session = sessionRegistry.get(scm.getKey());
@@ -104,4 +106,8 @@ String who() {
return client.getRemoteName();
}
+ public void close() throws IOException {
+ client.close();
+ Log.debug("client closed");
+ }
}
diff --git a/src/main/java/com/doodream/rmovjs/net/RMIServiceProxy.java b/src/main/java/com/doodream/rmovjs/net/RMIServiceProxy.java
index ef2cfa5..2708be3 100644
--- a/src/main/java/com/doodream/rmovjs/net/RMIServiceProxy.java
+++ b/src/main/java/com/doodream/rmovjs/net/RMIServiceProxy.java
@@ -3,17 +3,15 @@
import com.doodream.rmovjs.model.Endpoint;
import com.doodream.rmovjs.model.RMIError;
import com.doodream.rmovjs.model.Response;
-import jdk.nashorn.internal.runtime.options.Option;
import java.io.IOException;
-import java.util.Optional;
-
-import static java.util.Optional.empty;
+import java.util.concurrent.TimeUnit;
public interface RMIServiceProxy {
RMIServiceProxy NULL_PROXY = new RMIServiceProxy() {
@Override
public void open() {
+ // NO OP
}
@Override
@@ -22,17 +20,33 @@ public boolean isOpen() {
}
@Override
- public Response request(Endpoint endpoint, Object ...args) {
+ public Response request(Endpoint endpoint, long timeoutInMilliSec, Object ...args) {
return Response.from(RMIError.NOT_FOUND);
}
@Override
public void close() {
+ // NO OP
+ }
+
+ @Override
+ public void startPeriodicQosUpdate(long timeout, long interval, TimeUnit timeUnit) {
+ // NO OP
+ }
+
+ @Override
+ public void stopPeriodicQosUpdate() {
+ // NO OP
+ }
+
+ @Override
+ public Long getQosUpdate(long timeout) {
+ return Long.MAX_VALUE;
}
@Override
- public Optional ping() {
- return Optional.empty();
+ public Long getQosMeasured() {
+ return Long.MAX_VALUE;
}
@Override
@@ -55,9 +69,12 @@ public boolean provide(Class controller) {
*/
void open() throws IOException, IllegalAccessException, InstantiationException;
boolean isOpen();
- Response request(Endpoint endpoint, Object ...args) throws IOException;
+ Response request(Endpoint endpoint, long timeoutMilliSec, Object ...args) throws IOException;
void close() throws IOException;
- Optional ping();
+ void startPeriodicQosUpdate(long timeout, long interval, TimeUnit timeUnit);
+ void stopPeriodicQosUpdate();
+ Long getQosUpdate(long timeout);
+ Long getQosMeasured();
String who();
boolean provide(Class controller);
}
diff --git a/src/main/java/com/doodream/rmovjs/net/ServiceAdapter.java b/src/main/java/com/doodream/rmovjs/net/ServiceAdapter.java
index 94f4612..abc76d2 100644
--- a/src/main/java/com/doodream/rmovjs/net/ServiceAdapter.java
+++ b/src/main/java/com/doodream/rmovjs/net/ServiceAdapter.java
@@ -8,6 +8,7 @@
import io.reactivex.functions.Function;
import java.io.IOException;
+import java.net.InetAddress;
/**
* {@link ServiceAdapter} provides abstraction layer for network dependency including listed below
@@ -29,13 +30,14 @@ public interface ServiceAdapter {
* client로 부터의 네트워크 연결을 대기하며 client로 부터의 {@link Request}를 처리하기 위한 handler를 등록한다.
* @param serviceInfo 서비스 정의 instance {@link RMIServiceInfo}
* @param converter {@link Request} 및 {@link Response} instance에 대한 deserialization / serialization module
+ * @param network network interface which the service adapter listen to
* @param requestHandler {@link Request}의 수신 및 {@link Response}의 응답을 처리하기 위한 handler로 {@link com.doodream.rmovjs.server.RMIService}
* @return proxyFactoryHint as string
* @throws IOException server 측 네트워크 endpoint 생성의 실패 혹은 I/O 오류
* @throws IllegalAccessError the error thrown when {@link ServiceAdapter} fails to resolve dependency object (e.g. negotiator,
* @throws InstantiationException if dependent class represents an abstract class,an interface, an array class, a primitive type, or void;or if the class has no nullary constructor;
*/
- String listen(RMIServiceInfo serviceInfo, Converter converter, Function requestHandler) throws IOException, IllegalAccessException, InstantiationException;
+ String listen(RMIServiceInfo serviceInfo, Converter converter, InetAddress network, Function requestHandler) throws IOException, IllegalAccessException, InstantiationException;
/**
* return {@link ServiceProxyFactory} which is capable of building {@link RMIServiceProxy} able to connect to current service adapter
diff --git a/src/main/java/com/doodream/rmovjs/net/SimpleNegotiator.java b/src/main/java/com/doodream/rmovjs/net/SimpleNegotiator.java
index 44b9357..a56a2e0 100644
--- a/src/main/java/com/doodream/rmovjs/net/SimpleNegotiator.java
+++ b/src/main/java/com/doodream/rmovjs/net/SimpleNegotiator.java
@@ -8,6 +8,9 @@
import com.doodream.rmovjs.serde.Writer;
import com.google.common.base.Preconditions;
import io.reactivex.Observable;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
+import io.reactivex.functions.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,7 +21,7 @@ public class SimpleNegotiator implements RMINegotiator {
@Override
public RMISocket handshake(RMISocket socket, RMIServiceInfo service, Converter converter, boolean isClient) throws HandshakeFailException {
- Log.info("Handshake start @ {}", isClient? "CLIENT" : "SERVER");
+ Log.info("Handshake start as {} @ {}", isClient? "CLIENT" : "SERVER", socket.getRemoteName());
try {
Reader reader = converter.reader(socket.getInputStream());
Writer writer = converter.writer(socket.getOutputStream());
@@ -33,43 +36,88 @@ public RMISocket handshake(RMISocket socket, RMIServiceInfo service, Converter c
return socket;
}
- private void handshakeFromClient(RMIServiceInfo service, Reader reader, Writer writer) throws HandshakeFailException {
+ private void handshakeFromClient(final RMIServiceInfo service, Reader reader, Writer writer) throws HandshakeFailException {
try {
writer.write(service);
+ Log.debug("write {}", service);
Response response = reader.read(Response.class);
if ((response != null) &&
response.isSuccessful()) {
- Log.info("Handshake Success {} (Ver. {})", service.getName(), service.getVersion());
+ Log.debug("Handshake Success {} (Ver. {})", service.getName(), service.getVersion());
return;
}
Preconditions.checkNotNull(response, "Response is null");
- Log.error("Handshake Fail ({}) {}",response.getCode(), response.getErrorBody());
+ Log.error("Handshake Fail ({}) {}",response.getCode(), response.getBody());
} catch (IOException ignore) { }
throw new HandshakeFailException();
}
- private void handshakeFromServer(RMIServiceInfo service, Reader reader, Writer writer) throws HandshakeFailException {
+ private void handshakeFromServer(final RMIServiceInfo service, Reader reader, final Writer writer) throws HandshakeFailException {
try {
Observable handshakeRequestSingle = Observable.just(reader.read(RMIServiceInfo.class));
Observable serviceInfoMatchedObservable = handshakeRequestSingle
- .filter(info -> info.hashCode() == service.hashCode())
- .map(info -> Response.success("OK"));
+ .filter(new Predicate() {
+ @Override
+ public boolean test(RMIServiceInfo info) throws Exception {
+ return info.hashCode() == service.hashCode();
+ }
+ })
+ .map(new Function() {
+ @Override
+ public Response apply(RMIServiceInfo rmiServiceInfo) throws Exception {
+ return Response.success("OK");
+ }
+ });
Observable serviceInfoMismatchObservable = handshakeRequestSingle
- .filter(info -> info.hashCode() != service.hashCode())
- .map(info -> Response.from(RMIError.BAD_REQUEST));
+ .filter(new Predicate() {
+ @Override
+ public boolean test(RMIServiceInfo info) throws Exception {
+ return info.hashCode() != service.hashCode();
+ }
+ })
+ .map(new Function() {
+ @Override
+ public Response apply(RMIServiceInfo rmiServiceInfo) throws Exception {
+ Log.debug("{} != {}", rmiServiceInfo, service);
+ return Response.from(RMIError.BAD_REQUEST);
+ }
+ });
boolean success = serviceInfoMatchedObservable.mergeWith(serviceInfoMismatchObservable)
- .doOnNext(response -> Log.info("Handshake Response : ({}) {}", response.getCode(), response.getBody()))
- .doOnNext(writer::write)
- .map(Response::isSuccessful)
- .filter(Boolean::booleanValue)
+ .doOnNext(new Consumer() {
+ @Override
+ public void accept(Response response) throws Exception {
+ Log.trace("Handshake Response : ({}) {}", response.getCode(), response.getBody());
+ }
+ })
+ .doOnNext(new Consumer() {
+ @Override
+ public void accept(Response response) throws Exception {
+ Log.debug("write response {}", response);
+ writer.write(response);
+ }
+ })
+ .map(new Function() {
+ @Override
+ public Boolean apply(Response response) throws Exception {
+ return response.isSuccessful();
+ }
+ })
+ .filter(new Predicate() {
+ @Override
+ public boolean test(Boolean aBoolean) throws Exception {
+ return aBoolean;
+ }
+ })
.blockingSingle(false);
if (success) {
return;
}
- } catch (IOException ignore) { }
+ } catch (IOException e) {
+ Log.error("", e);
+ }
throw new HandshakeFailException();
}
diff --git a/src/main/java/com/doodream/rmovjs/net/session/BlobSession.java b/src/main/java/com/doodream/rmovjs/net/session/BlobSession.java
index 5269916..55f0dd9 100644
--- a/src/main/java/com/doodream/rmovjs/net/session/BlobSession.java
+++ b/src/main/java/com/doodream/rmovjs/net/session/BlobSession.java
@@ -1,17 +1,18 @@
package com.doodream.rmovjs.net.session;
+import com.doodream.rmovjs.serde.Converter;
import com.doodream.rmovjs.serde.Reader;
import com.doodream.rmovjs.serde.Writer;
import com.google.gson.annotations.SerializedName;
import io.reactivex.Observable;
+import io.reactivex.functions.Function;
+import io.reactivex.functions.Predicate;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Optional;
import java.util.Random;
-import java.util.function.Consumer;
@Data
public class BlobSession implements SessionHandler {
@@ -29,6 +30,7 @@ public class BlobSession implements SessionHandler {
// read error start with -2000
public static final int SIZE_NOT_MATCHED = -2001;
public static final int INVALID_EOS_CHAR = -2002;
+ public static final BlobSession NULL = new BlobSession(null);
private static int GLOBAL_KEY = 0;
private static String DEFAULT_TYPE = "application/octet-stream";
@@ -69,18 +71,31 @@ public BlobSession() {
* @param args arguments
* @return
*/
- public static Optional findOne(Object[] args) {
- return Observable.fromArray(args).filter(o -> o instanceof BlobSession).cast(BlobSession.class).map(Optional::ofNullable).blockingFirst(Optional.empty());
+ public static BlobSession findOne(Object[] args) {
+ return Observable.fromArray(args)
+ .filter(new Predicate