最新状态机 可行代码

This commit is contained in:
huangsimin 2022-08-10 17:58:01 +08:00
parent c3b067cca5
commit 38ef58dd90
5 changed files with 18 additions and 53 deletions

6
.vscode/launch.json vendored
View File

@ -22,7 +22,7 @@
"type": "java", "type": "java",
"name": "Raft-0", "name": "Raft-0",
"request": "launch", "request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.StateServer", "mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow", "projectName": "dataflow",
"console": "integratedTerminal", "console": "integratedTerminal",
"args": [ "args": [
@ -40,7 +40,7 @@
"type": "java", "type": "java",
"name": "Raft-1", "name": "Raft-1",
"request": "launch", "request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.StateServer", "mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow", "projectName": "dataflow",
"console": "integratedTerminal", "console": "integratedTerminal",
"args": [ "args": [
@ -58,7 +58,7 @@
"type": "java", "type": "java",
"name": "Raft-2", "name": "Raft-2",
"request": "launch", "request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.StateServer", "mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow", "projectName": "dataflow",
"console": "integratedTerminal", "console": "integratedTerminal",
"args": [ "args": [

View File

@ -9,6 +9,7 @@ import org.slf4j.MarkerFactory;
import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.conf.Configuration;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.utils.Utils; import com.yuandian.dataflow.utils.Utils;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -35,18 +36,13 @@ public class Server {
if (args.length < 1) {
System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}");
System.err.println("{serverIndex} could be 1, 2 or 3");
System.exit(1);
}
StateServerFactory.startServer(args[0]);
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
var peeridx = Integer.parseInt(args[0]);
var peeridstr = peers[ peeridx ];
// var peeridstr = peers[2];
// var sprPort = sprPeers[2];
log.info("{} {}", peeridstr, sprPort);
// conf = JRaftUtils.getConfiguration(String.join(",", peers)); // conf = JRaftUtils.getConfiguration(String.join(",", peers));
// StateFactory.startStateServer(peeridstr, conf); // StateFactory.startStateServer(peeridstr, conf);

View File

@ -38,6 +38,7 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.JavaUtils;
import com.yuandian.dataflow.statemachine.master.MasterFactory;
import com.yuandian.dataflow.statemachine.messages.MessageUtils; import com.yuandian.dataflow.statemachine.messages.MessageUtils;
import com.yuandian.dataflow.statemachine.messages.Operate; import com.yuandian.dataflow.statemachine.messages.Operate;
import com.yuandian.dataflow.statemachine.messages.Query; import com.yuandian.dataflow.statemachine.messages.Query;
@ -46,7 +47,6 @@ import com.yuandian.dataflow.statemachine.messages.Operate.OperateType;
import com.yuandian.dataflow.statemachine.messages.RaftReply.Status; import com.yuandian.dataflow.statemachine.messages.RaftReply.Status;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.MasterFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@ -33,8 +33,8 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.NetUtils; import org.apache.ratis.util.NetUtils;
import com.yuandian.dataflow.statemachine.grpc.ProcessorServer; import com.yuandian.dataflow.statemachine.grpc.ProcessorServer;
import com.yuandian.dataflow.statemachine.master.MasterFactory;
import com.yuandian.dataflow.statemachine.state.Peer; import com.yuandian.dataflow.statemachine.state.Peer;
import com.yuandian.dataflow.statemachine_old.MasterFactory;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import lombok.Getter; import lombok.Getter;
@ -141,32 +141,6 @@ public final class StateServer implements Closeable {
} }
public static void main(String[] args) throws IOException, InterruptedException { public static void main(String[] args) throws IOException, InterruptedException {
if (args.length < 1) {
System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}");
System.err.println("{serverIndex} could be 1, 2 or 3");
System.exit(1);
}
StateServerFactory.startServer(args[0]);
// var peers = new ArrayList<RaftPeer>();
// String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
// for (int i = 0; i < addresses.length; i++) {
// // var port = addresses[i].split(":")[1];
// peers.add(RaftPeer.newBuilder().setId("yd-" + args[0]).setAddress(addresses[i]).build());
// }
// //find current peer object based on application parameter
// final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0]));
// //start a counter server
// final File storageDir = new File("./raftdata/" + currentPeer.getId());
// final StateServer stateServer = new StateServer(currentPeer, peers, storageDir);
// stateServer.start();
// stateServer.close();
} }
} }

View File

@ -16,7 +16,7 @@ import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister; import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine.master.MasterExecute; import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine_old.MasterFactory; import com.yuandian.dataflow.statemachine.master.MasterFactory;
import com.yuandian.dataflow.utils.Utils; import com.yuandian.dataflow.utils.Utils;
import io.grpc.BindableService; import io.grpc.BindableService;
@ -33,13 +33,7 @@ import lombok.extern.slf4j.Slf4j;
@Setter @Setter
public class ProcessorServer { public class ProcessorServer {
public static void main(String[] args) throws Exception {
// ServerBuilder builder = ServerBuilder.forPort(0);
// var server = builder.build().start();
// log.info("{}", server.getPort());
var server = new ProcessorServer();
server.grpcServer.awaitTermination();
}
private Server grpcServer; private Server grpcServer;
@ -49,6 +43,7 @@ public class ProcessorServer {
// 扫描注解RaftProccessor 注册 // 扫描注解RaftProccessor 注册
var now = Instant.now(); var now = Instant.now();
HashMap<String, Class<?>> scansMap = new HashMap<>(); HashMap<String, Class<?>> scansMap = new HashMap<>();
var traces = Thread.currentThread().getStackTrace(); var traces = Thread.currentThread().getStackTrace();
var clsName = traces[traces.length - 1].getClassName(); var clsName = traces[traces.length - 1].getClassName();
@ -94,7 +89,7 @@ public class ProcessorServer {
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("{}", e.toString()); log.error("{}", e.toString());
} }
System.err.println("*** server shut down"); log.info("*** server shut down");
} }
}); });
} }