Skip to content

Commit

Permalink
Merge pull request #142 from pusher/bug/concurrentMoficiationExceptio…
Browse files Browse the repository at this point in the history
…n_when_iterating_channels

Synchronise access to channelNameToChannelMap
  • Loading branch information
zmarkan authored Mar 29, 2017
2 parents a51b9dc + 6485a0c commit b0a83eb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
15 changes: 6 additions & 9 deletions src/main/java/com/pusher/client/channel/impl/ChannelManager.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.pusher.client.channel.impl;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.google.gson.Gson;
import com.pusher.client.AuthorizationFailureException;
Expand All @@ -20,7 +20,8 @@
public class ChannelManager implements ConnectionEventListener {

private static final Gson GSON = new Gson();
private final Map<String, InternalChannel> channelNameToChannelMap = new HashMap<String, InternalChannel>();
private final Map<String, InternalChannel> channelNameToChannelMap = new ConcurrentHashMap<String, InternalChannel>();

private final Factory factory;
private InternalConnection connection;

Expand Down Expand Up @@ -70,8 +71,7 @@ public void setConnection(final InternalConnection connection) {
connection.bind(ConnectionState.CONNECTED, this);
}

public void subscribeTo(final InternalChannel channel, final ChannelEventListener listener,
final String... eventNames) {
public void subscribeTo(final InternalChannel channel, final ChannelEventListener listener, final String... eventNames) {

validateArgumentsAndBindEvents(channel, listener, eventNames);
channelNameToChannelMap.put(channel.getName(), channel);
Expand All @@ -88,7 +88,6 @@ public void unsubscribeFrom(final String channelName) {
if (channel == null) {
return;
}

if (connection.getState() == ConnectionState.CONNECTED) {
sendUnsubscribeMessage(channel);
}
Expand Down Expand Up @@ -116,8 +115,7 @@ public void onMessage(final String event, final String wholeMessage) {
public void onConnectionStateChange(final ConnectionStateChange change) {

if (change.getCurrentState() == ConnectionState.CONNECTED) {

for (final InternalChannel channel : channelNameToChannelMap.values()) {
for(final InternalChannel channel : channelNameToChannelMap.values()){
sendOrQueueSubscribeMessage(channel);
}
}
Expand Down Expand Up @@ -181,8 +179,7 @@ public void run() {
}
}

private void validateArgumentsAndBindEvents(final InternalChannel channel, final ChannelEventListener listener,
final String... eventNames) {
private void validateArgumentsAndBindEvents(final InternalChannel channel, final ChannelEventListener listener, final String... eventNames) {

if (channel == null) {
throw new IllegalArgumentException("Cannot subscribe to a null channel");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.pusher.client.connection.ConnectionStateChange;
import com.pusher.client.connection.impl.InternalConnection;
import com.pusher.client.util.Factory;
import java.util.concurrent.Executors;

@RunWith(MockitoJUnitRunner.class)
public class ChannelManagerTest {
Expand Down Expand Up @@ -325,7 +326,7 @@ public void testGetChannelFromString(){

@Test
public void testGetNonExistentChannelFromString(){
Channel channel = channelManager.getChannel("woot");
Channel channel = channelManager.getChannel("woot");
assertNull(channel);
}

Expand Down Expand Up @@ -369,4 +370,25 @@ public void testGetNonExistentPresenceChannel(){
PresenceChannel channel = channelManager.getPresenceChannel("presence-yolo");
assertNull(channel);
}

@Test
public void testConcurrentModificationExceptionDoesNotHappenWhenConnectionIsEstablished() {
for(int i = 0; i<1000; i++) {
channelManager.subscribeTo(new ChannelImpl("channel" + i, factory), null);
}

Runnable removeChannels = new Runnable() {
@Override
public void run() {
System.out.println("Start unsubscribe");
for(int i=900; i<1000; i++){
channelManager.unsubscribeFrom("channel"+i);
}
System.out.println("end unsubscribe");
}
};
Executors.newSingleThreadExecutor().submit(removeChannels);

channelManager.onConnectionStateChange(new ConnectionStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTED));
}
}

0 comments on commit b0a83eb

Please sign in to comment.