From bbf4a47ed34eeca8fd92f9e884a438973beea2d9 Mon Sep 17 00:00:00 2001 From: eson Date: Tue, 7 Jun 2022 18:23:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=88=90=E5=8A=9F=E8=A7=A3=E6=9E=90=E7=AC=AC?= =?UTF-8?q?=E4=B8=80=E4=B8=AA=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataflow/.proto/decode/AppFlow.java | 101 ------- .../.proto/decode/BacktrackingFlow.java | 98 ------- .../.proto/decode/BasicTrafficFlow.java | 82 ------ .../dataflow/.proto/decode/BussFlowOrl.java | 111 -------- .../dataflow/.proto/decode/DataFlow.java | 160 ----------- .../dataflow/.proto/decode/PacketBase.java | 27 -- .../dataflow/.proto/decode/PacketHeader.java | 55 ---- .../java/com/yuandian/dataflow/Server.java | 1 - .../com/yuandian/dataflow/master/Header.java | 55 ++-- .../decode/ApmBaseDataFlow.java | 8 +- .../dataflow/proto/decode/AppFlow.java | 100 +++++++ .../proto/decode/BacktrackingFlow.java | 98 +++++++ .../proto/decode/BasicTrafficFlow.java | 73 +++++ .../decode/BusinessBodyData.java | 0 .../{.proto => proto}/decode/BussFlowDb.java | 211 +++++++------- .../decode/BussFlowExternal.java | 115 ++++---- .../decode/BussFlowMidd.java | 211 +++++++------- .../dataflow/proto/decode/BussFlowOrl.java | 111 ++++++++ .../{.proto => proto}/decode/BussFlowWeb.java | 263 +++++++++--------- .../decode/DataBaseModel.java | 78 +++--- .../dataflow/proto/decode/DataFlow.java | 112 ++++++++ .../dataflow/proto/decode/PacketBase.java | 80 ++++++ .../{utils => proto/decode}/PacketHeader.java | 2 +- .../{.proto => proto}/decode/QoeFlow.java | 152 +++++----- .../{.proto => proto}/decode/SstFlow.java | 75 ++--- .../yuandian/dataflow/proto/decode/utils.java | 22 ++ 26 files changed, 1156 insertions(+), 1245 deletions(-) delete mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java delete mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java delete mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java delete mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java delete mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java delete mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java delete mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/ApmBaseDataFlow.java (96%) create mode 100644 src/main/java/com/yuandian/dataflow/proto/decode/AppFlow.java create mode 100644 src/main/java/com/yuandian/dataflow/proto/decode/BacktrackingFlow.java create mode 100644 src/main/java/com/yuandian/dataflow/proto/decode/BasicTrafficFlow.java rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/BusinessBodyData.java (100%) rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/BussFlowDb.java (50%) rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/BussFlowExternal.java (51%) rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/BussFlowMidd.java (50%) create mode 100644 src/main/java/com/yuandian/dataflow/proto/decode/BussFlowOrl.java rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/BussFlowWeb.java (54%) rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/DataBaseModel.java (85%) create mode 100644 src/main/java/com/yuandian/dataflow/proto/decode/DataFlow.java create mode 100644 src/main/java/com/yuandian/dataflow/proto/decode/PacketBase.java rename src/main/java/com/yuandian/dataflow/{utils => proto/decode}/PacketHeader.java (97%) rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/QoeFlow.java (50%) rename src/main/java/com/yuandian/dataflow/{.proto => proto}/decode/SstFlow.java (59%) create mode 100644 src/main/java/com/yuandian/dataflow/proto/decode/utils.java diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java deleted file mode 100644 index ee96ef9..0000000 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.yuandian.dataflow.proto.decode; -import java.nio.ByteBuffer; - - -public class AppFlow extends PacketBase { - public static int SIZE = 104; - public long srcIp; - public int srcPort; - public long dstIp; - public int dstPort; - public long startTvSec; - public long startTvUsec; - public long lastTvSec; - public long lastTvUsec; - public long endTvSec; - public long endTvUsec; - public int inputPackets; - public int outputPackets; - public int inputBytes; - public int outputBytes; - public String protocaol; - public int appId; - public int appGroupId; - public int probeIf; - public int appStyle; - public long timeFlag; - public int vlanId; - public int mplsLable; - public int tos; - - - @Override - public PacketBase Parse(PacketHeader header,ByteBuffer data) - throws Exception { - AppFlow nlf = new AppFlow(); - nlf.srcIp = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.srcPort = ByteUtil.getInteger(data, offset, 2); - offset += 2; - nlf.dstIp = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.dstPort = ByteUtil.getInteger(data, offset, 2); - offset += 2; - nlf.startTvSec = ByteUtil.getLong(data, offset, 4); - offset += 4; - nlf.startTvUsec = ByteUtil.getLong(data, offset, 4); - offset += 4; - nlf.lastTvSec = ByteUtil.getLong(data, offset, 4); - offset += 4; - nlf.lastTvUsec = ByteUtil.getLong(data, offset, 4); - offset += 4; - nlf.endTvSec = ByteUtil.getLong(data, offset, 4); - offset += 4; - nlf.endTvUsec = ByteUtil.getLong(data, offset, 4); - offset += 4; - nlf.inputPackets = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.outputPackets = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.inputBytes = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.outputBytes = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.protocaol = ByteUtil.getString(data, offset, 20); - offset += 20; - nlf.appId = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.appGroupId = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.probeIf = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.appStyle = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.timeFlag = ByteUtil.getLong(data, offset, 4); - offset += 4; - nlf.vlanId = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.mplsLable = ByteUtil.getInteger(data, offset, 4); - offset += 4; - nlf.tos = ByteUtil.getInteger(data, offset, 4); - offset += 4; - - return nlf; - } - - @Override - public int getUnitPacketLength() { - return 0; - } - - @Override - public String[] getData() { - return null; - } - - @Override - public int getInterfaceNumber() { - return 0; - } - -} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java deleted file mode 100644 index fd7757b..0000000 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.yuandian.dataflow.proto.decode; - - - -/** - * @author User - * - */ -public class BacktrackingFlow extends PacketBase{ - public static int SIZE = 108; - //tuple StatisTuple10 56 10元组信息 - private long macSrc; //源MAC 8 - private long macDst; //目的MAC 8 - private long ipSrc; //源IP 8 - private long ipDst; //目的IP 8 - private long portSrc; //源端口,如果没有,为-1 - private long portDst; //目标端口,如果没有,为-1 - private int l3Proto; //第三层协议ID,如果没有,为-1 - private int l4Proto; //第四层协议ID,如果没有,为-1 - private int tos; //Tos,如果没有,为-1 - private int vlanId; //vlan ID,如果没有,为-1 - - private long bytes; // 8 字节总数 - private long packets; // 8 数据包总数 - private long tcpSp; // 8 tcp同步包数 - private long tcpScpn;// 8 tcp同步确认包数 - private long tcpSrp; // 8 tcp同步重置包数 - private long appId; // 4 appID - private long appGroupId;// 4 app组ID - private long mplsLabel;// 4 - - - - - - @Override - public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { - BacktrackingFlow backFlow = new BacktrackingFlow(); - backFlow.macSrc = ByteUtil.getLong(data, offset, 8); //源MAC 8 - offset+=8; - backFlow.macDst = ByteUtil.getLong(data, offset, 8); //目的MAC 8 - offset+=8; - backFlow.ipSrc = ByteUtil.getLong(data, offset, 8); //源IP 8 - offset+=8; - backFlow.ipDst = ByteUtil.getLong(data, offset, 8); //目的IP 8 - offset+=8; - backFlow.portSrc = ByteUtil.getInteger(data, offset, 4); //源端口,如果没有,为-1 - offset+=4; - backFlow.portDst = ByteUtil.getInteger(data, offset, 4); //目标端口,如果没有,为-1 - offset+=4; - backFlow.l3Proto = ByteUtil.getInteger(data, offset, 4); //第三层协议ID,如果没有,为-1 - offset+=4; - backFlow.l4Proto = ByteUtil.getInteger(data, offset, 4); //第四层协议ID,如果没有,为-1 - offset+=4; - backFlow.tos = ByteUtil.getInteger(data, offset, 4); //Tos,如果没有,为-1 - offset+=4; - backFlow.vlanId = ByteUtil.getInteger(data, offset, 4); //vlan ID,如果没有,为-1 - offset+=4; - - backFlow.bytes = ByteUtil.getLong(data, offset, 8); // 8 字节总数 - offset+=8; - backFlow.packets = ByteUtil.getLong(data, offset, 8); // 8 数据包总数 - offset+=8; - backFlow.tcpSp = ByteUtil.getLong(data, offset, 8); // 8 tcp同步包数 - offset+=8; - backFlow.tcpScpn = ByteUtil.getLong(data, offset, 8);// 8 tcp同步确认包数 - offset+=8; - backFlow.tcpSrp = ByteUtil.getLong(data, offset, 8); // 8 tcp同步重置包数 - offset+=8; - backFlow.appId = ByteUtil.getInteger(data, offset, 4); // 4 appID - offset+=4; - backFlow.appGroupId = ByteUtil.getInteger(data, offset, 4);// 4 app组ID - offset+=4; - backFlow.mplsLabel = ByteUtil.getInteger(data, offset, 4);// 4 - offset+=4; - - return backFlow; - } - - @Override - public int getUnitPacketLength() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public String[] getData() { - // TODO Auto-generated method stub - return null; - } - - @Override - public int getInterfaceNumber() { - // TODO Auto-generated method stub - return 0; - } - -} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java deleted file mode 100644 index 4064dc1..0000000 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.yuandian.dataflow.proto.decode; - - - -public class BasicTrafficFlow extends PacketBase { - public static int SIZE = 56; - public long capPort; - public int requestPort; - public int responsePort; - public long requestIp; - public long responseIp; - public long startTime; - public long totalBytes; - public long totalPackets; - public long spackets64; - public long spackets128; - public long spackets256; - public long spackets512; - public long spackets1024; - public long spackets; - public long sendTime; - - @Override - public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { - BasicTrafficFlow btf = new BasicTrafficFlow(); - btf.setPacketHeader(header); - - btf.capPort = ByteUtil.getLong(data, offset, 4); - offset += 4; - -// btf.requestPort = ByteUtil.getInteger(data, offset, 2); - offset += 2; -// btf.responsePort = ByteUtil.getInteger(data, offset, 2); - offset += 2; -// btf.requestIp = ByteUtil.getLong(data, offset, 4); - offset += 4; -// btf.responseIp = ByteUtil.getLong(data, offset, 4); - offset += 4; - - btf.startTime = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.totalBytes = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.totalPackets = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.spackets64 = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.spackets128 = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.spackets256 = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.spackets512 = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.spackets1024 = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.spackets = ByteUtil.getLong(data, offset, 4); - offset += 4; - btf.sendTime = ByteUtil.getLong(data, offset, 4); - offset += 4; - - return btf; - } - @Override - public String[] getData() { - // TODO Auto-generated method stub - return null; - } - @Override - public int getInterfaceNumber() { - // TODO Auto-generated method stub - return 0; - } - @Override - public int getUnitPacketLength() { - // TODO Auto-generated method stub - return 0; - } - - - - -} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java deleted file mode 100644 index b41a7db..0000000 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java +++ /dev/null @@ -1,111 +0,0 @@ -package com.yuandian.dataflow.proto.decode; - - - -public class BussFlowOrl extends PacketBase { - - public int msg_type; - public int msg_version; - public int msg_seq; - public int msg_len; - public long request_mac; - public long response_mac; - public long request_ip; - public int request_port; - public long response_ip; - public int response_port; - public int probeif; - public int protocol; - public long start_tv_sec;//Web开始时间秒 - public long start_tv_usec;//开始时间毫秒 - public long end_tv_sec;//结束时间秒 - public long end_tv_usec;//结束时间微妙 - public int req_len; - public int res_len; - public int busi_msg_len; - public int key_msg_len; - public int detail_msg_len; - public int remain_len; - public String business_code; - public String sessionid; - public String req_data; - public String res_data; - public String busi_msg; - public String busi_key_msg; - public String busi_detail_msg; - public String remain_data; - - @Override - public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { - BussFlowOrl orl = new BussFlowOrl(); - orl.msg_type = header.getNow_type(); - orl.request_mac = ByteUtil.getLong(data, offset, 8); - offset += 8; - orl.response_mac = ByteUtil.getLong(data, offset, 8); - offset += 8; - orl.request_ip = ByteUtil.getLong(data, offset, 4); - offset += 4; - orl.request_port = ByteUtil.getInteger(data, offset, 4); - offset += 4; - orl.response_ip = ByteUtil.getLong(data, offset, 4); - offset += 4; - orl.response_port = ByteUtil.getInteger(data, offset, 4); - offset += 4; - orl.probeif = ByteUtil.getInteger(data, offset, 4); - offset += 4; - orl.protocol = ByteUtil.getInteger(data, offset, 4); - offset += 4; - orl.start_tv_sec = ByteUtil.getLong(data, offset, 4); - offset += 4; - orl.start_tv_usec = ByteUtil.getLong(data, offset, 4); - offset += 4; - orl.end_tv_sec = ByteUtil.getLong(data, offset, 4); - offset += 4; - orl.end_tv_usec = ByteUtil.getLong(data, offset, 4); - offset += 4; - int req_len = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int res_len = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int busi_msg_len = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int key_msg_len = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int detail_msg_len = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int remain_len = ByteUtil.getInteger(data, offset, 4); - offset += 4; - orl.business_code = ByteUtil.getString(data, offset, 32); - offset += 32; - orl.sessionid = ByteUtil.getString(data, offset, 80); - offset += 80; - orl.req_data = ByteUtil.getString(data, offset, req_len); - offset += req_len; - orl.res_data = ByteUtil.getString(data, offset, res_len); - offset += res_len; - orl.busi_msg = ByteUtil.getString(data, offset, busi_msg_len); - offset += busi_msg_len; - orl.busi_key_msg = ByteUtil.getString(data, offset, key_msg_len); - offset += key_msg_len; - orl.busi_detail_msg = ByteUtil.getString(data, offset, detail_msg_len); - offset += detail_msg_len; - orl.remain_data = ByteUtil.getString(data, offset, remain_len); - offset += remain_len; - return orl; - } - - @Override - public int getUnitPacketLength() { - return 0; - } - - @Override - public String[] getData() { - return new String[0]; - } - - @Override - public int getInterfaceNumber() { - return 0; - } -} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java deleted file mode 100644 index 9dffdc2..0000000 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java +++ /dev/null @@ -1,160 +0,0 @@ -package com.yuandian.dataflow.proto.decode; - - - -public class DataFlow extends PacketBase { - private final int UNIT_DATA_LENGTH = 157;//贵阳 - public String id; - - public Integer msg_type; //消息类型 - public Integer msg_version; //消息版本 - public Integer msg_seq; //序列号 - public Integer msg_len; //消息长度 - public Integer probe_if; //接口号 - public Long timestamp; //时间戳 - public Long mac_src; //源物理地址 - public Long mac_dst; //目的物理地址 - public Integer vlan_id; //vlan_id - public Long l3_proto; //l3层协议 - public Long l4_proto; //l4层协议 - public Integer tos; //tos - public Integer retran_count; //重传次数 - public Integer reset_count; //重置次数 - public Integer zerowin_count; //零窗口次数 - public Integer protocol; //协议名 - public Long seq; - public Long ack; - public Integer recog_status; //识别类型标识 - public Long bytes; //总字节 - public Long packets; //总包数 - public Integer start_tv_sec;//Web开始时间秒 - public Long start_tv_usec;//开始时间毫秒 - public Integer end_tv_sec;//结束时间秒 - public Long end_tv_usec;//结束时间微妙 - public Integer server_start_tv_sec;//服务器响应开始时间秒 - public Long server_start_tv_usec;//服务器响应开始时间毫秒 - public Integer server_end_tv_sec;//服务器响应结束时间秒 - public Long server_end_tv_usec;//服务器响应结束时间微妙 - - public Long server_response_time;//Web服务器响应时间 - public Long client_translate_time;//Web客户端传输耗时 - public Long server_translate_time;//Web服务器传输耗时 - - public Long bytes_in; - public Long bytes_out; - public Long packets_in; - public Long packets_out; - public Long ip_src; //源IP - public Long ip_dst; //目的IP - public Long port_src; //源端口 - public Long port_dst; //目的端口 - public Long probeIP; //探针IP - - public Long intodb_time; - public Long count = 1L; - - - - @Override - public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { - if (offset + UNIT_DATA_LENGTH > data.length) - throw new Exception("data length error!"); - DataFlow dflow = new DataFlow(); -// dflow.m_Header = header; - - dflow.msg_type = header.getTableID(); -// offset += 4; -// dflow.msg_version = ByteUtil.getInteger(data, offset, 1); -// offset += 1; -// dflow.msg_seq = ByteUtil.getInteger(data, offset, 4); -// offset += 4; -// dflow.msg_len = ByteUtil.getInteger(data, offset, 4); -// offset += 4; - dflow.ip_src = ByteUtil.getLong(data, offset, 4); - offset += 4; - dflow.ip_dst = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.port_src = ByteUtil.getLong(data, offset, 2); - offset += 2; -// dflow.port_dst = ByteUtil.getLong(data, offset, 2); - offset += 2; - dflow.probe_if = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.timestamp = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.mac_src = ByteUtil.getLong(data, offset, 8); - offset += 8; -// dflow.mac_dst = ByteUtil.getLong(data, offset, 8); - offset += 8; -// dflow.vlan_id = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.l3_proto = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.l4_proto = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.tos = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.retran_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.reset_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.zerowin_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.protocol=ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.seq = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.ack = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.recog_status = ByteUtil.getInteger(data, offset, 4); - offset += 4; - dflow.bytes_in = ByteUtil.getLong(data, offset, 4); - offset += 4; - dflow.bytes_out = ByteUtil.getLong(data, offset, 4); - offset += 4; - dflow.packets_in = ByteUtil.getLong(data, offset, 4); - offset += 4; - dflow.packets_out = ByteUtil.getLong(data, offset, 4); - offset += 4; - dflow.bytes = ByteUtil.getLong(data, offset, 4); - offset += 4; - dflow.packets = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.start_tv_sec = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.start_tv_usec = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.end_tv_sec = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.end_tv_usec = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.server_start_tv_sec = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.server_start_tv_usec = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.server_end_tv_sec = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// dflow.server_end_tv_usec = ByteUtil.getLong(data, offset, 4); - offset += 4; -// dflow.server_translate_time = ByteUtil.getLong(data, offset, 8); - offset += 8; -// dflow.client_translate_time = ByteUtil.getLong(data, offset, 8); - offset += 8; -// dflow.server_response_time = ByteUtil.getLong(data, offset, 8); - offset += 8; - return dflow; - } - @Override - public int getUnitPacketLength() { - return UNIT_DATA_LENGTH; - } - @Override - public String[] getData() { - String data[] = new String[45]; - return data; - } - @Override - public int getInterfaceNumber() { - return Integer.parseInt(probe_if.toString()); - } -} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java deleted file mode 100644 index 3eb2d91..0000000 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.yuandian.dataflow.proto.decode; -import java.nio.ByteBuffer; - -public abstract class PacketBase { - - protected PacketHeader m_Header = null; - - public PacketHeader getPacketHead(){ - return m_Header; - } - - public void setPacketHeader(PacketHeader header) - { - this.m_Header = header; - } - - //抽象方法,每个数据操作类实现不一样 - public abstract PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception; - - public abstract int getUnitPacketLength(); - - public abstract String[] getData(); - - public abstract int getInterfaceNumber(); - - -} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java deleted file mode 100644 index 56f2a4d..0000000 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.yuandian.dataflow.proto.decode; - - - -public class PacketHeader { - - private int m_TableID = -1; //数据类型 - private int m_RecCount = 0; //数据总条数 - private int msg_len; //数据报文总长度,60010端口为压缩后长度 - private long timestamp; //60010端口发送数据时间 - private int probe_if; //60010端口抓包口 - private int umcompr_len;//60010端口数据报文压缩前长度 - private int now_type; //记录25类型数据数据当前数据类型 22 23 24 - - public int getTableID() { - return m_TableID; - } - public int getMsg_len() { - return msg_len; - } - public int getRecCount() { - return m_RecCount; - } - public long getTimestamp() { - return timestamp ; - } - public int getProbe_id(){ - return probe_if; - } - public int getUmcompr_len(){ - return umcompr_len; - } - - public PacketHeader(byte[] data) throws Exception { - if (data.length != 12 && data.length != 20) throw new Exception("Packet header length error!"); - this.m_TableID = ByteUtil.getInteger(data, 0, 4);//22 - this.m_RecCount = ByteUtil.getInteger(data, 4, 4);//4000 - this.msg_len = ByteUtil.getInteger(data, 8, 4); - } - - public void parseNextHeader_60010(byte[] data) throws Exception { - if (data.length != 12 && data.length != 20) throw new Exception("Packet header length error!"); - this.timestamp = ByteUtil.getLong(data, 0, 4); - this.probe_if = ByteUtil.getInteger(data, 4, 4); - this.umcompr_len = ByteUtil.getInteger(data, 8, 4); - } - - public void setNow_type(int now_type) { - this.now_type = now_type; - } - - public int getNow_type(){ - return now_type; - } -} diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index d88f01a..ccd1bc5 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -76,7 +76,6 @@ public class Server { String groupId = "jraft"; - Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); PeerId serverId = JRaftUtils.getPeerId(peeridstr); diff --git a/src/main/java/com/yuandian/dataflow/master/Header.java b/src/main/java/com/yuandian/dataflow/master/Header.java index ec0aa64..2f2b9f6 100644 --- a/src/main/java/com/yuandian/dataflow/master/Header.java +++ b/src/main/java/com/yuandian/dataflow/master/Header.java @@ -13,7 +13,8 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; -import com.yuandian.dataflow.utils.PacketHeader; +import com.yuandian.dataflow.proto.decode.PacketBase; +import com.yuandian.dataflow.proto.decode.PacketHeader; import lombok.Cleanup; import lombok.Getter; @@ -28,45 +29,23 @@ import lombok.extern.slf4j.Slf4j; public class Header { public static void main(String[] args) throws Exception { - ArrayList threads = new ArrayList() ; - - for(int i = 0 ; i < 100 ; i++) { - Thread thread = new Thread(() -> { - try { - var addr = new InetSocketAddress("192.168.1.248", 60001); - @Cleanup - var sock = new Socket(); - sock.setReceiveBufferSize(10 * 1024 * 1024);// socket接收缓存 - sock.setSoTimeout(1000 * 30); - // 设置超时 - sock.connect(addr, 10 * 1000); - var in = new DataInputStream(sock.getInputStream()); - var out = new DataOutputStream(sock.getOutputStream()); - // 发送验证字符串 - out.write("public".getBytes()); - // var buf = ByteBuffer.wrap( in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN); - log.error("{}", PacketHeader.PacketCode(in)); - var pheader = new PacketHeader(in); + var addr = new InetSocketAddress("192.168.1.248", 60001); + @Cleanup + var sock = new Socket(); + sock.setReceiveBufferSize(10 * 1024 * 1024);// socket接收缓存 + sock.setSoTimeout(1000 * 30); + // 设置超时 + sock.connect(addr, 10 * 1000); + var in = new DataInputStream(sock.getInputStream()); + var out = new DataOutputStream(sock.getOutputStream()); + // 发送验证字符串 + out.write("public".getBytes()); + log.error("{}", PacketHeader.PacketCode(in)); + var pheader = new PacketHeader(in); - // buf = ByteBuffer.wrap(in.readNBytes(12)).order(ByteOrder.LITTLE_ENDIAN); - log.error("{}", pheader); - } catch (Exception e) { - e.printStackTrace(); - } - }) ; - threads.add(thread); - thread.start(); - } - - threads.forEach((t)->{ - try { - t.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - + log.error("{}", pheader); + log.error("{}",PacketBase.createPacketBase(pheader)); } } diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java b/src/main/java/com/yuandian/dataflow/proto/decode/ApmBaseDataFlow.java similarity index 96% rename from src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java rename to src/main/java/com/yuandian/dataflow/proto/decode/ApmBaseDataFlow.java index def7133..f779ce2 100644 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/ApmBaseDataFlow.java @@ -1,7 +1,9 @@ package com.yuandian.dataflow.proto.decode; +import java.io.BufferedReader; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,7 +179,8 @@ public class ApmBaseDataFlow extends PacketBase { @Override public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { - ApmBaseDataFlow oneData = new ApmBaseDataFlow(); + ApmBaseDataFlow oneData = new ApmBaseDataFlow(); + oneData.setPacketHeader(header); oneData.probeIf = data.getLong(4); @@ -263,8 +266,7 @@ public class ApmBaseDataFlow extends PacketBase { oneData.scAlert = data.getLong(4); - - oneData.protocal = new String(data.array()); + oneData.protocal = data.slice().toString() ; return oneData; } diff --git a/src/main/java/com/yuandian/dataflow/proto/decode/AppFlow.java b/src/main/java/com/yuandian/dataflow/proto/decode/AppFlow.java new file mode 100644 index 0000000..050302c --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/proto/decode/AppFlow.java @@ -0,0 +1,100 @@ +package com.yuandian.dataflow.proto.decode; +import java.nio.ByteBuffer; + + +public class AppFlow extends PacketBase { + public static int SIZE = 104; + public long srcIp; + public int srcPort; + public long dstIp; + public int dstPort; + public long startTvSec; + public long startTvUsec; + public long lastTvSec; + public long lastTvUsec; + public long endTvSec; + public long endTvUsec; + public int inputPackets; + public int outputPackets; + public int inputBytes; + public int outputBytes; + public String protocaol; + public int appId; + public int appGroupId; + public int probeIf; + public int appStyle; + public long timeFlag; + public int vlanId; + public int mplsLable; + public int tos; + + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) + throws Exception { + AppFlow nlf = new AppFlow(); + nlf.srcIp = data.getInt(4); + + nlf.srcPort = data.getInt(2); + + nlf.dstIp = data.getInt(4); + + nlf.dstPort = data.getInt(2); + + nlf.startTvSec = data.getLong(4); + + nlf.startTvUsec = data.getLong(4); + + nlf.lastTvSec = data.getLong(4); + + nlf.lastTvUsec = data.getLong(4); + + nlf.endTvSec = data.getLong(4); + + nlf.endTvUsec = data.getLong(4); + + nlf.inputPackets = data.getInt(4); + + nlf.outputPackets = data.getInt(4); + + nlf.inputBytes = data.getInt(4); + + nlf.outputBytes = data.getInt(4); + + nlf.protocaol = data.alignedSlice(20).toString(); + + nlf.appId = data.getInt(4); + + nlf.appGroupId = data.getInt(4); + + nlf.probeIf = data.getInt(4); + + nlf.appStyle = data.getInt(4); + + nlf.timeFlag = data.getLong(4); + + nlf.vlanId = data.getInt(4); + + nlf.mplsLable = data.getInt(4); + + nlf.tos = data.getInt(4); + + return nlf; + } + + @Override + public int getUnitPacketLength() { + return 0; + } + + @Override + public String[] getData() { + return null; + } + + @Override + public int getInterfaceNumber() { + return 0; + } + +} diff --git a/src/main/java/com/yuandian/dataflow/proto/decode/BacktrackingFlow.java b/src/main/java/com/yuandian/dataflow/proto/decode/BacktrackingFlow.java new file mode 100644 index 0000000..83aa13d --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/proto/decode/BacktrackingFlow.java @@ -0,0 +1,98 @@ +package com.yuandian.dataflow.proto.decode; + +import java.nio.ByteBuffer; + +/** + * @author User + * + */ +public class BacktrackingFlow extends PacketBase{ + public static int SIZE = 108; + //tuple StatisTuple10 56 10元组信息 + private long macSrc; //源MAC 8 + private long macDst; //目的MAC 8 + private long ipSrc; //源IP 8 + private long ipDst; //目的IP 8 + private long portSrc; //源端口,如果没有,为-1 + private long portDst; //目标端口,如果没有,为-1 + private int l3Proto; //第三层协议ID,如果没有,为-1 + private int l4Proto; //第四层协议ID,如果没有,为-1 + private int tos; //Tos,如果没有,为-1 + private int vlanId; //vlan ID,如果没有,为-1 + + private long bytes; // 8 字节总数 + private long packets; // 8 数据包总数 + private long tcpSp; // 8 tcp同步包数 + private long tcpScpn;// 8 tcp同步确认包数 + private long tcpSrp; // 8 tcp同步重置包数 + private long appId; // 4 appID + private long appGroupId;// 4 app组ID + private long mplsLabel;// 4 + + + + + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + BacktrackingFlow backFlow = new BacktrackingFlow(); + backFlow.macSrc = data.getLong(8); //源MAC 8 + + backFlow.macDst = data.getLong(8); //目的MAC 8 + + backFlow.ipSrc = data.getLong(8); //源IP 8 + + backFlow.ipDst = data.getLong(8); //目的IP 8 + + backFlow.portSrc = data.getInt(4); //源端口,如果没有,为-1 + + backFlow.portDst = data.getInt(4); //目标端口,如果没有,为-1 + + backFlow.l3Proto = data.getInt(4); //第三层协议ID,如果没有,为-1 + + backFlow.l4Proto = data.getInt(4); //第四层协议ID,如果没有,为-1 + + backFlow.tos = data.getInt(4); //Tos,如果没有,为-1 + + backFlow.vlanId = data.getInt(4); //vlan ID,如果没有,为-1 + + + backFlow.bytes = data.getLong(8); // 8 字节总数 + + backFlow.packets = data.getLong(8); // 8 数据包总数 + + backFlow.tcpSp = data.getLong(8); // 8 tcp同步包数 + + backFlow.tcpScpn = data.getLong(8);// 8 tcp同步确认包数 + + backFlow.tcpSrp = data.getLong(8); // 8 tcp同步重置包数 + + backFlow.appId = data.getInt(4); // 4 appID + + backFlow.appGroupId = data.getInt(4);// 4 app组ID + + backFlow.mplsLabel = data.getInt(4);// 4 + + + return backFlow; + } + + @Override + public int getUnitPacketLength() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public String[] getData() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getInterfaceNumber() { + // TODO Auto-generated method stub + return 0; + } + +} diff --git a/src/main/java/com/yuandian/dataflow/proto/decode/BasicTrafficFlow.java b/src/main/java/com/yuandian/dataflow/proto/decode/BasicTrafficFlow.java new file mode 100644 index 0000000..60de0c3 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/proto/decode/BasicTrafficFlow.java @@ -0,0 +1,73 @@ +package com.yuandian.dataflow.proto.decode; + +import java.nio.ByteBuffer; + +public class BasicTrafficFlow extends PacketBase { + public static int SIZE = 56; + public long capPort; + public int requestPort; + public int responsePort; + public long requestIp; + public long responseIp; + public long startTime; + public long totalBytes; + public long totalPackets; + public long spackets64; + public long spackets128; + public long spackets256; + public long spackets512; + public long spackets1024; + public long spackets; + public long sendTime; + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + BasicTrafficFlow btf = new BasicTrafficFlow(); + btf.setPacketHeader(header); + + btf.capPort = data.getLong(4); + + data.position(data.position() + 12); + + btf.startTime = data.getLong(4); + + btf.totalBytes = data.getLong(4); + + btf.totalPackets = data.getLong(4); + + btf.spackets64 = data.getLong(4); + + btf.spackets128 = data.getLong(4); + + btf.spackets256 = data.getLong(4); + + btf.spackets512 = data.getLong(4); + + btf.spackets1024 = data.getLong(4); + + btf.spackets = data.getLong(4); + + btf.sendTime = data.getLong(4); + + return btf; + } + @Override + public String[] getData() { + // TODO Auto-generated method stub + return null; + } + @Override + public int getInterfaceNumber() { + // TODO Auto-generated method stub + return 0; + } + @Override + public int getUnitPacketLength() { + // TODO Auto-generated method stub + return 0; + } + + + + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java b/src/main/java/com/yuandian/dataflow/proto/decode/BusinessBodyData.java similarity index 100% rename from src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java rename to src/main/java/com/yuandian/dataflow/proto/decode/BusinessBodyData.java diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowDb.java similarity index 50% rename from src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java rename to src/main/java/com/yuandian/dataflow/proto/decode/BussFlowDb.java index 21731ec..f21b54c 100644 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowDb.java @@ -1,6 +1,8 @@ package com.yuandian.dataflow.proto.decode; -import com.ud.module.common.SystemConfigManager; +import java.nio.ByteBuffer; + +// import com.ud.module.common.SystemConfigManager; @@ -82,129 +84,108 @@ public class BussFlowDb extends PacketBase { // buss.m_Header = header; buss.msg_type = header.getTableID(); -// offset += 4; -// buss.msg_version = ByteUtil.getInteger(data, offset, 1); +// +// buss.msg_version = data.getInt(1); // offset += 1; -// buss.msg_seq = ByteUtil.getInteger(data, offset, 4); -// offset += 4; +// buss.msg_seq = data.getInt(4); +// // buss.msg_len = header.getMsg_len(); -// offset += 4; +// // UNIT_DATA_LENGTH=msg_len; - buss.src_mac = ByteUtil.getLong(data, offset, 8); - offset += 8; - buss.dst_mac = ByteUtil.getLong(data, offset, 8); - if(SystemConfigManager.macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){ + buss.src_mac = data.getLong(8); + + buss.dst_mac = data.getLong(8); + + boolean macFilterFlag = false; + if(macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){ return null; } - offset += 8; -// buss.vlan_id = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.tos = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.retran_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.reset_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.protocol=ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.bytes_out = ByteUtil.getLong(data, offset, 4); - offset += 4; - buss.bytes_in = ByteUtil.getLong(data, offset, 4); - offset += 4; - buss.package_out = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.package_in = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.session_serial_number = ByteUtil.getString(data, offset, 24); - offset += 24; -// buss.recog_status = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.probe_if=ByteUtil.getInteger(data, offset,4); - offset += 4; -// buss.channel=ByteUtil.getString(data, offset, 24); - offset+=24; - buss.name = ByteUtil.getString(data, offset, 64); - offset+=64; + + data.position(data.position() + 28); + + buss.protocol= data.getInt(4); - buss.request_ip=ByteUtil.getLong(data, offset,4); - offset += 4; - buss.request_port=ByteUtil.getInteger(data, offset,4); - offset += 4; - buss.response_ip=ByteUtil.getLong(data, offset,4); - offset += 4; - buss.response_port=ByteUtil.getInteger(data, offset,4); - offset += 4; -// buss.deal_state=ByteUtil.getInteger(data, offset,8); - offset += 8; - buss.server_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.client_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.server_response_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4); - offset +=4; - buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; - //结束时间 - buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4); - offset +=4; - buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; - buss.start_tv_sec=ByteUtil.getLong(data, offset,4); - offset +=4; - buss.start_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; - //结束时间 - buss.end_tv_sec=ByteUtil.getLong(data, offset,4); - offset +=4; - buss.end_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; - buss.isUncomplete = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.time_flag=ByteUtil.getInteger(data, offset, 4); - offset += 4; + buss.bytes_out = data.getLong(4); - int sqlLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int detailMsgLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.request_msg_length = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.response_msg_length = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int reservedLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; + buss.bytes_in = data.getLong(4); - buss.ori_sql=ByteUtil.getString(data, offset,sqlLen); - offset += sqlLen; - buss.business_detail_mesg=ByteUtil.getString(data, offset, detailMsgLen); - offset += detailMsgLen; - buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.request_msg_length); - offset += buss.request_msg_length; - buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.response_msg_length); - offset += buss.response_msg_length; - buss.reserved = ByteUtil.getString(data, offset, reservedLen); - offset += reservedLen; + buss.package_out = data.getInt(4); - /*if(SystemConfigManager.projectName.contains("GDYDBPM")){ - - boolean exist=false; - if(SystemConfigManager.reduceFlag){ + buss.package_in = data.getInt(4); + + + buss.session_serial_number = data.alignedSlice(24).toString(); + + - redisTemplate = (RedisTemplate) ApplicationContextUtil.getBean("redisTemplate"); - exist = reduceByKey(buss); - } - - //不存在就加入缓存 - if(!exist)return buss; - //存在就返回空 - return null; - }*/ + buss.probe_if= data.getInt(); + + data.position(data.position() + 24); + + buss.name = data.alignedSlice(64).toString(); + + buss.request_ip=data.getLong(); + + buss.request_port= data.getInt(); + + buss.response_ip=data.getLong(); + + buss.response_port= data.getInt(); + +// buss.deal_state= data.getInt(); + + buss.server_translate_time=data.getLong(); + + buss.client_translate_time=data.getLong(); + + buss.server_response_time=data.getLong(); + + buss.server_start_tv_sec= data.getInt(); + + buss.server_start_tv_usec=data.getLong(); + + //结束时间 + buss.server_end_tv_sec= data.getInt(); + + buss.server_end_tv_usec=data.getLong(); + + buss.start_tv_sec=data.getLong(); + + buss.start_tv_usec=data.getLong(); + + //结束时间 + buss.end_tv_sec=data.getLong(); + + buss.end_tv_usec=data.getLong(); + + buss.isUncomplete = data.getInt(4); + + buss.time_flag= data.getInt(4); + + int sqlLen = data.getInt(4); + + int detailMsgLen = data.getInt(4); + + buss.request_msg_length = data.getInt(4); + + buss.response_msg_length = data.getInt(4); + + int reservedLen = data.getInt(4); + + + buss.ori_sql = data.alignedSlice(sqlLen).toString(); + + buss.business_detail_mesg= data.alignedSlice( detailMsgLen).toString(); + + ; + buss.request_msg_detail = utils.ByteBufferUTF8String(data, buss.request_msg_length); + + buss.response_msg_detail = utils.ByteBufferUTF8String(data, buss.response_msg_length); + + buss.reserved = data.alignedSlice( reservedLen).toString(); + return buss; } @@ -223,6 +204,12 @@ public class BussFlowDb extends PacketBase { public int getInterfaceNumber() { return 0; } + + // @Override + // public PacketBase Parse(PacketHeader header, ByteBuffer data) throws Exception { + // // TODO Auto-generated method stub + // return null; + // } /** * 如果已经存在这个session_id(即key),则说明是重复数据,不再进行后续操作 diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowExternal.java similarity index 51% rename from src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java rename to src/main/java/com/yuandian/dataflow/proto/decode/BussFlowExternal.java index 1fbd515..499e67a 100644 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowExternal.java @@ -1,5 +1,6 @@ package com.yuandian.dataflow.proto.decode; +import java.nio.ByteBuffer; import java.util.Date; @@ -15,69 +16,73 @@ public class BussFlowExternal extends PacketBase { public BussFlowExternal Parse(PacketHeader header,ByteBuffer data) throws Exception { - if (offset + UNIT_DATA_LENGTH > data.length) + + + if (data.position() + UNIT_DATA_LENGTH > data.array().length) throw new Exception("data length error!"); + + BussFlowExternal buss = new BussFlowExternal(); // buss.m_Header = header; - buss.probe_if=ByteUtil.getLong(data, offset,4); - offset += 4; - buss.channel=ByteUtil.getString(data, offset,24); - offset += 24; - buss.interfaceid=ByteUtil.getString(data, offset,100); - offset += 100; - buss.systemName=ByteUtil.getString(data, offset,32); - offset += 32; - buss.net_type=ByteUtil.getString(data, offset,6); - offset += 6; - buss.net_segment = ByteUtil.getString(data, offset, 5); - offset += 5; - buss.session_id=ByteUtil.getString(data, offset,80); - offset += 80; - buss.phoneid=ByteUtil.getString(data, offset,12); - offset += 12; + buss.probe_if= data.getLong(4); + + buss.channel= data.alignedSlice(24).toString(); + + buss.interfaceid= data.alignedSlice(100).toString(); + + buss.systemName= data.alignedSlice(32).toString(); + + buss.net_type= data.alignedSlice(6).toString(); + + buss.net_segment = data.alignedSlice( 5).toString(); + + buss.session_id= data.alignedSlice(80).toString(); + + buss.phoneid= data.alignedSlice(12).toString(); + - buss.request_ip=ByteUtil.getLong(data, offset,4); - offset += 4; - buss.request_port=ByteUtil.getLong(data, offset,4); - offset += 4; - buss.response_ip=ByteUtil.getLong(data, offset,4); - offset += 4; - buss.response_port=ByteUtil.getLong(data, offset,4); - offset += 4; + buss.request_ip= data.getLong(4); + + buss.request_port= data.getLong(4); + + buss.response_ip= data.getLong(4); + + buss.response_port= data.getLong(4); + //开始时间 - buss.start_tv_sec=ByteUtil.getLong(data, offset,4); - offset +=4; - buss.start_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; + buss.start_tv_sec= data.getLong(4); + + buss.start_tv_usec= data.getLong(4); + //结束时间 - buss.end_tv_sec=ByteUtil.getLong(data, offset,4); - offset +=4; - buss.end_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; + buss.end_tv_sec= data.getLong(4); + + buss.end_tv_usec= data.getLong(4); + - buss.deal_state=ByteUtil.getInteger(data, offset,4); - offset += 4; - buss.server_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.server_response_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.begin_url=ByteUtil.getString(data, offset,100); - offset += 100; - buss.operating_sytem=ByteUtil.getString(data, offset,20); - offset += 20; - buss.server_res_code=ByteUtil.getInteger(data, offset,2); - offset += 2; - buss.browser=ByteUtil.getString(data, offset,30); - offset += 30; - buss.business_detail_mesg=ByteUtil.getString(data, offset,200); - offset += 200; - buss.business_involve_msg = ByteUtil.getString(data, offset,200); - offset += 200; - isUncomplete = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.time_flag=ByteUtil.getLong(data, offset, 4); - offset += 4; + buss.deal_state= data.getInt(4); + + buss.server_translate_time= data.getLong(8); + + buss.server_response_time= data.getLong(8); + + buss.begin_url= data.alignedSlice(100).toString(); + + buss.operating_sytem= data.alignedSlice(20).toString(); + + buss.server_res_code= data.getInt(2); + + buss.browser= data.alignedSlice(30).toString(); + + buss.business_detail_mesg= data.alignedSlice(200).toString(); + + buss.business_involve_msg = data.alignedSlice(200).toString(); + + isUncomplete = data.getInt( 4); + + buss.time_flag= data.getLong( 4); + return buss; } diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowMidd.java similarity index 50% rename from src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java rename to src/main/java/com/yuandian/dataflow/proto/decode/BussFlowMidd.java index afc31e3..5bbb336 100644 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowMidd.java @@ -1,12 +1,11 @@ package com.yuandian.dataflow.proto.decode; +import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; -import com.ud.module.common.ApplicationContextUtil; -import com.ud.module.common.SystemConfigManager; -import com.ud.module.util.code.CodingGeneration; + public class BussFlowMidd extends PacketBase { private int UNIT_DATA_LENGTH =3569; @@ -85,131 +84,115 @@ public class BussFlowMidd extends PacketBase { // buss.m_Header = header; buss.msg_type = header.getTableID(); -// offset += 4; -// buss.msg_version = ByteUtil.getInteger(data, offset, 1); -// offset += 1; -// buss.msg_seq = ByteUtil.getInteger(data, offset, 4); -// offset += 4; -// buss.msg_len = ByteUtil.getInteger(data, offset, 4); -// offset += 4; - -// UNIT_DATA_LENGTH=msg_len; - buss.src_mac = ByteUtil.getLong(data, offset, 8); - offset += 8; - buss.dst_mac = ByteUtil.getLong(data, offset, 8); - if(SystemConfigManager.macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){ + + buss.src_mac = data.getLong(8); + + buss.dst_mac = data.getLong(8); + var macFilterFlag = false ; + if(macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){ return null; } - offset += 8; -// buss.vlan_id = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.tos = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.retran_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.reset_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.protocol=ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.bytes_out = ByteUtil.getLong(data, offset, 4); - offset += 4; - buss.bytes_in = ByteUtil.getLong(data, offset, 4); - offset += 4; - buss.package_out = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.package_in = ByteUtil.getInteger(data, offset, 4); - offset += 4; + + + + data.position( data.position() + 28); + + buss.protocol= data.getInt( 4); + + buss.bytes_out = data.getLong(4); + + buss.bytes_in = data.getLong(4); + + buss.package_out = data.getInt( 4); + + buss.package_in = data.getInt( 4); + - buss.session_serial_number = ByteUtil.getString(data, offset, 24); - offset += 24; -// buss.recog_status = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.req_method = ByteUtil.getLong(data, offset, 4); -// offset += 4; -// buss.content_type = ByteUtil.getString(data, offset, 40); + buss.session_serial_number = data.alignedSlice( 24).toString(); + + data.position( data.position() + 4); +// buss.req_method = data.getLong(4); +// +// buss.content_type = data.alignedSlice( 40).toString(); // offset += 40; -// buss.accept = ByteUtil.getString(data, offset, 40); +// buss.accept = data.alignedSlice( 40).toString(); // offset += 40; - buss.probe_if=ByteUtil.getInteger(data, offset,4); - offset += 4; -// buss.buss_type =ByteUtil.getString(data, offset,10); + buss.probe_if= data.getInt(4); + +// buss.buss_type =data.alignedSlice(10).toString(); // offset += 10; - buss.channel=ByteUtil.getString(data, offset,24); - offset += 24; -// buss.session_id=ByteUtil.getString(data, offset,36); + buss.channel=data.alignedSlice(24).toString(); + +// buss.session_id=data.alignedSlice(36).toString(); // offset += 36; -// buss.net_segment=ByteUtil.getString(data, offset,6); +// buss.net_segment=data.alignedSlice(6).toString(); // offset += 6; - buss.request_ip=ByteUtil.getLong(data, offset,4); - offset += 4; - buss.request_port=ByteUtil.getInteger(data, offset,4); - offset += 4; - buss.response_ip=ByteUtil.getLong(data, offset,4); - offset += 4; - buss.response_port=ByteUtil.getInteger(data, offset,4); - offset += 4; + buss.request_ip=data.getLong(); + + buss.request_port= data.getInt(4); + + buss.response_ip=data.getLong(); + + buss.response_port= data.getInt(4); + //开始时间 - buss.start_tv_sec=ByteUtil.getLong(data, offset,4); - offset +=4; - buss.start_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; + buss.start_tv_sec=data.getLong(); + + buss.start_tv_usec=data.getLong(); + //结束时间 - buss.end_tv_sec=ByteUtil.getLong(data, offset,4); - offset +=4; - buss.end_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; -// buss.deal_state=ByteUtil.getInteger(data, offset,4); - offset +=4; - buss.server_res_code = ByteUtil.getInteger(data, offset, 2); - offset +=2; - buss.server_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; + buss.end_tv_sec=data.getLong(); + + buss.end_tv_usec=data.getLong(); + +// buss.deal_state= data.getInt(4); + + buss.server_res_code = data.getInt( 2); + + buss.server_translate_time=data.getLong(); + - buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4); - offset +=4; - buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; + buss.server_start_tv_sec= data.getInt(4); + + buss.server_start_tv_usec=data.getLong(); + //结束时间 - buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4); - offset +=4; - buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; + buss.server_end_tv_sec= data.getInt(4); + + buss.server_end_tv_usec=data.getLong(); + - buss.server_response_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.client_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.isUncomplete = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.time_flag=ByteUtil.getInteger(data, offset, 4); - offset +=4; - int detailMsgLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int keyMsgLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int apiLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.request_msg_length = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.response_msg_length = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int remainLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; + buss.server_response_time=data.getLong(); + + buss.client_translate_time=data.getLong(); + + buss.isUncomplete = data.getInt( 4); + + buss.time_flag= data.getInt( 4); + + int detailMsgLen = data.getInt( 4); + + int keyMsgLen = data.getInt( 4); + + int apiLen = data.getInt( 4); + + buss.request_msg_length = data.getInt( 4); + + buss.response_msg_length = data.getInt( 4); + + int remainLen = data.getInt( 4); + - buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen); - offset += detailMsgLen; - buss.bussiness_key_mesg = ByteUtil.getString(data, offset, keyMsgLen); - offset += keyMsgLen; - buss.ori_api=ByteUtil.getString(data, offset, apiLen); - offset += apiLen; - buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.request_msg_length); - offset += buss.request_msg_length; - buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.response_msg_length); - offset += buss.response_msg_length; -// buss.remain_data = ByteUtil.getString(data, offset, remainLen); - offset += remainLen; + buss.business_detail_mesg=data.alignedSlice(detailMsgLen).toString(); + + buss.bussiness_key_mesg = data.alignedSlice( keyMsgLen).toString(); + + buss.ori_api=data.alignedSlice( apiLen).toString(); + + buss.request_msg_detail = utils.ByteBufferUTF8String(data, buss.request_msg_length); + + buss.response_msg_detail = utils.ByteBufferUTF8String(data, buss.response_msg_length); + data.position(data.position()+remainLen); diff --git a/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowOrl.java b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowOrl.java new file mode 100644 index 0000000..44fe90b --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowOrl.java @@ -0,0 +1,111 @@ +package com.yuandian.dataflow.proto.decode; + +import java.nio.ByteBuffer; + +public class BussFlowOrl extends PacketBase { + + public int msg_type; + public int msg_version; + public int msg_seq; + public int msg_len; + public long request_mac; + public long response_mac; + public long request_ip; + public int request_port; + public long response_ip; + public int response_port; + public int probeif; + public int protocol; + public long start_tv_sec;//Web开始时间秒 + public long start_tv_usec;//开始时间毫秒 + public long end_tv_sec;//结束时间秒 + public long end_tv_usec;//结束时间微妙 + public int req_len; + public int res_len; + public int busi_msg_len; + public int key_msg_len; + public int detail_msg_len; + public int remain_len; + public String business_code; + public String sessionid; + public String req_data; + public String res_data; + public String busi_msg; + public String busi_key_msg; + public String busi_detail_msg; + public String remain_data; + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + BussFlowOrl orl = new BussFlowOrl(); + orl.msg_type = header.getNowType(); + orl.request_mac =data.getLong(8); + + orl.response_mac =data.getLong(8); + + orl.request_ip =data.getLong(4); + + orl.request_port = data.getInt(4); + + orl.response_ip =data.getLong(4); + + orl.response_port = data.getInt(4); + + orl.probeif = data.getInt(4); + + orl.protocol = data.getInt(4); + + orl.start_tv_sec =data.getLong(4); + + orl.start_tv_usec =data.getLong(4); + + orl.end_tv_sec =data.getLong(4); + + orl.end_tv_usec =data.getLong(4); + + int req_len = data.getInt(4); + + int res_len = data.getInt(4); + + int busi_msg_len = data.getInt(4); + + int key_msg_len = data.getInt(4); + + int detail_msg_len = data.getInt(4); + + int remain_len = data.getInt(4); + + orl.business_code = data.alignedSlice( 32).toString(); + + orl.sessionid = data.alignedSlice( 80).toString(); + + orl.req_data = data.alignedSlice( req_len).toString(); + + orl.res_data = data.alignedSlice( res_len).toString(); + + orl.busi_msg = data.alignedSlice( busi_msg_len).toString(); + + orl.busi_key_msg = data.alignedSlice( key_msg_len).toString(); + + orl.busi_detail_msg = data.alignedSlice( detail_msg_len).toString(); + + orl.remain_data = data.alignedSlice( remain_len).toString(); + + return orl; + } + + @Override + public int getUnitPacketLength() { + return 0; + } + + @Override + public String[] getData() { + return new String[0]; + } + + @Override + public int getInterfaceNumber() { + return 0; + } +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowWeb.java similarity index 54% rename from src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java rename to src/main/java/com/yuandian/dataflow/proto/decode/BussFlowWeb.java index c7b2e99..5e4147c 100644 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/BussFlowWeb.java @@ -1,16 +1,15 @@ package com.yuandian.dataflow.proto.decode; +import java.nio.ByteBuffer; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.ud.module.common.Constants; -import com.ud.module.common.SystemConfigManager; + import com.yuandian.dataflow.proto.decode.PacketBase; import com.yuandian.dataflow.proto.decode.PacketHeader; -import com.ud.module.util.flow.JXYDBPMDetailMesgUtils; -import com.ud.module.util.flow.URLUtil; + public class BussFlowWeb extends PacketBase { @@ -31,160 +30,152 @@ public class BussFlowWeb extends PacketBase { buss.msg_type = header.getTableID(); // offset += 4; -// buss.msg_version = ByteUtil.getInteger(data, offset, 1); +// buss.msg_version = data.getInt(1); // offset += 1; -// buss.msg_seq = ByteUtil.getInteger(data, offset, 4); +// buss.msg_seq = data.getInt(4); // offset += 4; -// buss.msg_len = ByteUtil.getInteger(data, offset, 4); +// buss.msg_len = data.getInt(4); // offset += 4; - buss.src_mac = ByteUtil.getLong(data, offset, 8); - offset += 8; - buss.dst_mac = ByteUtil.getLong(data, offset, 8); - offset += 8; -// buss.vlan_id = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.tos = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.retran_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.reset_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; -// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.protocol=ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.bytes_out = ByteUtil.getLong(data, offset, 4); - offset += 4; - buss.bytes_in = ByteUtil.getLong(data, offset, 4); - offset += 4; - buss.package_out = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.package_in = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.session_serial_number = ByteUtil.getString(data, offset, 24); //5268109200 - offset += 24; -// buss.recog_status = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.req_method = ByteUtil.getLong(data, offset, 4); //1 - offset += 4; - buss.content_type = ByteUtil.getString(data, offset, 40); - offset += 40; - buss.accept = ByteUtil.getString(data, offset, 40); //image/png, image/svg+xml, image/*;q=0.8 - offset += 40; - buss.probe_if=ByteUtil.getInteger(data, offset,4); - offset += 4; - buss.channel=ByteUtil.getString(data, offset,24); - offset += 24; - buss.session_id=ByteUtil.getString(data, offset,80); //7Ip6bYet8n0wfxeJMJtAm7v2FW76ZTImJVyNjbYoRbUdUf71QaxQ!-202316728 - offset += 80; - buss.request_ip=ByteUtil.getLong(data, offset,4); //3232235850 - offset += 4; - buss.request_port=ByteUtil.getInteger(data, offset,4); //3309 - offset += 4; - buss.response_ip=ByteUtil.getLong(data, offset,4); //1971882754 - offset += 4; - buss.response_port=ByteUtil.getInteger(data, offset,4); //80 - offset += 4; + buss.src_mac = data.getLong(8); + + buss.dst_mac = data.getLong(8); + + + data.position(data.position()+20); + + buss.protocol=data.getInt(4); + + buss.bytes_out = data.getLong(4); + + buss.bytes_in = data.getLong(4); + + buss.package_out = data.getInt(4); + + buss.package_in = data.getInt(4); + + buss.session_serial_number = data.alignedSlice(24).toString(); //5268109200 + +// buss.recog_status = data.getInt(4); + + data.position(data.position()+4); + buss.req_method = data.getLong(4); //1 + + buss.content_type = data.alignedSlice(40).toString(); + + buss.accept = data.alignedSlice(40).toString(); //image/png, image/svg+xml, image/*;q=0.8 + + buss.probe_if=data.getInt(); + + buss.channel=data.alignedSlice(4).toString(); + + buss.session_id=data.alignedSlice(0).toString(); //7Ip6bYet8n0wfxeJMJtAm7v2FW76ZTImJVyNjbYoRbUdUf71QaxQ!-202316728 + + buss.request_ip=data.getLong(); //3232235850 + + buss.request_port=data.getInt(); //3309 + + buss.response_ip=data.getLong(); //1971882754 + + buss.response_port=data.getInt(); //80 + //开始时间 - buss.start_tv_sec=ByteUtil.getLong(data, offset,4); //1492473697 - offset +=4; - buss.start_tv_usec=ByteUtil.getLong(data, offset,4); //459461 - offset +=4; + buss.start_tv_sec=data.getLong(); //1492473697 + + buss.start_tv_usec=data.getLong(); //459461 + //结束时间 - buss.end_tv_sec=ByteUtil.getLong(data, offset,4); //1492473697 - offset +=4; - buss.end_tv_usec=ByteUtil.getLong(data, offset,4); //459490 - offset +=4; + buss.end_tv_sec=data.getLong(); //1492473697 - buss.server_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; + buss.end_tv_usec=data.getLong(); //459490 + + + buss.server_translate_time=data.getLong(); + + + buss.server_start_tv_sec=data.getInt(); + + buss.server_start_tv_usec=data.getLong(); - buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4); - offset +=4; - buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; //结束时间 - buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4); - offset +=4; - buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4); - offset +=4; + buss.server_end_tv_sec=data.getInt(); - buss.server_response_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.client_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; -// buss.locate_server_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; -// buss.locate_server_response_time=ByteUtil.getLong(data, offset,8); - offset += 8; -// buss.locate_client_translate_time=ByteUtil.getLong(data, offset,8); - offset += 8; - buss.x_requested_with = ByteUtil.getString(data, offset, 20); - offset += 20; - buss.operating_system=ByteUtil.getString(data, offset,20); //Windows NT 6.1 - offset += 20; - buss.server_res_code=ByteUtil.getInteger(data, offset,2); - offset += 2; - buss.browser=ByteUtil.getString(data, offset,30); - offset += 30; - buss.isUncomplete = ByteUtil.getInteger(data, offset, 4); - offset += 4; - buss.time_flag=ByteUtil.getInteger(data, offset, 4); - offset += 4; - int detailMsgLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int keyMsgLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; - int reqMsgLeng= ByteUtil.getInteger(data, offset, 4); + buss.server_end_tv_usec=data.getLong(); + + + buss.server_response_time=data.getLong(); + + buss.client_translate_time=data.getLong(); + + + data.position(data.position()+24); + + buss.x_requested_with = data.alignedSlice(20).toString(); + + buss.operating_system=data.alignedSlice(0).toString(); //Windows NT 6.1 + + buss.server_res_code=data.getInt(); + + buss.browser=data.alignedSlice(0).toString(); + + buss.isUncomplete = data.getInt(4); + + buss.time_flag=data.getInt(4); + + int detailMsgLen = data.getInt(4); + + int keyMsgLen = data.getInt(4); + + int reqMsgLeng= data.getInt(4); buss.request_msg_length =reqMsgLeng; - offset += 4; - int resMsgLength=ByteUtil.getInteger(data, offset, 4); + + int resMsgLength=data.getInt(4); buss.response_msg_length = resMsgLength; - offset += 4; - int cookieLength=ByteUtil.getInteger(data, offset, 4); + + int cookieLength=data.getInt(4); buss.req_cookie_leng=cookieLength; - offset +=4; - int url_len=ByteUtil.getInteger(data, offset, 4); - offset +=4; - int refer_len=ByteUtil.getInteger(data, offset, 4); - offset += 4; - int remainLen = ByteUtil.getInteger(data, offset, 4); - offset += 4; + + int url_len=data.getInt(4); + + int refer_len=data.getInt(4); + + int remainLen = data.getInt(4); + // if (SystemConfigManager.systemCode.equals("JXYDBPM")) { -// buss.business_detail_mesg=JXYDBPMDetailMesgUtils.decodeValue(ByteUtil.getString(data, offset,detailMsgLen)); +// buss.business_detail_mesg=JXYDBPMDetailMesgUtils.decodeValue(data.alignedSlice(etailMsgLen).toString()); // } else { -// buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen); +// buss.business_detail_mesg=data.alignedSlice(etailMsgLen).toString(); // } - buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen); - offset += detailMsgLen; - buss.bussiness_key_mesg = ByteUtil.getString(data, offset,keyMsgLen); - offset += keyMsgLen; - buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, reqMsgLeng); - offset += reqMsgLeng; - buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset,resMsgLength); - offset +=resMsgLength; - buss.req_cookie_detail = ByteUtil.getString(data, offset,cookieLength); - offset += cookieLength; - buss.ori_url=ByteUtil.getString(data, offset,url_len); // /images/index/220130.gif + buss.business_detail_mesg=data.alignedSlice(detailMsgLen).toString(); + + buss.bussiness_key_mesg = data.alignedSlice(keyMsgLen).toString(); + + buss.request_msg_detail = utils.ByteBufferUTF8String(data, reqMsgLeng); + + buss.response_msg_detail = utils.ByteBufferUTF8String(data, resMsgLength); + + buss.req_cookie_detail = data.alignedSlice(cookieLength).toString(); + + buss.ori_url=data.alignedSlice(url_len).toString(); // /images/index/220130.gif if(buss.ori_url!=null && !"".equals(buss.ori_url.trim())){ String result = buss.ori_url.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", ""); if(isMessyCode(result)){//如果中文乱码 - String withoutValue = URLUtil.getInsertURL(buss.ori_url, Constants.SPLIT_NULL).trim();//去参数值 - String result2 = withoutValue.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", ""); - if(isMessyCode(result2)){//判断是否还乱码 - buss.ori_url=""; - }else{ - buss.ori_url=withoutValue; - } + // String withoutValue = URLUtil.getInsertURL(buss.ori_url, Constants.SPLIT_NULL).trim();//去参数值 + + // String result2 = withoutValue.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", ""); + // if(isMessyCode(result2)){//判断是否还乱码 + // buss.ori_url=""; + // }else{ + // buss.ori_url=withoutValue; + // } }; } - offset += url_len; - buss.reter_url = ByteUtil.getString(data, offset, refer_len); // http://www.10086.cn/gd/index_200_200.html - offset += refer_len; - buss.remain_data = ByteUtil.getString(data, offset, remainLen); - offset += remainLen; + + buss.reter_url = data.alignedSlice(refer_len).toString(); // http://www.10086.cn/gd/index_200_200.html + + buss.remain_data = data.alignedSlice(remainLen).toString(); + /*if(SystemConfigManager.projectName.contains("GDYDBPM")){ boolean exist=false; if(SystemConfigManager.reduceFlag){ diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java b/src/main/java/com/yuandian/dataflow/proto/decode/DataBaseModel.java similarity index 85% rename from src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java rename to src/main/java/com/yuandian/dataflow/proto/decode/DataBaseModel.java index 6f062a7..b56677c 100644 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/DataBaseModel.java @@ -3,8 +3,11 @@ package com.yuandian.dataflow.proto.decode; import java.io.Serializable; +import java.nio.ByteBuffer; import java.util.Date; +import io.netty.buffer.ByteBuf; + /** @@ -73,53 +76,52 @@ public class DataBaseModel extends PacketBase implements Serializable{ return strb.toString(); } - public DataBaseModel Parse(byte[] data, int offset)throws Exception { + public DataBaseModel Parse(ByteBuffer data)throws Exception { - if (offset + 108 > data.length) + if (data.position() + 108 > data.array().length) throw new Exception("data length error!"); DataBaseModel db = new DataBaseModel(); - db.mac_src = ByteUtil.getLong(data, offset, 8); - offset+=8; - db.mac_dst = ByteUtil.getLong(data, offset, 8); - offset+=8; - db.ip_src = ByteUtil.getLong(data, offset, 8); - offset+=8; - db.ip_dst = ByteUtil.getLong(data, offset, 8); - offset+=8; + db.mac_src = data.getLong( 8); - db.port_src = ByteUtil.getInteger(data, offset, 4); - offset+=4; - db.port_dst = ByteUtil.getInteger(data, offset, 4); - offset+=4; - db.l3_proto = ByteUtil.getInteger(data, offset, 4); - offset+=4; - db.l4_proto = ByteUtil.getInteger(data, offset, 4); - offset+=4; - db.tos = ByteUtil.getInteger(data, offset, 4); - offset+=4; - db.vlan_id = ByteUtil.getInteger(data, offset, 4); - offset+=4; - db.bytes = ByteUtil.getLong(data, offset, 8); - offset+=8; - db.packets = ByteUtil.getLong(data, offset, 8); - offset+=8; - db.packets_syn = ByteUtil.getLong(data, offset, 8); + db.mac_dst = data.getLong( 8); + + db.ip_src = data.getLong( 8); + + db.ip_dst = data.getLong( 8); + + db.port_src = data.getInt(4); + + db.port_dst = data.getInt(4); + + db.l3_proto = data.getInt(4); + + db.l4_proto = data.getInt(4); + + db.tos = data.getInt(4); + + db.vlan_id = data.getInt(4); + + db.bytes = data.getLong( 8); + + db.packets = data.getLong( 8); + + db.packets_syn = data.getLong( 8); - offset+=8; - db.packets_syn_ack = ByteUtil.getLong(data, offset, 8); - offset+=8; - db.packets_syn_rst = ByteUtil.getLong(data, offset, 8); - offset+=8; + + db.packets_syn_ack = data.getLong( 8); + + db.packets_syn_rst = data.getLong( 8); + /** * 2015-12-10txy 新增3个解析 */ - db.appid=ByteUtil.getLong(data, offset, 4); - offset+=4; - db.app_group_id=ByteUtil.getLong(data, offset, 4); - offset+=4; - db.mpls_label=ByteUtil.getLong(data, offset, 4); - offset+=4; + db.appid=data.getLong( 4); + + db.app_group_id=data.getLong( 4); + + db.mpls_label=data.getLong( 4); + return db; } diff --git a/src/main/java/com/yuandian/dataflow/proto/decode/DataFlow.java b/src/main/java/com/yuandian/dataflow/proto/decode/DataFlow.java new file mode 100644 index 0000000..05bd36f --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/proto/decode/DataFlow.java @@ -0,0 +1,112 @@ +package com.yuandian.dataflow.proto.decode; + +import java.nio.ByteBuffer; + +public class DataFlow extends PacketBase { + private final int UNIT_DATA_LENGTH = 157;//贵阳 + public String id; + + public Integer msg_type; //消息类型 + public Integer msg_version; //消息版本 + public Integer msg_seq; //序列号 + public Integer msg_len; //消息长度 + public Integer probe_if; //接口号 + public Long timestamp; //时间戳 + public Long mac_src; //源物理地址 + public Long mac_dst; //目的物理地址 + public Integer vlan_id; //vlan_id + public Long l3_proto; //l3层协议 + public Long l4_proto; //l4层协议 + public Integer tos; //tos + public Integer retran_count; //重传次数 + public Integer reset_count; //重置次数 + public Integer zerowin_count; //零窗口次数 + public Integer protocol; //协议名 + public Long seq; + public Long ack; + public Integer recog_status; //识别类型标识 + public Long bytes; //总字节 + public Long packets; //总包数 + public Integer start_tv_sec;//Web开始时间秒 + public Long start_tv_usec;//开始时间毫秒 + public Integer end_tv_sec;//结束时间秒 + public Long end_tv_usec;//结束时间微妙 + public Integer server_start_tv_sec;//服务器响应开始时间秒 + public Long server_start_tv_usec;//服务器响应开始时间毫秒 + public Integer server_end_tv_sec;//服务器响应结束时间秒 + public Long server_end_tv_usec;//服务器响应结束时间微妙 + + public Long server_response_time;//Web服务器响应时间 + public Long client_translate_time;//Web客户端传输耗时 + public Long server_translate_time;//Web服务器传输耗时 + + public Long bytes_in; + public Long bytes_out; + public Long packets_in; + public Long packets_out; + public Long ip_src; //源IP + public Long ip_dst; //目的IP + public Long port_src; //源端口 + public Long port_dst; //目的端口 + public Long probeIP; //探针IP + + public Long intodb_time; + public Long count = 1L; + + + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + if (data.position() + UNIT_DATA_LENGTH > data.array().length) + throw new Exception("data length error!"); + DataFlow dflow = new DataFlow(); +// dflow.m_Header = header; + + dflow.msg_type = header.getTableID(); + + dflow.ip_src = data.getLong(4); + + dflow.ip_dst = data.getLong(4); + + + + data.position(data.position()+4); + + dflow.probe_if = data.getInt(4); + + + + + + + data.position(data.position() + 32 + 12 + 12 + 8 ); + + dflow.bytes_in = data.getLong(4); + + dflow.bytes_out = data.getLong(4); + + dflow.packets_in = data.getLong(4); + + dflow.packets_out = data.getLong(4); + + dflow.bytes = data.getLong(4); + + dflow.packets = data.getLong(4); + + data.position(data.position() + 12 + 12 + 16 + 16 ); + return dflow; + } + @Override + public int getUnitPacketLength() { + return UNIT_DATA_LENGTH; + } + @Override + public String[] getData() { + String data[] = new String[45]; + return data; + } + @Override + public int getInterfaceNumber() { + return Integer.parseInt(probe_if.toString()); + } +} diff --git a/src/main/java/com/yuandian/dataflow/proto/decode/PacketBase.java b/src/main/java/com/yuandian/dataflow/proto/decode/PacketBase.java new file mode 100644 index 0000000..9019563 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/proto/decode/PacketBase.java @@ -0,0 +1,80 @@ +package com.yuandian.dataflow.proto.decode; +import java.nio.ByteBuffer; + +import lombok.ToString; + +@ToString +public abstract class PacketBase { + + protected PacketHeader m_Header = null; + + public PacketHeader getPacketHead(){ + return m_Header; + } + + public void setPacketHeader(PacketHeader header) + { + this.m_Header = header; + } + + //抽象方法,每个数据操作类实现不一样 + public abstract PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception; + + public abstract int getUnitPacketLength(); + + public abstract String[] getData(); + + public abstract int getInterfaceNumber(); + + + public static PacketBase createPacketBase(PacketHeader packetHeader) { + // 获取报文头 + + // 响应表ID + int table_id = packetHeader.getTableID(); + switch (table_id) { + case 17: + AppFlow appFlow = new AppFlow(); + appFlow.setPacketHeader(packetHeader); + return appFlow; + case 18: + QoeFlow qoeFlow = new QoeFlow(); + qoeFlow.setPacketHeader(packetHeader); + return qoeFlow; + case 20: + SstFlow sstFlow = new SstFlow(); + sstFlow.setPacketHeader(packetHeader); + return sstFlow; + case 22: + BussFlowWeb tBusiness = new BussFlowWeb(); + tBusiness.setPacketHeader(packetHeader); + return tBusiness; + case 23: + BussFlowMidd secondBusiness = new BussFlowMidd(); + secondBusiness.setPacketHeader(packetHeader); + return secondBusiness; + case 24: + BussFlowDb thirdBusiness = new BussFlowDb(); + thirdBusiness.setPacketHeader(packetHeader); + return thirdBusiness; + case 25: + BussFlowOrl tExternalBusiness = new BussFlowOrl(); + tExternalBusiness.setPacketHeader(packetHeader); + return tExternalBusiness; + case 27: + DataFlow dataFlow = new DataFlow(); + dataFlow.setPacketHeader(packetHeader); + return dataFlow; + case 28: + ApmBaseDataFlow apmBaseDataFlow = new ApmBaseDataFlow(); + apmBaseDataFlow.setPacketHeader(packetHeader); + return apmBaseDataFlow; + case 29: + BasicTrafficFlow BasicTrafficFlow = new BasicTrafficFlow(); + BasicTrafficFlow.setPacketHeader(packetHeader); + return BasicTrafficFlow; + default: + return null; + } + } +} diff --git a/src/main/java/com/yuandian/dataflow/utils/PacketHeader.java b/src/main/java/com/yuandian/dataflow/proto/decode/PacketHeader.java similarity index 97% rename from src/main/java/com/yuandian/dataflow/utils/PacketHeader.java rename to src/main/java/com/yuandian/dataflow/proto/decode/PacketHeader.java index c658772..010c755 100644 --- a/src/main/java/com/yuandian/dataflow/utils/PacketHeader.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/PacketHeader.java @@ -4,7 +4,7 @@ * @author eson *2022年6月07日-11:09:21 */ -package com.yuandian.dataflow.utils; +package com.yuandian.dataflow.proto.decode; import java.io.DataInputStream; import java.io.IOException; diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java b/src/main/java/com/yuandian/dataflow/proto/decode/QoeFlow.java similarity index 50% rename from src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java rename to src/main/java/com/yuandian/dataflow/proto/decode/QoeFlow.java index d141de5..1efcf1d 100644 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/QoeFlow.java @@ -59,82 +59,82 @@ public int vlanId; //uint32_t 4 @Override public QoeFlow Parse(PacketHeader header,ByteBuffer data) throws Exception { QoeFlow qoeFlow = new QoeFlow(); - qoeFlow.srcIp=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dstIp=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.stvSec=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.stvUsec=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.ltvSec=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.ltvUsec=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2ResponNum=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2Fast=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2FastExpected=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2ExpectedDegrated=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2DegratedService=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2ServiceAvailability=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2ResponTimeout=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2ResponSuccess=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2ResponFail=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2ResponPeek=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dst2ResponAverage=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.csWindow=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.scWindow=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.csReset=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.scReset=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.csRetran=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.scRetran=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.appId=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.appGroupId=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.probeIf=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.appStyle=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.timeFlag=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.connSetupTm=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.dataTransferTm=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.retransDelayTm=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.networkInbound=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.networkOutbound=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.newSession=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.userEvents=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.serverEvents=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.connSetupPeek=ByteUtil.getInteger(data, offset, 4); - offset += 4; - qoeFlow.vlanId=ByteUtil.getInteger(data, offset, 4); - offset += 4; + qoeFlow.srcIp=data.getInt( 4); + + qoeFlow.dstIp=data.getInt( 4); + + qoeFlow.stvSec=data.getInt( 4); + + qoeFlow.stvUsec=data.getInt( 4); + + qoeFlow.ltvSec=data.getInt( 4); + + qoeFlow.ltvUsec=data.getInt( 4); + + qoeFlow.dst2ResponNum=data.getInt( 4); + + qoeFlow.dst2Fast=data.getInt( 4); + + qoeFlow.dst2FastExpected=data.getInt( 4); + + qoeFlow.dst2ExpectedDegrated=data.getInt( 4); + + qoeFlow.dst2DegratedService=data.getInt( 4); + + qoeFlow.dst2ServiceAvailability=data.getInt( 4); + + qoeFlow.dst2ResponTimeout=data.getInt( 4); + + qoeFlow.dst2ResponSuccess=data.getInt( 4); + + qoeFlow.dst2ResponFail=data.getInt( 4); + + qoeFlow.dst2ResponPeek=data.getInt( 4); + + qoeFlow.dst2ResponAverage=data.getInt( 4); + + qoeFlow.csWindow=data.getInt( 4); + + qoeFlow.scWindow=data.getInt( 4); + + qoeFlow.csReset=data.getInt( 4); + + qoeFlow.scReset=data.getInt( 4); + + qoeFlow.csRetran=data.getInt( 4); + + qoeFlow.scRetran=data.getInt( 4); + + qoeFlow.appId=data.getInt( 4); + + qoeFlow.appGroupId=data.getInt( 4); + + qoeFlow.probeIf=data.getInt( 4); + + qoeFlow.appStyle=data.getInt( 4); + + qoeFlow.timeFlag=data.getInt( 4); + + qoeFlow.connSetupTm=data.getInt( 4); + + qoeFlow.dataTransferTm=data.getInt( 4); + + qoeFlow.retransDelayTm=data.getInt( 4); + + qoeFlow.networkInbound=data.getInt( 4); + + qoeFlow.networkOutbound=data.getInt( 4); + + qoeFlow.newSession=data.getInt( 4); + + qoeFlow.userEvents=data.getInt( 4); + + qoeFlow.serverEvents=data.getInt( 4); + + qoeFlow.connSetupPeek=data.getInt( 4); + + qoeFlow.vlanId=data.getInt( 4); + return qoeFlow; } @Override diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java b/src/main/java/com/yuandian/dataflow/proto/decode/SstFlow.java similarity index 59% rename from src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java rename to src/main/java/com/yuandian/dataflow/proto/decode/SstFlow.java index b87bc14..61d23ad 100644 --- a/src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java +++ b/src/main/java/com/yuandian/dataflow/proto/decode/SstFlow.java @@ -1,5 +1,6 @@ package com.yuandian.dataflow.proto.decode; +import java.nio.ByteBuffer; import java.util.Date; @@ -51,45 +52,45 @@ public class SstFlow extends PacketBase{ public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { SstFlow sstFlow = new SstFlow(); - sstFlow.mac_src = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.mac_dst = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.ip_src = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.ip_dst = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.port_src = ByteUtil.getInteger(data, offset, 4); - offset += 4; - sstFlow.port_dst = ByteUtil.getInteger(data, offset, 4); - offset += 4; - sstFlow.l3_proto = ByteUtil.getInteger(data, offset, 4); - offset += 4; - sstFlow.l4_proto = ByteUtil.getInteger(data, offset, 4); - offset += 4; - sstFlow.tos = ByteUtil.getInteger(data, offset, 4); - offset += 4; - sstFlow.vlan_id = ByteUtil.getInteger(data, offset, 4); - offset += 4; + sstFlow.mac_src = data.getLong(8); + + sstFlow.mac_dst = data.getLong(8); + + sstFlow.ip_src = data.getLong(8); + + sstFlow.ip_dst = data.getLong(8); + + sstFlow.port_src = data.getInt(4); + + sstFlow.port_dst = data.getInt(4); + + sstFlow.l3_proto = data.getInt(4); + + sstFlow.l4_proto = data.getInt(4); + + sstFlow.tos = data.getInt(4); + + sstFlow.vlan_id = data.getInt(4); + - sstFlow.bytes = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.packets = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.packets_syn = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.packets_syn_ack = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.packets_syn_rst = ByteUtil.getLong(data, offset, 8); - offset += 8; - sstFlow.appid = ByteUtil.getInteger(data, offset, 4); - offset += 4; - sstFlow.app_group_id = ByteUtil.getInteger(data, offset, 4); - offset += 4; - sstFlow.mpls_label = ByteUtil.getInteger(data, offset, 4); - offset += 4; + sstFlow.bytes = data.getLong(8); + + sstFlow.packets = data.getLong(8); + + sstFlow.packets_syn = data.getLong(8); + + sstFlow.packets_syn_ack = data.getLong(8); + + sstFlow.packets_syn_rst = data.getLong(8); + + sstFlow.appid = data.getInt(4); + + sstFlow.app_group_id = data.getInt(4); + + sstFlow.mpls_label = data.getInt(4); + sstFlow.timestamp = header.getTimestamp(); - sstFlow.probe_if = header.getProbe_id(); + sstFlow.probe_if = header.getProbeIf(); return sstFlow; } diff --git a/src/main/java/com/yuandian/dataflow/proto/decode/utils.java b/src/main/java/com/yuandian/dataflow/proto/decode/utils.java new file mode 100644 index 0000000..41a66b4 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/proto/decode/utils.java @@ -0,0 +1,22 @@ +/** + * description + * + * @author eson + *2022年6月07日-16:18:34 + */ +package com.yuandian.dataflow.proto.decode; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +/** + * description + * + * @author eson + *2022年6月07日-16:18:34 + */ +public class utils { + public static String ByteBufferUTF8String(ByteBuffer data, int length) throws UnsupportedEncodingException { + return new String( data.alignedSlice(length).array(), "utf-8"); + } +}