diff --git a/.vscode/launch.json b/.vscode/launch.json index 979edf3..6e17393 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,16 +4,31 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "type": "java", + "name": "Launch OperateProcessor", + "request": "launch", + "mainClass": "com.yuandian.dataflow.statemachine.rpc.OperateProcessor", + "projectName": "dataflow" + }, + { + "type": "java", + "name": "Launch StateFactory", + "request": "launch", + "mainClass": "com.yuandian.dataflow.statemachine.StateFactory", + "projectName": "dataflow" + }, { "type": "java", "name": "Launch Server", "request": "launch", "mainClass": "com.yuandian.dataflow.Server", "projectName": "dataflow", - "args": ["2"], + "args": [ + "2" + ], "preLaunchTask": "restart", - "postDebugTask": "stopall", - - } + "postDebugTask": "stopall" + } ] } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 05b5a17..95a1bb4 100644 --- a/pom.xml +++ b/pom.xml @@ -100,11 +100,11 @@ 0.10.2 - + @@ -254,6 +254,7 @@ com.yuandian.dataflow.Server + diff --git a/restart.sh b/restart.sh index 8e0f02d..6f4ed8e 100755 --- a/restart.sh +++ b/restart.sh @@ -2,3 +2,6 @@ sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 sh start.sh + + +exit 0 \ No newline at end of file diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 7b3723e..20c4100 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -2,6 +2,9 @@ package com.yuandian.dataflow; +import java.net.URL; + +import org.reflections.Reflections; import org.slf4j.MarkerFactory; import com.alipay.sofa.jraft.JRaftUtils; @@ -28,6 +31,20 @@ public class Server { public static void main(String[] args) throws Exception { + + + final var resourceUrl = ClassLoader.getSystemResources(""); + + while(resourceUrl.hasMoreElements()) { + var e = resourceUrl.nextElement(); + + log.info("{} {}", e.getFile(), e.getProtocol()); + + } + + System.exit(0); + + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; var peeridx = Integer.parseInt(args[0]); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java index dfbc799..661565a 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java @@ -9,11 +9,16 @@ package com.yuandian.dataflow.statemachine; import java.io.File; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; +import org.reflections.ReflectionUtils; import org.reflections.Reflections; import com.alipay.remoting.NamedThreadFactory; @@ -109,7 +114,8 @@ public class StateFactory { ss.applyOperate(op, closure); } - public static void rpcClientInvokeAsync(final Endpoint endpoint,final Object request,final InvokeCallback callback,final long timeoutMs) + public static void rpcClientInvokeAsync(final Endpoint endpoint, final Object request, + final InvokeCallback callback, final long timeoutMs) throws InterruptedException, RemotingException { ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs); } @@ -170,16 +176,35 @@ public class StateFactory { cluster = new RaftGroupService(groupId, serverId, nodeOptions); - Set> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class); - scans.forEach((pRaftClass) -> { + var now = Instant.now(); + HashMap> scansMap = new HashMap<>(); + + Set packageNames = new HashSet<>(); + for (var p : Package.getPackages()) { + packageNames.add(p.getName().split("\\.")[0] ); + } + + for (var name: packageNames) { + + Set> scans = new Reflections(name).getTypesAnnotatedWith(ProcessorRaft.class); + scans.forEach((pRaftClass) -> { + scansMap.put(pRaftClass.getName(), pRaftClass); + }); + + } + + log.info("scan annotates cost time: {}", Duration.between(now, Instant.now())) ; + + scansMap.forEach((name, pRaftClass) -> { try { cluster.getRpcServer() .registerProcessor((RpcProcessor) pRaftClass.getDeclaredConstructor().newInstance()); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { - log.info("{}", e.toString()); + log.info("{} {}", name, e.toString()); } }); + node = cluster.start(); rpcClient = new BoltRaftRpcFactory().createRpcClient(); @@ -259,6 +284,9 @@ public class StateFactory { } public static void main(String[] args) throws InterruptedException, RemotingException { + + ReflectionUtils.get(ReflectionUtils.SuperClass.of(State.class)); + var rpcClient = new BoltRaftRpcFactory().createRpcClient(); rpcClient.init(new CliOptions()); var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index 705be3b..b78a2ac 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -73,13 +73,10 @@ public class Operate implements Serializable { var leaderId = StateFactory.getLeaderId(); try { - ss.getRpcClient().invokeAsync(leaderId.getEndpoint(), - request, new InvokeCallback() { - + StateFactory.rpcClientInvokeAsync(leaderId.getEndpoint(), request, new InvokeCallback() { @Override public void complete(Object result, Throwable err) { log.debug("Object result {}", result); - var resp = (RaftResponse) result; closure.setResponse(resp); closure.success(resp.getValue()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index e76e2d2..93ed2e1 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -17,6 +17,8 @@ import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; +import javassist.ClassPath; + import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -81,5 +83,13 @@ public class OperateProcessor implements RpcProcessor void insertMsgToMongoDB(T obj) {