合并一段修改后的jraft代码 TODO: 状态机的SyncData

This commit is contained in:
huangsimin 2022-07-12 18:08:55 +08:00
parent 87357f7983
commit e8f990d883
6 changed files with 257 additions and 70 deletions

View File

@ -1,50 +1,25 @@
package com.yuandian.dataflow;
import com.yuandian.common.Config;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import lombok.val;
import lombok.var;
import java.io.File;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.io.File;
import java.nio.ByteBuffer;
import com.alipay.remoting.rpc.protocol.RpcResponseProcessor;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.RaftServiceFactory;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.IteratorImpl;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.core.ReplicatorGroupImpl;
import com.alipay.sofa.jraft.entity.NodeId;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.CliClientService;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcRequestProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import com.yuandian.dataflow.statemachine.StateServer;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
import lombok.var;
@ -60,6 +35,7 @@ public class Server {
@Autowired
public static Node node;
public static RaftClosure done;
private static StateServer stateServer;
public static Node GetNode() {
return node;
@ -73,47 +49,18 @@ public class Server {
public static void main(String[] args) {
/*String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
var peeridstr = peers[ Integer.parseInt(args[0] ) ];
var peeridstr = peers[ Integer.parseInt(args[0] )];
var sprPort = sprPeers[Integer.parseInt(args[0] )];
String groupId = "jraft";
Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
stateServer = new StateServer(peeridstr, conf);
PeerId serverId = JRaftUtils.getPeerId(peeridstr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port) );
System.out.print(RaftDataFile.mkdirs());
nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) );
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
nodeOptions.setFsm(new StateMachine());
RaftGroupService cluster = new RaftGroupService(groupId, serverId, nodeOptions);
node = cluster.start();
done = new RaftClosure();
System.setProperty("server.port", sprPort);*/
System.setProperty("server.port", "3440");
System.setProperty("server.port", sprPort);
var app = SpringApplication.run(Server.class, args);
app.start();
// node.shutdown(done);
}
}

View File

@ -60,11 +60,11 @@ public class StateMachine extends StateMachineAdapter {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
log.error("done:%1$s",iter.getData().toString());
log.error("done:{}",iter.getData().toString());
} else {
// Have to parse FetchAddRequest from this user log.
log.error("null:%1$s",iter.getData().toString());
log.error("null:{}",iter.getData().toString());
}
iter.next();

View File

@ -0,0 +1,97 @@
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
package com.yuandian.dataflow.statemachine;
import java.io.File;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
@Slf4j
@var
public class StateServer {
public Node node;
public RaftGroupService cluster;
public StateMachine fsm;
private String groupId = "dataflow";
public StateServer(String addr, Configuration conf) {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
// var peeridstr = peers[Integer.parseInt(serverId)];
// var sprPort = sprPeers[Integer.parseInt(args[0])];
// String groupId = "jraft";
// Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(addr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port) );
log.info("{}",RaftDataFile.mkdirs());
nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) );
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
fsm = new StateMachine(); // 状态实例初始化
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
cluster.getRpcServer().registerProcessor(new SyncDataProcessor());
node = cluster.start();
}
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncData(), 5000);
log.info("{}", resp);
// done = new RaftClosure();
// node.shutdown(done);
}
}

View File

@ -0,0 +1,104 @@
/**
* description
*
* @author eson
*2022年7月11日-16:01:07
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.Closure;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月11日-16:01:07
*/
@Slf4j
@Getter
@Setter
@ToString
public class SyncData implements Serializable {
private static final long serialVersionUID = 1L;
private long queueSize = 0;
// @Getter
// @Setter
// public class IncrementAndGetRequest implements Serializable {
// private long delta;
// }
// public class GetValueRequest implements Serializable {
// private static final long serialVersionUID = 9218253805003988802L;
// public GetValueRequest() {
// super();
// }
// }
// @Getter
// @Setter
// public class ValueResponse implements Serializable {
// private static final long serialVersionUID = -4220017686727146773L;
// private long value;
// private boolean success;
// /**
// * redirect peer id
// */
// private String redirect;
// private String errorMsg;
// }
// public class IncrementAndAddClosure implements Closure {
// // private CounterServer counterServer;
// private IncrementAndGetRequest request;
// private ValueResponse response;
// private Closure done; // 网络应答callback
// public IncrementAndAddClosure(CounterServer counterServer, IncrementAndGetRequest request, ValueResponse response,
// Closure done) {
// super();
// this.counterServer = counterServer;
// this.request = request;
// this.response = response;
// this.done = done;
// }
// @Override
// public void run(Status status) {
// // 返回应答给客户端
// if (this.done != null) {
// done.run(status);
// }
// }
// public IncrementAndGetRequest getRequest() {
// return this.request;
// }
// public void setRequest(IncrementAndGetRequest request) {
// this.request = request;
// }
// public ValueResponse getResponse() {
// return this.response;
// }
// }
}

View File

@ -0,0 +1,39 @@
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
package com.yuandian.dataflow.statemachine.rpc;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
@Slf4j
public class SyncDataProcessor implements RpcProcessor<SyncData> {
@Override
public void handleRequest(RpcContext rpcCtx, SyncData request) {
log.info("{}", rpcCtx);
log.info("{}", request);
rpcCtx.sendResponse(null); //
}
@Override
public String interest() {
return SyncData.class.getName();
}
public static void main(String[] args) {
}
}

View File

@ -4,6 +4,6 @@ screen -S raft-2 -X quit
sleep 2
screen -dmS raft-0 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 2
screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2