Skip to content

Commit

Permalink
Merge pull request #175 from pusher/configurable_reconnections
Browse files Browse the repository at this point in the history
Configurable reconnections
  • Loading branch information
zmarkan authored Feb 23, 2018
2 parents 8acdcaf + fe1a020 commit 9b031cf
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 22 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ The pusher-java-client is available in Maven Central.
<dependency>
<groupId>com.pusher</groupId>
<artifactId>pusher-java-client</artifactId>
<version>1.7.0</version>
<version>1.8.0</version>
</dependency>
</dependencies>
```
Expand All @@ -60,7 +60,7 @@ The pusher-java-client is available in Maven Central.

```groovy
dependencies {
compile 'com.pusher:pusher-java-client:1.7.0'
compile 'com.pusher:pusher-java-client:1.8.0'
}
```

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ apply plugin: 'org.ajoberstar.github-pages'
apply plugin: 'signing'

group = "com.pusher"
version = "1.7.1-SNAPSHOT"
version = "1.8.1-SNAPSHOT"
sourceCompatibility = "1.6"
targetCompatibility = "1.6"

Expand Down
5 changes: 3 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
distributionUrl=https\://services.gradle.org/distributions/gradle-4.4.1-bin.zip
#Wed Feb 21 14:42:32 GMT 2018
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.4.1-all.zip
46 changes: 44 additions & 2 deletions src/main/java/com/pusher/client/PusherOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class PusherOptions {
private static final long DEFAULT_ACTIVITY_TIMEOUT = 120000;
private static final long DEFAULT_PONG_TIMEOUT = 30000;

private static final int MAX_RECONNECTION_ATTEMPTS = 6; //Taken from the Swift lib
private static final int MAX_RECONNECT_GAP_IN_SECONDS = 30;

// Note that the primary cluster lives on a different domain
// (others are subdomains of pusher.com). This is not an oversight.
// Legacy reasons.
Expand All @@ -36,6 +39,8 @@ public class PusherOptions {
private long pongTimeout = DEFAULT_PONG_TIMEOUT;
private Authorizer authorizer;
private Proxy proxy = Proxy.NO_PROXY;
private int maxReconnectionAttempts = MAX_RECONNECTION_ATTEMPTS;
private int maxReconnectGapInSeconds = MAX_RECONNECT_GAP_IN_SECONDS;

/**
* Gets whether an encrypted (SSL) connection should be used when connecting
Expand Down Expand Up @@ -182,7 +187,30 @@ public PusherOptions setPongTimeout(final long pongTimeout) {
return this;
}

public long getPongTimeout() {
/**
* Number of reconnect attempts when websocket connection failed
* @param maxReconnectionAttempts
* number of max reconnection attempts, default = {@link #MAX_RECONNECTION_ATTEMPTS} 6
* @return this, for chaining
*/
public PusherOptions setMaxReconnectionAttempts(int maxReconnectionAttempts) {
this.maxReconnectionAttempts = maxReconnectionAttempts;
return this;
}

/**
* The delay in two reconnection extends exponentially (1, 2, 4, .. seconds) This property sets the maximum in between two
* reconnection attempts.
* @param maxReconnectGapInSeconds
* time in seconds of the maximum gab between two reconnection attempts, default = {@link #MAX_RECONNECT_GAP_IN_SECONDS} 30s
* @return this, for chaining
*/
public PusherOptions setMaxReconnectGapInSeconds(int maxReconnectGapInSeconds) {
this.maxReconnectGapInSeconds = maxReconnectGapInSeconds;
return this;
}

public long getPongTimeout() {
return pongTimeout;
}

Expand Down Expand Up @@ -221,7 +249,21 @@ public Proxy getProxy() {
return this.proxy;
}

private static String readVersionFromProperties() {
/**
* @return the maximum reconnection attempts
*/
public int getMaxReconnectionAttempts() {
return maxReconnectionAttempts;
}

/**
* @return the maximum reconnection gap in seconds
*/
public int getMaxReconnectGapInSeconds() {
return maxReconnectGapInSeconds;
}

private static String readVersionFromProperties() {
InputStream inStream = null;
try {
final Properties p = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ public class WebSocketConnection implements InternalConnection, WebSocketListene

private static final String INTERNAL_EVENT_PREFIX = "pusher:";
private static final String PING_EVENT_SERIALIZED = "{\"event\": \"pusher:ping\"}";
private static final int MAX_RECONNECTION_ATTEMPTS = 6; //Taken from the Swift lib
private static final int MAX_RECONNECT_GAP_IN_SECONDS = 30;

private final Factory factory;
private final ActivityTimer activityTimer;
private final Map<ConnectionState, Set<ConnectionEventListener>> eventListeners = new ConcurrentHashMap<ConnectionState, Set<ConnectionEventListener>>();
private final URI webSocketUri;
private final Proxy proxy;
private final int maxReconnectionAttempts;
private final int maxReconnectionGap;

private volatile ConnectionState state = ConnectionState.DISCONNECTED;
private WebSocketClientWrapper underlyingConnection;
Expand All @@ -49,10 +49,14 @@ public WebSocketConnection(
final String url,
final long activityTimeout,
final long pongTimeout,
int maxReconnectionAttempts,
int maxReconnectionGap,
final Proxy proxy,
final Factory factory) throws URISyntaxException {
webSocketUri = new URI(url);
activityTimer = new ActivityTimer(activityTimeout, pongTimeout);
this.maxReconnectionAttempts = maxReconnectionAttempts;
this.maxReconnectionGap = maxReconnectionGap;
this.proxy = proxy;
this.factory = factory;

Expand Down Expand Up @@ -270,7 +274,7 @@ public void onClose(final int code, final String reason, final boolean remote) {
//Reconnection logic
if(state == ConnectionState.CONNECTED || state == ConnectionState.CONNECTING){

if(reconnectAttempts < MAX_RECONNECTION_ATTEMPTS){
if(reconnectAttempts < maxReconnectionAttempts){
tryReconnecting();
}
else{
Expand All @@ -288,7 +292,7 @@ public void onClose(final int code, final String reason, final boolean remote) {
private void tryReconnecting() {
reconnectAttempts++;
updateState(ConnectionState.RECONNECTING);
long reconnectInterval = Math.min(MAX_RECONNECT_GAP_IN_SECONDS, reconnectAttempts * reconnectAttempts);
long reconnectInterval = Math.min(maxReconnectionGap, reconnectAttempts * reconnectAttempts);

factory.getTimers().schedule(new Runnable() {
@Override
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/pusher/client/util/Factory.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ public class Factory {
public synchronized InternalConnection getConnection(final String apiKey, final PusherOptions options) {
if (connection == null) {
try {
connection = new WebSocketConnection(options.buildUrl(apiKey), options.getActivityTimeout(),
options.getPongTimeout(), options.getProxy(), this);
connection = new WebSocketConnection(
options.buildUrl(apiKey),
options.getActivityTimeout(),
options.getPongTimeout(),
options.getMaxReconnectionAttempts(),
options.getMaxReconnectGapInSeconds(),
options.getProxy(),
this);
}
catch (final URISyntaxException e) {
throw new IllegalArgumentException("Failed to initialise connection", e);
Expand Down
16 changes: 12 additions & 4 deletions src/test/java/com/pusher/client/EndToEndTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package com.pusher.client;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.net.Proxy;
import java.net.URI;

import com.pusher.java_websocket.handshake.ServerHandshake;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -26,6 +31,7 @@
import com.pusher.client.connection.websocket.WebSocketListener;
import com.pusher.client.util.DoNothingExecutor;
import com.pusher.client.util.Factory;
import com.pusher.java_websocket.handshake.ServerHandshake;

@RunWith(MockitoJUnitRunner.class)
public class EndToEndTest {
Expand All @@ -38,6 +44,7 @@ public class EndToEndTest {
+ PRIVATE_CHANNEL_NAME + "\",\"auth\":\"" + AUTH_KEY + "\"}}";
private static final long ACTIVITY_TIMEOUT = 120000;
private static final long PONG_TIMEOUT = 120000;

private static final Proxy proxy = Proxy.NO_PROXY;

private @Mock Authorizer mockAuthorizer;
Expand All @@ -53,7 +60,8 @@ public class EndToEndTest {
public void setUp() throws Exception {
pusherOptions = new PusherOptions().setAuthorizer(mockAuthorizer).setEncrypted(false);

connection = new WebSocketConnection(pusherOptions.buildUrl(API_KEY), ACTIVITY_TIMEOUT, PONG_TIMEOUT, proxy, factory);
connection = new WebSocketConnection(pusherOptions.buildUrl(API_KEY), ACTIVITY_TIMEOUT, PONG_TIMEOUT, pusherOptions.getMaxReconnectionAttempts(),
pusherOptions.getMaxReconnectGapInSeconds(), proxy, factory);

doAnswer(new Answer() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class WebSocketConnectionTest {

private static final long ACTIVITY_TIMEOUT = 500;
private static final long PONG_TIMEOUT = 500;
private static final int MAX_RECONNECTIONS = 6;
private static final int MAX_GAP = 30;
private static final String URL = "ws://ws.example.com/";
private static final String EVENT_NAME = "my-event";
private static final String CONN_ESTABLISHED_EVENT = "{\"event\":\"pusher:connection_established\",\"data\":\"{\\\"socket_id\\\":\\\"21112.816204\\\"}\"}";
Expand Down Expand Up @@ -65,22 +67,24 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}).when(factory).queueOnEventThread(any(Runnable.class));
when(factory.getTimers()).thenReturn(new DoNothingExecutor());

connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, PROXY, factory);
connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, MAX_RECONNECTIONS, MAX_GAP, PROXY, factory);
connection.bind(ConnectionState.ALL, mockEventListener);
}

@Test
public void testUnbindingWhenNotAlreadyBoundReturnsFalse() throws URISyntaxException {
final ConnectionEventListener listener = mock(ConnectionEventListener.class);
final WebSocketConnection connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, PROXY, factory);
final WebSocketConnection connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, MAX_RECONNECTIONS, MAX_GAP,
PROXY, factory);
final boolean unbound = connection.unbind(ConnectionState.ALL, listener);
assertEquals(false, unbound);
}

@Test
public void testUnbindingWhenBoundReturnsTrue() throws URISyntaxException {
final ConnectionEventListener listener = mock(ConnectionEventListener.class);
final WebSocketConnection connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, PROXY, factory);
final WebSocketConnection connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, MAX_RECONNECTIONS, MAX_GAP,
PROXY, factory);

connection.bind(ConnectionState.ALL, listener);

Expand Down Expand Up @@ -118,7 +122,8 @@ public void testConnectDoesNotCallConnectOnUnderlyingConnectionIfAlreadyInConnec

@Test
public void testListenerDoesNotReceiveConnectingEventIfItIsOnlyBoundToTheConnectedEvent() throws URISyntaxException {
connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, PROXY, factory);
connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, MAX_RECONNECTIONS, MAX_GAP,
PROXY, factory);
connection.bind(ConnectionState.CONNECTED, mockEventListener);
connection.connect();

Expand Down Expand Up @@ -219,7 +224,8 @@ public void testOnCloseCallbackUpdatesStateToDisconnectedWhenPreviousStateIsDisc

@Test
public void testOnCloseCallbackDoesNotCallListenerIfItIsNotBoundToDisconnectedEvent() throws URISyntaxException {
connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, PROXY, factory);
connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, MAX_RECONNECTIONS, MAX_GAP,
PROXY, factory);
connection.bind(ConnectionState.CONNECTED, mockEventListener);

connection.connect();
Expand Down

0 comments on commit 9b031cf

Please sign in to comment.