添加 raft处理的注解

This commit is contained in:
huangsimin 2022-07-21 15:48:59 +08:00
parent 8d79341e23
commit b24ac05920
13 changed files with 212 additions and 120 deletions

11
pom.xml
View File

@ -89,7 +89,12 @@
<version>${mongo.driver.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reflections/reflections -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.10.2</version>
</dependency>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
@ -206,12 +211,10 @@
<version>1.6.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<archive>
<manifest>

View File

@ -0,0 +1,59 @@
/**
* description
*
* @author eson
*2022年7月21日-13:48:01
*/
package com.yuandian.dataflow.controller;
import java.io.Serializable;
import java.util.ArrayList;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import lombok.Getter;
import lombok.Setter;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月21日-13:48:01
*/
@Slf4j
@ProcessorRaft
public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRequest> {
@Setter
@Getter
public static class PacketsRequest implements Serializable {
private ArrayList<Any> packets = new ArrayList<>();
}
@Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
log.info("{}",request.packets.size());
var resp = new ResponseSM();
resp.setMsg(rpcCtx.getRemoteAddress());
rpcCtx.sendResponse(resp);
}
@Override
public String interest() {
// TODO Auto-generated method stub
return PacketsRequest.class.getName();
}
}

View File

@ -29,7 +29,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class Header {
public class Master {
public static void main(String[] args) throws Exception {

View File

@ -1,16 +1,9 @@
package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Closure;
@ -19,20 +12,13 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Utils;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.StateFactory;
import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -122,10 +108,7 @@ public class StateMachine extends StateMachineAdapter {
// parsing.
var closure = (SyncClosure<State>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure);
this.state = closure.getValue();
this.state = closure.getValue();
closure.success(state);
closure.run(Status.OK());
} else {

View File

@ -8,12 +8,13 @@ package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import org.reflections.Reflections;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
@ -32,15 +33,15 @@ import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.rpc.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor;
import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -139,8 +140,18 @@ public class StateServerFactory {
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
cluster.getRpcServer().registerProcessor(new SyncStateProcessor());
cluster.getRpcServer().registerProcessor(new SyncConditionProcessor());
Set<Class<?>> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass)->{
try {
cluster.getRpcServer().registerProcessor((RpcProcessor<?>) pRaftClass.newInstance());
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
});
node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient();

View File

@ -1,21 +0,0 @@
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
@ToString
public class RequestCondition implements Serializable {
private static final long serialVersionUID = 1L;
private WorkerState workerState;
}

View File

@ -1,35 +0,0 @@
/**
* description
*
* @author eson
*2022年7月11日-16:01:07
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加
*
* @author eson
*2022年7月11日-16:01:07
*/
@Slf4j
@Getter
@Setter
@ToString
public class RequestState implements Serializable {
private static final long serialVersionUID = 1L;
private State state;
}

View File

@ -6,6 +6,7 @@
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import java.nio.ByteBuffer;
import com.alipay.remoting.exception.CodecException;
@ -19,12 +20,16 @@ import com.lmax.disruptor.WorkProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.commons.lang.StringUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -35,7 +40,18 @@ import lombok.extern.slf4j.Slf4j;
*2022年7月12日-11:10:54
*/
@Slf4j
public class SyncConditionProcessor implements RpcProcessor<RequestCondition> {
@ProcessorRaft
public class SyncConditionProcessor implements RpcProcessor<SyncConditionProcessor.RequestCondition> {
@Getter
@Setter
@ToString
public static class RequestCondition implements Serializable {
private static final long serialVersionUID = 1L;
private WorkerState workerState;
}
@Override
public void handleRequest(RpcContext rpcCtx, RequestCondition request) {

View File

@ -6,6 +6,7 @@
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import java.nio.ByteBuffer;
import com.alipay.remoting.exception.CodecException;
@ -18,11 +19,15 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.commons.lang.StringUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -33,7 +38,26 @@ import lombok.extern.slf4j.Slf4j;
*2022年7月12日-11:10:54
*/
@Slf4j
public class SyncStateProcessor implements RpcProcessor<RequestState> {
@ProcessorRaft
public class SyncStateProcessor implements RpcProcessor<SyncStateProcessor.RequestState> {
/**
* 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加
*
* @author eson
*2022年7月11日-16:01:07
*/
@Getter
@Setter
@ToString
public static class RequestState implements Serializable {
private static final long serialVersionUID = 1L;
private State state;
}
@Override
public void handleRequest(RpcContext rpcCtx, RequestState request) {

View File

@ -0,0 +1,24 @@
/**
* description
*
* @author eson
*2022年7月21日-14:27:49
*/
package com.yuandian.dataflow.statemachine.rpc.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* raft自动装配
*
* @author eson
*2022年7月21日-14:27:49
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface ProcessorRaft {
}

View File

@ -7,7 +7,11 @@
package com.yuandian.dataflow.statemachine.state;
import com.alibaba.nacos.api.cmdb.pojo.Entity;
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result;
import com.alipay.sofa.jraft.error.RemotingException;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import lombok.Getter;
@ -20,7 +24,7 @@ import lombok.extern.slf4j.Slf4j;
* description
*
* @author eson
*2022年7月20日-10:00:05
* 2022年7月20日-10:00:05
*/
@Slf4j
@Getter
@ -32,30 +36,55 @@ public class StateFactory {
@Override
public void run() {
try {
while(true) {
while (true) {
log.debug("master execute {}", StateServerFactory.getServerId());
var alivePeers = StateServerFactory.getCluster().getRaftNode().listAlivePeers();
log.debug("master execute {}", StateServerFactory.getCluster().getRaftNode().listAlivePeers());
if(alivePeers != null) {
synchronized(alivePeers) {
StateServerFactory.getStateServer().useFsmStateAsync((state)->{
alivePeers.forEach((peer)->{
var ws = state.getWorkers().get(peer);
if(ws != null) {
var cap = 10000 - ws.getTaskQueueSize();
if(cap > 0) {
log.debug("{}", cap);
if (alivePeers != null) {
var ss = StateServerFactory.getStateServer();
StateServerFactory.getStateServer().useFsmStateAsync((state) -> {
synchronized(alivePeers){
alivePeers.forEach((peer) -> {
var ws = state.getWorkers().get(peer);
if (ws != null) {
var cap = 100 - ws.getTaskQueueSize();
if (cap > 0) {
log.debug("{}", cap);
var request = new PacketsRequest();
for(int i = 0; i < cap ; i++ ) {
var p = Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086)
.build()
);
request.getPackets().add(p);
}
}
});
alivePeers.notify();
return null;
});
}
}
try {
var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(), request, 5000);
log.info("{}", result);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
alivePeers.notifyAll();
});
}
return null;
});
synchronized(alivePeers){
alivePeers.wait(5000);
}
}
Thread.sleep(2000);
}
@ -74,5 +103,4 @@ public class StateFactory {
}
}

View File

@ -20,9 +20,9 @@
</appender>
<root level="info">
<!-- <root level="info">
<appender-ref ref="CONSOLE" />
</root>
</root> -->
<logger name="com.yuandian.dataflow" level="debug">
<appender-ref ref="CONSOLE" />