mina框架CumulativeProtocolDecoder.doDecode方法浅析
服务器正常启动,确认设备连接正常。 设备为客户端,每3秒向服务器发送心跳
模拟网络短暂不通,多个包同时到达,粘包情况: 拔掉网线30秒,插入网线。
代码如下
package mina;
import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays;
import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.AttributeKey; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class Test { public static void main(String[] args) throws IOException { final byte[] key = { (byte) 0xb9, (byte) 0xda, 0x5e, 0x15, 0x46, 0x57, (byte) 0xa7, (byte) 0x8d, (byte) 0x9d, (byte) 0x84, (byte) 0x90, (byte) 0xd8, (byte) 0xab, 0x00, (byte) 0x8c, (byte) 0xbc, (byte) 0xd3, 0x0a, (byte) 0xf7, (byte) 0xe4, 0x58, 0x05, (byte) 0xb8, (byte) 0xb3, 0x45, 0x06, (byte) 0xd0, 0x2c, 0x1e, (byte) 0x8f, (byte) 0xca, 0x3f };
NioSocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addFirst("codec", new ProtocolCodecFilter(new ProtocolEncoderAdapter() {
[@Override](https://my.oschina.net/u/1162528)
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
System.out.println("编码:" + message);
}
}, new CumulativeProtocolDecoder() {
[@Override](https://my.oschina.net/u/1162528)
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
System.out.println();
System.out.println("长度:" + in.limit());
System.out.println();
IoBuffer ioBuffer = getContext(session).getIoBuffer();
//IoBuffer ioBuffer = IoBuffer.allocate(100).setAutoExpand(true);
while (in.hasRemaining()) {
byte b = in.get();
ioBuffer.put(b);
System.out.print("【"+b+"】");
if (b != 0x03 && b != 0x04) {
continue;
}
byte[] array = ioBuffer.array();
if (array[0] != 0x02) {
System.out.println();
System.out.println("=======================================");
System.out.println("未知包" + ByteUtil.toHexStr(array));
System.out.println("=======================================");
throw new RuntimeException("================未知包================");
}
array = Arrays.copyOfRange(array, 1, ioBuffer.position() - 2);
int temp1 = array.length / 32;
int temp2 = array.length % 32;
for (int i = 0; i < temp1; i++) {
for (int j = 0; j < 32; j++) {
array[i * 32 + j] ^= key[j];
}
}
for (int j = 0; j < temp2; j++) {
array[temp1 * 32 + j] ^= key[j];
}
System.out.println("");
System.out.println("=======================================");
System.out.println("解析到包:" + new String(array));
System.out.println("=======================================");
out.write(new String(array));
ioBuffer.clear();
System.out.println("返回true");
return true;
}
System.out.println("返回false");
return false;
}
private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
public Context getContext(IoSession session) {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
}
class Context {
private IoBuffer ioBuffer = IoBuffer.allocate(100).setAutoExpand(true);
public IoBuffer getIoBuffer() {
return ioBuffer;
}
}
}));
acceptor.setHandler(new IoHandlerAdapter() {
[@Override](https://my.oschina.net/u/1162528)
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
}
});
acceptor.bind(new InetSocketAddress(3333));
}
} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 打印如下
长度:53
【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】
解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7
返回true
长度:256
【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】
解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7
返回true
长度:256
【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】
解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7
返回true
长度:256
【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】
解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7
返回true
长度:256
【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】
解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7
返回true
长度:256
【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】返回false
长度:433
【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】
未知包BD,C5,67,F8,D4,E9,3D,00,03,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00
长度:53
【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】
解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7
返回true 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 解释 插入网线后,不打印日志。(模拟的,有时是马上就打印) 大约20秒后,瞬间收到5、60行日志。
一个心跳包已0x02开头、已0x03结尾,长度为53。
日志第2行:长度为53,表示这个包正常
日志第10行:长度为256,说明IoBuffer in一次从缓存读取256个字节,是4个心跳包加半个心跳包的长度。 代码: 41行,每次取一个字节 42行,保存字节到ioBuffer 45行,如果不是取到一个完整的包,继续保存字节到ioBuffer 60行,解码数据 79行,返回true 上级代码
protected CumulativeProtocolDecoder() { } public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (!session.getTransportMetadata().hasFragmentation()) { while (in.hasRemaining()) { /** 当前读取到in的53的位置,还有剩余字节(共256),并且doDecode返回的true,则继续执行doDecode。CumulativeProtocolDecoder.decode(..)的上层为同步,所以这个没循环完,就算缓存还有新数据,也需要等待decode方法执行完才可以。上层代码: synchronized (session) { decoder.decode(session, in, decoderOut); } */ if (!doDecode(session, in, out)) { break; } } return; }
boolean usingSessionBuffer = true;
IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 日志第18行: 从in的256的53位置继续读取53个字节,继续返回true
日志第26行: 从in的256的106位置继续读取53个字节,继续返回true
日志第34行: 从in的256的159位置继续读取53个字节,继续返回true
日志第34行: 从in的256的212位置继续读取53个字节,继续返回true
日志第42行: 从in的256的212位置继续读取44个字节,返回false CumulativeProtocolDecoder() { } public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (!session.getTransportMetadata().hasFragmentation()) { while (in.hasRemaining()) { /** 返回false,表示本方法执行完毕。如果有新包则在执行。 */ if (!doDecode(session, in, out)) { break; } } return; }
日志第42行: 本次长度433(上次心跳包的9个字节+新的8个心跳包的长度) test.java.38行又重新创建的IoBuffer,通过0x03截取的时候,取到的是9个字节的长度。所以需要把IoBuffer放入session中,就可以连接上次取到的包加本次的9个字节,组合成一个完成的包。 ———————————————— 版权声明:本文为CSDN博主「笨人_」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/qq1247/article/details/78125289