From e8f990d883adcaec21cfe27c02bac68eb03de8a6 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Tue, 12 Jul 2022 18:08:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6=E4=B8=80=E6=AE=B5=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E5=90=8E=E7=9A=84jraft=E4=BB=A3=E7=A0=81=20TODO:=20?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=9C=BA=E7=9A=84SyncData?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/yuandian/dataflow/Server.java | 77 ++----------- .../dataflow/statemachine/StateMachine.java | 4 +- .../dataflow/statemachine/StateServer.java | 97 ++++++++++++++++ .../dataflow/statemachine/rpc/SyncData.java | 104 ++++++++++++++++++ .../statemachine/rpc/SyncDataProcessor.java | 39 +++++++ start.sh | 6 +- 6 files changed, 257 insertions(+), 70 deletions(-) create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/StateServer.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 2c3acfb..9a0d35a 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -1,50 +1,25 @@ package com.yuandian.dataflow; -import com.yuandian.common.Config; -import com.yuandian.dataflow.statemachine.RaftClosure; -import com.yuandian.dataflow.statemachine.StateMachine; - -import lombok.val; -import lombok.var; +import java.io.File; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; -import java.io.File; -import java.nio.ByteBuffer; - -import com.alipay.remoting.rpc.protocol.RpcResponseProcessor; -import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.Iterator; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.NodeManager; import com.alipay.sofa.jraft.RaftGroupService; -import com.alipay.sofa.jraft.RaftServiceFactory; import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.core.IteratorImpl; -import com.alipay.sofa.jraft.core.NodeImpl; -import com.alipay.sofa.jraft.core.ReplicatorGroupImpl; -import com.alipay.sofa.jraft.entity.NodeId; import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.option.RaftOptions; -import com.alipay.sofa.jraft.rpc.CliClientService; -import com.alipay.sofa.jraft.rpc.RpcProcessor; -import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; -import com.alipay.sofa.jraft.rpc.RpcClient; -import com.alipay.sofa.jraft.rpc.RpcRequestProcessor; -import com.alipay.sofa.jraft.rpc.RpcServer; -import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; -import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; -import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient; -import com.alipay.sofa.jraft.util.Endpoint; +import com.yuandian.dataflow.statemachine.RaftClosure; +import com.yuandian.dataflow.statemachine.StateMachine; +import com.yuandian.dataflow.statemachine.StateServer; +import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; -import com.alipay.sofa.jraft.option.ReplicatorGroupOptions; +import lombok.var; @@ -60,6 +35,7 @@ public class Server { @Autowired public static Node node; public static RaftClosure done; + private static StateServer stateServer; public static Node GetNode() { return node; @@ -73,47 +49,18 @@ public class Server { public static void main(String[] args) { - - - - /*String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] sprPeers = new String[]{"3440","3441","3442"}; - - - var peeridstr = peers[ Integer.parseInt(args[0] ) ]; + var peeridstr = peers[ Integer.parseInt(args[0] )]; var sprPort = sprPeers[Integer.parseInt(args[0] )]; - String groupId = "jraft"; Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); + stateServer = new StateServer(peeridstr, conf); - PeerId serverId = JRaftUtils.getPeerId(peeridstr); - int port = serverId.getPort(); - - NodeOptions nodeOptions = new NodeOptions(); - nodeOptions.setElectionTimeoutMs(1000); - nodeOptions.setSnapshotLogIndexMargin(3600); - nodeOptions.setInitialConf(conf); - - File RaftDataFile = new File(String.format("./raftdata/%d", port) ); - System.out.print(RaftDataFile.mkdirs()); - - nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) ); - nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port)); - nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port)); - nodeOptions.setFsm(new StateMachine()); - - RaftGroupService cluster = new RaftGroupService(groupId, serverId, nodeOptions); - node = cluster.start(); - done = new RaftClosure(); - - System.setProperty("server.port", sprPort);*/ - - System.setProperty("server.port", "3440"); + + System.setProperty("server.port", sprPort); var app = SpringApplication.run(Server.class, args); app.start(); - - - // node.shutdown(done); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 3bfcc1b..c93f337 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -60,11 +60,11 @@ public class StateMachine extends StateMachineAdapter { // This task is applied by this node, get value from closure to avoid additional // parsing. - log.error("done:%1$s",iter.getData().toString()); + log.error("done:{}",iter.getData().toString()); } else { // Have to parse FetchAddRequest from this user log. - log.error("null:%1$s",iter.getData().toString()); + log.error("null:{}",iter.getData().toString()); } iter.next(); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java new file mode 100644 index 0000000..01e970c --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -0,0 +1,97 @@ +/** + * description + * + * @author eson + *2022年7月12日-13:36:24 + */ +package com.yuandian.dataflow.statemachine; + +import java.io.File; + +import com.alibaba.nacos.common.remote.client.RpcClient; +import com.alipay.sofa.jraft.JRaftUtils; +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.option.CliOptions; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.InvokeCallback; +import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; +import com.alipay.sofa.jraft.util.Endpoint; +import com.yuandian.dataflow.statemachine.RaftClosure; +import com.yuandian.dataflow.statemachine.StateMachine; +import com.yuandian.dataflow.statemachine.rpc.SyncData; +import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; + +import lombok.var; +import lombok.extern.slf4j.Slf4j; + +/** + * description + * + * @author eson + *2022年7月12日-13:36:24 + */ +@Slf4j +@var +public class StateServer { + + public Node node; + public RaftGroupService cluster; + public StateMachine fsm; + + private String groupId = "dataflow"; + + public StateServer(String addr, Configuration conf) { + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + String[] sprPeers = new String[]{"3440","3441","3442"}; + + // var peeridstr = peers[Integer.parseInt(serverId)]; + // var sprPort = sprPeers[Integer.parseInt(args[0])]; + + // String groupId = "jraft"; + + // Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); + + PeerId serverId = JRaftUtils.getPeerId(addr); + int port = serverId.getPort(); + + NodeOptions nodeOptions = new NodeOptions(); + nodeOptions.setElectionTimeoutMs(1000); + nodeOptions.setSnapshotLogIndexMargin(3600); + nodeOptions.setInitialConf(conf); + + File RaftDataFile = new File(String.format("./raftdata/%d", port) ); + log.info("{}",RaftDataFile.mkdirs()); + + nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) ); + nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port)); + nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port)); + fsm = new StateMachine(); // 状态实例初始化 + nodeOptions.setFsm(fsm); + + cluster = new RaftGroupService(groupId, serverId, nodeOptions); + cluster.getRpcServer().registerProcessor(new SyncDataProcessor()); + + + + + node = cluster.start(); + } + + + public static void main(String[] args) throws InterruptedException, RemotingException { + var rpcClient = new BoltRaftRpcFactory().createRpcClient(); + + + rpcClient.init(new CliOptions()); + + var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncData(), 5000); + log.info("{}", resp); + + // done = new RaftClosure(); + // node.shutdown(done); + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java new file mode 100644 index 0000000..928702a --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java @@ -0,0 +1,104 @@ +/** + * description + * + * @author eson + *2022年7月11日-16:01:07 + */ +package com.yuandian.dataflow.statemachine.rpc; + +import java.io.Serializable; + +import com.alipay.sofa.jraft.Closure; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * description + * + * @author eson + *2022年7月11日-16:01:07 + */ +@Slf4j +@Getter +@Setter +@ToString +public class SyncData implements Serializable { + + private static final long serialVersionUID = 1L; + + private long queueSize = 0; + + + // @Getter + // @Setter + // public class IncrementAndGetRequest implements Serializable { + // private long delta; + // } + + // public class GetValueRequest implements Serializable { + // private static final long serialVersionUID = 9218253805003988802L; + + // public GetValueRequest() { + // super(); + // } + // } + + + // @Getter + // @Setter + // public class ValueResponse implements Serializable { + + // private static final long serialVersionUID = -4220017686727146773L; + + // private long value; + // private boolean success; + // /** + // * redirect peer id + // */ + // private String redirect; + + // private String errorMsg; + + // } + + // public class IncrementAndAddClosure implements Closure { + // // private CounterServer counterServer; + // private IncrementAndGetRequest request; + // private ValueResponse response; + // private Closure done; // 网络应答callback + + // public IncrementAndAddClosure(CounterServer counterServer, IncrementAndGetRequest request, ValueResponse response, + // Closure done) { + // super(); + // this.counterServer = counterServer; + // this.request = request; + // this.response = response; + // this.done = done; + // } + + // @Override + // public void run(Status status) { + // // 返回应答给客户端 + // if (this.done != null) { + // done.run(status); + // } + // } + + // public IncrementAndGetRequest getRequest() { + // return this.request; + // } + + // public void setRequest(IncrementAndGetRequest request) { + // this.request = request; + // } + + // public ValueResponse getResponse() { + // return this.response; + // } + + // } + +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java new file mode 100644 index 0000000..a682e27 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java @@ -0,0 +1,39 @@ +/** + * description + * + * @author eson + *2022年7月12日-11:10:54 + */ +package com.yuandian.dataflow.statemachine.rpc; + +import com.alipay.sofa.jraft.rpc.RpcContext; +import com.alipay.sofa.jraft.rpc.RpcProcessor; + +import lombok.extern.slf4j.Slf4j; + +/** + * description + * + * @author eson + *2022年7月12日-11:10:54 + */ +@Slf4j +public class SyncDataProcessor implements RpcProcessor { + + @Override + public void handleRequest(RpcContext rpcCtx, SyncData request) { + log.info("{}", rpcCtx); + log.info("{}", request); + rpcCtx.sendResponse(null); // + } + + @Override + public String interest() { + return SyncData.class.getName(); + } + + public static void main(String[] args) { + + } + +} diff --git a/start.sh b/start.sh index a19f6b2..5997f9d 100755 --- a/start.sh +++ b/start.sh @@ -4,6 +4,6 @@ screen -S raft-2 -X quit sleep 2 -screen -dmS raft-0 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 0 -screen -dmS raft-1 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 1 -screen -dmS raft-2 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 2 +screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0 +screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1 +screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2