mina框架CumulativeProtocolDecoder.doDecode方法浅析

mina框架CumulativeProtocolDecoder.doDecode方法浅析

服务器正常启动,确认设备连接正常。 设备为客户端,每3秒向服务器发送心跳

模拟网络短暂不通,多个包同时到达,粘包情况: 拔掉网线30秒,插入网线。

代码如下

package mina;

import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays;

import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.AttributeKey; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class Test { public static void main(String[] args) throws IOException { final byte[] key = { (byte) 0xb9, (byte) 0xda, 0x5e, 0x15, 0x46, 0x57, (byte) 0xa7, (byte) 0x8d, (byte) 0x9d, (byte) 0x84, (byte) 0x90, (byte) 0xd8, (byte) 0xab, 0x00, (byte) 0x8c, (byte) 0xbc, (byte) 0xd3, 0x0a, (byte) 0xf7, (byte) 0xe4, 0x58, 0x05, (byte) 0xb8, (byte) 0xb3, 0x45, 0x06, (byte) 0xd0, 0x2c, 0x1e, (byte) 0x8f, (byte) 0xca, 0x3f };

    NioSocketAcceptor acceptor = new NioSocketAcceptor();
    acceptor.getFilterChain().addFirst("codec", new ProtocolCodecFilter(new ProtocolEncoderAdapter() {
        [@Override](https://my.oschina.net/u/1162528)
        public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
            System.out.println("编码:" + message);
        }
    }, new CumulativeProtocolDecoder() {
        [@Override](https://my.oschina.net/u/1162528)
        protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
            System.out.println();
            System.out.println("长度:" + in.limit());
            System.out.println();
            IoBuffer ioBuffer = getContext(session).getIoBuffer();
            //IoBuffer ioBuffer = IoBuffer.allocate(100).setAutoExpand(true);

            while (in.hasRemaining()) {
                byte b = in.get();
                ioBuffer.put(b);
                System.out.print("【"+b+"】");

                if (b != 0x03 && b != 0x04) {
                    continue;
                }

                byte[] array = ioBuffer.array();

                if (array[0] != 0x02) {
                    System.out.println();
                    System.out.println("=======================================");
                    System.out.println("未知包" + ByteUtil.toHexStr(array));
                    System.out.println("=======================================");
                    throw new RuntimeException("================未知包================");
                }

                array = Arrays.copyOfRange(array, 1, ioBuffer.position() - 2);
                int temp1 = array.length / 32;
                int temp2 = array.length % 32;
                for (int i = 0; i < temp1; i++) {
                    for (int j = 0; j < 32; j++) {
                        array[i * 32 + j] ^= key[j];
                    }
                }
                for (int j = 0; j < temp2; j++) {
                    array[temp1 * 32 + j] ^= key[j];
                }

                System.out.println("");
                System.out.println("=======================================");
                System.out.println("解析到包:" + new String(array));
                System.out.println("=======================================");

                out.write(new String(array));
                ioBuffer.clear();
                System.out.println("返回true");
                return true;
            }

            System.out.println("返回false");
            return false;
        }

        private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");

        public Context getContext(IoSession session) {
            Context ctx = (Context) session.getAttribute(CONTEXT);
            if (ctx == null) {
                ctx = new Context();
                session.setAttribute(CONTEXT, ctx);
            }
            return ctx;
        }

        class Context {
            private IoBuffer ioBuffer = IoBuffer.allocate(100).setAutoExpand(true);

            public IoBuffer getIoBuffer() {
                return ioBuffer;
            }
        }
    }));
    acceptor.setHandler(new IoHandlerAdapter() {

        [@Override](https://my.oschina.net/u/1162528)
        public void messageReceived(IoSession session, Object message) throws Exception {
            super.messageReceived(session, message);
        }
    });

    acceptor.bind(new InetSocketAddress(3333));
}

} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 打印如下

长度:53

【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】

解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7

返回true

长度:256

【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】

解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7

返回true

长度:256

【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】

解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7

返回true

长度:256

【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】

解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7

返回true

长度:256

【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】

解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7

返回true

长度:256

【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】返回false

长度:433

【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】

未知包BD,C5,67,F8,D4,E9,3D,00,03,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00,00

长度:53

【2】【-35】【-65】【40】【124】【37】【50】【-99】【-18】【-8】【-80】【-95】【-71】【-101】【53】【-24】【-100】【-70】【110】【-51】【-44】【120】【102】【-43】【-41】【127】【50】【-32】【20】【62】【-7】【-85】【83】【-52】【-65】【100】【119】【35】【54】【-45】【-13】【-29】【-92】【-4】【-67】【-59】【103】【-8】【-44】【-23】【61】【0】【3】

解析到包:device:ce41a05d id:0 cmd:408 value:beat~~ length:7

返回true 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 解释 插入网线后,不打印日志。(模拟的,有时是马上就打印) 大约20秒后,瞬间收到5、60行日志。

一个心跳包已0x02开头、已0x03结尾,长度为53。

日志第2行:长度为53,表示这个包正常

日志第10行:长度为256,说明IoBuffer in一次从缓存读取256个字节,是4个心跳包加半个心跳包的长度。 代码: 41行,每次取一个字节 42行,保存字节到ioBuffer 45行,如果不是取到一个完整的包,继续保存字节到ioBuffer 60行,解码数据 79行,返回true 上级代码

protected CumulativeProtocolDecoder() { } public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (!session.getTransportMetadata().hasFragmentation()) { while (in.hasRemaining()) { /** 当前读取到in的53的位置,还有剩余字节(共256),并且doDecode返回的true,则继续执行doDecode。CumulativeProtocolDecoder.decode(..)的上层为同步,所以这个没循环完,就算缓存还有新数据,也需要等待decode方法执行完才可以。上层代码: synchronized (session) { decoder.decode(session, in, decoderOut); } */ if (!doDecode(session, in, out)) { break; } } return; }

    boolean usingSessionBuffer = true;
    IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 日志第18行: 从in的256的53位置继续读取53个字节,继续返回true

日志第26行: 从in的256的106位置继续读取53个字节,继续返回true

日志第34行: 从in的256的159位置继续读取53个字节,继续返回true

日志第34行: 从in的256的212位置继续读取53个字节,继续返回true

日志第42行: 从in的256的212位置继续读取44个字节,返回false CumulativeProtocolDecoder() { } public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (!session.getTransportMetadata().hasFragmentation()) { while (in.hasRemaining()) { /** 返回false,表示本方法执行完毕。如果有新包则在执行。 */ if (!doDecode(session, in, out)) { break; } } return; }

日志第42行: 本次长度433(上次心跳包的9个字节+新的8个心跳包的长度) test.java.38行又重新创建的IoBuffer,通过0x03截取的时候,取到的是9个字节的长度。所以需要把IoBuffer放入session中,就可以连接上次取到的包加本次的9个字节,组合成一个完成的包。 ———————————————— 版权声明:本文为CSDN博主「笨人_」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/qq1247/article/details/78125289