TODO: 测试grpc c++互相调整.

This commit is contained in:
eson 2022-06-09 18:10:52 +08:00
parent bde4b7985d
commit 55c3491aeb
26 changed files with 6691 additions and 2622 deletions

25
pom.xml
View File

@ -84,7 +84,14 @@
<!-- 添加grpc相关依赖包 -->
<!-- <dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
@ -103,7 +110,7 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency> -->
</dependency>
<!-- proto自动生成java文件所需的编译插件 -->
<dependency>
@ -133,16 +140,17 @@
<version>1.6.2</version>
</plugin>
<!-- <plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<pluginId>grpc-java</pluginId>
<protocArtifact>com.google.protobuf:protoc:3.13.0:exe:${os.detected.classifier}</protocArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.46.0:exe:${os.detected.classifier}</pluginArtifact>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<outputDirectory>${basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
@ -150,11 +158,12 @@
<goal>compile</goal>
<goal>compile-custom</goal>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin> -->
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>

View File

@ -1,9 +0,0 @@
#! /bin/bash
# OUTPUT_FILE="./src/main/java/"
# for PROTOFILE in `find ./src -name "*.proto"`
# do
# protoc $PROTOFILE --plugin=/home/eson/workspace/dataflow/target/protoc-plugins --java_out=$OUTPUT_FILE --grpc-java_out=$OUTPUT_FILE
# done
ln -sf ./target/gener

View File

@ -0,0 +1,25 @@
/**
* description
*
* @author eson
*2022年6月09日-16:29:17
*/
package com.yuandian.dataflow.grpc;
import com.yuandian.dataflow.proto.Base.Request;
import com.yuandian.dataflow.proto.Base.Response;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerImplBase;
import io.grpc.stub.StreamObserver;
/**
* description
*
* @author eson
*2022年6月09日-16:29:17
*/
public class CollectPackets extends CollectPacketsServerImplBase {
}

View File

@ -13,9 +13,11 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import com.yuandian.dataflow.proto.decode.PacketBase;
import com.yuandian.dataflow.proto.decode.PacketHeader;
// import com.yuandian.dataflow.proto.decode.PacketBase;
// import com.yuandian.dataflow.proto.decode.PacketHeader;
// import com.yuandian.dataflow.proto.decode.utils;
import io.netty.handler.codec.compression.ZlibDecoder;
import lombok.Cleanup;
import lombok.Getter;
import lombok.Setter;
@ -40,25 +42,95 @@ public class Header {
var in = new DataInputStream(sock.getInputStream());
var out = new DataOutputStream(sock.getOutputStream());
// 发送验证字符串
out.write("public".getBytes());
log.error("{}", PacketHeader.PacketCode(in));
var pheader = new PacketHeader(in);
// out.write("public".getBytes());
// log.error("{}", PacketHeader.PacketCode(in));
// var pheader = new PacketHeader(in);
log.error("{}", pheader);
log.error("{}",PacketBase.createPacketBase(pheader));
// log.error("{}", pheader);
// var pbase = PacketBase.createPacketBase(pheader);
// log.error("{}",pbase);
//60010流需要解压
byte[] unzipbodydata = null;
if (pheader.getTableID() == 20) {
pheader.parseNextHeader_60010(in);
// //60010流需要解压
// byte[] unzipbodydata = null;
// if (pheader.getTableID() == 20) {
// pheader.parseNextHeader_60010(in);
// byte[] zipbodydata = in.readNBytes(pheader.getMsgLen());
// unzipbodydata = utils.Inflate(zipbodydata);
// }
byte[] zipbodydata = new byte[packetHeader.getMsg_len()];
readTillLength(zipbodydata, packetHeader.getMsg_len());
unzipbodydata = ZlibUtil.Inflate(zipbodydata);
// for (int i = 0; i < pheader.getRecCount(); i++) {
// PacketBase dataBean = null;
}
// if (pheader.getTableID() >= 22 && pheader.getTableID() <= 24) {
// // 读取具体数据头信息获取前四个字段值第四个字段为整条数据的长度 字段长度分别为 4 1 4 4
// // bodyhead = new byte[13];
// // readTillLength(bodyhead, 13);
// var p1 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var p2 = ByteBuffer.wrap(in.readNBytes(1)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var p3 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var length = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// if (length <= 13) {
// log.error("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]");
// throw new Exception("数据解析异常");
// }
// // 解析数据
// dataBean = pbase.Parse(pheader, ByteBuffer.wrap(in.readNBytes(length - 13)));
// } else if (pheader.getTableID() == 25) {
// var nowtype = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var p2 = ByteBuffer.wrap(in.readNBytes(1)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var p3 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var length = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// pheader.setNowType(nowtype);
// if (length <= 13) {
// log.error("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]");
// throw new Exception("数据解析异常");
// }
// // 读取具体数据体信息
// byte[] bodydata = new byte[length - 13];
// readTillLength(bodydata, length - 13);
// // 解析数据
// dataBean = instance.Parse(pheader, bodydata, 0);
// } else if (pheader.getTableID() == 28 || pheader.getTableID() == 29) { //28或29为Apm流统计
// if (pheader.getTableID() == 28) {
// length = ApmBaseDataFlow.SIZE;
// } else {
// length = BasicTrafficFlow.SIZE;
// }
// byte[] bodydata = new byte[length];
// readTillLength(bodydata, length);
// dataBean = instance.Parse(pheader, bodydata, 0);
// } else if (pheader.getTableID() == 17 || pheader.getTableID() == 18) { //18 为网络性能流
// if (pheader.getTableID() == 17) {
// length = AppFlow.SIZE;
// } else if (pheader.getTableID() == 18) {
// length = QoeFlow.SIZE;
// }
// byte[] bodydata = new byte[length];
// readTillLength(bodydata, length);
// dataBean = instance.Parse(pheader, bodydata, 0);
// } else if (pheader.getTableID() == 20) {
// int offset = i * SstFlow.SIZE;
// dataBean = instance.Parse(pheader, unzipbodydata, offset);
// } else {
// logger.info("不需要的数据类型:" + pheader.getTableID());
// break;
// }
// if (dataBean != null) {
// tempBaseDatas.add(dataBean);
// }
// }
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,269 @@
package com.yuandian.dataflow.proto;
import static io.grpc.MethodDescriptor.generateFullMethodName;
/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.46.0)",
comments = "Source: Base.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class CollectPacketsServerGrpc {
private CollectPacketsServerGrpc() {}
public static final String SERVICE_NAME = "com.yuandian.dataflow.proto.CollectPacketsServer";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Base.Request,
com.yuandian.dataflow.proto.Base.Response> getGetPacketsMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "GetPackets",
requestType = com.yuandian.dataflow.proto.Base.Request.class,
responseType = com.yuandian.dataflow.proto.Base.Response.class,
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
public static io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Base.Request,
com.yuandian.dataflow.proto.Base.Response> getGetPacketsMethod() {
io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Base.Request, com.yuandian.dataflow.proto.Base.Response> getGetPacketsMethod;
if ((getGetPacketsMethod = CollectPacketsServerGrpc.getGetPacketsMethod) == null) {
synchronized (CollectPacketsServerGrpc.class) {
if ((getGetPacketsMethod = CollectPacketsServerGrpc.getGetPacketsMethod) == null) {
CollectPacketsServerGrpc.getGetPacketsMethod = getGetPacketsMethod =
io.grpc.MethodDescriptor.<com.yuandian.dataflow.proto.Base.Request, com.yuandian.dataflow.proto.Base.Response>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "GetPackets"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.proto.Base.Request.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.proto.Base.Response.getDefaultInstance()))
.setSchemaDescriptor(new CollectPacketsServerMethodDescriptorSupplier("GetPackets"))
.build();
}
}
}
return getGetPacketsMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static CollectPacketsServerStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<CollectPacketsServerStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<CollectPacketsServerStub>() {
@java.lang.Override
public CollectPacketsServerStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new CollectPacketsServerStub(channel, callOptions);
}
};
return CollectPacketsServerStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static CollectPacketsServerBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<CollectPacketsServerBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<CollectPacketsServerBlockingStub>() {
@java.lang.Override
public CollectPacketsServerBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new CollectPacketsServerBlockingStub(channel, callOptions);
}
};
return CollectPacketsServerBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static CollectPacketsServerFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<CollectPacketsServerFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<CollectPacketsServerFutureStub>() {
@java.lang.Override
public CollectPacketsServerFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new CollectPacketsServerFutureStub(channel, callOptions);
}
};
return CollectPacketsServerFutureStub.newStub(factory, channel);
}
/**
*/
public static abstract class CollectPacketsServerImplBase implements io.grpc.BindableService {
/**
*/
public void getPackets(com.yuandian.dataflow.proto.Base.Request request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Base.Response> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getGetPacketsMethod(), responseObserver);
}
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getGetPacketsMethod(),
io.grpc.stub.ServerCalls.asyncServerStreamingCall(
new MethodHandlers<
com.yuandian.dataflow.proto.Base.Request,
com.yuandian.dataflow.proto.Base.Response>(
this, METHODID_GET_PACKETS)))
.build();
}
}
/**
*/
public static final class CollectPacketsServerStub extends io.grpc.stub.AbstractAsyncStub<CollectPacketsServerStub> {
private CollectPacketsServerStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected CollectPacketsServerStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new CollectPacketsServerStub(channel, callOptions);
}
/**
*/
public void getPackets(com.yuandian.dataflow.proto.Base.Request request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Base.Response> responseObserver) {
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
getChannel().newCall(getGetPacketsMethod(), getCallOptions()), request, responseObserver);
}
}
/**
*/
public static final class CollectPacketsServerBlockingStub extends io.grpc.stub.AbstractBlockingStub<CollectPacketsServerBlockingStub> {
private CollectPacketsServerBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected CollectPacketsServerBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new CollectPacketsServerBlockingStub(channel, callOptions);
}
/**
*/
public java.util.Iterator<com.yuandian.dataflow.proto.Base.Response> getPackets(
com.yuandian.dataflow.proto.Base.Request request) {
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
getChannel(), getGetPacketsMethod(), getCallOptions(), request);
}
}
/**
*/
public static final class CollectPacketsServerFutureStub extends io.grpc.stub.AbstractFutureStub<CollectPacketsServerFutureStub> {
private CollectPacketsServerFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected CollectPacketsServerFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new CollectPacketsServerFutureStub(channel, callOptions);
}
}
private static final int METHODID_GET_PACKETS = 0;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final CollectPacketsServerImplBase serviceImpl;
private final int methodId;
MethodHandlers(CollectPacketsServerImplBase serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_GET_PACKETS:
serviceImpl.getPackets((com.yuandian.dataflow.proto.Base.Request) request,
(io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Base.Response>) responseObserver);
break;
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
}
private static abstract class CollectPacketsServerBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
CollectPacketsServerBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return com.yuandian.dataflow.proto.Base.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("CollectPacketsServer");
}
}
private static final class CollectPacketsServerFileDescriptorSupplier
extends CollectPacketsServerBaseDescriptorSupplier {
CollectPacketsServerFileDescriptorSupplier() {}
}
private static final class CollectPacketsServerMethodDescriptorSupplier
extends CollectPacketsServerBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final String methodName;
CollectPacketsServerMethodDescriptorSupplier(String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (CollectPacketsServerGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new CollectPacketsServerFileDescriptorSupplier())
.addMethod(getGetPacketsMethod())
.build();
}
}
}
return result;
}
}

