From 02b4722799b21643ed033962e740b4fe920d0208 Mon Sep 17 00:00:00 2001 From: eson <474420502@qq.com> Date: Thu, 2 Jun 2022 00:20:38 +0800 Subject: [PATCH] =?UTF-8?q?TOOD:=E8=B0=83=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/yuandian/dataflow/Server.java | 20 +++++----- .../yuandian/dataflow/controller/TaskLog.java | 40 ++++++++++++++++--- .../dataflow/statemachine/RaftClosure.java | 11 +++-- .../dataflow/statemachine/StateMachine.java | 24 +++++------ start.sh | 0 stop.sh | 3 ++ 6 files changed, 68 insertions(+), 30 deletions(-) mode change 100644 => 100755 start.sh create mode 100755 stop.sh diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index bfd4727..3b6a30b 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -4,6 +4,7 @@ package com.yuandian.dataflow; import com.yuandian.dataflow.statemachine.RaftClosure; import com.yuandian.dataflow.statemachine.StateMachine; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -51,6 +52,14 @@ import com.alipay.sofa.jraft.option.ReplicatorGroupOptions; @SpringBootApplication public class Server { + @Autowired + public static Node node; + + public static Node GetNode() { + return node; + } + + public static void main(String[] args) { String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; @@ -82,20 +91,13 @@ public class Server { nodeOptions.setFsm(new StateMachine()); RaftGroupService cluster = new RaftGroupService(groupId, serverId, nodeOptions); - Node node = cluster.start(); - - Closure done = new RaftClosure(); - Task task = new Task(); - task.setData(ByteBuffer.wrap("hello".getBytes())); - task.setDone(done); - node.apply(task); - + node = cluster.start(); System.setProperty("server.port", sprPort); SpringApplication.run(Server.class, args); - + Closure done = new RaftClosure(); node.shutdown(done); } } diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 7390aa4..f1f1b74 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -1,25 +1,53 @@ package com.yuandian.dataflow.controller; -import com.google.gson.JsonObject; +import java.nio.ByteBuffer; +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.entity.Task; +import com.google.gson.JsonObject; +import com.yuandian.dataflow.Server; +import com.yuandian.dataflow.statemachine.RaftClosure; + +import org.apache.commons.lang.ObjectUtils.Null; +import org.apache.commons.logging.Log; +import org.apache.ratis.thirdparty.org.checkerframework.common.reflection.qual.GetMethod; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @Controller public class TaskLog { - @PostMapping(path = "/test", produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity greeting(@RequestBody JsonObject input) { - - String message = (String) input.get("message").getAsString(); - System.out.println(message); + private static final Logger LOG = LoggerFactory.getLogger(TaskLog.class); + + @GetMapping(path = "/test") + public ResponseEntity greeting() { + + Task task = new Task(); + Closure done = new RaftClosure(); + task.setData(ByteBuffer.wrap("hello".getBytes())); + task.setDone( done); + LOG.info( done.toString()); + LOG.info(Server.GetNode().toString()); JsonObject response = new JsonObject(); + + if (Server.GetNode() == null) { + return new ResponseEntity(response, HttpStatus.OK); + } + Server.GetNode() + .apply(task); + + + response.addProperty("status", "success"); + response.addProperty("apply", "hello"); return new ResponseEntity(response, HttpStatus.OK); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java index c2a38d7..2746294 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java @@ -3,14 +3,19 @@ package com.yuandian.dataflow.statemachine; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class RaftClosure implements Closure { + private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); + @Override public void run(Status status) { - System.out.println("Task completed with status"+status.getCode()); - System.out.println("Task completed with "+status.getErrorMsg()); - System.out.println("Task completed with "+status.getRaftError()); + LOG.info("Task completed with status"+status.getCode()); + LOG.info("Task completed with "+status.getErrorMsg()); + LOG.info("Task completed with "+status.getRaftError()); } // @Override diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 22fffc2..a83c756 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -1,7 +1,5 @@ package com.yuandian.dataflow.statemachine; - - import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -25,20 +23,20 @@ import com.alipay.sofa.jraft.util.Utils; * * @author boyan (boyan@alibaba-inc.com) * - * 2018-Apr-09 4:52:31 PM + * 2018-Apr-09 4:52:31 PM */ public class StateMachine extends StateMachineAdapter { - private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); + private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); /** * Counter value */ - private final AtomicLong value = new AtomicLong(0); + private final AtomicLong value = new AtomicLong(0); /** * Leader term */ - private final AtomicLong leaderTerm = new AtomicLong(-1); + private final AtomicLong leaderTerm = new AtomicLong(-1); public boolean isLeader() { return this.leaderTerm.get() > 0; @@ -54,15 +52,17 @@ public class StateMachine extends StateMachineAdapter { @Override public void onApply(final Iterator iter) { while (iter.hasNext()) { - + if (iter.done() != null) { - // This task is applied by this node, get value from closure to avoid additional parsing. - + // This task is applied by this node, get value from closure to avoid additional + // parsing. + + LOG.debug("done:%s",iter.getData().toString()); } else { // Have to parse FetchAddRequest from this user log. - + LOG.debug("null:%s",iter.getData().toString()); } - + iter.next(); } } @@ -79,7 +79,7 @@ public class StateMachine extends StateMachineAdapter { @Override public boolean onSnapshotLoad(final SnapshotReader reader) { - + return true; } diff --git a/start.sh b/start.sh old mode 100644 new mode 100755 diff --git a/stop.sh b/stop.sh new file mode 100755 index 0000000..e34e95a --- /dev/null +++ b/stop.sh @@ -0,0 +1,3 @@ +screen -S raft-0 -X quit +screen -S raft-1 -X quit +screen -S raft-2 -X quit