Netty框架介绍
转自:http://www.toutiao.com/i6398482015196283394/
一 初步了解Netty
Netty是由JBoss公司推出的一个高性能事件驱动型异步非阻塞的IONIO)框架。用于建立TCP等底层的连接,基于Netty可以建立高性能的Http服务器。支持HTTP、WebSocket、Protobuf、Binary TCP和UDP。
Netty提供了NIO和BIOOIO阻塞IO)两种模式处理逻辑,其中NIO主要通过一个BOSS线程处理等待链接的接入,若干个Worker线程从worder线程池取出一个赋给channel,因为channel持有真正的java网络对象)接过BOSS线程递交过来的Channel进行数据读写并且触发相应的事件传递给pipeline进行数据处理,而BIOOIO)方式服务器端虽然还是通过一个BOSS线程来处理等待链接的接入,但是客户端还是由主线程直接connect,另外写数据C/S两端都是直接主线程写,而数据读取则是通过一个Worker线程BLOCK方式读取一直等待,直到读到数据,除非channel关闭)。
总体结构图:摘自Netty官网)
二 Netty组件
为了更好的理解和进一步深入Netty,我们先总体认识一下Netty中的组件以及在整个框架中如何协调工作的。Netty应用中必不可少的组件:
Bootstrap or ServerBootstrap
EventLoop
EventLoopGroup
ChannelPipeline
Channel
Future or ChannelFuture
ChannelInitializer
ChannelHandler
Bootstrap:一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件,ServerBootstrap用于server端,Bootstrap用于client端。
Handler:为了支持各种协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接,数据接收,异常,数据转换等。
ChannelInboundHandler:一个最常用的Handler。这个Handler的作用就是处理接收到数据时的事件,也就是说,我们的业务逻辑一般都写在这个Handler里的,ChannelInboundHandler就是用来处理核心业务逻辑的。
ChannelInitializer:当一个链接建立时,我们需要知道怎么来接收或发送数据,当然,我们有各种各样的Handler实现来处理它,那么ChannelInitializer便是用来配置这些Handler,它会提供一个ChannelPipeline并把Handler加入到ChannelPipeline。
ChannelPipeline:一个Netty应用基于ChannelPipeline机制,这种机制需要依赖于EventLoop和EventLoopGroup,因为它们三个都和事件或者事件处理有关。
EventLoop: EventLoop目的是为Channel处理IO操作,一个Channel对应一个EventLop,而一个EventLoop对应一个线程,也就是说,仅有一个线程在负责一个Channel的IO操作。EventLoopGroup会包含多个EventLoop。
Channel: Channel代表了一个Socket链接,或者其它和IO操作相关的组件,它和EventLoop一起用来参与IO处理。
Future:在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,它们可以注册一个监听,当操作执行成功或失败时监听会自动触发。总之,所有的操作都会返回一个ChannelFuture。
三 Netty处理连接请求和业务逻辑
当一个连接到达,Netty会注册一个channel,然后EventLoopGroup会分配一个EventLoop绑定到这个channel,在这个channel的整个生命周期过程中,都会由绑定的这个EventLoop来为它服务。
四 Netty使用案例
以下例子是使用Netty4.x编写的案例,不详细描述了,直接上代码:
server端:
package com.cn.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
public class MyNettyServer {
public static void mainString[] args) {
new MyNettyServer).serverStart);
}
//用于分配处理业务线程的线程组个数
private static final int PARENTGROUPSIZE = Runtime.getRuntime).availableProcessors);
private static final int CHILDGROUPSIZE = PARENTGROUPSIZE*2;
private static final String HOST = “127.0.0.1”;
private static final int PORT = 9999;
NioEventLoopGroup parentGroup;
NioEventLoopGroup childGroup;
/**
*服务端启动方法
*/
public void serverStart){
try {
final ServerBootstrap server = new ServerBootstrap);
parentGroup = new NioEventLoopGroupPARENTGROUPSIZE);
childGroup = new NioEventLoopGroupCHILDGROUPSIZE);
server.groupparentGroup, childGroup);//设置线程组
server.channelNioServerSocketChannel.class);
server.optionChannelOption.SO_KEEPALIVE,true);//保持连接
server.optionChannelOption.SO_BACKLOG,1024);
server.optionChannelOption.TCP_NODELAY,true);
server.childHandlernew ChannelInitializer<SocketChannel>) {
@Override
protected void initChannelSocketChannel channel) throws Exception {
channel.pipeline).addLast”frameDecoder”,new LengthFieldBasedFrameDecoderInteger.MAX_VALUE, 0, 4));
channel.pipeline).addLast”frameEncoder”,new LengthFieldPrepender4));
channel.pipeline).addLast”decoder”,new StringDecoderCharset.forName”GBK”)));
channel.pipeline).addLast”encoder”,new StringEncoderCharset.forName”GBK”)));
channel.pipeline).addLastnew MyServerHandler));
}
});
ChannelFuture future = server.bindHOST,PORT).sync);
iffuture.isSuccess)){
System.out.println”……Netty Server Started……”);
}else{
System.out.println”……Netty Server Failed……”);
}
future.channel).closeFuture).sync);
} catch Exception e) {
e.printStackTrace);
}finally{
this.parentGroup.shutdownGracefully);
this.childGroup.shutdownGracefully);
}
}
}
package com.cn.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandler extends SimpleChannelInboundHandler<String>{
/*
*收到消息时调用
*/
@Override
protected void channelReadChannelHandlerContext context, String msg)throws Exception {
System.out.println”server received msg:”+msg);
context.writeAndFlushnew String”客户端你好“));
}
@Override
public void exceptionCaughtChannelHandlerContext context, Throwable cause) throws Exception {
cause.printStackTrace);
context.flush);
context.close);
}
}
Client端:
package com.cn.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
public class MyNettyClient {
public static void mainString[] args) {
Channel channel = new MyNettyClient).clientStart);
channel.writeAndFlushnew String”服务器你好“));
}
private static final String HOST = “127.0.0.1”;
private static final int PORT = 9999;
public Channel clientStart){
Channel channel = null;
Bootstrap client = new Bootstrap);
EventLoopGroup group = new NioEventLoopGroup);
try {
client.groupgroup);
client.channelNioSocketChannel.class);
client.optionChannelOption.SO_KEEPALIVE,true);
client.handlernew ChannelInitializer<SocketChannel>) {
@Override
protected void initChannelSocketChannel channel)throws Exception {
channel.pipeline).addLast”frameDecoder”,new LengthFieldBasedFrameDecoderInteger.MAX_VALUE, 0, 4));
channel.pipeline).addLast”frameEncoder”,new LengthFieldPrepender4));
//编码格式
channel.pipeline).addLast”encoder”,new StringEncoderCharset.forName”GBK”)));
//解码格式
channel.pipeline).addLast”decoder”,new StringDecoderCharset.forName”GBK”)));
channel.pipeline).addLastnew MyClientHandler));
}
});
ChannelFuture future = client.connectHOST, PORT).sync);
iffuture.isSuccess)){
System.out.println”……netty client started……”);
}
channel = future.channel);
} catch InterruptedException e) {
e.printStackTrace);
}
return channel;
}
}
packagecom.cn.netty;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.SimpleChannelInboundHandler;
public class MyClientHandler extends SimpleChannelInboundHandler<String>{
/*
*收到消息时调用
*/
@Override
protected void channelReadChannelHandlerContext context, String msg)throwsException {
System.out.println“收到服务器发来的消息:”+msg);
}
/*
*建立连接时调用
*/
@Override
public void channelActiveChannelHandlerContext context)throwsException {
super.channelActivecontext);
}
}