View File

@ -1,289 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.io.BufferedReader;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author User
*
*/
public class ApmBaseDataFlow extends PacketBase {
private static Logger logger = LoggerFactory.getLogger(ApmBaseDataFlow.class);
public static int SIZE = 156;//88;//
/**
* 抓包口
*/
public long probeIf;
//四元组
/**
* 请求端口
*/
public int requestPort;
/**
* 响应端口
*/
public int responsePort;
/**
* 请求IP
*/
public long requestIp;
/**
* 响应Ip
*/
public long responseIp;
/**
* 源mac
*/
public long srcMac;
/**
* 目的mac
*/
public long dstMac;
/**
* 链路编号
*/
public long vlanId;
public long tvSec;
public long tvUsec;
/**
* 开始时间
*/
public long startTm;
/**
* 总字节数
*/
public long totalBytes;
/**
* 总包数
*/
public long totalPackets;
/**
* 总丢包数
*/
public long totalDropPackets;
/**
* 重传延时
*/
public long retranTimeDelay;
/**
* 客户端rtt
*/
public long clientRtt;
/**
* 服务端Rtt
*/
public long serverRtt;
/**
* 用户响应时间
*/
public long userResponseTime;
/**
* 服务响应时间
*/
public long serverResponseTime;
/**
* tcp回话连接失败数
*/
public long conFail;
/**
* 会话重置数
*/
// public long reset;
/**
* 服务端总字节数
*/
public long bytesIn;
/**
* 客户端总字节数
*/
public long bytesOut;
/**
* 探针推送时间
*/
public long timeFlag;
/**
* 结束时间
*/
public long endTm;
/**
* 结束时间微秒
*/
public long endTmUsec;
/**
* 总响应数
*/
public long responNum;
/**
* 客户端零窗口数
*/
public long csWindow;
/**
* 服务端零窗口数
*/
public long scWindow;
/**
* 客户端重置数
*/
public long csReset;
/**
* 服务端重置数
*/
public long scReset;
/**
* 客户端重传数
*/
public long csRetran;
/**
* 服务端重传数
*/
public long scRetran;
/**
* 会话建立时间
*/
public long connSetupTm;
/**
* 新建会话数
*/
public long newSession;
public long csAlert;
public long scAlert;
public String protocal;
@Override
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
ApmBaseDataFlow oneData = new ApmBaseDataFlow();
oneData.setPacketHeader(header);
oneData.probeIf = data.getLong(4);
oneData.requestPort = data.getInt( 2);
oneData.responsePort = data.getInt( 2);
oneData.requestIp = data.getLong(4);
oneData.responseIp = data.getLong(4);
oneData.srcMac = data.getLong(6);
oneData.dstMac = data.getLong(6);
oneData.vlanId = data.getLong(4);
oneData.tvSec = data.getLong(4);
oneData.tvUsec = data.getLong(4);
// oneData.startTm = data.getLong(8);
oneData.totalBytes = data.getLong(4);
oneData.totalPackets = data.getLong(4);
oneData.totalDropPackets = data.getLong(4);
oneData.retranTimeDelay = data.getLong(4);
oneData.clientRtt = data.getLong(4);
//服务端Rtt
oneData.serverRtt = data.getLong(4);
oneData.userResponseTime = data.getLong(4);
oneData.serverResponseTime = data.getLong(4);
oneData.conFail = data.getLong(4);
// oneData.reset = data.getLong(4);
oneData.bytesIn = data.getLong(4);
oneData.bytesOut = data.getLong(4);
oneData.timeFlag = data.getLong(4);
oneData.endTm = data.getLong(4);
oneData.endTmUsec = data.getLong(4);
oneData.responNum = data.getLong(4);
oneData.csWindow = data.getLong(4);
oneData.scWindow = data.getLong(4);
oneData.csReset = data.getLong(4);
oneData.scReset = data.getLong(4);
oneData.csRetran = data.getLong(4);
oneData.scRetran = data.getLong(4);
oneData.connSetupTm = data.getLong(4);
oneData.newSession = data.getLong(4);
oneData.csAlert = data.getLong(4);
oneData.scAlert = data.getLong(4);
oneData.protocal = data.slice().toString() ;
return oneData;
}
@Override
public String[] getData() {
return null;
}
@Override
public int getInterfaceNumber() {
return 0;
}
@Override
public int getUnitPacketLength() {
return 0;
}
}

View File

@ -1,100 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
public class AppFlow extends PacketBase {
public static int SIZE = 104;
public long srcIp;
public int srcPort;
public long dstIp;
public int dstPort;
public long startTvSec;
public long startTvUsec;
public long lastTvSec;
public long lastTvUsec;
public long endTvSec;
public long endTvUsec;
public int inputPackets;
public int outputPackets;
public int inputBytes;
public int outputBytes;
public String protocaol;
public int appId;
public int appGroupId;
public int probeIf;
public int appStyle;
public long timeFlag;
public int vlanId;
public int mplsLable;
public int tos;
@Override
public PacketBase Parse(PacketHeader header,ByteBuffer data)
throws Exception {
AppFlow nlf = new AppFlow();
nlf.srcIp = data.getInt(4);
nlf.srcPort = data.getInt(2);
nlf.dstIp = data.getInt(4);
nlf.dstPort = data.getInt(2);
nlf.startTvSec = data.getLong(4);
nlf.startTvUsec = data.getLong(4);
nlf.lastTvSec = data.getLong(4);
nlf.lastTvUsec = data.getLong(4);
nlf.endTvSec = data.getLong(4);
nlf.endTvUsec = data.getLong(4);
nlf.inputPackets = data.getInt(4);
nlf.outputPackets = data.getInt(4);
nlf.inputBytes = data.getInt(4);
nlf.outputBytes = data.getInt(4);
nlf.protocaol = data.alignedSlice(20).toString();
nlf.appId = data.getInt(4);
nlf.appGroupId = data.getInt(4);
nlf.probeIf = data.getInt(4);
nlf.appStyle = data.getInt(4);
nlf.timeFlag = data.getLong(4);
nlf.vlanId = data.getInt(4);
nlf.mplsLable = data.getInt(4);
nlf.tos = data.getInt(4);
return nlf;
}
@Override
public int getUnitPacketLength() {
return 0;
}
@Override
public String[] getData() {
return null;
}
@Override
public int getInterfaceNumber() {
return 0;
}
}

View File

