Netty应用入门

一个简单的NIO服务端程序,如果我们直接使用JDK的NIO类库进行开发,竟然需要经过烦琐的十多步操作才能完成最基本的信息读取和发送,这也是我们要选择Netty等NIO框架的原因了。

Netty服务端开发

Netty时间服务器服务端
代码清单1 Netty时间服务器服务端 TimeServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class TimeServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组,实际就是他们就是Reactor线程组
//一个用于服务端接受客户端的连接,另外一个用于进行SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//ServerBootstrap用于启动NIO服务端的辅助启动类,降低服务端的开发复杂度
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler()); //ChildChannelHandler用于处理I/O事件
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 调用相应方法进行阻塞,等待服务端监听端口关闭之后main函数才退出
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new TimeServerHandler());
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new TimeServer().bind(port);
}
}

代码清单2 Netty时间服务器服务端 TimeServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//继承自ChannelHandlerAdapter用于对网络的读写操作
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
//readableBytes获取缓冲区可读的字节数,根据可读的字节数创建byte数组
byte[] req = new byte[buf.readableBytes()];
//readBytes方法将缓冲区中的字节数组复制到新建的byte数组中
buf.readBytes(req);
//通过new String 构造函数获取请求消息
String body = new String(req, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//从性能考虑,为了防止频繁地唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入SocketChannel中,调用write方法只是把待发送的消息放到发送缓冲区数组中,再通过调用flush方法,将发送缓冲区的消息全部写入到SocketChannel中。
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//flush方法的作用是将消息发送队列中的消息写入到SocketChannel中发送给对方
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}

Netty客户端开发

代码清单3 Netty时间服务器客户端 TimeClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TimeClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//客户端辅助启动类Bootstrap,随后进行配置
Bootstrap b = new Bootstrap();
//客户端设置为NioSocketChannel,随后添加Handler
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new TimeClient().connect(port, "127.0.0.1");
}
}

代码清单4 Netty时间服务器客户端 TimeClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class TimeClientHandler extends ChannelHandlerAdapter {
private static final Logger logger = Logger
.getLogger(TimeClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public TimeClientHandler() {
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
//当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法,
//发送查询的时间的指令给服务端,调用ChannelHandlerContext的writeAndFlush方法将请求消息发送给服务端。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
//当服务端返回应答消息时,channelRead方法被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is : " + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 当发生异常时,打印异常日志,释放客户端资源
logger.warning("Unexpected exception from downstream : "
+ cause.getMessage());
ctx.close();
}
}

运行结果

服务端运行结果如图1。



图1 TimeServer运行结果

客户端运行结果如图2。



图2 TimeClient运行结果

Adhere to the original technology to share, your support will encourage me to continue to create!