GoogleProtobuf编解码

Google的Protobuf在业界非常流行,很多商业项目选择Protobuf作为编解码框架,其优点如下。

  • 在谷歌内部长期使用,产品成熟度高:
  • 跨语言,支持多种语言,包括C十十、java和Python.
  • 编码后的消息更小,更加有利于存储和传输:
  • 编解码的性能非常高:
  • 支持不同协议版本的前向兼容:
  • 支辫定义可选和必选字段。

Protobuf的入门

Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于XML等传统的序列化工具,它更小,更快,更简单。Protobuf支持数据结构化一次可以到处使用,甚至跨语言使用,通过代码生成工具可以自动生成不同语言版本的源代码,甚至可以在使用不同版本的数据结构j进程间进行数据传递,实现数据结构的前向兼容。

下面我们通过一个简单的例程来学习如何使用Protobuf对POJO对象进行编解码,然后,我们以这个例程为基础,学习如何在Netty中对POJO对象迸行Protobuf编解码,并在两个进程之间进行通信和数据交换。

Protobuf开发环境搭建

首先下载Protobuf的Windows版本,得到protoc.exe.
下面我们以商品订购例程为例,定义SubscribeReq.proto和SubscribeResp.proto与protoc.exe放在同目录下(方便下面操作).
代码清单1 SubscribeReq.proto

1
2
3
4
5
6
7
8
9
10
package com.eric.netty.codec.protobuf;
option java_package = "com.eric.netty.codec.protobuf";
option java_outer_classname = "SubscribeReqProto";
message SubscribeReq{
required int32 subReqID = 1;
required string userName = 2;
required string productName = 3;
repeated string address = 4;
}

代码清单2 SubscribeReq.proto

1
2
3
4
5
6
7
8
package com.eric.netty.codec.protobuf;
option java_package = "com.eric.netty.codec.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
required int32 subReqID = 1;
required string respCode = 2;
required string desc = 3;
}

通过protoc.exe命令protoc --java_out=SubscribeReq.protoprotoc --java_out=SubscribeResp.proto行生成Java代码。如果出现Missing input file则改用protoc ./SubscribeReq.proto --java_out=./protoc ./SubscribeResp.proto --java_out=./如图1所示,再把生成的SubscribeReqProto.java和SubscribeRespProto.java拷贝到项目中。



图1 通过protoc.exe工具生成源代码

生成的源代码编译出错,是由于缺少protobuf-java的jar包,其maven的依赖如下,将jar包引入到类库中就能正常使用,Protobuf开发环境也已经构建完成。

代码清单3 protobuf-java的maven的依赖

1
2
3
4
5
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.4.0</version>
</dependency>

Protobuf编解码开发

Protobuf的类库使用比较简单,下面我们就通过对SubscribeReqProto进行编解码来介绍Protobuf的使用。
代码清单4 Protobuf入门 TestSubscribeReqProto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class TestSubscribeReqProto {
private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
/*
编码时通过调用SubscribeReqProto.SubscrobeReq实例的toByteArray即可将SubscribeReq编码为byte数组,使用非常方便。
*/
return req.toByteArray();
}
private static SubscribeReqProto.SubscribeReq decode(byte[] body)
throws InvalidProtocolBufferException {
/*
编码时通过调用SubscribeReqProto.SubscrobeReq实例的parseFrom将二进制byte数组解码为原始的对象。
*/
return SubscribeReqProto.SubscribeReq.parseFrom(body);
}
/*
通过SubscribeReqProto.SubscribeReq的静态方法newBuilder创建
SubscribeReqProto.SubscribeReq的Builder实例
*/
private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
/*
通过Builder构造器对SubscribeReq的属性进行设置
*/
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq
.newBuilder();
builder.setSubReqID(1);
builder.setUserName("Lilinfeng");
builder.setProductName("Netty Book");
/*
对于集合类型通过addAllXXX()方法将集合对象设置 到对应的属性中。
*/
List<String> address = new ArrayList<>();
address.add("NanJing YuHuaTai");
address.add("BeiJing LiuLiChang");
address.add("ShenZhen HongShuLin");
builder.addAllAddress(address);
return builder.build();
}
/**
* @param args
* @throws InvalidProtocolBufferException
*/
public static void main(String[] args)
throws InvalidProtocolBufferException {
SubscribeReqProto.SubscribeReq req = createSubscribeReq();
System.out.println("Before encode : " + req.toString());
SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
System.out.println("After decode : " + req.toString());
System.out.println("Assert equal : --> " + req2.equals(req));
}
}

