Skip to content

Commit

Permalink
5.4 2.0 Netty版本全新上线
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzzzzzyt committed May 4, 2022
1 parent acf5706 commit 39c5bd8
Show file tree
Hide file tree
Showing 20 changed files with 256 additions and 42 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,12 @@
<version>1.1.1</version>
</dependency>

<!--引入netty的依赖-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.66.Final</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package service.netty_bootstrap;


import consumer.bootstrap.netty.NettyConsumerBootStrap20;

public class NettyClientBootStrap {
public static void start(String address, int port) throws InterruptedException {
NettyConsumerBootStrap20.main(new String[]{address, String.valueOf(port)});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package service.netty_bootstrap;

import provider.bootstrap.netty.NettyProviderBootStrap20;

public class NettyServerBootStrap {
public static void start(String address,int port) throws InterruptedException {
NettyProviderBootStrap20.main(new String[]{address, String.valueOf(port)});
}
}
10 changes: 10 additions & 0 deletions zyt-rpc-call/src/main/java/service/netty_call/NettyClientCall.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package service.netty_call;

import service.netty_bootstrap.NettyClientBootStrap;

//客户端启动类
public class NettyClientCall {
public static void main(String[] args) throws InterruptedException {
NettyClientBootStrap.start("127.0.0.1",6668);
}
}
10 changes: 10 additions & 0 deletions zyt-rpc-call/src/main/java/service/netty_call/NettyServerCall.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package service.netty_call;

import service.netty_bootstrap.NettyServerBootStrap;

//启动类 给定对应的端口 进行启动并监听
public class NettyServerCall {
public static void main(String[] args) throws InterruptedException {
NettyServerBootStrap.start("127.0.0.1",6668);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service.bootstrap;
package service.nio_bootstrap;

import annotation.RpcClientBootStrap;

Expand All @@ -12,11 +12,11 @@
//之后启动直接在这边启动根据 在注解中配置对应的版本号 将相应的操作封装到之后的操作中即可 这样很方便 就是每次咱加一个启动器还得改下switch
//比如说这里的version 1.2 就是v1.2版本的启动器
@RpcClientBootStrap(version = "1.5")
public class ClientBootStrap {
public class NIOClientBootStrap {
public static Customer start() throws IOException, RpcException {
//获取当前的注解上的版本然后去调用相应的远端方法 反射的方法
//当前客户端启动器class对象
Class<ClientBootStrap> currentClientBootStrapClass = ClientBootStrap.class;
Class<NIOClientBootStrap> currentClientBootStrapClass = NIOClientBootStrap.class;
RpcClientBootStrap annotation = currentClientBootStrapClass.getAnnotation(RpcClientBootStrap.class);
String currentVersion = annotation.version();
//根据注解获得的版本进行判断是哪个版本 然后进行启动
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
package service.bootstrap;
package service.nio_bootstrap;

import annotation.RpcMethodCluster;
import annotation.RpcServerBootStrap;
import init.ZK;
import org.apache.zookeeper.KeeperException;
import provider.bootstrap.nio.*;
import service.call.ServerCall;
import service.nio_call.NIOServerCall;

import java.io.IOException;


//之后启动直接在这边启动根据 在注解中配置对应的版本号 将相应的操作封装到之后的操作中即可
//比如说这里的version 1.2 就是v1.2版本的启动器
@RpcServerBootStrap(version = "1.5")
public class ServerBootStrap {
public class NIOServerBootStrap {



public static void start() throws IOException, InterruptedException, KeeperException, NoSuchMethodException {

//先对ZK进行初始化
ZK.init();
Class<ServerBootStrap> serverBootStrapClass = ServerBootStrap.class;
Class<NIOServerBootStrap> serverBootStrapClass = NIOServerBootStrap.class;
RpcServerBootStrap annotation = serverBootStrapClass.getAnnotation(RpcServerBootStrap.class);
//当前服务端启动器 class对象
String currentServerBootStrapVersion = annotation.version();

//获取对应的方法和个数 然后进行启动
//1.获取对应方法 在获取对应的注解 注解中的属性
RpcMethodCluster nowAnnotation = ServerCall.class.getAnnotation(RpcMethodCluster.class);
RpcMethodCluster nowAnnotation = NIOServerCall.class.getAnnotation(RpcMethodCluster.class);
String[] methods = nowAnnotation.method();
int[] startNums = nowAnnotation.startNum();
//如果不存在那就返回
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package service.call;
package service.nio_call;


import exception.RpcException;
import method.Customer;
import service.bootstrap.ClientBootStrap;
import service.nio_bootstrap.NIOClientBootStrap;

import java.io.IOException;

//通用启动类 将启动的逻辑藏在ClientBootStrap中
public class ClientCall {
public class NIOClientCall {
public static void main(String[] args) throws IOException, RpcException {
Customer customer = ClientBootStrap.start();
Customer customer = NIOClientBootStrap.start();
//实现调用
System.out.println(customer.Hello("success"));
System.out.println(customer.Bye("fail"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package service.call;
package service.nio_call;



import annotation.RpcMethodCluster;
import org.apache.zookeeper.KeeperException;
import service.bootstrap.ServerBootStrap;
import service.nio_bootstrap.NIOServerBootStrap;

import java.io.IOException;

//通用启动类 将启动的逻辑藏在ServerBootStrap中
//注解 看你想启动多少个服务和对应的方法
@RpcMethodCluster(method = {"Hello","Bye"},startNum = {2,3})
public class ServerCall {
public class NIOServerCall {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchMethodException {
ServerBootStrap.start();
NIOServerBootStrap.start();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package consumer.bootstrap.netty;

import consumer.netty.NettyClient20;

/*
以netty为网络编程框架的消费者端启动类
*/
//进行启动 提供类的方式即可
public class NettyConsumerBootStrap20 {
public static void main(String[] args) throws InterruptedException {
NettyClient20.start(args[0], Integer.parseInt(args[1]));
}
}

This file was deleted.

51 changes: 51 additions & 0 deletions zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient20.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package consumer.netty;

import consumer.netty_client_handler.NettyClientHandler01;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


//实际客户端启动类
public class NettyClient20 {
public static void start(String hostName, int port) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workGroup = new NioEventLoopGroup();

try {
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new NettyClientHandler01());
}
});

ChannelFuture channelFuture = bootstrap.connect(hostName, port).sync();

channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("连接"+hostName+":"+port+"成功");
}
else
{
System.out.println("连接"+hostName+":"+port+"失败");
}
}
});

//监听关闭事件,本来是异步的,现在转换为同步事件
channelFuture.channel().closeFuture().sync();
} finally
{
//优雅的关闭 group
workGroup.shutdownGracefully();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package consumer.netty_client_handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler01 extends ChannelInboundHandlerAdapter {

//通道就绪就会发的信息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端", CharsetUtil.UTF_8));
}

//这个是收到信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println("收到来自"+ctx.channel().remoteAddress()+"的消息"+buf.toString(CharsetUtil.UTF_8));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package provider.bootstrap.netty;

import provider.netty.NettyServer20;

/*
以netty为网络编程框架的服务提供端启动类
*/
public class NettyProviderBootStrap20 {
public static void main(String[] args) throws InterruptedException {
//传入要绑定的ip和端口
NettyServer20.start(args[0], Integer.parseInt(args[1]));
}
}

This file was deleted.

59 changes: 59 additions & 0 deletions zyt-rpc-provider/src/main/java/provider/netty/NettyServer20.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package provider.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import provider.netty_server_handler.NettyServerHandler01;


//简单实现 主要还是进行了一段回想
public class NettyServer20 {
public static void start(String hostName,int port) throws InterruptedException {
//该方法完成NettyServer的初始化 好好想想看 该怎么完成这个
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class) //自身所实现的通道
.option(ChannelOption.SO_BACKLOG,128) //设置线程队列得到的连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//每个用户连接上都会进行初始化
System.out.println("客户socketChannel hashcode="+socketChannel.hashCode());
ChannelPipeline pipeline = socketChannel.pipeline();//每个通道都对应一个管道 将处理器往管道里放
pipeline.addLast(new NettyServerHandler01());
}
});

System.out.println("服务器 is ready");

//连接 同步
ChannelFuture cf = serverBootstrap.bind(hostName, port).sync();

cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("监听端口"+port+"成功");
}
else
{
System.out.println("监听端口"+port+"失败");
}
}
});

//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
//优雅的关闭两个集群
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package provider.netty_server_handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

//实现简单的服务注册和回写
public class NettyServerHandler01 extends ChannelInboundHandlerAdapter {

//读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将信息进行读取 直接这样就可以了
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+ buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}

//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//读取完毕进行回显 写回并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("success", CharsetUtil.UTF_8));
}

//捕获异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常处理 首先先将通道的上下文关闭 每个ctx对应的就是handler本身
ctx.close();
cause.printStackTrace();
}
}
Binary file modified 手写RPC框架.pdf
Binary file not shown.

0 comments on commit 39c5bd8

Please sign in to comment.