Skip to content

Commit

Permalink
Merge pull request #125 from pusher/feature/reconnect
Browse files Browse the repository at this point in the history
Add Reconnecting logic when you lose connectivity.
  • Loading branch information
zmarkan authored Nov 10, 2016
2 parents 537bd51 + f32956f commit abc7d5d
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
* Represents connection states e.g. connected and disconnected.
*/
public enum ConnectionState {
CONNECTING, CONNECTED, DISCONNECTING, DISCONNECTED, ALL
CONNECTING, CONNECTED, DISCONNECTING, DISCONNECTED, RECONNECTING, ALL
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import javax.net.ssl.SSLException;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,7 +30,9 @@ public class WebSocketConnection implements InternalConnection, WebSocketListene
private static final Gson GSON = new Gson();

private static final String INTERNAL_EVENT_PREFIX = "pusher:";
static final String PING_EVENT_SERIALIZED = "{\"event\": \"pusher:ping\"}";
private static final String PING_EVENT_SERIALIZED = "{\"event\": \"pusher:ping\"}";
private static final int MAX_RECONNECTION_ATTEMPTS = 6; //Taken from the Swift lib
private static final int MAX_RECONNECT_GAP_IN_SECONDS = 30;

private final Factory factory;
private final ActivityTimer activityTimer;
Expand All @@ -42,6 +43,8 @@ public class WebSocketConnection implements InternalConnection, WebSocketListene
private volatile ConnectionState state = ConnectionState.DISCONNECTED;
private WebSocketClientWrapper underlyingConnection;
private String socketId;
private int reconnectAttempts = 0;


public WebSocketConnection(
final String url,
Expand All @@ -68,20 +71,24 @@ public void connect() {
@Override
public void run() {
if (state == ConnectionState.DISCONNECTED) {
try {
underlyingConnection = factory
.newWebSocketClientWrapper(webSocketUri, proxy, WebSocketConnection.this);
updateState(ConnectionState.CONNECTING);
underlyingConnection.connect();
}
catch (final SSLException e) {
sendErrorToAllListeners("Error connecting over SSL", null, e);
}
tryConnecting();
}
}
});
}

private void tryConnecting(){
try {
underlyingConnection = factory
.newWebSocketClientWrapper(webSocketUri, proxy, WebSocketConnection.this);
updateState(ConnectionState.CONNECTING);
underlyingConnection.connect();
}
catch (final SSLException e) {
sendErrorToAllListeners("Error connecting over SSL", null, e);
}
}