运行Protobuf例程

运行TestSubscribeReqProto,执行结果如图2所示。



图2 运行TestSubscribeReqProto执行结果

Netty的Protobuf服务端开发

下面为一个Protobuf版本的图书订购程序。

Protobuf服务端开发

代码清单5 Protobuf版本图书订购代码SubReqServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class SubReqServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
/*
向ChannelPipeline添加ProtobufVarint32FrameDecoder,
它主要用于半包处理,随后继续添加ProtobufDecoder解码器,
它的参数是com.google.protobuf.MessageLite,实际上就是
要告诉ProtobufDecoder需要解码的目标类是什么,否则仅仅
从字节数组中是无法判断出要解码的目标类型信息的。
*/
ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeReqProto.SubscribeReq
.getDefaultInstance()));
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqServer().bind(port);
}
}

代码清单6 Protobuf版本图书订购代码SubReqServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
System.out.println("Service accept client subscribe req : ["
+ req.toString() + "]");
ctx.writeAndFlush(resp(req.getSubReqID()));
}
}
private SubscribeRespProto.SubscribeResp resp(int subReqID) {
SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp
.newBuilder();
builder.setSubReqID(subReqID);
builder.setRespCode(String.valueOf(0));
builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return builder.build();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();// 发生异常,关闭链路
}
}

由于protobufDecoder已经对消息进行了自动解码,因此接收到的订购请求消息可以直接使用。对用户名进行校验,校验通过后构造应答消息返回给客户端,由于使用了ProtobufEncoder,所以不需要对SubscribeRespProto.SubscribeResp进行手工编码。

Protobuf客户端开发

代码清单7 Protobuf版本图书订购代码SubReqClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class SubReqClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());
/*
需要指出的是客户端需要解码的对象是订购响应,使用
SubscribeRespProto.SubscribeResp的实例作为入参
*/
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeRespProto.SubscribeResp
.getDefaultInstance()));
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqClient().connect(port, "127.0.0.1");
}
}

代码清单8 Protobuf版本图书订购代码SubReqHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class SubReqClientHandler extends ChannelHandlerAdapter {
/**
* Creates a client-side handler.
*/
public SubReqClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 10; i++) {
ctx.write(subReq(i));
}
ctx.flush();
}
private SubscribeReqProto.SubscribeReq subReq(int i) {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq
.newBuilder();
builder.setSubReqID(i);
builder.setUserName("Lilinfeng");
builder.setProductName("Netty Book For Protobuf");
List<String> address = new ArrayList<>();
address.add("NanJing YuHuaTai");
address.add("BeiJing LiuLiChang");
address.add("ShenZhen HongShuLin");
builder.addAllAddress(address);
return builder.build();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Receive server response : [" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

测试Protobuf版本的图书订购程序功能

服务端运行结果如图3所示.



图3 服务端运行结果

服务端运行结果如图4所示.



图4 服务端运行结果

Protobuf的使用注意事项

ProtoBufDecoder仅仅负责解码,它不支持读半包。因此,在ProtobufDecoder前面,一定要有能够处理读半包的解码器,有以下三种方式可以选择。

  • 使用Netty提供的ProtobufVarint32FrameDecoder,可以处理半包消息;
  • 继承Netty的通用半包解码器LengthFieldBasedFrameDecoder;
  • 继承ByteToMessageDecoder类,自己处理半包消息。

如果使用ProtobufDecoder解码器而忽略对半包消息的处理,程序是不能正常工作的。以前面的图书订购为例服务端代码进行修改,注释掉ProtobufVarint32FrameDecoder,代码修改如图5所示。



图5 注释掉ProtobufVarint32FrameDecoder

程序运行,结果如图6所示,运行出错。



图6 注释掉ProtobufVarint32FrameDecoder运行出错

Netty权威指南第二版源代码

Adhere to the original technology to share, your support will encourage me to continue to create!