diff --git a/pom.xml b/pom.xml index ed8b04c..f4f801d 100644 --- a/pom.xml +++ b/pom.xml @@ -87,5 +87,12 @@ 1.1.1 + + + io.netty + netty-all + 4.1.66.Final + + \ No newline at end of file diff --git a/zyt-rpc-call/src/main/java/service/netty_bootstrap/NettyClientBootStrap.java b/zyt-rpc-call/src/main/java/service/netty_bootstrap/NettyClientBootStrap.java new file mode 100644 index 0000000..ebe4777 --- /dev/null +++ b/zyt-rpc-call/src/main/java/service/netty_bootstrap/NettyClientBootStrap.java @@ -0,0 +1,10 @@ +package service.netty_bootstrap; + + +import consumer.bootstrap.netty.NettyConsumerBootStrap20; + +public class NettyClientBootStrap { + public static void start(String address, int port) throws InterruptedException { + NettyConsumerBootStrap20.main(new String[]{address, String.valueOf(port)}); + } +} diff --git a/zyt-rpc-call/src/main/java/service/netty_bootstrap/NettyServerBootStrap.java b/zyt-rpc-call/src/main/java/service/netty_bootstrap/NettyServerBootStrap.java new file mode 100644 index 0000000..23f296e --- /dev/null +++ b/zyt-rpc-call/src/main/java/service/netty_bootstrap/NettyServerBootStrap.java @@ -0,0 +1,9 @@ +package service.netty_bootstrap; + +import provider.bootstrap.netty.NettyProviderBootStrap20; + +public class NettyServerBootStrap { + public static void start(String address,int port) throws InterruptedException { + NettyProviderBootStrap20.main(new String[]{address, String.valueOf(port)}); + } +} diff --git a/zyt-rpc-call/src/main/java/service/netty_call/NettyClientCall.java b/zyt-rpc-call/src/main/java/service/netty_call/NettyClientCall.java new file mode 100644 index 0000000..9684ce0 --- /dev/null +++ b/zyt-rpc-call/src/main/java/service/netty_call/NettyClientCall.java @@ -0,0 +1,10 @@ +package service.netty_call; + +import service.netty_bootstrap.NettyClientBootStrap; + +//客户端启动类 +public class NettyClientCall { + public static void main(String[] args) throws InterruptedException { + NettyClientBootStrap.start("127.0.0.1",6668); + } +} diff --git a/zyt-rpc-call/src/main/java/service/netty_call/NettyServerCall.java b/zyt-rpc-call/src/main/java/service/netty_call/NettyServerCall.java new file mode 100644 index 0000000..8029c29 --- /dev/null +++ b/zyt-rpc-call/src/main/java/service/netty_call/NettyServerCall.java @@ -0,0 +1,10 @@ +package service.netty_call; + +import service.netty_bootstrap.NettyServerBootStrap; + +//启动类 给定对应的端口 进行启动并监听 +public class NettyServerCall { + public static void main(String[] args) throws InterruptedException { + NettyServerBootStrap.start("127.0.0.1",6668); + } +} diff --git a/zyt-rpc-call/src/main/java/service/bootstrap/ClientBootStrap.java b/zyt-rpc-call/src/main/java/service/nio_bootstrap/NIOClientBootStrap.java similarity index 91% rename from zyt-rpc-call/src/main/java/service/bootstrap/ClientBootStrap.java rename to zyt-rpc-call/src/main/java/service/nio_bootstrap/NIOClientBootStrap.java index 23ac385..1ee3151 100644 --- a/zyt-rpc-call/src/main/java/service/bootstrap/ClientBootStrap.java +++ b/zyt-rpc-call/src/main/java/service/nio_bootstrap/NIOClientBootStrap.java @@ -1,4 +1,4 @@ -package service.bootstrap; +package service.nio_bootstrap; import annotation.RpcClientBootStrap; @@ -12,11 +12,11 @@ //之后启动直接在这边启动根据 在注解中配置对应的版本号 将相应的操作封装到之后的操作中即可 这样很方便 就是每次咱加一个启动器还得改下switch //比如说这里的version 1.2 就是v1.2版本的启动器 @RpcClientBootStrap(version = "1.5") -public class ClientBootStrap { +public class NIOClientBootStrap { public static Customer start() throws IOException, RpcException { //获取当前的注解上的版本然后去调用相应的远端方法 反射的方法 //当前客户端启动器class对象 - Class currentClientBootStrapClass = ClientBootStrap.class; + Class currentClientBootStrapClass = NIOClientBootStrap.class; RpcClientBootStrap annotation = currentClientBootStrapClass.getAnnotation(RpcClientBootStrap.class); String currentVersion = annotation.version(); //根据注解获得的版本进行判断是哪个版本 然后进行启动 diff --git a/zyt-rpc-call/src/main/java/service/bootstrap/ServerBootStrap.java b/zyt-rpc-call/src/main/java/service/nio_bootstrap/NIOServerBootStrap.java similarity index 90% rename from zyt-rpc-call/src/main/java/service/bootstrap/ServerBootStrap.java rename to zyt-rpc-call/src/main/java/service/nio_bootstrap/NIOServerBootStrap.java index c972db7..ce04efa 100644 --- a/zyt-rpc-call/src/main/java/service/bootstrap/ServerBootStrap.java +++ b/zyt-rpc-call/src/main/java/service/nio_bootstrap/NIOServerBootStrap.java @@ -1,11 +1,11 @@ -package service.bootstrap; +package service.nio_bootstrap; import annotation.RpcMethodCluster; import annotation.RpcServerBootStrap; import init.ZK; import org.apache.zookeeper.KeeperException; import provider.bootstrap.nio.*; -import service.call.ServerCall; +import service.nio_call.NIOServerCall; import java.io.IOException; @@ -13,7 +13,7 @@ //之后启动直接在这边启动根据 在注解中配置对应的版本号 将相应的操作封装到之后的操作中即可 //比如说这里的version 1.2 就是v1.2版本的启动器 @RpcServerBootStrap(version = "1.5") -public class ServerBootStrap { +public class NIOServerBootStrap { @@ -21,14 +21,14 @@ public static void start() throws IOException, InterruptedException, KeeperExcep //先对ZK进行初始化 ZK.init(); - Class serverBootStrapClass = ServerBootStrap.class; + Class serverBootStrapClass = NIOServerBootStrap.class; RpcServerBootStrap annotation = serverBootStrapClass.getAnnotation(RpcServerBootStrap.class); //当前服务端启动器 class对象 String currentServerBootStrapVersion = annotation.version(); //获取对应的方法和个数 然后进行启动 //1.获取对应方法 在获取对应的注解 注解中的属性 - RpcMethodCluster nowAnnotation = ServerCall.class.getAnnotation(RpcMethodCluster.class); + RpcMethodCluster nowAnnotation = NIOServerCall.class.getAnnotation(RpcMethodCluster.class); String[] methods = nowAnnotation.method(); int[] startNums = nowAnnotation.startNum(); //如果不存在那就返回 diff --git a/zyt-rpc-call/src/main/java/service/call/ClientCall.java b/zyt-rpc-call/src/main/java/service/nio_call/NIOClientCall.java similarity index 72% rename from zyt-rpc-call/src/main/java/service/call/ClientCall.java rename to zyt-rpc-call/src/main/java/service/nio_call/NIOClientCall.java index e4f8282..a2337f9 100644 --- a/zyt-rpc-call/src/main/java/service/call/ClientCall.java +++ b/zyt-rpc-call/src/main/java/service/nio_call/NIOClientCall.java @@ -1,16 +1,16 @@ -package service.call; +package service.nio_call; import exception.RpcException; import method.Customer; -import service.bootstrap.ClientBootStrap; +import service.nio_bootstrap.NIOClientBootStrap; import java.io.IOException; //通用启动类 将启动的逻辑藏在ClientBootStrap中 -public class ClientCall { +public class NIOClientCall { public static void main(String[] args) throws IOException, RpcException { - Customer customer = ClientBootStrap.start(); + Customer customer = NIOClientBootStrap.start(); //实现调用 System.out.println(customer.Hello("success")); System.out.println(customer.Bye("fail")); diff --git a/zyt-rpc-call/src/main/java/service/call/ServerCall.java b/zyt-rpc-call/src/main/java/service/nio_call/NIOServerCall.java similarity index 75% rename from zyt-rpc-call/src/main/java/service/call/ServerCall.java rename to zyt-rpc-call/src/main/java/service/nio_call/NIOServerCall.java index dc0745a..2f6856d 100644 --- a/zyt-rpc-call/src/main/java/service/call/ServerCall.java +++ b/zyt-rpc-call/src/main/java/service/nio_call/NIOServerCall.java @@ -1,18 +1,18 @@ -package service.call; +package service.nio_call; import annotation.RpcMethodCluster; import org.apache.zookeeper.KeeperException; -import service.bootstrap.ServerBootStrap; +import service.nio_bootstrap.NIOServerBootStrap; import java.io.IOException; //通用启动类 将启动的逻辑藏在ServerBootStrap中 //注解 看你想启动多少个服务和对应的方法 @RpcMethodCluster(method = {"Hello","Bye"},startNum = {2,3}) -public class ServerCall { +public class NIOServerCall { public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchMethodException { - ServerBootStrap.start(); + NIOServerBootStrap.start(); } } diff --git a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/netty/NettyConsumerBootStrap.java b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/netty/NettyConsumerBootStrap.java deleted file mode 100644 index 6b053a1..0000000 --- a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/netty/NettyConsumerBootStrap.java +++ /dev/null @@ -1,8 +0,0 @@ -package consumer.bootstrap.netty; - -/* - 以netty为网络编程框架的消费者端启动类 - */ -public class NettyConsumerBootStrap { - -} diff --git a/zyt-rpc-consumer/src/main/java/consumer/bootstrap/netty/NettyConsumerBootStrap20.java b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/netty/NettyConsumerBootStrap20.java new file mode 100644 index 0000000..a10db4c --- /dev/null +++ b/zyt-rpc-consumer/src/main/java/consumer/bootstrap/netty/NettyConsumerBootStrap20.java @@ -0,0 +1,13 @@ +package consumer.bootstrap.netty; + +import consumer.netty.NettyClient20; + +/* + 以netty为网络编程框架的消费者端启动类 + */ +//进行启动 提供类的方式即可 +public class NettyConsumerBootStrap20 { + public static void main(String[] args) throws InterruptedException { + NettyClient20.start(args[0], Integer.parseInt(args[1])); + } +} diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient.java b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient.java deleted file mode 100644 index 28eb004..0000000 --- a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient.java +++ /dev/null @@ -1,4 +0,0 @@ -package consumer.netty; - -public class NettyClient { -} diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient20.java b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient20.java new file mode 100644 index 0000000..c4343da --- /dev/null +++ b/zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient20.java @@ -0,0 +1,51 @@ +package consumer.netty; + +import consumer.netty_client_handler.NettyClientHandler01; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + + +//实际客户端启动类 +public class NettyClient20 { + public static void start(String hostName, int port) throws InterruptedException { + Bootstrap bootstrap = new Bootstrap(); + EventLoopGroup workGroup = new NioEventLoopGroup(); + + try { + bootstrap.group(workGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + pipeline.addLast(new NettyClientHandler01()); + } + }); + + ChannelFuture channelFuture = bootstrap.connect(hostName, port).sync(); + + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + if (channelFuture.isSuccess()) { + System.out.println("连接"+hostName+":"+port+"成功"); + } + else + { + System.out.println("连接"+hostName+":"+port+"失败"); + } + } + }); + + //监听关闭事件,本来是异步的,现在转换为同步事件 + channelFuture.channel().closeFuture().sync(); + } finally + { + //优雅的关闭 group + workGroup.shutdownGracefully(); + } + } +} diff --git a/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler01.java b/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler01.java new file mode 100644 index 0000000..148106b --- /dev/null +++ b/zyt-rpc-consumer/src/main/java/consumer/netty_client_handler/NettyClientHandler01.java @@ -0,0 +1,23 @@ +package consumer.netty_client_handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.CharsetUtil; + +public class NettyClientHandler01 extends ChannelInboundHandlerAdapter { + + //通道就绪就会发的信息 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端", CharsetUtil.UTF_8)); + } + + //这个是收到信息 + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf buf = (ByteBuf)msg; + System.out.println("收到来自"+ctx.channel().remoteAddress()+"的消息"+buf.toString(CharsetUtil.UTF_8)); + } +} diff --git a/zyt-rpc-provider/src/main/java/provider/bootstrap/netty/NettyProviderBootStrap.java b/zyt-rpc-provider/src/main/java/provider/bootstrap/netty/NettyProviderBootStrap.java deleted file mode 100644 index 82b3432..0000000 --- a/zyt-rpc-provider/src/main/java/provider/bootstrap/netty/NettyProviderBootStrap.java +++ /dev/null @@ -1,10 +0,0 @@ -package provider.bootstrap.netty; - -/* - 以netty为网络编程框架的服务提供端启动类 - */ -public class NettyProviderBootStrap { - public static void main(String[] args) { - - } -} diff --git a/zyt-rpc-provider/src/main/java/provider/bootstrap/netty/NettyProviderBootStrap20.java b/zyt-rpc-provider/src/main/java/provider/bootstrap/netty/NettyProviderBootStrap20.java new file mode 100644 index 0000000..7ad3c99 --- /dev/null +++ b/zyt-rpc-provider/src/main/java/provider/bootstrap/netty/NettyProviderBootStrap20.java @@ -0,0 +1,13 @@ +package provider.bootstrap.netty; + +import provider.netty.NettyServer20; + +/* + 以netty为网络编程框架的服务提供端启动类 + */ +public class NettyProviderBootStrap20 { + public static void main(String[] args) throws InterruptedException { + //传入要绑定的ip和端口 + NettyServer20.start(args[0], Integer.parseInt(args[1])); + } +} diff --git a/zyt-rpc-provider/src/main/java/provider/netty/NettyServer.java b/zyt-rpc-provider/src/main/java/provider/netty/NettyServer.java deleted file mode 100644 index 55c7f90..0000000 --- a/zyt-rpc-provider/src/main/java/provider/netty/NettyServer.java +++ /dev/null @@ -1,4 +0,0 @@ -package provider.netty; - -public class NettyServer { -} diff --git a/zyt-rpc-provider/src/main/java/provider/netty/NettyServer20.java b/zyt-rpc-provider/src/main/java/provider/netty/NettyServer20.java new file mode 100644 index 0000000..77705a5 --- /dev/null +++ b/zyt-rpc-provider/src/main/java/provider/netty/NettyServer20.java @@ -0,0 +1,59 @@ +package provider.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import provider.netty_server_handler.NettyServerHandler01; + + +//简单实现 主要还是进行了一段回想 +public class NettyServer20 { + public static void start(String hostName,int port) throws InterruptedException { + //该方法完成NettyServer的初始化 好好想想看 该怎么完成这个 + ServerBootstrap serverBootstrap = new ServerBootstrap(); + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workGroup = new NioEventLoopGroup(); + try { + serverBootstrap.group(bossGroup,workGroup) + .channel(NioServerSocketChannel.class) //自身所实现的通道 + .option(ChannelOption.SO_BACKLOG,128) //设置线程队列得到的连接个数 + .childOption(ChannelOption.SO_KEEPALIVE,true) //设置保持活动连接状态 + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + //每个用户连接上都会进行初始化 + System.out.println("客户socketChannel hashcode="+socketChannel.hashCode()); + ChannelPipeline pipeline = socketChannel.pipeline();//每个通道都对应一个管道 将处理器往管道里放 + pipeline.addLast(new NettyServerHandler01()); + } + }); + + System.out.println("服务器 is ready"); + + //连接 同步 + ChannelFuture cf = serverBootstrap.bind(hostName, port).sync(); + + cf.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + System.out.println("监听端口"+port+"成功"); + } + else + { + System.out.println("监听端口"+port+"失败"); + } + } + }); + + //对关闭通道进行监听 + cf.channel().closeFuture().sync(); + } finally { + //优雅的关闭两个集群 + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + } + } +} diff --git a/zyt-rpc-provider/src/main/java/provider/netty_server_handler/NettyServerHandler01.java b/zyt-rpc-provider/src/main/java/provider/netty_server_handler/NettyServerHandler01.java new file mode 100644 index 0000000..5922a85 --- /dev/null +++ b/zyt-rpc-provider/src/main/java/provider/netty_server_handler/NettyServerHandler01.java @@ -0,0 +1,35 @@ +package provider.netty_server_handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.CharsetUtil; + +//实现简单的服务注册和回写 +public class NettyServerHandler01 extends ChannelInboundHandlerAdapter { + + //读取数据 + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //将信息进行读取 直接这样就可以了 + ByteBuf buf = (ByteBuf) msg; + System.out.println("客户端发送消息是:"+ buf.toString(CharsetUtil.UTF_8)); + System.out.println("客户端地址:"+ctx.channel().remoteAddress()); + } + + //数据读取完毕 + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + //读取完毕进行回显 写回并刷新 + ctx.writeAndFlush(Unpooled.copiedBuffer("success", CharsetUtil.UTF_8)); + } + + //捕获异常 + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + //异常处理 首先先将通道的上下文关闭 每个ctx对应的就是handler本身 + ctx.close(); + cause.printStackTrace(); + } +} diff --git "a/\346\211\213\345\206\231RPC\346\241\206\346\236\266.pdf" "b/\346\211\213\345\206\231RPC\346\241\206\346\236\266.pdf" index 73518f2..bdf78a0 100644 Binary files "a/\346\211\213\345\206\231RPC\346\241\206\346\236\266.pdf" and "b/\346\211\213\345\206\231RPC\346\241\206\346\236\266.pdf" differ