@ -1,95 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
/**
* @author User
*
*/
public class BacktrackingFlow extends PacketBase{
public static int SIZE = 108;
//tuple StatisTuple10 56 10元组信息
private long macSrc; //源MAC 8
private long macDst; //目的MAC 8
private long ipSrc; //源IP 8
private long ipDst; //目的IP 8
private long portSrc; //源端口如果没有-1
private long portDst; //目标端口如果没有-1
private int l3Proto; //第三层协议ID如果没有-1
private int l4Proto; //第四层协议ID如果没有-1
private int tos; //Tos如果没有-1
private int vlanId; //vlan ID如果没有-1
private long bytes; // 8 字节总数
private long packets; // 8 数据包总数
private long tcpSp; // 8 tcp同步包数
private long tcpScpn;// 8 tcp同步确认包数
private long tcpSrp; // 8 tcp同步重置包数
private long appId; // 4 appID
private long appGroupId;// 4 app组ID
private long mplsLabel;// 4
@Override
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
BacktrackingFlow backFlow = new BacktrackingFlow();
backFlow.macSrc = data.getLong(8); //源MAC 8
backFlow.macDst = data.getLong(8); //目的MAC 8
backFlow.ipSrc = data.getLong(8); //源IP 8
backFlow.ipDst = data.getLong(8); //目的IP 8
backFlow.portSrc = data.getInt(4); //源端口如果没有-1
backFlow.portDst = data.getInt(4); //目标端口如果没有-1
backFlow.l3Proto = data.getInt(4); //第三层协议ID如果没有-1
backFlow.l4Proto = data.getInt(4); //第四层协议ID如果没有-1
backFlow.tos = data.getInt(4); //Tos如果没有-1
backFlow.vlanId = data.getInt(4); //vlan ID如果没有-1
backFlow.bytes = data.getLong(8); // 8 字节总数
backFlow.packets = data.getLong(8); // 8 数据包总数
backFlow.tcpSp = data.getLong(8); // 8 tcp同步包数
backFlow.tcpScpn = data.getLong(8);// 8 tcp同步确认包数
backFlow.tcpSrp = data.getLong(8); // 8 tcp同步重置包数
backFlow.appId = data.getInt(4); // 4 appID
backFlow.appGroupId = data.getInt(4);// 4 app组ID
backFlow.mplsLabel = data.getInt(4);// 4
return backFlow;
}
@Override
public int getUnitPacketLength() {
// TODO Auto-generated method stub
return 0;
}
@Override
public String[] getData() {
// TODO Auto-generated method stub
return null;
}
@Override
public int getInterfaceNumber() {
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -1,73 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
public class BasicTrafficFlow extends PacketBase {
public static int SIZE = 56;
public long capPort;
public int requestPort;
public int responsePort;
public long requestIp;
public long responseIp;
public long startTime;
public long totalBytes;
public long totalPackets;
public long spackets64;
public long spackets128;
public long spackets256;
public long spackets512;
public long spackets1024;
public long spackets;
public long sendTime;
@Override
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
BasicTrafficFlow btf = new BasicTrafficFlow();
btf.setPacketHeader(header);
btf.capPort = data.getLong(4);
data.position(data.position() + 12);
btf.startTime = data.getLong(4);
btf.totalBytes = data.getLong(4);
btf.totalPackets = data.getLong(4);
btf.spackets64 = data.getLong(4);
btf.spackets128 = data.getLong(4);
btf.spackets256 = data.getLong(4);
btf.spackets512 = data.getLong(4);
btf.spackets1024 = data.getLong(4);
btf.spackets = data.getLong(4);
btf.sendTime = data.getLong(4);
return btf;
}
@Override
public String[] getData() {
// TODO Auto-generated method stub
return null;
}
@Override
public int getInterfaceNumber() {
// TODO Auto-generated method stub
return 0;
}
@Override
public int getUnitPacketLength() {
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -1,29 +0,0 @@
package com.yuandian.dataflow.proto.decode;
public class BusinessBodyData {
public String relvanceDataId;
public long request_ip;
public int request_port;
public long response_ip;
public int response_port;
public long start_tv_sec;//开始时间秒
public long start_tv_usec;//开始时间毫秒
public long end_tv_sec;//结束时间秒
public long end_tv_usec;//结束时间微妙
public String resource_code;
public String noParameterRecognition;
public String originalRecognition;
public String requestCookie;
public String requestBodyContext;
public String responseBodyContext;
public int filterId;
public String business_detail_mesg;
}

View File

@ -1,231 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
// import com.ud.module.common.SystemConfigManager;
public class BussFlowDb extends PacketBase {
private int UNIT_DATA_LENGTH = 3362;
// redis资源归并,处理服务资源发现是需要设置识别串(正则表达式)
public String redisRegex;
public String id; //id
public int msg_len; //消息长度
public int msg_type; //消息类型
public long src_mac;
public long dst_mac;
public int protocol; //协议名
public String session_serial_number;//会话序列号
public String buss_type;//业务服务资源编码C_01
public long request_ip;//Web客户端IP
public int request_port;//Web客户端端口
public long response_ip;//Web服务器IP
public int response_port;//Web服务器端口
public long start_tv_sec;//Web开始时间秒
public long start_tv_usec;//开始时间毫秒
public long end_tv_sec;//结束时间秒
public long end_tv_usec;//结束时间微妙
public String disc_resource_ident; //weburl mid:apiBody dbsql
public String name; //web:操作系统 middinterfaceName db:db_name
//web midd
public String session_id;//sessionid
public int request_msg_length; //请求报文长度
public String request_msg_detail; //请求报文详情
public int response_msg_length; //响应报文长度
public String response_msg_detail; //响应报文详情
//web段
public String reter_url;
public String x_requested_with;
public long req_method; //请求方式
public String content_type; //请求类型
public String accept; //jieshou
public int req_cookie_leng;//请求cookie报文长度
public String req_cookie_detail;//请求cookie报文详情
public long t_intodb_time;
public int load_or_step; //0: 页面 1加载项 2非web段数据
public String business_detail_mesg;
public String bussiness_key_mesg; //关键字 格式key=val|key=val....
public int isUncomplete; //组包是否完全 0组包完整 1不完整
public int deal_state=1;
public int server_res_code;
public long server_response_time;
public long client_translate_time;
public String browser;
public int server_start_tv_sec;
public long server_start_tv_usec;
public int server_end_tv_sec;
public long server_end_tv_usec;
public String probe_ip;
public long probe_if;
public long server_translate_time;
public long time_flag;
public String base_code;
public String ori_sql;
public String reserved;
public long bytes_in;
public long bytes_out;
public int package_in;
public int package_out;
public String dataId;
public int filterId;
@Override
public int getUnitPacketLength() {
// TODO Auto-generated method stub
return UNIT_DATA_LENGTH;
}
@SuppressWarnings("unchecked")
public BussFlowDb Parse(PacketHeader header,ByteBuffer data)
throws Exception {
BussFlowDb buss = new BussFlowDb();
// buss.m_Header = header;
buss.msg_type = header.getTableID();
//
// buss.msg_version = data.getInt(1);
// offset += 1;
// buss.msg_seq = data.getInt(4);
//
// buss.msg_len = header.getMsg_len();
//
// UNIT_DATA_LENGTH=msg_len;
buss.src_mac = data.getLong(8);
buss.dst_mac = data.getLong(8);
boolean macFilterFlag = false;
if(macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){
return null;
}
data.position(data.position() + 28);
buss.protocol= data.getInt(4);
buss.bytes_out = data.getLong(4);
buss.bytes_in = data.getLong(4);
buss.package_out = data.getInt(4);
buss.package_in = data.getInt(4);
buss.session_serial_number = data.alignedSlice(24).toString();
buss.probe_if= data.getInt();
data.position(data.position() + 24);
buss.name = data.alignedSlice(64).toString();
buss.request_ip=data.getLong();
buss.request_port= data.getInt();
buss.response_ip=data.getLong();
buss.response_port= data.getInt();
// buss.deal_state= data.getInt();
buss.server_translate_time=data.getLong();
buss.client_translate_time=data.getLong();
buss.server_response_time=data.getLong();
buss.server_start_tv_sec= data.getInt();
buss.server_start_tv_usec=data.getLong();
//结束时间
buss.server_end_tv_sec= data.getInt();
buss.server_end_tv_usec=data.getLong();
buss.start_tv_sec=data.getLong();
buss.start_tv_usec=data.getLong();
//结束时间
buss.end_tv_sec=data.getLong();
buss.end_tv_usec=data.getLong();
buss.isUncomplete = data.getInt(4);
buss.time_flag= data.getInt(4);
int sqlLen = data.getInt(4);
int detailMsgLen = data.getInt(4);
buss.request_msg_length = data.getInt(4);
buss.response_msg_length = data.getInt(4);
int reservedLen = data.getInt(4);
buss.ori_sql = data.alignedSlice(sqlLen).toString();
buss.business_detail_mesg= data.alignedSlice( detailMsgLen).toString();
;
buss.request_msg_detail = utils.ByteBufferUTF8String(data, buss.request_msg_length);
buss.response_msg_detail = utils.ByteBufferUTF8String(data, buss.response_msg_length);
buss.reserved = data.alignedSlice( reservedLen).toString();
return buss;
}
public String getBussType() {
return buss_type;
}
@Override
public String[] getData() {
String data[] = new String[45];
return data;
}
@Override
public int getInterfaceNumber() {
return 0;
}
// @Override
// public PacketBase Parse(PacketHeader header, ByteBuffer data) throws Exception {
// // TODO Auto-generated method stub
// return null;
// }
/**
* 如果已经存在这个session_id即key则说明是重复数据不再进行后续操作
* @param buss
* @return 存在返回true 不存在返回false
*/
/*private boolean reduceByKey(BussFlowDb buss) {
String coding = CodingGeneration.md5GenCoding2(buss.ori_sql);
String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
//分布式同步锁如果存在该key或者锁已经被获取则返回false设置
if(redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"")){
redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
return false;
}
return true;
}*/
}

View File

@ -1,137 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
import java.util.Date;
public class BussFlowExternal extends PacketBase {
private final int UNIT_DATA_LENGTH =871;
@Override
public int getUnitPacketLength() {
return UNIT_DATA_LENGTH;
}
public BussFlowExternal Parse(PacketHeader header,ByteBuffer data)
throws Exception {
if (data.position() + UNIT_DATA_LENGTH > data.array().length)
throw new Exception("data length error!");
BussFlowExternal buss = new BussFlowExternal();
// buss.m_Header = header;
buss.probe_if= data.getLong(4);
buss.channel= data.alignedSlice(24).toString();
buss.interfaceid= data.alignedSlice(100).toString();
buss.systemName= data.alignedSlice(32).toString();
buss.net_type= data.alignedSlice(6).toString();
buss.net_segment = data.alignedSlice( 5).toString();
buss.session_id= data.alignedSlice(80).toString();
buss.phoneid= data.alignedSlice(12).toString();
buss.request_ip= data.getLong(4);
buss.request_port= data.getLong(4);
buss.response_ip= data.getLong(4);
buss.response_port= data.getLong(4);
//开始时间
buss.start_tv_sec= data.getLong(4);
buss.start_tv_usec= data.getLong(4);
//结束时间
buss.end_tv_sec= data.getLong(4);
buss.end_tv_usec= data.getLong(4);
buss.deal_state= data.getInt(4);
buss.server_translate_time= data.getLong(8);
buss.server_response_time= data.getLong(8);
buss.begin_url= data.alignedSlice(100).toString();
buss.operating_sytem= data.alignedSlice(20).toString();
buss.server_res_code= data.getInt(2);
buss.browser= data.alignedSlice(30).toString();
buss.business_detail_mesg= data.alignedSlice(200).toString();
buss.business_involve_msg = data.alignedSlice(200).toString();
isUncomplete = data.getInt( 4);
buss.time_flag= data.getLong( 4);
return buss;
}
public String id;
public Long probe_if;//接口号
public String channel;//营业厅渠道前台或者分析服务器给出,渠道标识
public String systemName; //外部系统名称
public String interfaceid;// 业务接口编码
public String session_id;//sessionid
public String phoneid;//受理手机号码
public String net_type;
public String net_segment;//网段标识(客户-web)
public Long request_ip;//Web客户端IP
public Long request_port;//Web客户端端口
public Long response_ip;//Web服务器IP
public Long response_port;//Web服务器端口
public Long start_tv_sec;//Web开始时间秒
public Long start_tv_usec;//开始时间微秒
public Long end_tv_sec;//结束时间秒
public Long end_tv_usec;//结束时间微妙
public Integer deal_state;//Web操作成功/失败标识1成功0失败
public Long server_translate_time;//Web服务器传输耗时
public Long server_response_time;//Web服务器响应时间
public String begin_url;//url
public String operating_sytem;//操作系统
public Integer server_res_code; //Web系统返回码
public String browser;//浏览器
public String business_detail_mesg;//要获取的指标
public Date insert_time;//插入时间
public String business_involve_msg; //要关联的指标
public Integer isUncomplete;
public Long time_flag;
@Override
public String[] getData() {
String data[] = new String[37];
return data;
}
@Override
public int getInterfaceNumber() {
return Integer.parseInt(probe_if.toString());
}
}

View File

@ -1,250 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
public class BussFlowMidd extends PacketBase {
private int UNIT_DATA_LENGTH =3569;
public String id; //id
public long src_mac;
public long dst_mac;
public int msg_len; //消息长度
public int msg_type; //消息类型
public int protocol; //协议名
public String session_serial_number;//会话序列号
public String buss_type;//业务服务资源编码C_01
public String net_segment;//网段标识(客户-web)
public long request_ip;//Web客户端IP
public int request_port;//Web客户端端口
public long response_ip;//Web服务器IP
public int response_port;//Web服务器端口
public long start_tv_sec;//Web开始时间秒
public long start_tv_usec;//开始时间毫秒
public long end_tv_sec;//结束时间秒
public long end_tv_usec;//结束时间微妙
public String disc_resource_ident; //weburl mid:apiBody dbsql
public String name; //web:操作系统 middinterfaceName db:db_name
//web midd
public String session_id;//sessionid
public int request_msg_length; //请求报文长度
public String request_msg_detail; //请求报文详情
public int response_msg_length; //响应报文长度
public String response_msg_detail; //响应报文详情
//web段
public String reter_url;
public String x_requested_with;
public long req_method; //请求方式
public String content_type; //请求类型
public String accept; //jieshou
public int req_cookie_leng;//请求cookie报文长度
public String req_cookie_detail;//请求cookie报文详情
public long t_intodb_time;
public int load_or_step; //0: 页面 1加载项 2非web段数据
public String business_detail_mesg;
public String bussiness_key_mesg; //关键字 格式key=val|key=val....
public int isUncomplete; //组包是否完全 0组包完整 1不完整
public int deal_state=1;
public int server_res_code;
public long server_response_time;
public long client_translate_time;
public String browser;
public int server_start_tv_sec;
public long server_start_tv_usec;
public int server_end_tv_sec;
public long server_end_tv_usec;
public String probe_ip;
public int probe_if;
public long server_translate_time;
public long time_flag;
public String channel;
public String base_code;
public String ori_api;
public String remain_data;
public long bytes_in;
public long bytes_out;
public int package_in;
public int package_out;
public String dataId;
public int filterId;
@Override
public int getUnitPacketLength() {
// TODO Auto-generated method stub
return UNIT_DATA_LENGTH;
}
public BussFlowMidd Parse(PacketHeader header,ByteBuffer data)
throws Exception {
BussFlowMidd buss = new BussFlowMidd();
// buss.m_Header = header;
buss.msg_type = header.getTableID();
buss.src_mac = data.getLong(8);
buss.dst_mac = data.getLong(8);
var macFilterFlag = false ;
if(macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){
return null;
}
data.position( data.position() + 28);
buss.protocol= data.getInt( 4);
buss.bytes_out = data.getLong(4);
buss.bytes_in = data.getLong(4);
buss.package_out = data.getInt( 4);
buss.package_in = data.getInt( 4);
buss.session_serial_number = data.alignedSlice( 24).toString();
data.position( data.position() + 4);
// buss.req_method = data.getLong(4);
//
// buss.content_type = data.alignedSlice( 40).toString();
// offset += 40;
// buss.accept = data.alignedSlice( 40).toString();
// offset += 40;
buss.probe_if= data.getInt(4);
// buss.buss_type =data.alignedSlice(10).toString();
// offset += 10;
buss.channel=data.alignedSlice(24).toString();
// buss.session_id=data.alignedSlice(36).toString();
// offset += 36;
// buss.net_segment=data.alignedSlice(6).toString();
// offset += 6;
buss.request_ip=data.getLong();
buss.request_port= data.getInt(4);
buss.response_ip=data.getLong();
buss.response_port= data.getInt(4);
//开始时间
buss.start_tv_sec=data.getLong();
buss.start_tv_usec=data.getLong();
//结束时间
buss.end_tv_sec=data.getLong();
buss.end_tv_usec=data.getLong();
// buss.deal_state= data.getInt(4);
buss.server_res_code = data.getInt( 2);
buss.server_translate_time=data.getLong();
buss.server_start_tv_sec= data.getInt(4);
buss.server_start_tv_usec=data.getLong();
//结束时间
buss.server_end_tv_sec= data.getInt(4);
buss.server_end_tv_usec=data.getLong();
buss.server_response_time=data.getLong();
buss.client_translate_time=data.getLong();
buss.isUncomplete = data.getInt( 4);
buss.time_flag= data.getInt( 4);
int detailMsgLen = data.getInt( 4);
int keyMsgLen = data.getInt( 4);
int apiLen = data.getInt( 4);
buss.request_msg_length = data.getInt( 4);
buss.response_msg_length = data.getInt( 4);
int remainLen = data.getInt( 4);
buss.business_detail_mesg=data.alignedSlice(detailMsgLen).toString();
buss.bussiness_key_mesg = data.alignedSlice( keyMsgLen).toString();
buss.ori_api=data.alignedSlice( apiLen).toString();
buss.request_msg_detail = utils.ByteBufferUTF8String(data, buss.request_msg_length);
buss.response_msg_detail = utils.ByteBufferUTF8String(data, buss.response_msg_length);
data.position(data.position()+remainLen);
/*if(SystemConfigManager.projectName.contains("GDYDBPM")){
boolean exist=false;
if(SystemConfigManager.reduceFlag){
redisTemplate = (RedisTemplate<String, String>) ApplicationContextUtil.getBean("redisTemplate");
exist = reduceByKey(buss);
}
//不存在就加入缓存
if(!exist)return buss;
//存在就返回空
return null;
}*/
return buss;
}
public String getBussType() {
return buss_type;
}
@Override
public String[] getData() {
String data[] = new String[35];//贵阳
return data;
}
@Override
public int getInterfaceNumber() {
return 0;
}
/**
* 如果已经存在这个session_id即key则说明是重复数据不再进行后续操作
* @param buss
* @return
*/
/*private boolean reduceByKey(BussFlowMidd buss) {
String coding = CodingGeneration.md5GenCoding2(buss.ori_api);
String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
//分布式同步锁如果存在该key或者锁已经被获取则返回false设置
if(redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"")){
redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
return false;
}
return true;
}*/
}

View File

@ -1,111 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
public class BussFlowOrl extends PacketBase {
public int msg_type;
public int msg_version;
public int msg_seq;
public int msg_len;
public long request_mac;
public long response_mac;
public long request_ip;
public int request_port;
public long response_ip;
public int response_port;
public int probeif;
public int protocol;
public long start_tv_sec;//Web开始时间秒
public long start_tv_usec;//开始时间毫秒
public long end_tv_sec;//结束时间秒
public long end_tv_usec;//结束时间微妙
public int req_len;
public int res_len;
public int busi_msg_len;
public int key_msg_len;
public int detail_msg_len;
public int remain_len;
public String business_code;
public String sessionid;
public String req_data;
public String res_data;
public String busi_msg;
public String busi_key_msg;
public String busi_detail_msg;
public String remain_data;
@Override
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
BussFlowOrl orl = new BussFlowOrl();
orl.msg_type = header.getNowType();
orl.request_mac =data.getLong(8);
orl.response_mac =data.getLong(8);
orl.request_ip =data.getLong(4);
orl.request_port = data.getInt(4);
orl.response_ip =data.getLong(4);
orl.response_port = data.getInt(4);
orl.probeif = data.getInt(4);
orl.protocol = data.getInt(4);
orl.start_tv_sec =data.getLong(4);
orl.start_tv_usec =data.getLong(4);
orl.end_tv_sec =data.getLong(4);
orl.end_tv_usec =data.getLong(4);
int req_len = data.getInt(4);
int res_len = data.getInt(4);
int busi_msg_len = data.getInt(4);
int key_msg_len = data.getInt(4);
int detail_msg_len = data.getInt(4);
int remain_len = data.getInt(4);
orl.business_code = data.alignedSlice( 32).toString();
orl.sessionid = data.alignedSlice( 80).toString();
orl.req_data = data.alignedSlice( req_len).toString();
orl.res_data = data.alignedSlice( res_len).toString();
orl.busi_msg = data.alignedSlice( busi_msg_len).toString();
orl.busi_key_msg = data.alignedSlice( key_msg_len).toString();
orl.busi_detail_msg = data.alignedSlice( detail_msg_len).toString();
orl.remain_data = data.alignedSlice( remain_len).toString();
return orl;
}
@Override
public int getUnitPacketLength() {
return 0;
}
@Override
public String[] getData() {
return new String[0];
}
@Override
public int getInterfaceNumber() {
return 0;
}
}

View File

@ -1,353 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.yuandian.dataflow.proto.decode.PacketBase;
import com.yuandian.dataflow.proto.decode.PacketHeader;
public class BussFlowWeb extends PacketBase {
private Integer UNIT_DATA_LENGTH =6104;
private static int i = 1;
@Override
public int getUnitPacketLength() {
return UNIT_DATA_LENGTH;
}
@SuppressWarnings("unchecked")
public BussFlowWeb Parse(PacketHeader header,ByteBuffer data)
throws Exception {
i++;
BussFlowWeb buss = new BussFlowWeb();
// buss.m_Header = header;
buss.msg_type = header.getTableID();
// offset += 4;
// buss.msg_version = data.getInt(1);
// offset += 1;
// buss.msg_seq = data.getInt(4);
// offset += 4;
// buss.msg_len = data.getInt(4);
// offset += 4;
buss.src_mac = data.getLong(8);
buss.dst_mac = data.getLong(8);
data.position(data.position()+20);
buss.protocol=data.getInt(4);
buss.bytes_out = data.getLong(4);
buss.bytes_in = data.getLong(4);
buss.package_out = data.getInt(4);
buss.package_in = data.getInt(4);
buss.session_serial_number = data.alignedSlice(24).toString(); //5268109200
// buss.recog_status = data.getInt(4);
data.position(data.position()+4);
buss.req_method = data.getLong(4); //1
buss.content_type = data.alignedSlice(40).toString();
buss.accept = data.alignedSlice(40).toString(); //image/png, image/svg+xml, image/*;q=0.8
buss.probe_if=data.getInt();
buss.channel=data.alignedSlice(4).toString();
buss.session_id=data.alignedSlice(0).toString(); //7Ip6bYet8n0wfxeJMJtAm7v2FW76ZTImJVyNjbYoRbUdUf71QaxQ!-202316728
buss.request_ip=data.getLong(); //3232235850
buss.request_port=data.getInt(); //3309
buss.response_ip=data.getLong(); //1971882754
buss.response_port=data.getInt(); //80
//开始时间
buss.start_tv_sec=data.getLong(); //1492473697
buss.start_tv_usec=data.getLong(); //459461
//结束时间
buss.end_tv_sec=data.getLong(); //1492473697
buss.end_tv_usec=data.getLong(); //459490
buss.server_translate_time=data.getLong();
buss.server_start_tv_sec=data.getInt();
buss.server_start_tv_usec=data.getLong();
//结束时间
buss.server_end_tv_sec=data.getInt();
buss.server_end_tv_usec=data.getLong();
buss.server_response_time=data.getLong();
buss.client_translate_time=data.getLong();
data.position(data.position()+24);
buss.x_requested_with = data.alignedSlice(20).toString();
buss.operating_system=data.alignedSlice(0).toString(); //Windows NT 6.1
buss.server_res_code=data.getInt();
buss.browser=data.alignedSlice(0).toString();
buss.isUncomplete = data.getInt(4);
buss.time_flag=data.getInt(4);
int detailMsgLen = data.getInt(4);
int keyMsgLen = data.getInt(4);
int reqMsgLeng= data.getInt(4);
buss.request_msg_length =reqMsgLeng;
int resMsgLength=data.getInt(4);
buss.response_msg_length = resMsgLength;
int cookieLength=data.getInt(4);
buss.req_cookie_leng=cookieLength;
int url_len=data.getInt(4);
int refer_len=data.getInt(4);
int remainLen = data.getInt(4);
// if (SystemConfigManager.systemCode.equals("JXYDBPM")) {
// buss.business_detail_mesg=JXYDBPMDetailMesgUtils.decodeValue(data.alignedSlice(etailMsgLen).toString());
// } else {
// buss.business_detail_mesg=data.alignedSlice(etailMsgLen).toString();
// }
buss.business_detail_mesg=data.alignedSlice(detailMsgLen).toString();
buss.bussiness_key_mesg = data.alignedSlice(keyMsgLen).toString();
buss.request_msg_detail = utils.ByteBufferUTF8String(data, reqMsgLeng);
buss.response_msg_detail = utils.ByteBufferUTF8String(data, resMsgLength);
buss.req_cookie_detail = data.alignedSlice(cookieLength).toString();
buss.ori_url=data.alignedSlice(url_len).toString(); // /images/index/220130.gif
if(buss.ori_url!=null && !"".equals(buss.ori_url.trim())){
String result = buss.ori_url.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", "");
if(isMessyCode(result)){//如果中文乱码
// String withoutValue = URLUtil.getInsertURL(buss.ori_url, Constants.SPLIT_NULL).trim();//去参数值
// String result2 = withoutValue.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", "");
// if(isMessyCode(result2)){//判断是否还乱码
// buss.ori_url="";
// }else{
// buss.ori_url=withoutValue;
// }
};
}
buss.reter_url = data.alignedSlice(refer_len).toString(); // http://www.10086.cn/gd/index_200_200.html
buss.remain_data = data.alignedSlice(remainLen).toString();
/*if(SystemConfigManager.projectName.contains("GDYDBPM")){
boolean exist=false;
if(SystemConfigManager.reduceFlag){
redisTemplate = (RedisTemplate<String, String>) ApplicationContextUtil.getBean("redisTemplate");
exist = reduceByKey(buss);
}
//不存在就加入缓存
if(!exist)return buss;
//存在就返回空
return null;
}
*/
return buss;
}
/**
* 如果已经存在这个key则说明是重复数据不再进行后续操作
* @param buss
* @return
*/
/*private boolean reduceByKey(BussFlowWeb buss) {
String coding = CodingGeneration.md5GenCoding2(buss.ori_url);
String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
//分布式同步锁如果存在该key或者锁已经被获取则返回false设置
boolean flag=redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"");
if(flag){
redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
//原来不存在这条数据
return false;
}
return true;
}*/
/**
* 判断字符串是否是乱码
*
* @param strName 字符串
* @return 是否是乱码
*/
public static boolean isMessyCode(String strName) {
Pattern p = Pattern.compile("\\s*|\t*|\r*|\n*");
Matcher m = p.matcher(strName);
String after = m.replaceAll("").replace("-", "").replace("|", "");
String temp = after.replaceAll("\\p{P}", "");
char[] ch = temp.trim().toCharArray();
float chLength = 0 ;
float count = 0;
for (int i = 0; i < ch.length; i++) {
char c = ch[i];
if (!Character.isLetterOrDigit(c)) {
if (!isChinese(c)) {
count = count + 1;
}
chLength++;
}
}
float result = count / chLength ;
if (result > 0.4) {
return true;
} else {
return false;
}
}
private static boolean isChinese(char c) {
Character.UnicodeBlock ub = Character.UnicodeBlock.of(c);
if (ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS
|| ub == Character.UnicodeBlock.CJK_COMPATIBILITY_IDEOGRAPHS
|| ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS_EXTENSION_A
|| ub == Character.UnicodeBlock.GENERAL_PUNCTUATION
|| ub == Character.UnicodeBlock.CJK_SYMBOLS_AND_PUNCTUATION
|| ub == Character.UnicodeBlock.HALFWIDTH_AND_FULLWIDTH_FORMS) {
return true;
}
return false;
}
public String id; //id
public int msg_len; //消息长度
public int msg_type; //消息类型
public long src_mac;
public long dst_mac;
public int protocol; //协议名
public String session_serial_number;//会话序列号
public String buss_type;//业务服务资源编码C_01
// public String net_segment;//网段标识(客户-web)
public long request_ip;//Web客户端IP
public int request_port;//Web客户端端口
public long response_ip;//Web服务器IP
public int response_port;//Web服务器端口
public long start_tv_sec;//Web开始时间秒
public long start_tv_usec;//开始时间毫秒
public long end_tv_sec;//结束时间秒
public long end_tv_usec;//结束时间微妙
public String disc_resource_ident; //weburl mid:apiBody dbsql
public String operating_system; //web:操作系统 middinterfaceName db:db_name
//web midd
public String session_id;//sessionid
public int request_msg_length; //请求报文长度
public String request_msg_detail; //请求报文详情
public int response_msg_length; //响应报文长度
public String response_msg_detail; //响应报文详情
//web段
public String reter_url;
public String x_requested_with;
public long req_method; //请求方式
public String content_type; //请求类型
public String accept; //jieshou
public int req_cookie_leng;//请求cookie报文长度
public String req_cookie_detail;//请求cookie报文详情
public long t_intodb_time;
public int load_or_step; //0: 页面 1加载项 2非web段数据
public String business_detail_mesg;
public String bussiness_key_mesg; //关键字 格式key=val|key=val....
public int isUncomplete; //组包是否完全 0组包完整 1不完整
public int deal_state=1;
public int server_res_code;
public long server_response_time;
public long client_translate_time;
public String browser;
public int server_start_tv_sec;
public long server_start_tv_usec;
public int server_end_tv_sec;
public long server_end_tv_usec;
public String probe_ip;
public int probe_if;
public long server_translate_time;
public long time_flag;
public String channel;
public String base_code;
public String ori_url;
public String remain_data;
public long bytes_in;
public long bytes_out;
public int package_in;
public int package_out;
public String dataId;
public int filterId;
public String whiteCharacter;
// tokenId
public String tokenId;
// 判断是否是首端资源( 2 )
public Integer segmentId;
@Override
public String[] getData() {
String data[] = new String[38];
return data;
}
public String getBussType() {
return buss_type;
}
@Override
public int getInterfaceNumber() {
return 0;
}
}

View File

@ -1,381 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Date;
import io.netty.buffer.ByteBuf;
/**
* 移植zjm流量信息接收类
* Leehr
*
*/
public class DataBaseModel extends PacketBase implements Serializable{
private static final long serialVersionUID = 1L;
private static final String split=";";
private long mac_src; //源MAC
private long mac_dst; //目标MAC
private long ip_src; //源IP
private long ip_dst; //目标IP
private long port_src; //源端口如果没有-1
private long port_dst; //目标端口如果没有-1
private long l3_proto; //第三层协议ID如果没有-1
private long l4_proto; //第四层协议ID如果没有-1
private long tos; //Tos一个字节如果没有-1
private long vlan_id; //vlan ID如果没有-1
private long bytes; //字节总数
private long packets; //数据包总数
private long packets_syn; //TCP同步包数
private long packets_syn_ack; //TCP同步确认包数
private long packets_syn_rst; //TCP同步重置包数
private long timestamp; //时间戳,
private long appid;//应用ID
private long app_group_id;
private long mpls_label;
private long pkts_syn_rx; //tcp同步包接收
private long pkts_syn_ack_rx;//tcp同步确认包接收
private long pkts_syn_rst_rx; //tcp同步重置包接收
private long pkts_fin;//tcp终止包接收
private long pkts_rst;//tcp重置包接收
private long bytes_rx;//字节收
private long packets_rx;//数据包收
private long probe_time_sec;
private Date probe_time;
private Date createTime;
public String getKey(){
StringBuffer strb=new StringBuffer();
strb.append(timestamp).append(split);
strb.append(mac_src).append(split);
strb.append(mac_dst).append(split);
strb.append(ip_src).append(split);
strb.append(ip_dst).append(split);
strb.append(port_src).append(split);
strb.append(port_dst).append(split);
strb.append(l3_proto).append(split);
strb.append(l4_proto).append(split);
strb.append(tos).append(split);
strb.append(vlan_id).append(split);
strb.append(appid);
return strb.toString();
}
public DataBaseModel Parse(ByteBuffer data)throws Exception {
if (data.position() + 108 > data.array().length)
throw new Exception("data length error!");
DataBaseModel db = new DataBaseModel();
db.mac_src = data.getLong( 8);
db.mac_dst = data.getLong( 8);
db.ip_src = data.getLong( 8);
db.ip_dst = data.getLong( 8);
db.port_src = data.getInt(4);
db.port_dst = data.getInt(4);
db.l3_proto = data.getInt(4);
db.l4_proto = data.getInt(4);
db.tos = data.getInt(4);
db.vlan_id = data.getInt(4);
db.bytes = data.getLong( 8);
db.packets = data.getLong( 8);
db.packets_syn = data.getLong( 8);
db.packets_syn_ack = data.getLong( 8);
db.packets_syn_rst = data.getLong( 8);
/**
* 2015-12-10txy 新增3个解析
*/
db.appid=data.getLong( 4);
db.app_group_id=data.getLong( 4);
db.mpls_label=data.getLong( 4);
return db;
}
public String toString(){
StringBuffer strb = new StringBuffer();
strb.append(timestamp).append(",");
strb.append(mac_src).append(",");
strb.append(mac_dst).append(",");
strb.append(ip_src).append(",");
strb.append(ip_dst).append(",");
strb.append(port_src).append(",");
strb.append(port_dst).append(",");
strb.append(l3_proto).append(",");
strb.append(l4_proto).append(",");
strb.append(tos).append(",");
strb.append(vlan_id).append(",");
strb.append(appid).append(",");
strb.append(packets_syn).append(",");
strb.append(pkts_syn_rx).append(",");
strb.append(packets_syn_ack).append(",");
strb.append(pkts_syn_ack_rx).append(",");
strb.append(packets_syn_rst).append(",");
strb.append(pkts_syn_rst_rx).append(",");
strb.append(pkts_fin).append(",");
strb.append(pkts_rst).append(",");
strb.append(bytes).append(",");
strb.append(packets).append(",");
strb.append(bytes_rx).append(",");
strb.append(packets_rx).append(",");
strb.append(app_group_id).append(",");
strb.append(mpls_label).append("\n");
return strb.toString();
}
public long getProbeTimeSec() {
return probe_time_sec;
}
public void setProbe_time_sec(long probe_time_sec) {
this.probe_time_sec = probe_time_sec;
}
public Date getProbe_time() {
return probe_time;
}
public void setProbe_time(Date probe_time) {
this.probe_time = probe_time;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public long getMac_src() {
return mac_src;
}
public void setMac_src(long macSrc) {
mac_src = macSrc;
}
public long getMac_dst() {
return mac_dst;
}
public void setMac_dst(long macDst) {
mac_dst = macDst;
}
public long getIp_src() {
return ip_src;
}
public void setIp_src(long ipSrc) {
ip_src = ipSrc;
}
public long getIp_dst() {
return ip_dst;
}
public void setIp_dst(long ipDst) {
ip_dst = ipDst;
}
public long getPort_src() {
return port_src;
}
public void setPort_src(long portSrc) {
port_src = portSrc;
}
public long getPort_dst() {
return port_dst;
}
public void setPort_dst(long portDst) {
port_dst = portDst;
}
public long getL3_proto() {
return l3_proto;
}
public void setL3_proto(long l3Proto) {
l3_proto = l3Proto;
}
public long getL4_proto() {
return l4_proto;
}
public void setL4_proto(long l4Proto) {
l4_proto = l4Proto;
}
public long getTos() {
return tos;
}
public void setTos(long tos) {
this.tos = tos;
}
public long getVlan_id() {
return vlan_id;
}
public void setVlan_id(long vlanId) {
vlan_id = vlanId;
}
public long getAppid() {
return appid;
}
public void setAppid(long appid) {
this.appid = appid;
}
public long getPkts_syn_rx() {
return pkts_syn_rx;
}
public void setPkts_syn_rx(long pktsSynRx) {
pkts_syn_rx = pktsSynRx;
}
public long getPkts_syn_ack_rx() {
return pkts_syn_ack_rx;
}
public void setPkts_syn_ack_rx(long pktsSynAckRx) {
pkts_syn_ack_rx = pktsSynAckRx;
}
public long getPkts_syn_rst_rx() {
return pkts_syn_rst_rx;
}
public void setPkts_syn_rst_rx(long pktsSynRstRx) {
pkts_syn_rst_rx = pktsSynRstRx;
}
public long getPkts_fin() {
return pkts_fin;
}
public void setPkts_fin(long pktsFin) {
pkts_fin = pktsFin;
}
public long getPkts_rst() {
return pkts_rst;
}
public void setPkts_rst(long pktsRst) {
pkts_rst = pktsRst;
}
public long getBytes() {
return bytes;
}
public void setBytes(long bytes) {
this.bytes = bytes;
}
public long getPackets() {
return packets;
}
public void setPackets(long packets) {
this.packets = packets;
}
public long getBytes_rx() {
return bytes_rx;
}
public void setBytes_rx(long bytesRx) {
bytes_rx = bytesRx;
}
public long getPackets_rx() {
return packets_rx;
}
public void setPackets_rx(long packetsRx) {
packets_rx = packetsRx;
}
public long getPackets_syn() {
return packets_syn;
}
public void setPackets_syn(long packetsSyn) {
packets_syn = packetsSyn;
}
public long getPackets_syn_ack() {
return packets_syn_ack;
}
public void setPackets_syn_ack(long packetsSynAck) {
packets_syn_ack = packetsSynAck;
}
public long getPackets_syn_rst() {
return packets_syn_rst;
}
public void setPackets_syn_rst(long packetsSynRst) {
packets_syn_rst = packetsSynRst;
}
public long getApp_group_id() {
return app_group_id;
}
public void setApp_group_id(long app_group_id) {
this.app_group_id = app_group_id;
}
public long getMpls_label() {
return mpls_label;
}
public void setMpls_label(long mpls_label) {
this.mpls_label = mpls_label;
}
public static String getSplit() {
return split;
}
@Override
public PacketBase Parse(PacketHeader header,ByteBuffer data)
throws Exception {
return null;
}
@Override
public int getUnitPacketLength() {
return 0;
}
@Override
public String[] getData() {
return null;
}
@Override
public int getInterfaceNumber() {
return 0;
}
}

View File

@ -1,112 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
public class DataFlow extends PacketBase {
private final int UNIT_DATA_LENGTH = 157;//贵阳
public String id;
public Integer msg_type; //消息类型
public Integer msg_version; //消息版本
public Integer msg_seq; //序列号
public Integer msg_len; //消息长度
public Integer probe_if; //接口号
public Long timestamp; //时间戳
public Long mac_src; //源物理地址
public Long mac_dst; //目的物理地址
public Integer vlan_id; //vlan_id
public Long l3_proto; //l3层协议
public Long l4_proto; //l4层协议
public Integer tos; //tos
public Integer retran_count; //重传次数
public Integer reset_count; //重置次数
public Integer zerowin_count; //零窗口次数
public Integer protocol; //协议名
public Long seq;
public Long ack;
public Integer recog_status; //识别类型标识
public Long bytes; //总字节
public Long packets; //总包数
public Integer start_tv_sec;//Web开始时间秒
public Long start_tv_usec;//开始时间毫秒
public Integer end_tv_sec;//结束时间秒
public Long end_tv_usec;//结束时间微妙
public Integer server_start_tv_sec;//服务器响应开始时间秒
public Long server_start_tv_usec;//服务器响应开始时间毫秒
public Integer server_end_tv_sec;//服务器响应结束时间秒
public Long server_end_tv_usec;//服务器响应结束时间微妙
public Long server_response_time;//Web服务器响应时间
public Long client_translate_time;//Web客户端传输耗时
public Long server_translate_time;//Web服务器传输耗时
public Long bytes_in;
public Long bytes_out;
public Long packets_in;
public Long packets_out;
public Long ip_src; //源IP
public Long ip_dst; //目的IP
public Long port_src; //源端口
public Long port_dst; //目的端口
public Long probeIP; //探针IP
public Long intodb_time;
public Long count = 1L;
@Override
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
if (data.position() + UNIT_DATA_LENGTH > data.array().length)
throw new Exception("data length error!");
DataFlow dflow = new DataFlow();
// dflow.m_Header = header;
dflow.msg_type = header.getTableID();
dflow.ip_src = data.getLong(4);
dflow.ip_dst = data.getLong(4);
data.position(data.position()+4);
dflow.probe_if = data.getInt(4);
data.position(data.position() + 32 + 12 + 12 + 8 );
dflow.bytes_in = data.getLong(4);
dflow.bytes_out = data.getLong(4);
dflow.packets_in = data.getLong(4);
dflow.packets_out = data.getLong(4);
dflow.bytes = data.getLong(4);
dflow.packets = data.getLong(4);
data.position(data.position() + 12 + 12 + 16 + 16 );
return dflow;
}
@Override
public int getUnitPacketLength() {
return UNIT_DATA_LENGTH;
}
@Override
public String[] getData() {
String data[] = new String[45];
return data;
}
@Override
public int getInterfaceNumber() {
return Integer.parseInt(probe_if.toString());
}
}

View File

@ -1,80 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
import lombok.ToString;
@ToString
public abstract class PacketBase {
protected PacketHeader m_Header = null;
public PacketHeader getPacketHead(){
return m_Header;
}
public void setPacketHeader(PacketHeader header)
{
this.m_Header = header;
}
//抽象方法每个数据操作类实现不一样
public abstract PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception;
public abstract int getUnitPacketLength();
public abstract String[] getData();
public abstract int getInterfaceNumber();
public static PacketBase createPacketBase(PacketHeader packetHeader) {
// 获取报文头
// 响应表ID
int table_id = packetHeader.getTableID();
switch (table_id) {
case 17:
AppFlow appFlow = new AppFlow();
appFlow.setPacketHeader(packetHeader);
return appFlow;
case 18:
QoeFlow qoeFlow = new QoeFlow();
qoeFlow.setPacketHeader(packetHeader);
return qoeFlow;
case 20:
SstFlow sstFlow = new SstFlow();
sstFlow.setPacketHeader(packetHeader);
return sstFlow;
case 22:
BussFlowWeb tBusiness = new BussFlowWeb();
tBusiness.setPacketHeader(packetHeader);
return tBusiness;
case 23:
BussFlowMidd secondBusiness = new BussFlowMidd();
secondBusiness.setPacketHeader(packetHeader);
return secondBusiness;
case 24:
BussFlowDb thirdBusiness = new BussFlowDb();
thirdBusiness.setPacketHeader(packetHeader);
return thirdBusiness;
case 25:
BussFlowOrl tExternalBusiness = new BussFlowOrl();
tExternalBusiness.setPacketHeader(packetHeader);
return tExternalBusiness;
case 27:
DataFlow dataFlow = new DataFlow();
dataFlow.setPacketHeader(packetHeader);
return dataFlow;
case 28:
ApmBaseDataFlow apmBaseDataFlow = new ApmBaseDataFlow();
apmBaseDataFlow.setPacketHeader(packetHeader);
return apmBaseDataFlow;
case 29:
BasicTrafficFlow BasicTrafficFlow = new BasicTrafficFlow();
BasicTrafficFlow.setPacketHeader(packetHeader);
return BasicTrafficFlow;
default:
return null;
}
}
}

View File

@ -1,55 +0,0 @@
/**
* description
*
* @author eson
*2022年6月07日-11:09:21
*/
package com.yuandian.dataflow.proto.decode;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* description
*
* @author eson
*2022年6月07日-11:09:21
*/
@Getter
@Setter
@ToString
public class PacketHeader {
private int tableID = -1; //数据类型
private int recCount = 0; //数据总条数
private int msgLen; //数据报文总长度60010端口为压缩后长度
private long timestamp; //60010端口发送数据时间
private int probeIf; //60010端口抓包口
private int umcomprLen;//60010端口数据报文压缩前长度
private int nowType; //记录25类型数据数据当前数据类型 22 23 24
public PacketHeader(DataInputStream data) throws Exception {
this.tableID = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();//22
this.recCount = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();//4000
this.msgLen = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
}
public void parseNextHeader_60010(DataInputStream data) throws Exception {
this.timestamp = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getLong();
this.probeIf =ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
this.umcomprLen = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
}
public static int PacketCode(DataInputStream data) throws IOException {
var buf = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN);
return buf.getInt();
}
}

