MessagePack编解码

MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交互,但是它的性能更快,序列化之后的码流也更小。
由于MessagePack在业界得到了非常广泛的应用,将介绍如何利用Netty的CodeC框架新增对MessagePack的支持。

MessagePack介绍

MessagePack的特点如下:

  • 编解码高效,性能高;
  • 序列化之后码流小;
  • 支持跨语言。

衡量序列化框架通用性的一个重要指标就是对多语言的支持,因为数据交换的双方很难保证一定采用相同的语言开发,如果序列化框架和某种语言绑定,他就很难跨语言,如Java的序列化机制。
MessagePack提供了多种语言支持:Java、Python、Ruby、Haskell、C#、OCaml、Lua、Go、C、C++等。

官网地址http://msgpack.org/
Git地址https://github.com/msgpack/msgpack-java

MessagePack Java API 介绍

Maven引用的方式:

1
2
3
4
5
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>${msgpack.version}</version>
</dependency>

API官方示例:

代码清单1 官方示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.msgpack.MessagePack;
import org.msgpack.template.Templates;
public class TestMessagePack {
public static void main(String[] args) {
// Create serialize objects
List<String> src=new ArrayList<String>();
src.add("msgpack");
src.add("kumofs");
src.add("viver");
MessagePack msgpack=new MessagePack();
// Serialize
byte[] raw;
try {
raw = msgpack.write(src);
// Deserialize directly using a template
List<String> dst1 = msgpack.read(raw,Templates.tList(Templates.TString));
System.out.println(dst1.get(0));
System.out.println(dst1.get(1));
System.out.println(dst1.get(2));
} catch (IOException e) {
e.printStackTrace();
}
}
}

MessagePack编码器和解码器开发

MessagePack编码器开发

代码清单2 msgpack编码器 MsgpackEncoder

1
2
3
4
5
6
7
8
9
public class MsgpackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception {
MessagePack msgpack=new MessagePack();
byte[] raw=msgpack.write(arg1);
arg2.writeBytes(raw);
}
}

MsgpackEncoder继承MessageToByteEncoder,它负责将Object类型的POJO对象编码为byte数组,然后写入到ByteBuf中。

MessagePack解码器开发

代码清单3 msgpack编码器 MsgPackDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception {
final byte[] array;
final int length=arg1.readableBytes();
array=new byte[length];
arg1.getBytes(arg1.readerIndex(), array,0,length);
MessagePack msgpack=new MessagePack();
arg2.add(msgpack.read(array));
}
}

首先从数据报arg1中获取需要解码的byte数组,然后调用MessagePack的read方法将其反序列化为Object对象,将解码后的对象加入到解码列表中arg2中,这样就完成了MessagePack的解码操作。

#### 功能测试
完成编解码器开发之后,我们以Netty原生Echo程序为例,进行测试。对Echo进行简单改造,传输的对象由字符串修改为POJO对象,利用MessagePack对POJO对象进行序列化。

序列化对象

代码清单4 UserInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Message
public class UserInfo {
private int age;
private String name;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

客户端代码

代码清单5 EchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class EchoClient {
private final String host;
private final int port;
private final int sendNumber;
public EchoClient(int port,String host,int sendNumber){
this.host=host;
this.port=port;
this.sendNumber=sendNumber;
}
public void run() throws Exception{
EventLoopGroup group=new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//LengthFieldBasedFrameDecoder用于处理半包消息
//这样后面的MsgpackDecoder接收的永远是整包消息
ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
ch.pipeline().addLast("msgpack decoder",new MsgPackDecoder());
//在ByteBuf之前增加2个字节的消息长度字段
ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
ch.pipeline().addLast(new EchoClientHandler(sendNumber));
}
});
ChannelFuture f= b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally{
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
int port=8080;
new EchoClient(port,"127.0.0.1",1000).run();

代码清单6 EchoClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class EchoClientHandler extends ChannelHandlerAdapter{
private final int sendNumber;
private int counter;
public EchoClientHandler(int sendNumber){
this.sendNumber=sendNumber;
}
@Override
public void channelActive(ChannelHandlerContext ctx){
UserInfo [] infos = UserInfo();
for(UserInfo infoE : infos){
ctx.write(infoE);
}
ctx.flush();
}
private UserInfo[] UserInfo(){
UserInfo [] userInfos=new UserInfo[sendNumber];
UserInfo userInfo=null;
for(int i=0; i < sendNumber; i++){
userInfo=new UserInfo();
userInfo.setAge(i);
userInfo.setName("ABCDEFG --->"+i);
userInfos[i]=userInfo;
}
return userInfos;
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
System.out.println("Client receive the msgpack message : " + msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}

服务端代码

代码清单7 EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class EchoServer {
public void bind(int port)throws Exception{
//创建服务端的NIO线程组
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try {
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// TODO Auto-generated method stub
ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
ch.pipeline().addLast("msgpack decoder",new MsgPackDecoder());
//在ByteBuf之前增加2个字节的消息长度字段
ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
ch.pipeline().addLast(new EchoServerHander());
}
});
//绑定端口,同步等待成功
ChannelFuture f=b.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
int port=8080;
if(args!=null&&args.length>0){
try {
port=Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// TODO: handle exception
}
}
new EchoServer().bind(port);
}
}

代码清单8 EchoServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Sharable
public class EchoServerHander extends ChannelHandlerAdapter {
int counter=0;
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
System.out.println("Server receive the msgpack message : " + msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

运行结果分析

没有进行粘包/半包处理,结果分析

不进行处理就是去掉服务端与客户端中如下代码:

1
2
ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));

客户端运行结果如下。



图1 客户端运行结果

服务端运行结果如下。



图2 服务端运行结果

没有进行粘包/半包的处理,我们开发的MessagePack编解码框架还不能正常工作,如下为粘包场景下测试结果。



图3 粘包场景下测试结果
粘包/半包支持下,运行结果分析

在MessagePack编码器之前添加LengthFieldPrepender,它将在ByteBuf之前添加2个字节的消息长度字段,其原理如图1所示。



图4 LengthFieldPrepender原理示意图

在MessagePack解码器之前添加LengthFieldBaseFrameDecoder,用于处理半包消息,这样后面的MsgpackDecoder接收到的永远是整包消息,它的工作原理如图2所示。



图5 LengthFieldBaseFrameDecoder原理示意图

服务端运行结果如下。



图6 服务端运行结果

客户端运行结果如下。



图7 客户端运行结果

Adhere to the original technology to share, your support will encourage me to continue to create!