协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据时将JAVA 对象编码二进制数据,而接收数据时将二进制数据解码为JAVA 对象(这个可不是JAVA 对象的序列化、反序列化那么简单的事情)
Mina 中的协议编解码器通过过滤器ProtocolCodecFilter 构造,这个过滤器的构造方法需要一个ProtocolCodecFactory,ProtocolCodecFactory 中有如下两个方法:
public interface ProtocolCodecFactory {ProtocolEncoder getEncoder(IoSession session) throws Exception;ProtocolDecoder getDecoder(IoSession session) throws Exception;}
ProtocolEncoder是自定义编码器要实现的接口,ProtocolDecoder是自定义解码器要实现的接口。
下面是示例代码:(模拟手机信息的编解码,消息格式;有报头,发送人,接收人,内容长度,内容信息)
MsgObject.java: 消息实体类
1 public class MsgObject { 2 //发送者 3 private String sender; 4 //接收者 5 private String receiver; 6 //信息内容 7 private String content; 8 9 public String getSender() {10 return sender;11 }12 13 public void setSender(String sender) {14 this.sender = sender;15 }16 17 public String getReceiver() {18 return receiver;19 }20 21 public void setReceiver(String receiver) {22 this.receiver = receiver;23 }24 25 public String getContent() {26 return content;27 }28 29 public void setContent(String content) {30 this.content = content;31 }32 33 }
MessageEncoder.java: 消息编码器
1 //消息编码器 2 public class MessageEncoder extends ProtocolEncoderAdapter { 3 private Charset charset; 4 5 public MessageEncoder(Charset charset) 6 { 7 this.charset = charset; 8 } 9 10 @Override11 public void encode(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2)12 throws Exception {13 MsgObject msg = (MsgObject) arg1;14 //生成字符编码器15 CharsetEncoder charsetEncoder = charset.newEncoder();16 //得到要发送对象属性内容,准备进行编码17 String status = "M sip:wap.fetion.com.cn SIP-C/2.0";18 String sender = msg.getSender();19 String receiver = msg.getReceiver();20 String content = msg.getContent();21 //开辟一个缓存空间,设置为自动调整大小22 IoBuffer ioBuffer = IoBuffer.allocate(100);23 ioBuffer.setAutoExpand(true);24 //将要发送的信息放入缓存空间25 //消息头26 ioBuffer.putString(status + "\n", charsetEncoder);27 //消息发送者28 ioBuffer.putString("S: " + sender + "\n", charsetEncoder);29 //消息接收者30 ioBuffer.putString("R: " + receiver + "\n", charsetEncoder);31 //消息内容长度32 ioBuffer.putString("L: " + content.getBytes(charset).length + "\n", charsetEncoder);33 //消息内容34 ioBuffer.putString(content + "\n", charsetEncoder);35 //编码后的信息已放入ioBuffer中,进行写回36 ioBuffer.flip();37 arg2.write(ioBuffer);38 }39 40 }
MessageDecoder.java: 消息解码器
1 //消息解码器 2 public class MessageDecoder extends CumulativeProtocolDecoder { 3 private Charset charset; 4 5 public MessageDecoder(Charset charset) { 6 this.charset = charset; 7 } 8 9 @Override10 protected boolean doDecode(IoSession arg0, IoBuffer arg1,11 ProtocolDecoderOutput arg2) throws Exception {12 CharsetDecoder charDecoder = charset.newDecoder();13 IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);14 // 接收解码后的信息15 String status = "";16 String sender = "";17 String receiver = "";18 String contentLen = "";19 String content = "";20 21 int textLineNumber = 1;22 int columnNumber = 0;23 // 如果缓存区还有消息24 while (arg1.hasRemaining()) {25 byte bt = arg1.get();26 buffer.put(bt);27 //换行28 if (bt == 10 && textLineNumber < 5) {29 columnNumber++;30 if (textLineNumber == 1) {31 buffer.flip();32 status = buffer.getString(columnNumber, charDecoder);33 status = status.substring(0, status.length() - 1);34 columnNumber = 0;35 buffer.clear();36 }37 if (textLineNumber == 2) {38 buffer.flip();39 sender = buffer.getString(columnNumber, charDecoder);40 sender = sender.substring(0, sender.length() - 1);41 columnNumber = 0;42 buffer.clear();43 }44 if (textLineNumber == 3) {45 buffer.flip();46 receiver = buffer.getString(columnNumber, charDecoder);47 receiver = receiver.substring(0, receiver.length() - 1);48 columnNumber = 0;49 buffer.clear();50 }51 if (textLineNumber == 4) {52 buffer.flip();53 contentLen = buffer.getString(columnNumber, charDecoder);54 contentLen = contentLen.substring(0,55 contentLen.length() - 1);56 columnNumber = 0;57 buffer.clear();58 }59 textLineNumber++;60 } else if (textLineNumber == 5) {61 columnNumber++;62 if (columnNumber == Long.parseLong(contentLen.split(": ")[1])) {63 buffer.flip();64 content = buffer.getString(columnNumber, charDecoder);65 textLineNumber++;66 break;67 }68 } else {69 columnNumber++;70 }71 72 }73 MsgObject smsObject = new MsgObject();74 smsObject.setSender(sender.split(": ")[1]);75 smsObject.setReceiver(receiver.split(": ")[1]);76 smsObject.setContent(content);77 arg2.write(smsObject);78 return false;79 }80 }
关于IoBuffer的读操作,需要了解一下原理,可参考文章:
MessageProtocolCodecFactory.java: 生成消息编解码器工厂
1 //编解码器生成工产 2 public class MessageProtocolCodecFactory implements ProtocolCodecFactory { 3 private ProtocolEncoder encoder; 4 private ProtocolDecoder decoder; 5 6 public MessageProtocolCodecFactory() 7 { 8 this(Charset.forName("UTF-8")); 9 }10 11 public MessageProtocolCodecFactory(Charset charset)12 {13 encoder = new MessageEncoder(charset);14 decoder = new MessageDecoder(charset);15 }16 17 @Override18 public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {19 return decoder;20 }21 22 @Override23 public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {24 return encoder;25 }26 27 }
接着就是调用这些编解码器来进行对象的传输了,服务器端和客户端的主程序编写可参考
温馨提示:
上面的消息解码器( MessageDecoder.java)中的解码考虑的情况是消息一次性从服务器发送过来,但有时消息可能不是一次性从服务器发送过来,而是分成了几次分批过来,这时就会重复调用解码器的deCode()方法,这时状态变量textLineNumber和columnNumber就会被重置,所以要把状态变量保存起来。可能你会想到将状态变量保存在解码器的成员变量中,但是Mina不保证每次调用deCode()方法的都是同一个线程,所以状态变量不是线程安全的。所以要将状态变量保存到IoSession中,因为IoSession用了一个同步的HashMap保存对象。 在IoSession中保存状态变量:// 保存数据状态对象的key值 private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
通过IoSession.setAttribute和IoSession.getAttribute的保存和得到保存数据的对象
如下:1 private MsgContext getContext(IoSession session) {2 MsgContext context = (MsgContext) session.getAttribute(CONTEXT);3 if (null == context) {4 context = new MsgContext();5 session.setAttribute(CONTEXT, context);6 }7 return context;8 }