From 886a5ffe1aa1774ccd0adfa2ae1aabad0ceaada2 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Thu, 14 Jul 2022 18:19:45 +0800 Subject: [PATCH] =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=90=8C=E6=AD=A5=20TODO:=20?= =?UTF-8?q?readIndex?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/yuandian/dataflow/Server.java | 46 ++-- .../yuandian/dataflow/controller/TaskLog.java | 35 ++-- .../proto/msgtype/UsrFlowOuterClass.java | 10 +- .../dataflow/statemachine/StateMachine.java | 42 +++- .../dataflow/statemachine/StateServer.java | 97 --------- .../statemachine/StateServerFactory.java | 198 ++++++++++++++++++ .../statemachine/SyncDataClosure.java | 7 +- .../dataflow/statemachine/rpc/SMResponse.java | 10 +- .../dataflow/statemachine/rpc/SyncData.java | 4 +- .../statemachine/rpc/SyncDataProcessor.java | 24 ++- .../dataflow/statemachine/rpc/TaskState.java | 6 +- src/main/resources/logback.xml | 2 +- .../statemachine/StateMachineTest.java | 74 +++++++ 13 files changed, 392 insertions(+), 163 deletions(-) delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine/StateServer.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java create mode 100644 src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 2962f3c..6e210fa 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -16,10 +16,11 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.option.NodeOptions; import com.yuandian.dataflow.statemachine.SyncDataClosure; import com.yuandian.dataflow.statemachine.StateMachine; -import com.yuandian.dataflow.statemachine.StateServer; +import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; import lombok.var; +import lombok.extern.slf4j.Slf4j; @@ -28,26 +29,17 @@ import lombok.var; * Hello world! * */ +@Slf4j @SpringBootApplication @SpringBootConfiguration public class Server { - @Autowired - public static Node node; - public static SyncDataClosure done; - private static StateServer stateServer; - - public static Node GetNode() { - return node; - } - - public static SyncDataClosure GetDone() { - return done; - } + - public static void main(String[] args) { + public static void main(String[] args) throws Exception { + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] sprPeers = new String[]{"3440","3441","3442"}; @@ -56,11 +48,35 @@ public class Server { Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); - stateServer = new StateServer(peeridstr, conf); + // StateServerFactory.my = new StateServerFactory(peeridstr, conf); + StateServerFactory.InitStateServer(peeridstr, conf); + // Thread printer = new Thread( new Runnable(){ + + // @Override + // public void run() { + // // TODO Auto-generated method stub + // while(true) { + // var state = StateServerFactory.getStateServer().getFsm().getTaskState(); + // log.info("{}", state); + // try { + // Thread.sleep(1000); + // } catch (InterruptedException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + // } + // } + + // } ); + + // printer.start(); + System.setProperty("server.port", sprPort); var app = SpringApplication.run(Server.class, args); app.start(); + + } } diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index e6cf27a..8ca4f6d 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -10,8 +10,11 @@ import com.alipay.sofa.jraft.rpc.RpcClient; import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient; import com.yuandian.dataflow.Server; import com.yuandian.dataflow.projo.Response; +import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.SyncDataClosure; +import com.yuandian.dataflow.statemachine.rpc.TaskState; +import lombok.var; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.ObjectUtils.Null; @@ -29,42 +32,32 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.Status; @Slf4j @Controller public class TaskLog { - private static Node node = Server.GetNode(); + @GetMapping(path = "/test") public ResponseEntity Processing() { - - log.info("node.isLeader {} {} node.getNodeId() {}", node.getNodeState(), node.getLeaderId(), node.getNodeId()); - if(node.isLeader()) { - // Task task = new Task(); - - // // 处理状态机 - // RaftClosure done = new RaftClosure(); - // task.setData(ByteBuffer.wrap("hello".getBytes())); - // task.setDone(done); - - // log.error("{} {} {}",node, node.toString(), task); - // node.apply(task); - - // log.error("{}", "RaftClosure"); - - - - } - + // var state = StateServerFactory.getStateServer().getFsm().getTaskState(); + var c = new SyncDataClosure() { + @Override + public void run(Status status) { + log.info(getTaskState().toString()); + } + }; + StateServerFactory.getStateServer().readIndexState(true, c ); Response response = new Response(); response.Code = HttpStatus.OK; - response.Message = HttpStatus.OK.toString(); + response.Message = "OK"; return new ResponseEntity(response, HttpStatus.OK); } diff --git a/src/main/java/com/yuandian/dataflow/proto/msgtype/UsrFlowOuterClass.java b/src/main/java/com/yuandian/dataflow/proto/msgtype/UsrFlowOuterClass.java index a829cdc..6dbf67e 100644 --- a/src/main/java/com/yuandian/dataflow/proto/msgtype/UsrFlowOuterClass.java +++ b/src/main/java/com/yuandian/dataflow/proto/msgtype/UsrFlowOuterClass.java @@ -20,7 +20,7 @@ public final class UsrFlowOuterClass { /** *
-     *编号19
+     *编号19
      * 
* * int32 table_id = 1; @@ -501,7 +501,7 @@ public final class UsrFlowOuterClass { private int tableId_; /** *
-     *编号19
+     *编号19
      * 
* * int32 table_id = 1; @@ -1793,7 +1793,7 @@ public final class UsrFlowOuterClass { private int tableId_ ; /** *
-       *编号19
+       *编号19
        * 
* * int32 table_id = 1; @@ -1805,7 +1805,7 @@ public final class UsrFlowOuterClass { } /** *
-       *编号19
+       *编号19
        * 
* * int32 table_id = 1; @@ -1820,7 +1820,7 @@ public final class UsrFlowOuterClass { } /** *
-       *编号19
+       *编号19
        * 
* * int32 table_id = 1; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index c93f337..454c8ba 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -3,6 +3,7 @@ package com.yuandian.dataflow.statemachine; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,12 +13,22 @@ import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Iterator; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.core.StateMachineAdapter; +import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.error.RaftException; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.option.CliOptions; +import com.alipay.sofa.jraft.rpc.InvokeCallback; +import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Utils; +import com.yuandian.dataflow.statemachine.rpc.SMResponse; +import com.yuandian.dataflow.statemachine.rpc.SyncData; +import com.yuandian.dataflow.statemachine.rpc.TaskState; +import lombok.var; import lombok.extern.slf4j.Slf4j; /** @@ -28,6 +39,7 @@ import lombok.extern.slf4j.Slf4j; * 2018-Apr-09 4:52:31 PM */ @Slf4j + public class StateMachine extends StateMachineAdapter { // private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); @@ -35,7 +47,7 @@ public class StateMachine extends StateMachineAdapter { /** * Counter value */ - private final AtomicLong value = new AtomicLong(0); + private TaskState taskState = new TaskState(); /** * Leader term */ @@ -48,8 +60,8 @@ public class StateMachine extends StateMachineAdapter { /** * Returns current value. */ - public long getValue() { - return this.value.get(); + public TaskState getTaskState() { + return taskState; } @Override @@ -59,12 +71,22 @@ public class StateMachine extends StateMachineAdapter { if (iter.done() != null) { // This task is applied by this node, get value from closure to avoid additional // parsing. - - log.error("done:{}",iter.getData().toString()); - + var closure = (SyncDataClosure)iter.done(); + taskState = closure.getTaskState(); + log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",taskState, this.leaderTerm); + closure.success(taskState); + closure.run(Status.OK()); } else { - // Have to parse FetchAddRequest from this user log. - log.error("null:{}",iter.getData().toString()); + // Have to parse FetchAddRequest from this user log. + final ByteBuffer data = iter.getData(); + try { + taskState = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( + data.array(), TaskState.class.getName()); + log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", taskState, this.leaderTerm); + } catch (CodecException e) { + e.printStackTrace(); + } + } iter.next(); @@ -100,4 +122,8 @@ public class StateMachine extends StateMachineAdapter { super.onLeaderStop(status); } + public static void main(String[] args) throws InterruptedException, RemotingException { + + } + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java deleted file mode 100644 index ea617f4..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * description - * - * @author eson - *2022年7月12日-13:36:24 - */ -package com.yuandian.dataflow.statemachine; - -import java.io.File; - -import com.alibaba.nacos.common.remote.client.RpcClient; -import com.alipay.sofa.jraft.JRaftUtils; -import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.RaftGroupService; -import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.error.RemotingException; -import com.alipay.sofa.jraft.option.CliOptions; -import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.rpc.InvokeCallback; -import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; -import com.alipay.sofa.jraft.util.Endpoint; -import com.yuandian.dataflow.statemachine.SyncDataClosure; -import com.yuandian.dataflow.statemachine.StateMachine; -import com.yuandian.dataflow.statemachine.rpc.SyncData; -import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; - -import lombok.var; -import lombok.extern.slf4j.Slf4j; - -/** - * description - * - * @author eson - *2022年7月12日-13:36:24 - */ -@Slf4j -@var -public class StateServer { - - public Node node; - public RaftGroupService cluster; - public StateMachine fsm; - - private String groupId = "dataflow"; - - public StateServer(String addr, Configuration conf) { - String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; - String[] sprPeers = new String[]{"3440","3441","3442"}; - - // var peeridstr = peers[Integer.parseInt(serverId)]; - // var sprPort = sprPeers[Integer.parseInt(args[0])]; - - // String groupId = "jraft"; - - // Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); - - PeerId serverId = JRaftUtils.getPeerId(addr); - int port = serverId.getPort(); - - NodeOptions nodeOptions = new NodeOptions(); - nodeOptions.setElectionTimeoutMs(1000); - nodeOptions.setSnapshotLogIndexMargin(3600); - nodeOptions.setInitialConf(conf); - - File RaftDataFile = new File(String.format("./raftdata/%d", port) ); - log.info("{}",RaftDataFile.mkdirs()); - - nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) ); - nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port)); - nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port)); - fsm = new StateMachine(); // 状态实例初始化 - nodeOptions.setFsm(fsm); - - cluster = new RaftGroupService(groupId, serverId, nodeOptions); - cluster.getRpcServer().registerProcessor(new SyncDataProcessor()); - - - - - node = cluster.start(); - } - - - public static void main(String[] args) throws InterruptedException, RemotingException { - var rpcClient = new BoltRaftRpcFactory().createRpcClient(); - - - rpcClient.init(new CliOptions()); - - var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncData(), 5000); - log.info("{}", resp); - - // done = new RaftClosure(); - // node.shutdown(done); - } -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java new file mode 100644 index 0000000..f58b2d8 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -0,0 +1,198 @@ +/** + * description + * + * @author eson + *2022年7月12日-13:36:24 + */ +package com.yuandian.dataflow.statemachine; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.concurrent.Executor; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.JRaftUtils; +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.closure.ReadIndexClosure; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.entity.Task; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.option.CliOptions; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; +import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.Endpoint; +import com.yuandian.dataflow.statemachine.rpc.SMResponse; +import com.yuandian.dataflow.statemachine.rpc.SyncData; +import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; +import com.yuandian.dataflow.statemachine.rpc.TaskState; + +import lombok.Getter; +import lombok.Setter; +import lombok.var; +import lombok.extern.slf4j.Slf4j; + +/** + * description + * + * @author eson + *2022年7月12日-13:36:24 + */ +@Slf4j +@var +public class StateServerFactory { + + private static StateServer ss; + // 必须初始化 + public static void InitStateServer(String peerstr, Configuration conf) throws Exception { + if(ss != null) { + throw new Exception("重复初始化 InitStateServer"); + } + ss = new StateServerFactory.StateServer(peerstr, conf); + } + + // 获取状态服务的对象 + public static StateServer getStateServer() { + return ss; + } + + @Getter + @Setter + public static class StateServer { + + private Node node; + private RaftGroupService cluster; + private StateMachine fsm; + + private String groupId = "dataflow"; + private Executor readIndexExecutor; + + public StateServer(String addr, Configuration conf) { + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + String[] sprPeers = new String[]{"3440","3441","3442"}; + + // var peeridstr = peers[Integer.parseInt(serverId)]; + // var sprPort = sprPeers[Integer.parseInt(args[0])]; + + // String groupId = "jraft"; + + // Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); + + PeerId serverId = JRaftUtils.getPeerId(addr); + int port = serverId.getPort(); + + NodeOptions nodeOptions = new NodeOptions(); + nodeOptions.setElectionTimeoutMs(1000); + nodeOptions.setSnapshotLogIndexMargin(3600); + nodeOptions.setInitialConf(conf); + + File RaftDataFile = new File(String.format("./raftdata/%d", port) ); + log.info("{}",RaftDataFile.mkdirs()); + + nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) ); + nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port)); + nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port)); + fsm = new StateMachine(); // 状态实例初始化 + nodeOptions.setFsm(fsm); + + cluster = new RaftGroupService(groupId, serverId, nodeOptions); + cluster.getRpcServer().registerProcessor(new SyncDataProcessor()); + + + + + node = cluster.start(); + } + + public boolean isLeader() { + return this.fsm.isLeader(); + } + + public TaskState getTaskState() { + return this.fsm.getTaskState(); + } + + public void applyState(TaskState state, SyncDataClosure closure) { + if (!ss.isLeader()) { + ss.handlerNotLeaderError(closure); + return; + } + + try { + closure.setTaskState(state); + final Task task = new Task(); + task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state))); + task.setDone(closure); + StateServerFactory.getStateServer().getNode().apply(task); + } catch (CodecException e) { + String errorMsg = "Fail to encode TaskState"; + log.error(errorMsg, e); + closure.failure(errorMsg, PeerId.emptyPeer()); + closure.run(new Status(RaftError.EINTERNAL, errorMsg)); + } + } + + public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) { + + if(!readOnlySafe){ + closure.success(getTaskState()); + closure.run(Status.OK()); + return; + } + + getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { + @Override + public void run(Status status, long index, byte[] reqCtx) { + if(status.isOk()){ + closure.success(getTaskState()); + closure.run(Status.OK()); + return; + } + + readIndexExecutor.execute(() -> { + if(isLeader()){ + log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); + applyState( getTaskState(), closure); + }else { + handlerNotLeaderError(closure); + } + }); + } + }); + } + + public SMResponse redirect() { + + final SMResponse response = new SMResponse(); + response.setSuccess(false); + if (this.node != null) { + final PeerId leader = this.node.getLeaderId(); + if (leader != null) { + response.setRedirect(leader); + } + } + return response; + + } + + public void handlerNotLeaderError(final SyncDataClosure closure) { + closure.failure("Not leader.", redirect().getRedirect()); + closure.run(new Status(RaftError.EPERM, "Not leader")); + } +} + + + public static void main(String[] args) throws InterruptedException, RemotingException { + var rpcClient = new BoltRaftRpcFactory().createRpcClient(); + + rpcClient.init(new CliOptions()); + + var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncData(), 5000); + log.info("{}", resp); + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java index d714d2d..f7e1ccf 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java @@ -2,6 +2,7 @@ package com.yuandian.dataflow.statemachine; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.PeerId; import com.yuandian.dataflow.statemachine.rpc.SMResponse; import com.yuandian.dataflow.statemachine.rpc.TaskState; @@ -22,10 +23,10 @@ public abstract class SyncDataClosure implements Closure { // 状态机的统一响应 private SMResponse response; // 代表任务状态 - private TaskState state; + private TaskState taskState; - protected void failure(final String errorMsg, final String redirect) { + public void failure(final String errorMsg, final PeerId redirect) { final SMResponse response = new SMResponse(); response.setSuccess(false); response.setMsg(errorMsg); @@ -33,7 +34,7 @@ public abstract class SyncDataClosure implements Closure { setResponse(response); } - protected void success(final TaskState value) { + public void success(final TaskState value) { final SMResponse response = new SMResponse(); response.setState(value); response.setSuccess(true); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java index 62c9182..8a260f2 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java @@ -6,6 +6,10 @@ */ package com.yuandian.dataflow.statemachine.rpc; +import java.io.Serializable; + +import com.alipay.sofa.jraft.entity.PeerId; + import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -21,7 +25,9 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString -public class SMResponse { +public class SMResponse implements Serializable { + + private static final long serialVersionUID = 1L; private TaskState state; @@ -30,7 +36,7 @@ public class SMResponse { /** * redirect peer id */ - private String redirect; + private PeerId redirect; private String msg; } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java index 2a3a4a4..e2349e3 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java @@ -29,6 +29,8 @@ public class SyncData implements Serializable { private static final long serialVersionUID = 1L; - private TaskState state; + private TaskState taskState; + + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java index 6f7f149..2181cb1 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java @@ -6,11 +6,23 @@ */ package com.yuandian.dataflow.statemachine.rpc; +import java.nio.ByteBuffer; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.Task; +import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; +import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.SyncDataClosure; +import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer; +import com.alipay.sofa.jraft.entity.PeerId; +import org.apache.commons.lang.StringUtils; + +import lombok.var; import lombok.extern.slf4j.Slf4j; /** @@ -24,18 +36,18 @@ public class SyncDataProcessor implements RpcProcessor { @Override public void handleRequest(RpcContext rpcCtx, SyncData request) { - log.info("{}", rpcCtx); - log.info("{}", request); + + log.info("request: {}", request); - final SyncDataClosure closure = new SyncDataClosure() { @Override public void run(Status status) { rpcCtx.sendResponse(getResponse()); + log.info("{}", status); } }; - + StateServerFactory.getStateServer().applyState(request.getTaskState(), closure); } @Override @@ -43,8 +55,6 @@ public class SyncDataProcessor implements RpcProcessor { return SyncData.class.getName(); } - public static void main(String[] args) { - - } + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java index f830599..af05361 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java @@ -12,6 +12,7 @@ import com.alipay.sofa.jraft.entity.PeerId; import lombok.Getter; import lombok.Setter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import java.lang.management.ManagementFactory; @@ -28,6 +29,7 @@ import java.lang.reflect.Modifier; @Slf4j @Getter @Setter +@ToString public class TaskState implements Serializable { private static final long serialVersionUID = -1L; @@ -36,7 +38,5 @@ public class TaskState implements Serializable { private long taskQueueSize; - public static void main(String[] args) { - - } + } diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 1a5cfba..269a553 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -3,7 +3,7 @@ class="ch.qos.logback.core.ConsoleAppender"> - %d{yyyyMMdd HH:mm:ss.SSS} %-5level%thread\(%file:%line\): %msg%n + %d{yyyyMMdd HH:mm:ss.SSS} %level %thread\(%file:%line\): %msg%n diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java new file mode 100644 index 0000000..b8c37b9 --- /dev/null +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -0,0 +1,74 @@ +package com.yuandian.dataflow.statemachine; + +import java.util.concurrent.Executor; + +import org.junit.jupiter.api.Test; + +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.option.CliOptions; +import com.alipay.sofa.jraft.rpc.InvokeCallback; +import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; +import com.alipay.sofa.jraft.util.Endpoint; +import com.yuandian.dataflow.statemachine.rpc.SMResponse; +import com.yuandian.dataflow.statemachine.rpc.SyncData; +import com.yuandian.dataflow.statemachine.rpc.TaskState; + +import lombok.var; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class StateMachineTest { + @Test + void testOnApply() throws InterruptedException, RemotingException { + var rpcClient = new BoltRaftRpcFactory().createRpcClient(); + rpcClient.init(new CliOptions()); + + + var fstate = new TaskState(); + var fdata = new SyncData(); + fdata.setTaskState(fstate); + + var leader = new Endpoint("localhost",4441); + SMResponse resp = (SMResponse)rpcClient.invokeSync(leader, fdata + , 5000); + log.info("{}", resp); + if( resp != null && !resp.isSuccess() ) { + leader = resp.getRedirect().getEndpoint(); + resp = (SMResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata + , 5000); + log.info("{}", resp); + } + + + int i = 0 ; + while(true) { + + var state = new TaskState(); + var data = new SyncData(); + data.setTaskState(state); + + state.setPeerId( PeerId.parsePeer("localhost:2222") ); + + state.setTaskQueueSize(i); + + var pi = i ; + i++; + if (i >= 1000) { + break; + } + rpcClient.invokeAsync(leader, data, new InvokeCallback() { + @Override + public void complete(Object result, Throwable err) { + // SMResponse resp = (SMResponse)result; + log.info("{} {} {}", result, err, pi); + } + + @Override + public Executor executor() { + return null; + } + } , 5000); + } + } +}