diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java deleted file mode 100644 index 626217a..0000000 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.yuandian.dataflow.controller; - -// import org.springframework.http.HttpStatus; -// import org.springframework.http.MediaType; -// import org.springframework.http.ResponseEntity; -// import org.springframework.stereotype.Controller; -// import org.springframework.web.bind.annotation.GetMapping; -// import org.springframework.web.bind.annotation.PostMapping; -// import org.springframework.web.bind.annotation.RequestBody; -// import org.springframework.web.bind.annotation.RequestParam; - -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.error.RemotingException; -import com.yuandian.dataflow.projo.Response; -import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.SyncClosure; -import com.yuandian.dataflow.statemachine.state.State; -import com.yuandian.dataflow.statemachine.state.WorkerState; - -import lombok.var; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -// @Controller -public class TaskLog { - - // @PostMapping(path = "/test", produces={MediaType.APPLICATION_JSON_VALUE}) - // public ResponseEntity Processing(@RequestBody String Count) throws InterruptedException, RemotingException { - // // var ws = new WorkerState(new PeerId()); - // // StateServerFactory.getStateServer().updateFsmWorkerState(ws); - // log.debug("{}", Count); - // Response response = new Response(); - // synchronized(response) { - // StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{ - // log.debug("http: {}",StateServerFactory.getServerId() ); - // response.Message = fsmState.toString(); - // response.notify(); - // return null; - // }); - // } - - // response.Code = HttpStatus.OK; - // return new ResponseEntity(response, HttpStatus.OK); - // } - - // @GetMapping(path = "/test2") - // public ResponseEntity MongodbTest() { - // Response response = new Response(); - // StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{ - // log.debug("{} {}", fsmState.toString()); - // // response.Message = fsmState.toString(); - // return null; - // }); - - // return new ResponseEntity(response, HttpStatus.OK); - // } - - - // @GetMapping(path = "/test3") - // public ResponseEntity RemoveLeader() { - // Response response = new Response(); - // try { - // StateServerFactory.getNode().shutdown(); - // } catch (Exception e) { - // e.printStackTrace(); - // response.Message = e.getMessage(); - // return new ResponseEntity(response, HttpStatus.INTERNAL_SERVER_ERROR); - // } - - // return new ResponseEntity(response, HttpStatus.OK); - // } -} diff --git a/src/main/java/com/yuandian/dataflow/master/Master.java b/src/main/java/com/yuandian/dataflow/master/Master.java deleted file mode 100644 index 62b0d6f..0000000 --- a/src/main/java/com/yuandian/dataflow/master/Master.java +++ /dev/null @@ -1,137 +0,0 @@ -package com.yuandian.dataflow.master; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.nio.Buffer; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.ArrayList; - -// import com.yuandian.dataflow.proto.decode.PacketBase; -// import com.yuandian.dataflow.proto.decode.PacketHeader; -// import com.yuandian.dataflow.proto.decode.utils; - -import io.netty.handler.codec.compression.ZlibDecoder; -import lombok.Cleanup; -import lombok.Getter; -import lombok.Setter; -import lombok.var; -import lombok.extern.slf4j.Slf4j; - -/** - * Header - */ -@Slf4j - -public class Master { - - public static void main(String[] args) throws Exception { - - var addr = new InetSocketAddress("192.168.1.248", 60001); - @Cleanup - var sock = new Socket(); - sock.setReceiveBufferSize(10 * 1024 * 1024);// socket接收缓存 - sock.setSoTimeout(1000 * 30); - // 设置超时 - sock.connect(addr, 10 * 1000); - var in = new DataInputStream(sock.getInputStream()); - var out = new DataOutputStream(sock.getOutputStream()); - // 发送验证字符串 - // out.write("public".getBytes()); - // log.debug("{}", PacketHeader.PacketCode(in)); - // var pheader = new PacketHeader(in); - - // log.debug("{}", pheader); - // var pbase = PacketBase.createPacketBase(pheader); - // log.debug("{}",pbase); - - - // //60010流需要解压 - // byte[] unzipbodydata = null; - // if (pheader.getTableID() == 20) { - // pheader.parseNextHeader_60010(in); - // byte[] zipbodydata = in.readNBytes(pheader.getMsgLen()); - // unzipbodydata = utils.Inflate(zipbodydata); - // } - - // for (int i = 0; i < pheader.getRecCount(); i++) { - // PacketBase dataBean = null; - - // if (pheader.getTableID() >= 22 && pheader.getTableID() <= 24) { - // // 读取具体数据头信息,获取前四个字段值,第四个字段为整条数据的长度 字段长度分别为 4 1 4 4 - // // bodyhead = new byte[13]; - // // readTillLength(bodyhead, 13); - - // var p1 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); - // var p2 = ByteBuffer.wrap(in.readNBytes(1)).order(ByteOrder.LITTLE_ENDIAN).getInt(); - // var p3 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); - // var length = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); - - // if (length <= 13) { - // log.debug("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]"); - // throw new Exception("数据解析异常"); - // } - - // // 解析数据 - // dataBean = pbase.Parse(pheader, ByteBuffer.wrap(in.readNBytes(length - 13))); - // } else if (pheader.getTableID() == 25) { - - // var nowtype = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); - // var p2 = ByteBuffer.wrap(in.readNBytes(1)).order(ByteOrder.LITTLE_ENDIAN).getInt(); - // var p3 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); - // var length = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); - // pheader.setNowType(nowtype); - - - // if (length <= 13) { - // log.debug("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]"); - // throw new Exception("数据解析异常"); - // } - - // // 读取具体数据体信息 - // byte[] bodydata = new byte[length - 13]; - // readTillLength(bodydata, length - 13); - - // // 解析数据 - // dataBean = instance.Parse(pheader, bodydata, 0); - // } else if (pheader.getTableID() == 28 || pheader.getTableID() == 29) { //28或29为Apm流统计 - - // if (pheader.getTableID() == 28) { - // length = ApmBaseDataFlow.SIZE; - // } else { - // length = BasicTrafficFlow.SIZE; - // } - - // byte[] bodydata = new byte[length]; - // readTillLength(bodydata, length); - // dataBean = instance.Parse(pheader, bodydata, 0); - - // } else if (pheader.getTableID() == 17 || pheader.getTableID() == 18) { //18 为网络性能流 - // if (pheader.getTableID() == 17) { - // length = AppFlow.SIZE; - // } else if (pheader.getTableID() == 18) { - // length = QoeFlow.SIZE; - // } - // byte[] bodydata = new byte[length]; - // readTillLength(bodydata, length); - // dataBean = instance.Parse(pheader, bodydata, 0); - // } else if (pheader.getTableID() == 20) { - // int offset = i * SstFlow.SIZE; - // dataBean = instance.Parse(pheader, unzipbodydata, offset); - // } else { - // logger.info("不需要的数据类型:" + pheader.getTableID()); - // break; - // } - // if (dataBean != null) { - // tempBaseDatas.add(dataBean); - // } - // } - - } -}