TODO: 完成注解

This commit is contained in:
huangsimin 2022-07-29 02:14:47 +08:00
parent 819325e744
commit 9c1e5a62ec
10 changed files with 116 additions and 15 deletions

23
.vscode/launch.json vendored
View File

@ -4,16 +4,31 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0", "version": "0.2.0",
"configurations": [ "configurations": [
{
"type": "java",
"name": "Launch OperateProcessor",
"request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.rpc.OperateProcessor",
"projectName": "dataflow"
},
{
"type": "java",
"name": "Launch StateFactory",
"request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.StateFactory",
"projectName": "dataflow"
},
{ {
"type": "java", "type": "java",
"name": "Launch Server", "name": "Launch Server",
"request": "launch", "request": "launch",
"mainClass": "com.yuandian.dataflow.Server", "mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow", "projectName": "dataflow",
"args": ["2"], "args": [
"2"
],
"preLaunchTask": "restart", "preLaunchTask": "restart",
"postDebugTask": "stopall", "postDebugTask": "stopall"
}
}
] ]
} }

View File

@ -100,11 +100,11 @@
<version>0.10.2</version> <version>0.10.2</version>
</dependency> </dependency>
<!-- <dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version> <version>${spring.boot.version}</version>
</dependency> --> </dependency>
<!-- protobuf 依赖 --> <!-- protobuf 依赖 -->
@ -254,6 +254,7 @@
<mainClass>com.yuandian.dataflow.Server</mainClass> <mainClass>com.yuandian.dataflow.Server</mainClass>
</manifest> </manifest>
</archive> </archive>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>

View File

@ -2,3 +2,6 @@
sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0
sh start.sh sh start.sh
exit 0

View File

@ -2,6 +2,9 @@ package com.yuandian.dataflow;
import java.net.URL;
import org.reflections.Reflections;
import org.slf4j.MarkerFactory; import org.slf4j.MarkerFactory;
import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.JRaftUtils;
@ -28,6 +31,20 @@ public class Server {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final var resourceUrl = ClassLoader.getSystemResources("");
while(resourceUrl.hasMoreElements()) {
var e = resourceUrl.nextElement();
log.info("{} {}", e.getFile(), e.getProtocol());
}
System.exit(0);
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
var peeridx = Integer.parseInt(args[0]); var peeridx = Integer.parseInt(args[0]);

View File

@ -9,11 +9,16 @@ package com.yuandian.dataflow.statemachine;
import java.io.File; import java.io.File;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import org.reflections.ReflectionUtils;
import org.reflections.Reflections; import org.reflections.Reflections;
import com.alipay.remoting.NamedThreadFactory; import com.alipay.remoting.NamedThreadFactory;
@ -109,7 +114,8 @@ public class StateFactory {
ss.applyOperate(op, closure); ss.applyOperate(op, closure);
} }
public static void rpcClientInvokeAsync(final Endpoint endpoint,final Object request,final InvokeCallback callback,final long timeoutMs) public static void rpcClientInvokeAsync(final Endpoint endpoint, final Object request,
final InvokeCallback callback, final long timeoutMs)
throws InterruptedException, RemotingException { throws InterruptedException, RemotingException {
ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs); ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs);
} }
@ -170,16 +176,35 @@ public class StateFactory {
cluster = new RaftGroupService(groupId, serverId, nodeOptions); cluster = new RaftGroupService(groupId, serverId, nodeOptions);
Set<Class<?>> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class); var now = Instant.now();
scans.forEach((pRaftClass) -> { HashMap<String, Class<?>> scansMap = new HashMap<>();
Set<String> packageNames = new HashSet<>();
for (var p : Package.getPackages()) {
packageNames.add(p.getName().split("\\.")[0] );
}
for (var name: packageNames) {
Set<Class<?>> scans = new Reflections(name).getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass) -> {
scansMap.put(pRaftClass.getName(), pRaftClass);
});
}
log.info("scan annotates cost time: {}", Duration.between(now, Instant.now())) ;
scansMap.forEach((name, pRaftClass) -> {
try { try {
cluster.getRpcServer() cluster.getRpcServer()
.registerProcessor((RpcProcessor<?>) pRaftClass.getDeclaredConstructor().newInstance()); .registerProcessor((RpcProcessor<?>) pRaftClass.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) { | InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString()); log.info("{} {}", name, e.toString());
} }
}); });
node = cluster.start(); node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient(); rpcClient = new BoltRaftRpcFactory().createRpcClient();
@ -259,6 +284,9 @@ public class StateFactory {
} }
public static void main(String[] args) throws InterruptedException, RemotingException { public static void main(String[] args) throws InterruptedException, RemotingException {
ReflectionUtils.get(ReflectionUtils.SuperClass.of(State.class));
var rpcClient = new BoltRaftRpcFactory().createRpcClient(); var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions()); rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000); var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000);

View File

@ -73,13 +73,10 @@ public class Operate implements Serializable {
var leaderId = StateFactory.getLeaderId(); var leaderId = StateFactory.getLeaderId();
try { try {
ss.getRpcClient().invokeAsync(leaderId.getEndpoint(), StateFactory.rpcClientInvokeAsync(leaderId.getEndpoint(), request, new InvokeCallback() {
request, new InvokeCallback() {
@Override @Override
public void complete(Object result, Throwable err) { public void complete(Object result, Throwable err) {
log.debug("Object result {}", result); log.debug("Object result {}", result);
var resp = (RaftResponse<Operate>) result; var resp = (RaftResponse<Operate>) result;
closure.setResponse(resp); closure.setResponse(resp);
closure.success(resp.getValue()); closure.success(resp.getValue());

View File

@ -17,6 +17,8 @@ import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import javassist.ClassPath;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.ToString; import lombok.ToString;
@ -81,5 +83,13 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
} }
public static void main(String[] args) {
for(var a : Package.getPackages()) {
log.info("{}", a.getName() );
}
}
} }

View File

@ -0,0 +1,26 @@
/**
* description
*
* @author eson
*2022年7月21日-14:27:49
*/
package com.yuandian.dataflow.statemachine.rpc.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* MasterMain Master主执行线程
*
* @author eson
*2022年7月21日-14:27:49
*/
@Retention(RetentionPolicy.SOURCE)
@Target(ElementType.TYPE)
public @interface Master {
public int order() default 0 ;
}

View File

@ -17,7 +17,7 @@ import java.lang.annotation.Target;
* @author eson * @author eson
*2022年7月21日-14:27:49 *2022年7月21日-14:27:49
*/ */
@Retention(RetentionPolicy.CLASS) @Retention(RetentionPolicy.SOURCE)
@Target(ElementType.TYPE) @Target(ElementType.TYPE)
public @interface ProcessorRaft { public @interface ProcessorRaft {
} }

View File

@ -1,9 +1,13 @@
package com.yuandian.dataflow; package com.yuandian.dataflow;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
@SpringBootApplication
public class MongodbTest { public class MongodbTest {
public static <T> void insertMsgToMongoDB(T obj) { public static <T> void insertMsgToMongoDB(T obj) {