Skip to content
This repository has been archived by the owner on Aug 2, 2024. It is now read-only.

Commit

Permalink
Fix channel pooling problems #6
Browse files Browse the repository at this point in the history
liuzhengyang committed Jan 17, 2017
1 parent e794ed8 commit 00985d0
Showing 6 changed files with 105 additions and 64 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ Running in some business online.
# RoadMap
* 服务心跳检测
* 连接池
* 断线重连 重写
* 服务注册发布功能
* 服务管理、监控
* 服务调用日志链路跟踪
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
<groupId>com.github.liuzhengyang</groupId>
<artifactId>simple-rpc</artifactId>
<packaging>pom</packaging>
<version>1.1-SNAPSHOT</version>
<version>1.2-SNAPSHOT</version>
<modules>
<module>simple-core</module>
<module>simple-example</module>
@@ -98,8 +98,6 @@
<!--</distributionManagement>-->




<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
Original file line number Diff line number Diff line change
@@ -29,6 +29,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@@ -52,9 +54,10 @@ public class RpcClientWithLB {
private int requestTimeoutMillis = 10 * 1000;

// 存放字符串Channel对应的map
public static ConcurrentMap<String, ChannelWrapper> channelMap = new ConcurrentHashMap<String, ChannelWrapper>();
public static CopyOnWriteArrayList<ChannelWrapper> channelWrappers = new CopyOnWriteArrayList<ChannelWrapper>();

private static class ChannelWrapper {
private String connStr;
private String host;
private int ip;
private Channel channel;
@@ -63,9 +66,18 @@ private static class ChannelWrapper {
public ChannelWrapper(String host, int port) {
this.host = host;
this.ip = port;
this.connStr = host + ":" + ip;
channelObjectPool = new GenericObjectPool<Channel>(new ConnectionObjectFactory(host, port));
}

public String getConnStr() {
return connStr;
}

public void setConnStr(String connStr) {
this.connStr = connStr;
}

public Channel getChannel() {
return channel;
}
@@ -101,6 +113,18 @@ public ObjectPool<Channel> getChannelObjectPool() {
public void setChannelObjectPool(ObjectPool<Channel> channelObjectPool) {
this.channelObjectPool = channelObjectPool;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ChannelWrapper{");
sb.append("connStr='").append(connStr).append('\'');
sb.append(", host='").append(host).append('\'');
sb.append(", ip=").append(ip);
sb.append(", channel=").append(channel);
sb.append(", channelObjectPool=").append(channelObjectPool);
sb.append('}');
return sb.toString();
}
}

public RpcClientWithLB(String serviceName) {
@@ -118,7 +142,6 @@ public void setZkConn(String zkConn) {

public void init() {


// TODO 这段代码需要仔细检查重构整理
// 注册中心不可用时,保存本地缓存
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(getZkConn(), new ExponentialBackoffRetry(1000, 3));
@@ -135,25 +158,32 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
LOGGER.info("Listen Event {}", event);
List<String> newServiceData = children.forPath(serviceZKPath);
LOGGER.info("Server {} list change {}", serviceName, newServiceData);

// 关闭删除本地缓存中多出的channel
for (Map.Entry<String, ChannelWrapper> entry : channelMap.entrySet()) {
String key = entry.getKey();
ChannelWrapper value = entry.getValue();
if (!newServiceData.contains(key)) {
value.close();
LOGGER.info("Remove channel {}", key);
channelMap.remove(key, value);
for (ChannelWrapper cw : channelWrappers) {
String connStr = cw.getConnStr();
if (!newServiceData.contains(connStr)) {
cw.close();
LOGGER.info("Remove channel {}", connStr);
channelWrappers.remove(cw);
}
}

// 增加本地缓存中不存在的连接
for (String connStr : newServiceData) {
if (!channelMap.containsKey(connStr)) {
LOGGER.info("Add new Channel {}", connStr);
boolean containThis = false;
for (ChannelWrapper cw : channelWrappers) {
if (connStr != null && connStr.equals(cw.getConnStr())) {
containThis = true;
}
}
if (!containThis) {
addNewChannel(connStr);
}
}
}
});

List<String> strings = children.forPath(serviceZKPath);
if (CollectionUtils.isEmpty(strings)) {
throw new RuntimeException("No Service available for " + serviceName);
@@ -173,22 +203,6 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
}
}

// private Channel reconnect(Channel channel) {
// Channel result = null;
// while(result == null) {
// InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
// String hostAddress = socketAddress.getAddress().getHostAddress();
// int port = socketAddress.getPort();
// result = addNewChannel(hostAddress + ":" + port);
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// return result;
// }

private void addNewChannel(String connStr) {
try {
List<String> strings = Splitter.on(":").splitToList(connStr);
@@ -198,7 +212,8 @@ private void addNewChannel(String connStr) {
String host = strings.get(0);
int port = Integer.parseInt(strings.get(1));
ChannelWrapper channelWrapper = new ChannelWrapper(host, port);
channelMap.putIfAbsent(connStr, channelWrapper);
channelWrappers.add(channelWrapper);
LOGGER.info("Add New Channel {}, {}", connStr, channelWrapper);
} catch (Exception e) {
e.printStackTrace();
}
@@ -227,22 +242,19 @@ public Response sendMessage(Class<?> clazz, Method method, Object[] args) {
}
if (channel == null) {
Response response = new Response();
RuntimeException runtimeException = new RuntimeException("Channel is not active now");
RuntimeException runtimeException = new RuntimeException("Channel is not available now");
response.setThrowable(runtimeException);
return response;
}

// if (!channel.isActive()) {
// channel = reconnect(channel);
// }
channel.writeAndFlush(request);
BlockingQueue<Response> blockingQueue = new ArrayBlockingQueue<Response>(1);
responseMap.put(request.getRequestId(), blockingQueue);

try {
channel.writeAndFlush(request);
BlockingQueue<Response> blockingQueue = new ArrayBlockingQueue<Response>(1);
responseMap.put(request.getRequestId(), blockingQueue);
return blockingQueue.poll(requestTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
return null;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
channelWrapper.getChannelObjectPool().returnObject(channel);
@@ -256,30 +268,32 @@ public Response sendMessage(Class<?> clazz, Method method, Object[] args) {

private ChannelWrapper selectChannel() {
Random random = new Random();
int size = channelMap.size();
int size = channelWrappers.size();
if (size < 1) {
return null;
}
int i = random.nextInt(size);
List<ChannelWrapper> channels = new ArrayList<ChannelWrapper>(channelMap.values());
return channels.get(i);
return channelWrappers.get(i);
}

public <T> T newProxy(final Class<T> serviceInterface) {
// Fix JDK proxy limitations and add other proxy implementation like cg-lib, spring proxy factory etc.
Object o = Proxy.newProxyInstance(RpcClientWithLB.class.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return sendMessage(serviceInterface, method, args).getResponse();
try {
return sendMessage(serviceInterface, method, args).getResponse();
} catch (Exception e) {
return null;
}
}
});
return (T) o;
}

public void destroy() {
try {
for (Map.Entry<String, ChannelWrapper> entry : channelMap.entrySet()) {
ChannelWrapper value = entry.getValue();
value.close();
for (ChannelWrapper cw : channelWrappers) {
cw.close();
}
} finally {
eventLoopGroup.shutdownGracefully();
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ public class RpcServerWithLB {
private Object serviceImpl;
private String serviceName;
private String zkConn;
private String serviceRegisterPath;

private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -102,8 +103,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
// Register service provider
private void registerService() {
String zkConn = getZkConn();
String localIp = InetUtil.getLocalIp();
String ipPortStr = localIp + ":" + port;
ip = InetUtil.getLocalIp();
String ipPortStr = ip + ":" + port;
curatorFramework = CuratorFrameworkFactory.newClient(zkConn, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
String serviceBasePath = ZK_BASE_PATH + "/services/" + serviceName;
@@ -139,9 +140,18 @@ private void registerService() {
}
}
}
private void unRegister() {
try {
curatorFramework.delete().forPath(ZK_BASE_PATH + "/services/" + serviceName + "/" + ip + ":" + port);
} catch (Exception e) {
e.printStackTrace();
}
}

public void stop() {
unRegister();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

}
}
Original file line number Diff line number Diff line change
@@ -15,13 +15,10 @@
import io.netty.handler.logging.LoggingHandler;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.liuzhengyang.simplerpc.core.RpcClientWithLB.channelMap;

/**
* Description:
*
@@ -41,15 +38,13 @@ public ConnectionObjectFactory(String ip, int port) {
this.port = port;
}

public Channel create() throws Exception {
private Channel connectNewChannel() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
// .addLast(new IdleStateHandler(30, 30, 30))
// .addLast(new HeartBeatHandler())
.addLast(new ProtocolDecoder(10 * 1024 * 1024))
.addLast(new ProtocolEncoder())
.addLast(new RpcClientHandler())
@@ -65,14 +60,14 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}
});
Channel channel = f.channel();
String connStr = ip + ":" + port;
// channelMap.put(connStr, channel);
final Channel channel = f.channel();
channel.closeFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Thread.sleep(1000);
LOGGER.info("Try to reconnect {} {}", ip, port);
// addNewChannel(serverIp + ":" + port);

LOGGER.info("Channel Close {} {}", ip, port);
// channel.connect(new InetSocketAddress(ip, port)).sync();
// ConnectionObjectFactory.this.destroyObject(ConnectionObjectFactory.this.wrap(channel));
// Thread.sleep(1000);
}
});
return channel;
@@ -82,6 +77,25 @@ public void operationComplete(ChannelFuture future) throws Exception {
return null;
}

public Channel create() throws Exception {
return connectNewChannel();
}

@Override
public boolean validateObject(PooledObject<Channel> p) {
Channel object = p.getObject();
return object.isActive();
}

@Override
public void destroyObject(PooledObject<Channel> p) throws Exception {
p.getObject().close().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
LOGGER.info("Close Finish");
}
});
}

public PooledObject<Channel> wrap(Channel obj) {
return new DefaultPooledObject<Channel>(obj);
}
Original file line number Diff line number Diff line change
@@ -36,9 +36,13 @@ public void init() throws Exception {
rpcClientWithLB.setZkConn("127.0.0.1:2181");
rpcClientWithLB.init();
IHello iHello = rpcClientWithLB.newProxy(IHello.class);
for (int i = 0; i < 100; i++) {
Thread.sleep(10 * 5);
for (int i = 0; i < 20; i++) {
Thread.sleep(100 * 5);
iHello.say("hello world");
if (i % 3 == 0) {
destroy();
before();
}
}
}

0 comments on commit 00985d0

Please sign in to comment.