diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 20c4100..7f34e85 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -10,8 +10,10 @@ import org.slf4j.MarkerFactory; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.conf.Configuration; import com.yuandian.dataflow.statemachine.StateFactory; +import com.yuandian.dataflow.utils.Utils; - +import io.netty.util.internal.StringUtil; +import javassist.ClassClassPath; import lombok.extern.slf4j.Slf4j; @@ -30,20 +32,11 @@ public class Server { public static void main(String[] args) throws Exception { + - - - final var resourceUrl = ClassLoader.getSystemResources(""); - - while(resourceUrl.hasMoreElements()) { - var e = resourceUrl.nextElement(); - - log.info("{} {}", e.getFile(), e.getProtocol()); - - } - - System.exit(0); - + + + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 5425866..71b0969 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -59,7 +59,7 @@ public class PacketsProcessor implements RpcProcessor masterMainLoop; + public static Thread masterExecute = new Thread(new Runnable() { @Override public void run() { try { - while (true) { + while (!cxt.isExit.get()) { + + masterMainLoop.accept(cxt); + var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); log.debug("master({}) execute {}", StateFactory.getServerId(), StateFactory.getRaftNode().listAlivePeers()); if (alivePeers != null) { - // 读一致性 StateFactory.readIndexState(new GenericClosure() { @@ -91,9 +106,9 @@ public class MasterFactory { request.getPackets().add(p); } - // 先提交 节点的 剩余能处理的任务数量. 然后再处理 - Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), + Operate.CallOperate( + new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure() { @Override public void run(Status status) { @@ -115,7 +130,6 @@ public class MasterFactory { } } }); - }); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java index 661565a..b57e15f 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java @@ -11,12 +11,14 @@ import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Consumer; import org.reflections.ReflectionUtils; import org.reflections.Reflections; @@ -50,6 +52,7 @@ import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.utils.Utils; import lombok.Getter; import lombok.Setter; @@ -156,6 +159,8 @@ public class StateFactory { // conf = // JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); + + PeerId serverId = JRaftUtils.getPeerId(addr); int port = serverId.getPort(); @@ -176,25 +181,18 @@ public class StateFactory { cluster = new RaftGroupService(groupId, serverId, nodeOptions); + // 扫描注解RaftProccessor 注册 var now = Instant.now(); HashMap> scansMap = new HashMap<>(); - - Set packageNames = new HashSet<>(); - for (var p : Package.getPackages()) { - packageNames.add(p.getName().split("\\.")[0] ); - } - - for (var name: packageNames) { - - Set> scans = new Reflections(name).getTypesAnnotatedWith(ProcessorRaft.class); - scans.forEach((pRaftClass) -> { - scansMap.put(pRaftClass.getName(), pRaftClass); - }); - - } - - log.info("scan annotates cost time: {}", Duration.between(now, Instant.now())) ; - + var traces = Thread.currentThread().getStackTrace(); + var clsName = traces[traces.length - 1].getClassName(); + var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3)); + log.info("获取 {} -> {} 下包的所有注解",clsName, packName ); + Set> scans = new Reflections(packName).getTypesAnnotatedWith(ProcessorRaft.class); + scans.forEach((pRaftClass) -> { + scansMap.put(pRaftClass.getName(), pRaftClass); + }); + log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis()) ; scansMap.forEach((name, pRaftClass) -> { try { cluster.getRpcServer() @@ -204,11 +202,12 @@ public class StateFactory { log.info("{} {}", name, e.toString()); } }); - + + // 启动集群 node = cluster.start(); - rpcClient = new BoltRaftRpcFactory().createRpcClient(); - rpcClient.init(new CliOptions()); + rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端. + rpcClient.init(new CliOptions()); // 初始化 } public boolean isLeader() { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index b78a2ac..51311ef 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -60,10 +60,9 @@ public class Operate implements Serializable { @java.lang.SuppressWarnings("unchecked") public static void CallOperate(Operate op, GenericClosure closure) { log.debug("CallOperate Value {}", op.getValue()); - var ss = StateFactory.getStateServer(); // 如果是leader 就直接提交 if (StateFactory.isLeader()) { - ss.applyOperate(op, closure); + StateFactory.applyOperate(op, closure); return; } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/Master.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/Master.java index b89356f..200a627 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/Master.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/Master.java @@ -17,10 +17,8 @@ import java.lang.annotation.Target; * @author eson *2022年7月21日-14:27:49 */ -@Retention(RetentionPolicy.SOURCE) +@Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) public @interface Master { public int order() default 0 ; - - } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/ProcessorRaft.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/ProcessorRaft.java index 05017f8..a28677b 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/ProcessorRaft.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/ProcessorRaft.java @@ -17,7 +17,7 @@ import java.lang.annotation.Target; * @author eson *2022年7月21日-14:27:49 */ -@Retention(RetentionPolicy.SOURCE) +@Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) public @interface ProcessorRaft { } diff --git a/src/main/java/com/yuandian/dataflow/utils/Utils.java b/src/main/java/com/yuandian/dataflow/utils/Utils.java index 1ba5e8f..e3c1a3b 100644 --- a/src/main/java/com/yuandian/dataflow/utils/Utils.java +++ b/src/main/java/com/yuandian/dataflow/utils/Utils.java @@ -1,6 +1,34 @@ package com.yuandian.dataflow.utils; public class Utils { + + + public static int indexOf(String str, String substr, int n) { + int pos = str.indexOf(substr); + while (--n > 0 && pos != -1) { + pos = str.indexOf(substr, pos + 1); + } + return pos; + } + + + /** + * 尽可能的拿到 n个substr索引点的字符串. 如果超出就拿最大值. + * @param str 原始字符串 + * @param substr 需要定位的字符串 + * @param n + * @return + */ + public static int indexOfAsPossible(String str, String substr, int n) { + int pos = str.indexOf(substr); + while (--n > 0 && pos != -1) { + pos = str.indexOf(substr, pos + 1); + } + if(pos == -1) { + return str.length(); + } + return pos; + } public static void main(String[] args) { diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java index 5e64dce..e3b0674 100644 --- a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -10,6 +10,9 @@ import lombok.extern.slf4j.Slf4j; public class StateMachineTest { @Test void testOnApply() throws InterruptedException, RemotingException { + + + // var rpcClient = new BoltRaftRpcFactory().createRpcClient(); // rpcClient.init(new CliOptions());