@Override
public void disconnect() {
factory.queueOnEventThread(new Runnable() {
Expand Down Expand Up @@ -185,6 +192,7 @@ private void handleConnectionMessage(final String message) {
socketId = (String)dataMap.get("socket_id");

updateState(ConnectionState.CONNECTED);
reconnectAttempts = 0;
}

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -251,12 +259,45 @@ public void run() {

@Override
public void onClose(final int code, final String reason, final boolean remote) {
if (state == ConnectionState.DISCONNECTED) {
log.error("Received close from underlying socket when already disconnected. " + "Close code ["
if (state == ConnectionState.DISCONNECTED || state == ConnectionState.RECONNECTING) {
log.error("Received close from underlying socket when already disconnected." + "Close code ["
+ code + "], Reason [" + reason + "], Remote [" + remote + "]");
return;
}

//Reconnection logic
if(state == ConnectionState.CONNECTED || state == ConnectionState.CONNECTING){

if(reconnectAttempts < MAX_RECONNECTION_ATTEMPTS){
tryReconnecting();
}
else{
updateState(ConnectionState.DISCONNECTING);
cancelTimeoutsAndTransitonToDisconnected();
}
return;
}

if (state == ConnectionState.DISCONNECTING){
cancelTimeoutsAndTransitonToDisconnected();
}
}

private void tryReconnecting() {
reconnectAttempts++;
updateState(ConnectionState.RECONNECTING);
long reconnectInterval = Math.min(MAX_RECONNECT_GAP_IN_SECONDS, reconnectAttempts * reconnectAttempts);

factory.getTimers().schedule(new Runnable() {
@Override
public void run() {
underlyingConnection.removeWebSocketListener();
tryConnecting();
}
}, reconnectInterval, TimeUnit.SECONDS);
}

private void cancelTimeoutsAndTransitonToDisconnected() {
activityTimer.cancelTimeouts();

factory.queueOnEventThread(new Runnable() {
Expand Down Expand Up @@ -290,7 +331,7 @@ private class ActivityTimer {
private Future<?> pingTimer;
private Future<?> pongTimer;

public ActivityTimer(final long activityTimeout, final long pongTimeout) {
ActivityTimer(final long activityTimeout, final long pongTimeout) {
this.activityTimeout = activityTimeout;
this.pongTimeout = pongTimeout;
}
Expand All @@ -299,7 +340,7 @@ public ActivityTimer(final long activityTimeout, final long pongTimeout) {
* On any activity from the server - Cancel pong timeout - Cancel
* currently ping timeout and re-schedule
*/
public synchronized void activity() {
synchronized void activity() {
if (pongTimer != null) {
pongTimer.cancel(true);
}
Expand All @@ -320,7 +361,7 @@ public void run() {
/**
* Cancel any pending timeouts, for example because we are disconnected.
*/
public synchronized void cancelTimeouts() {
synchronized void cancelTimeouts() {
if (pingTimer != null) {
pingTimer.cancel(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,22 @@ public void testReceiveUserMessagePassesMessageToChannelManager() {
}

@Test
public void testOnCloseCallbackUpdatesStateToDisconnected() {
public void testOnCloseCallbackUpdatesStateToDisconnectedWhenPreviousStateIsDisconnecting() {
connection.connect();
verify(mockEventListener).onConnectionStateChange(
new ConnectionStateChange(ConnectionState.DISCONNECTED, ConnectionState.CONNECTING));

connection.onMessage(CONN_ESTABLISHED_EVENT);
verify(mockEventListener).onConnectionStateChange(
new ConnectionStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTED));

connection.disconnect();
verify(mockEventListener).onConnectionStateChange(
new ConnectionStateChange(ConnectionState.CONNECTED, ConnectionState.DISCONNECTING));

connection.onClose(1, "reason", true);
verify(mockEventListener).onConnectionStateChange(
new ConnectionStateChange(ConnectionState.CONNECTING, ConnectionState.DISCONNECTED));
new ConnectionStateChange(ConnectionState.DISCONNECTING, ConnectionState.DISCONNECTED));
}

@Test
Expand Down Expand Up @@ -297,6 +305,46 @@ public void testPongTimeoutResultsInDisconnect() throws InterruptedException {
assertEquals(ConnectionState.DISCONNECTED, connection.getState());
}

@Test
public void stateIsReconnectingAfterOnCloseWithoutTheUserDisconnecting() throws InterruptedException, SSLException {
connection.connect();
connection.onMessage(CONN_ESTABLISHED_EVENT);

connection.onClose(500, "reason", true);

assertEquals(ConnectionState.RECONNECTING, connection.getState());
}

@Test
public void stateIsReconnectingAfterTryingToConnectForTheFirstTime() throws InterruptedException, SSLException {
connection.connect();

connection.onClose(500, "reason", true);

assertEquals(ConnectionState.RECONNECTING, connection.getState());
}

// TODO: leaving the following tests commented out just for reference. The lib needs to be rearchitected before we can hope to get any of these in
// @Test
// public void reconnectingLogicActuallyBeingCalled(){
// fail("not implemented");
// }
//
// @Test
// public void retryMaximumNumberOfTimes(){
// fail("not implemented");
// }
//
// @Test
// public void disconnectAfterTooManyRetries(){
// fail("not implemented");
// }
//
// @Test
// public void retryWithTimeout(){
// fail("not implemented");
// }

/* end of tests */

private void connect() {
Expand Down

0 comments on commit abc7d5d

Please sign in to comment.