From dce906a7eb7073ac6a8514b3848822e7a4215c4b Mon Sep 17 00:00:00 2001 From: huangsimin Date: Fri, 15 Jul 2022 01:00:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E7=8A=B6=E6=80=81=E6=9C=BA?= =?UTF-8?q?=E7=9A=84=E8=AF=BB=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/yuandian/dataflow/Server.java | 4 -- .../yuandian/dataflow/controller/TaskLog.java | 61 ++++++++----------- .../statemachine/StateServerFactory.java | 17 ++++-- .../statemachine/SyncDataClosure.java | 8 ++- 4 files changed, 46 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 6e210fa..b59fd1e 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -33,11 +33,7 @@ import lombok.extern.slf4j.Slf4j; @SpringBootApplication @SpringBootConfiguration public class Server { - - - - public static void main(String[] args) throws Exception { diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 8ca4f6d..78c9dcb 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -1,14 +1,12 @@ package com.yuandian.dataflow.controller; -import java.nio.ByteBuffer; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; -import com.alibaba.nacos.api.naming.pojo.Cluster; -import com.alibaba.nacos.common.remote.client.RpcClientFactory; -import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.entity.Task; -import com.alipay.sofa.jraft.rpc.RpcClient; -import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient; -import com.yuandian.dataflow.Server; +import com.alipay.sofa.jraft.Status; import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.SyncDataClosure; @@ -17,23 +15,6 @@ import com.yuandian.dataflow.statemachine.rpc.TaskState; import lombok.var; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.ObjectUtils.Null; -import org.apache.commons.logging.Log; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestParam; - -import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.Status; - @Slf4j @Controller public class TaskLog { @@ -41,23 +22,35 @@ public class TaskLog { @GetMapping(path = "/test") - public ResponseEntity Processing() { + public ResponseEntity Processing() throws InterruptedException { // var state = StateServerFactory.getStateServer().getFsm().getTaskState(); - var c = new SyncDataClosure() { + + + SyncDataClosure closure = new SyncDataClosure() { @Override public void run(Status status) { - log.info(getTaskState().toString()); + synchronized(lockObject) { + lockObject.notify(); + } } - }; - StateServerFactory.getStateServer().readIndexState(true, c ); - - + }; + var state = new TaskState(); + state.setTaskQueueSize(1); + closure.setTaskState(state); + StateServerFactory.getStateServer().readIndexState(true, closure); + + synchronized(closure.lockObject) { + closure.lockObject.wait(); + } + + + - Response response = new Response(); + final Response response = new Response(); response.Code = HttpStatus.OK; - response.Message = "OK"; + response.Message = closure.getTaskState().toString(); return new ResponseEntity(response, HttpStatus.OK); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index f58b2d8..25bd435 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -140,6 +140,8 @@ public class StateServerFactory { public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) { if(!readOnlySafe){ + + closure.setTaskState(getTaskState()); closure.success(getTaskState()); closure.run(Status.OK()); return; @@ -149,15 +151,22 @@ public class StateServerFactory { @Override public void run(Status status, long index, byte[] reqCtx) { if(status.isOk()){ - closure.success(getTaskState()); - closure.run(Status.OK()); + + if(closure.getTaskState() != null){ + applyState(closure.getTaskState(), closure); + } else { + closure.setTaskState(getTaskState()); + closure.success(getTaskState()); + closure.run(Status.OK()); + } + return; } readIndexExecutor.execute(() -> { if(isLeader()){ - log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); - applyState( getTaskState(), closure); + log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); + applyState(getTaskState(), closure); }else { handlerNotLeaderError(closure); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java index f7e1ccf..7a83439 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java @@ -25,7 +25,13 @@ public abstract class SyncDataClosure implements Closure { // 代表任务状态 private TaskState taskState; + public Object lockObject = new Object(); + public SyncDataClosure() { + + } + + public void failure(final String errorMsg, final PeerId redirect) { final SMResponse response = new SMResponse(); response.setSuccess(false); @@ -41,6 +47,4 @@ public abstract class SyncDataClosure implements Closure { setResponse(response); } - - }