JBossMarshalling编解码

JBoss Marshalling是一个Java对象序列化包,对JDK默认的序列化框架进行了优化,但又保存跟java.io.Serializable接口兼容,同时增加了一些可调的参数和附加特性,这些参数和特性可通过工厂类进行配置。

Marshallig开发环境准备

从地址:http://jbossmarshalling.jboss.org/downloads ,选择jboss-marshalling-serial-1.3.0.CR9.jar和jboss-marshalling-1.3.0.CR9.jar下载,并把它们加入到项目中。

Netty的Marshalling服务端开发

定义POJO对象,JBoss的Marshalling完全兼容JDK序列化,通过JBoss提供的序列化API,通过订购的实例看看如何使用Netty的Marshalling编解码类对消息进行序列化和反序列化。

代码清单1 Marshalling版本图书订购代码SubReqServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class SubReqServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
/*
MarshallingCodeCFactory工厂类创建了MarshallingDecoder解码器,
并将其加入到ChannelPipeline中
*/
ch.pipeline().addLast(
MarshallingCodeCFactory
.buildMarshallingDecoder());
/*
MarshallingCodeCFactory工厂类创建了MarshallingEncoder编码器,
并将其加入到ChannelPipeline中
*/
ch.pipeline().addLast(
MarshallingCodeCFactory
.buildMarshallingEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqServer().bind(port);
}
}

代码清单2 Marshalling版本图书订购代码SubReqServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
SubscribeReq req = (SubscribeReq) msg;
if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
System.out.println("Service accept client subscrib req : ["
+ req.toString() + "]");
ctx.writeAndFlush(resp(req.getSubReqID()));
}
}
private SubscribeResp resp(int subReqID) {
SubscribeResp resp = new SubscribeResp();
resp.setSubReqID(subReqID);
resp.setRespCode(0);
resp.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return resp;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();// 发生异常,关闭链路
}
}

工厂类MarshallingCodeCFactory

代码清单3 Marshalling版本图书订购代码MarshallingCodeCFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public final class MarshallingCodeCFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
*
* @return
*/
public static MarshallingDecoder buildMarshallingDecoder() {
/*
通过Marshalling工具类的getProvidedMarshallerFactory静态方法获取
MarshallerFactory实例,参数"serial"表示创建的是Java序列化工厂对象,
它由jboss-marshalling-serial-1.3.0.CR9.jar提供。
*/
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
/*
创建了MarshallingConfiguration对象,它的版本号设置为5,然后根据MarshallerFactory
和MarshallingConfiguration创建UnmarshallerProvider实例,
最后通过构造函数创建Netty的MarshallingDeCoder对象,它有两个参数,
分别是UnmarshallerProvider和单个消息序列化后的最大长度。
*/
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
marshallerFactory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
*
* @return
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
/*
同样是构造MarshallerFactory和MarshallingConfiguration
*/
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
/*
创建MarshallerProvider对象,它用于创建Netty提供的MarshallingEncoder实例,
MarshallingEncoder用于将实现序列化接口的POJO对象序列化为二进制数组。
*/
MarshallerProvider provider = new DefaultMarshallerProvider(
marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}

定义POJO对象

代码清单4 Marshalling版本图书订购代码SubscribeReq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class SubscribeReq implements Serializable {
/**
* 默认的序列号ID
*/
private static final long serialVersionUID = 1L;
private int subReqID;
private String userName;
private String productName;
private String phoneNumber;
private String address;
public final int getSubReqID() {
return subReqID;
}
public final void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public final String getUserName() {
return userName;
}
public final void setUserName(String userName) {
this.userName = userName;
}
public final String getProductName() {
return productName;
}
public final void setProductName(String productName) {
this.productName = productName;
}
public final String getPhoneNumber() {
return phoneNumber;
}
public final void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public final String getAddress() {
return address;
}
public final void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "SubscribeReq [subReqID=" + subReqID + ", userName=" + userName
+ ", productName=" + productName + ", phoneNumber="
+ phoneNumber + ", address=" + address + "]";
}
}

代码清单5 Marshalling版本图书订购代码SubscribeResp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SubscribeResp implements Serializable {
/**
* 默认序列ID
*/
private static final long serialVersionUID = 1L;
private int subReqID;
private int respCode;
private String desc;
public final int getSubReqID() {
return subReqID;
}
public final void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public final int getRespCode() {
return respCode;
}
public final void setRespCode(int respCode) {
this.respCode = respCode;
}
public final String getDesc() {
return desc;
}
public final void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "SubscribeResp [subReqID=" + subReqID + ", respCode=" + respCode
+ ", desc=" + desc + "]";
}
}

Netty的Marshalling客户端开发

代码清单6 Marshalling版本图书订购代码SubReqClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class SubReqClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
/*
分别创建Marshlling编码器和解码器,并将其添加到ChannelPipeline中。
*/
ch.pipeline().addLast(
MarshallingCodeCFactory
.buildMarshallingDecoder());
ch.pipeline().addLast(
MarshallingCodeCFactory
.buildMarshallingEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqClient().connect(port, "127.0.0.1");
}
}

代码清单7 Marshalling版本图书订购代码SubReqClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class SubReqClientHandler extends ChannelHandlerAdapter {
/**
* Creates a client-side handler.
*/
public SubReqClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 10; i++) {
ctx.write(subReq(i));
}
ctx.flush();
}
private SubscribeReq subReq(int i) {
SubscribeReq req = new SubscribeReq();
req.setAddress("NanJing YuHuaTai");
req.setPhoneNumber("138xxxxxxxxx");
req.setProductName("Netty Book For Marshalling");
req.setSubReqID(i);
req.setUserName("Lilinfeng");
return req;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Receive server response : [" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

运行结果

服务端运行结果如下。



图1 服务端运行结果

客户端运行结果如下。



图2 客户端运行结果

Adhere to the original technology to share, your support will encourage me to continue to create!