From aa7749f924ed402bd670095cea0fe31658658027 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Wed, 13 Jul 2022 13:33:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E7=8A=B6=E6=80=81=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/yuandian/dataflow/Server.java | 6 +- .../yuandian/dataflow/controller/TaskLog.java | 56 ++++++++------- .../dataflow/statemachine/RaftClosure.java | 27 ------- .../dataflow/statemachine/StateServer.java | 2 +- .../statemachine/SyncDataClosure.java | 45 ++++++++++++ .../dataflow/statemachine/rpc/SMResponse.java | 36 ++++++++++ .../dataflow/statemachine/rpc/SyncData.java | 72 +------------------ .../statemachine/rpc/SyncDataProcessor.java | 13 +++- .../dataflow/statemachine/rpc/TaskState.java | 42 +++++++++++ 9 files changed, 169 insertions(+), 130 deletions(-) delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 9a0d35a..2962f3c 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -14,7 +14,7 @@ import com.alipay.sofa.jraft.RaftGroupService; import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.option.NodeOptions; -import com.yuandian.dataflow.statemachine.RaftClosure; +import com.yuandian.dataflow.statemachine.SyncDataClosure; import com.yuandian.dataflow.statemachine.StateMachine; import com.yuandian.dataflow.statemachine.StateServer; import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; @@ -34,14 +34,14 @@ public class Server { @Autowired public static Node node; - public static RaftClosure done; + public static SyncDataClosure done; private static StateServer stateServer; public static Node GetNode() { return node; } - public static RaftClosure GetDone() { + public static SyncDataClosure GetDone() { return done; } diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index ed07bc8..e6cf27a 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -2,14 +2,15 @@ package com.yuandian.dataflow.controller; import java.nio.ByteBuffer; +import com.alibaba.nacos.api.naming.pojo.Cluster; +import com.alibaba.nacos.common.remote.client.RpcClientFactory; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.entity.Task; - -import com.google.protobuf.util.JsonFormat; +import com.alipay.sofa.jraft.rpc.RpcClient; +import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient; import com.yuandian.dataflow.Server; -import com.yuandian.dataflow.grpc.MongodbTest; import com.yuandian.dataflow.projo.Response; -import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; +import com.yuandian.dataflow.statemachine.SyncDataClosure; import lombok.extern.slf4j.Slf4j; @@ -25,40 +26,41 @@ import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; + import com.alipay.sofa.jraft.Node; @Slf4j @Controller public class TaskLog { - // private static final Logger log = LoggerFactory.getLogger(TaskLog.class); private static Node node = Server.GetNode(); - @PostMapping(path = "/test") - public ResponseEntity Processing(@RequestBody String json) { + @GetMapping(path = "/test") + public ResponseEntity Processing() { - /*Task task = new Task(); + + log.info("node.isLeader {} {} node.getNodeId() {}", node.getNodeState(), node.getLeaderId(), node.getNodeId()); + if(node.isLeader()) { + // Task task = new Task(); - log.error(node.toString()); + // // 处理状态机 + // RaftClosure done = new RaftClosure(); + // task.setData(ByteBuffer.wrap("hello".getBytes())); + // task.setDone(done); + + // log.error("{} {} {}",node, node.toString(), task); + // node.apply(task); + + // log.error("{}", "RaftClosure"); - RaftClosure done = new RaftClosure(); - task.setData(ByteBuffer.wrap("hello".getBytes())); - task.setDone(done); - Server.GetNode().apply(task);*/ + + + } + - try { - // 1、类型转换 - BacktrackingFlowOuterClass.BacktrackingFlow.Builder builder = BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder(); - JsonFormat.parser().merge(json, builder); - BacktrackingFlowOuterClass.BacktrackingFlow backtrackingFlow = builder.build(); - - // 2、业务处理 - - // 3、数据保存到 mongoDB - MongodbTest.insertMsgToMongoDB(backtrackingFlow); - } catch (Exception e) { - e.printStackTrace(); - } + + Response response = new Response(); response.Code = HttpStatus.OK; @@ -67,7 +69,7 @@ public class TaskLog { } @GetMapping(path = "/test2") - public ResponseEntity MongodbTest(@RequestBody int status) { + public ResponseEntity MongodbTest(@RequestParam int status) { Response response = new Response(); return new ResponseEntity(response, HttpStatus.OK); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java deleted file mode 100644 index 779a1a2..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.yuandian.dataflow.statemachine; - -import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.Status; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RaftClosure implements Closure { - - private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); - - @Override - public void run(Status status) { - - LOG.info("Task completed with status"+status.getCode()); - LOG.info("Task completed with "+status.getErrorMsg()); - LOG.info("Task completed with "+status.getRaftError()); - - } - -// @Override -// public void onCommitted() { -// System.out.println("Task onCommitted"); -// } - -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index 01e970c..ea617f4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -20,7 +20,7 @@ import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.Endpoint; -import com.yuandian.dataflow.statemachine.RaftClosure; +import com.yuandian.dataflow.statemachine.SyncDataClosure; import com.yuandian.dataflow.statemachine.StateMachine; import com.yuandian.dataflow.statemachine.rpc.SyncData; import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java new file mode 100644 index 0000000..d714d2d --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java @@ -0,0 +1,45 @@ +package com.yuandian.dataflow.statemachine; + +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Status; +import com.yuandian.dataflow.statemachine.rpc.SMResponse; +import com.yuandian.dataflow.statemachine.rpc.TaskState; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Slf4j +@Getter +@Setter +@ToString +public abstract class SyncDataClosure implements Closure { + + // 状态机的统一响应 + private SMResponse response; + // 代表任务状态 + private TaskState state; + + + protected void failure(final String errorMsg, final String redirect) { + final SMResponse response = new SMResponse(); + response.setSuccess(false); + response.setMsg(errorMsg); + response.setRedirect(redirect); + setResponse(response); + } + + protected void success(final TaskState value) { + final SMResponse response = new SMResponse(); + response.setState(value); + response.setSuccess(true); + setResponse(response); + } + + + +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java new file mode 100644 index 0000000..62c9182 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java @@ -0,0 +1,36 @@ +/** + * description + * + * @author eson + *2022年7月13日-09:07:22 + */ +package com.yuandian.dataflow.statemachine.rpc; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * description + * + * @author eson + *2022年7月13日-09:07:22 + */ +@Slf4j +@Getter +@Setter +@ToString +public class SMResponse { + + private TaskState state; + + + private boolean success; + /** + * redirect peer id + */ + private String redirect; + + private String msg; +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java index 928702a..2a3a4a4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java @@ -29,76 +29,6 @@ public class SyncData implements Serializable { private static final long serialVersionUID = 1L; - private long queueSize = 0; - - - // @Getter - // @Setter - // public class IncrementAndGetRequest implements Serializable { - // private long delta; - // } - - // public class GetValueRequest implements Serializable { - // private static final long serialVersionUID = 9218253805003988802L; - - // public GetValueRequest() { - // super(); - // } - // } - - - // @Getter - // @Setter - // public class ValueResponse implements Serializable { - - // private static final long serialVersionUID = -4220017686727146773L; - - // private long value; - // private boolean success; - // /** - // * redirect peer id - // */ - // private String redirect; - - // private String errorMsg; - - // } - - // public class IncrementAndAddClosure implements Closure { - // // private CounterServer counterServer; - // private IncrementAndGetRequest request; - // private ValueResponse response; - // private Closure done; // 网络应答callback - - // public IncrementAndAddClosure(CounterServer counterServer, IncrementAndGetRequest request, ValueResponse response, - // Closure done) { - // super(); - // this.counterServer = counterServer; - // this.request = request; - // this.response = response; - // this.done = done; - // } - - // @Override - // public void run(Status status) { - // // 返回应答给客户端 - // if (this.done != null) { - // done.run(status); - // } - // } - - // public IncrementAndGetRequest getRequest() { - // return this.request; - // } - - // public void setRequest(IncrementAndGetRequest request) { - // this.request = request; - // } - - // public ValueResponse getResponse() { - // return this.response; - // } - - // } + private TaskState state; } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java index a682e27..6f7f149 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java @@ -6,8 +6,10 @@ */ package com.yuandian.dataflow.statemachine.rpc; +import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; +import com.yuandian.dataflow.statemachine.SyncDataClosure; import lombok.extern.slf4j.Slf4j; @@ -24,7 +26,16 @@ public class SyncDataProcessor implements RpcProcessor { public void handleRequest(RpcContext rpcCtx, SyncData request) { log.info("{}", rpcCtx); log.info("{}", request); - rpcCtx.sendResponse(null); // + + + final SyncDataClosure closure = new SyncDataClosure() { + @Override + public void run(Status status) { + rpcCtx.sendResponse(getResponse()); + } + }; + + } @Override diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java new file mode 100644 index 0000000..f830599 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java @@ -0,0 +1,42 @@ +/** + * description + * + * @author eson + *2022年7月13日-09:11:26 + */ +package com.yuandian.dataflow.statemachine.rpc; + +import java.io.Serializable; + +import com.alipay.sofa.jraft.entity.PeerId; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +/** + * 代表任务状态 暂时全局使用这个结构 + * + * @author eson + *2022年7月13日-09:11:26 + */ +@Slf4j +@Getter +@Setter +public class TaskState implements Serializable { + private static final long serialVersionUID = -1L; + + // 节点的对应peerID + private PeerId peerId; + private long taskQueueSize; + + + public static void main(String[] args) { + + } +}