diff --git a/.vscode/launch.json b/.vscode/launch.json index b5e0972..1b6b422 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -22,7 +22,7 @@ "type": "java", "name": "Raft-0", "request": "launch", - "mainClass": "com.yuandian.dataflow.statemachine.StateServer", + "mainClass": "com.yuandian.dataflow.Server", "projectName": "dataflow", "console": "integratedTerminal", "args": [ @@ -40,7 +40,7 @@ "type": "java", "name": "Raft-1", "request": "launch", - "mainClass": "com.yuandian.dataflow.statemachine.StateServer", + "mainClass": "com.yuandian.dataflow.Server", "projectName": "dataflow", "console": "integratedTerminal", "args": [ @@ -58,7 +58,7 @@ "type": "java", "name": "Raft-2", "request": "launch", - "mainClass": "com.yuandian.dataflow.statemachine.StateServer", + "mainClass": "com.yuandian.dataflow.Server", "projectName": "dataflow", "console": "integratedTerminal", "args": [ diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 80e6d91..9a7e3f1 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -9,6 +9,7 @@ import org.slf4j.MarkerFactory; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.conf.Configuration; +import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.utils.Utils; import io.netty.util.internal.StringUtil; @@ -35,18 +36,13 @@ public class Server { - - - - String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; - - var peeridx = Integer.parseInt(args[0]); - var peeridstr = peers[ peeridx ]; - - - // var peeridstr = peers[2]; - // var sprPort = sprPeers[2]; - log.info("{} {}", peeridstr, sprPort); + if (args.length < 1) { + System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}"); + System.err.println("{serverIndex} could be 1, 2 or 3"); + System.exit(1); + } + + StateServerFactory.startServer(args[0]); // conf = JRaftUtils.getConfiguration(String.join(",", peers)); // StateFactory.startStateServer(peeridstr, conf); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index fe1b535..9681c6a 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -38,6 +38,7 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; +import com.yuandian.dataflow.statemachine.master.MasterFactory; import com.yuandian.dataflow.statemachine.messages.MessageUtils; import com.yuandian.dataflow.statemachine.messages.Operate; import com.yuandian.dataflow.statemachine.messages.Query; @@ -46,7 +47,6 @@ import com.yuandian.dataflow.statemachine.messages.Operate.OperateType; import com.yuandian.dataflow.statemachine.messages.RaftReply.Status; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; -import com.yuandian.dataflow.statemachine_old.MasterFactory; import lombok.extern.slf4j.Slf4j; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index 769cf73..b9058e3 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -33,8 +33,8 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.NetUtils; import com.yuandian.dataflow.statemachine.grpc.ProcessorServer; +import com.yuandian.dataflow.statemachine.master.MasterFactory; import com.yuandian.dataflow.statemachine.state.Peer; -import com.yuandian.dataflow.statemachine_old.MasterFactory; import io.netty.util.internal.StringUtil; import lombok.Getter; @@ -141,32 +141,6 @@ public final class StateServer implements Closeable { } public static void main(String[] args) throws IOException, InterruptedException { - if (args.length < 1) { - System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}"); - System.err.println("{serverIndex} could be 1, 2 or 3"); - System.exit(1); - } - - StateServerFactory.startServer(args[0]); - - // var peers = new ArrayList(); - // String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; - - // for (int i = 0; i < addresses.length; i++) { - // // var port = addresses[i].split(":")[1]; - // peers.add(RaftPeer.newBuilder().setId("yd-" + args[0]).setAddress(addresses[i]).build()); - // } - - // //find current peer object based on application parameter - // final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0])); - - // //start a counter server - // final File storageDir = new File("./raftdata/" + currentPeer.getId()); - // final StateServer stateServer = new StateServer(currentPeer, peers, storageDir); - - - - // stateServer.start(); - // stateServer.close(); + } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java b/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java index 426cd89..cdb24d4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java @@ -16,7 +16,7 @@ import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase; import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; import com.yuandian.dataflow.statemachine.annotations.MasterRegister; import com.yuandian.dataflow.statemachine.master.MasterExecute; -import com.yuandian.dataflow.statemachine_old.MasterFactory; +import com.yuandian.dataflow.statemachine.master.MasterFactory; import com.yuandian.dataflow.utils.Utils; import io.grpc.BindableService; @@ -33,13 +33,7 @@ import lombok.extern.slf4j.Slf4j; @Setter public class ProcessorServer { - public static void main(String[] args) throws Exception { - // ServerBuilder builder = ServerBuilder.forPort(0); - // var server = builder.build().start(); - // log.info("{}", server.getPort()); - var server = new ProcessorServer(); - server.grpcServer.awaitTermination(); - } + private Server grpcServer; @@ -49,6 +43,7 @@ public class ProcessorServer { // 扫描注解RaftProccessor 注册 var now = Instant.now(); + HashMap> scansMap = new HashMap<>(); var traces = Thread.currentThread().getStackTrace(); var clsName = traces[traces.length - 1].getClassName(); @@ -94,7 +89,7 @@ public class ProcessorServer { } catch (InterruptedException e) { log.error("{}", e.toString()); } - System.err.println("*** server shut down"); + log.info("*** server shut down"); } }); }