View File

@ -1,156 +0,0 @@
/**
*
*/
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
/**
* @author l
* 网络性能流
*/
public class QoeFlow extends PacketBase {
private String qoe_Flow_Id;
public static int SIZE = 38*4;
// 字段类型 字段 原始类型 字节数 说明
public int srcIp; //uint32_t 4 源ip地址
public int dstIp; //uint32_t 4 目的ip地址
public int stvSec; //uint32_t 4 开始时间秒
public int stvUsec; //uint32_t 4 开始时间微秒
public int ltvSec; //uint32_t 4 最后更新时间秒
public int ltvUsec; //uint32_t 4 最后更新时间微秒
public int dst2ResponNum; //uint32_t 4 响应总量
public int dst2Fast; //uint32_t 4
public int dst2FastExpected; //uint32_t 4
public int dst2ExpectedDegrated; //uint32_t 4
public int dst2DegratedService; //uint32_t 4
public int dst2ServiceAvailability; //uint32_t 4
public int dst2ResponTimeout; //uint32_t 4 响应超时数
public int dst2ResponSuccess; //uint32_t 4 响应成功数
public int dst2ResponFail; //uint32_t 4 响应失败数
public int dst2ResponPeek; //uint32_t 4 峰值响应时间
public int dst2ResponAverage; //uint32_t 4 响应时间均值
public int csWindow; //uint32_t 4
public int scWindow; //uint32_t 4
public int csReset; //uint32_t 4
public int scReset; //uint32_t 4
public int csRetran; //uint32_t 4
public int scRetran; //uint32_t 4
public int appId; //uint32_t 4 Aphid
public int appGroupId; //uint32_t 4 app组id
public int probeIf; //uint32_t 4 探针接口id
public int appStyle; //uint32_t 4
public int timeFlag; //uint32_t 4 发送时间戳
public int connSetupTm; //uint32_t 4 链接建立时间
public int dataTransferTm; //uint32_t 4 数据传输时间
public int retransDelayTm; //uint32_t 4 数据重传时延
public int networkInbound; //uint32_t 4 网络响应时间c->s
public int networkOutbound; //uint32_t 4 网络响应时间s->c
public int newSession; //uint32_t 4 新会话数
public int userEvents; //uint32_t 4 用户事件
public int serverEvents; //uint32_t 4 服务事件
public int connSetupPeek; //uint32_t 4 连接建立时间峰值
public int vlanId; //uint32_t 4
@Override
public QoeFlow Parse(PacketHeader header,ByteBuffer data) throws Exception {
QoeFlow qoeFlow = new QoeFlow();
qoeFlow.srcIp=data.getInt( 4);
qoeFlow.dstIp=data.getInt( 4);
qoeFlow.stvSec=data.getInt( 4);
qoeFlow.stvUsec=data.getInt( 4);
qoeFlow.ltvSec=data.getInt( 4);
qoeFlow.ltvUsec=data.getInt( 4);
qoeFlow.dst2ResponNum=data.getInt( 4);
qoeFlow.dst2Fast=data.getInt( 4);
qoeFlow.dst2FastExpected=data.getInt( 4);
qoeFlow.dst2ExpectedDegrated=data.getInt( 4);
qoeFlow.dst2DegratedService=data.getInt( 4);
qoeFlow.dst2ServiceAvailability=data.getInt( 4);
qoeFlow.dst2ResponTimeout=data.getInt( 4);
qoeFlow.dst2ResponSuccess=data.getInt( 4);
qoeFlow.dst2ResponFail=data.getInt( 4);
qoeFlow.dst2ResponPeek=data.getInt( 4);
qoeFlow.dst2ResponAverage=data.getInt( 4);
qoeFlow.csWindow=data.getInt( 4);
qoeFlow.scWindow=data.getInt( 4);
qoeFlow.csReset=data.getInt( 4);
qoeFlow.scReset=data.getInt( 4);
qoeFlow.csRetran=data.getInt( 4);
qoeFlow.scRetran=data.getInt( 4);
qoeFlow.appId=data.getInt( 4);
qoeFlow.appGroupId=data.getInt( 4);
qoeFlow.probeIf=data.getInt( 4);
qoeFlow.appStyle=data.getInt( 4);
qoeFlow.timeFlag=data.getInt( 4);
qoeFlow.connSetupTm=data.getInt( 4);
qoeFlow.dataTransferTm=data.getInt( 4);
qoeFlow.retransDelayTm=data.getInt( 4);
qoeFlow.networkInbound=data.getInt( 4);
qoeFlow.networkOutbound=data.getInt( 4);
qoeFlow.newSession=data.getInt( 4);
qoeFlow.userEvents=data.getInt( 4);
qoeFlow.serverEvents=data.getInt( 4);
qoeFlow.connSetupPeek=data.getInt( 4);
qoeFlow.vlanId=data.getInt( 4);
return qoeFlow;
}
@Override
public int getUnitPacketLength() {
// TODO Auto-generated method stub
return 0;
}
@Override
public String[] getData() {
// TODO Auto-generated method stub
return null;
}
@Override
public int getInterfaceNumber() {
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -1,115 +0,0 @@
package com.yuandian.dataflow.proto.decode;
import java.nio.ByteBuffer;
import java.util.Date;
public class SstFlow extends PacketBase{
public static int SIZE = 108;
public long mac_src; //源MAC
public long mac_dst; //目标MAC
public long ip_src; //源IP
public long ip_dst; //目标IP
public int port_src; //源端口如果没有-1
public int port_dst; //目标端口如果没有-1
public int l3_proto; //第三层协议ID如果没有-1
public int l4_proto; //第四层协议ID如果没有-1
public int tos; //Tos一个字节如果没有-1
public int vlan_id; //vlan ID如果没有-1
public long bytes; //字节总数
public long packets; //数据包总数
public long packets_syn; //TCP同步包数
public long packets_syn_ack; //TCP同步确认包数
public long packets_syn_rst; //TCP同步重置包数
public long timestamp; //时间戳,
public long appid;//应用ID
public long app_group_id;
public int mpls_label;
public long pkts_syn_rx; //tcp同步包接收
public long pkts_syn_ack_rx;//tcp同步确认包接收
public long pkts_syn_rst_rx; //tcp同步重置包接收
public long pkts_fin;//tcp终止包接收
public long pkts_rst;//tcp重置包接收
public long bytes_rx;//字节收
public long packets_rx;//数据包收
public long probe_time_sec;
public Date probe_time;
public Date createTime;
public int probe_if;
@Override
public PacketBase Parse(PacketHeader header,ByteBuffer data)
throws Exception {
SstFlow sstFlow = new SstFlow();
sstFlow.mac_src = data.getLong(8);
sstFlow.mac_dst = data.getLong(8);
sstFlow.ip_src = data.getLong(8);
sstFlow.ip_dst = data.getLong(8);
sstFlow.port_src = data.getInt(4);
sstFlow.port_dst = data.getInt(4);
sstFlow.l3_proto = data.getInt(4);
sstFlow.l4_proto = data.getInt(4);
sstFlow.tos = data.getInt(4);
sstFlow.vlan_id = data.getInt(4);
sstFlow.bytes = data.getLong(8);
sstFlow.packets = data.getLong(8);
sstFlow.packets_syn = data.getLong(8);
sstFlow.packets_syn_ack = data.getLong(8);
sstFlow.packets_syn_rst = data.getLong(8);
sstFlow.appid = data.getInt(4);
sstFlow.app_group_id = data.getInt(4);
sstFlow.mpls_label = data.getInt(4);
sstFlow.timestamp = header.getTimestamp();
sstFlow.probe_if = header.getProbeIf();
return sstFlow;
}
@Override
public int getUnitPacketLength() {
// TODO Auto-generated method stub
return 0;
}
@Override
public String[] getData() {
// TODO Auto-generated method stub
return null;
}
@Override
public int getInterfaceNumber() {
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -1,22 +0,0 @@
/**
* description
*
* @author eson
*2022年6月07日-16:18:34
*/
package com.yuandian.dataflow.proto.decode;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
/**
* description
*
* @author eson
*2022年6月07日-16:18:34
*/
public class utils {
public static String ByteBufferUTF8String(ByteBuffer data, int length) throws UnsupportedEncodingException {
return new String( data.alignedSlice(length).array(), "utf-8");
}
}

View File

@ -0,0 +1,168 @@
syntax = "proto3";
package com.yuandian.dataflow.proto;
message ApmBaseDataFlow {
int32 tableID = 1;
int32 probeIf = 2;
//
/**
*
*/
int32 requestPort = 3;
/**
*
*/
int32 responsePort = 4;
/**
* IP
*/
int32 requestIp = 5;
/**
* Ip
*/
int32 responseIp = 6;
/**
* mac
*/
int64 srcMac = 7;
/**
* mac
*/
int64 dstMac = 8;
/**
*
*/
int32 vlanId = 9;
int32 tvSec = 10;
int32 tvUsec = 11;
/**
*
*/
int32 startTm = 12;
/**
*
*/
int32 totalBytes = 13;
/**
*
*/
int32 totalPackets = 14;
/**
*
*/
int32 totalDropPackets = 15;
/**
*
*/
int32 retranTimeDelay = 16;
/**
* rtt
*/
int32 clientRtt = 17;
/**
* Rtt
*/
int32 serverRtt = 18;
/**
*
*/
int32 userResponseTime = 19;
/**
*
*/
int32 serverResponseTime = 20;
/**
* tcp回话连接失败数
*/
int32 conFail = 21;
/**
*
*/
// long reset = 22;
/**
*
*/
int32 bytesIn = 23;
/**
*
*/
int32 bytesOut = 24;
/**
*
*/
int32 timeFlag = 25;
/**
*
*/
int32 endTm = 26;
/**
*
*/
int32 endTmUsec = 27;
/**
*
*/
int32 responNum = 28;
/**
*
*/
int32 csWindow = 29;
/**
*
*/
int32 scWindow = 30;
/**
*
*/
int32 csReset = 31;
/**
*
*/
int32 scReset = 32;
/**
*
*/
int32 csRetran = 33;
/**
*
*/
int32 scRetran = 34;
/**
*
*/
int32 connSetupTm = 35;
/**
*
*/
int32 newSession = 36;
int32 csAlert = 37;
int32 scAlert = 38;
string protocal = 39;
}

25
src/main/proto/Base.proto Normal file
View File

@ -0,0 +1,25 @@
syntax = "proto3";
import "google/protobuf/any.proto";
import "ApmBaseDataFlow.proto";
option java_multiple_files = false; //
option java_package = "com.yuandian.dataflow.proto";
package com.yuandian.dataflow.proto;
message Response {
int32 code = 1;
int32 message = 2;
repeated google.protobuf.Any packets = 4;
}
message Request {
int32 type = 1; // . 1.
repeated google.protobuf.Any param = 2;
string version = 3; // . . 使
}
service CollectPacketsServer {
rpc GetPackets (Request) returns (stream Response);
}