初版例子

This commit is contained in:
huangsimin 2022-07-28 14:49:56 +08:00
parent 7795362d5a
commit 9a9b28799d
15 changed files with 457 additions and 403 deletions

View File

@ -2,11 +2,13 @@ package com.yuandian.dataflow;
import org.slf4j.MarkerFactory;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.conf.Configuration;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.StateFactory;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -25,22 +27,19 @@ public class Server {
public static void main(String[] args) throws Exception {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
var peeridx = Integer.parseInt(args[0]);
var peeridstr = peers[ peeridx ];
var sprPort = sprPeers[ peeridx ];
// var peeridstr = peers[2];
// var sprPort = sprPeers[2];
log.info("{} {}", peeridstr, sprPort);
conf = JRaftUtils.getConfiguration(String.join(",", peers));
StateServerFactory.startStateServer(peeridstr, conf);
StateFactory.startStateServer(peeridstr, conf);

View File

@ -10,6 +10,9 @@ import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
@ -17,7 +20,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
@ -29,7 +32,7 @@ import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -56,28 +59,28 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
resp.setSuccess(true);
rpcCtx.sendResponse(resp);
var ss = StateServerFactory.getStateServer();
log.debug("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size());
var ss = StateFactory.getStateServer();
log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
// 读状态 Closure<State> 里的 getValue<State> State的状态
ss.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
log.debug("status {}", status);
if (status.isOk()) {
var state = this.getValue();
var ws = state.getWorkers().get(StateServerFactory.getServerId());
var state = this.getValue(); // 获取返回的状态
var ws = state.getWorkers().get(StateFactory.getServerId());
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size());
ws.setUpdateAt(Instant.now());
ws.setUpdateAt(Instant.now()); // 设置更新时间
log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(),
request.packets.size(), state.getWorkers().size());
Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure<Operate>() {
// log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(),
// request.packets.size(), state.getWorkers().size());
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
if (status.isOk()) {
log.info("{}", resp);
log.info("[{}] {}", StateFactory.getServerId(), resp);
}
}
});

View File

@ -7,13 +7,17 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 例子 强制转换leader
*/
@Slf4j
@ProcessorRaft
public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> {
@ -26,7 +30,7 @@ public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProce
@Override
public void handleRequest(RpcContext rpcCtx, LeaderRequest request) {
Status status = StateServerFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer);
Status status = StateFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer);
rpcCtx.sendResponse(status);
log.debug("[TransferLeader] {} change leader to {}", status, request.peer);
}

View File

@ -25,7 +25,7 @@ import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServer
import com.yuandian.dataflow.proto.msgtype.*;
import io.grpc.ManagedChannelBuilder;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
// import org.springframework.http.*;
// import org.springframework.web.client.RestTemplate;

View File

@ -7,6 +7,7 @@
package com.yuandian.dataflow.statemachine;
import java.time.Instant;
import java.util.List;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
@ -23,7 +24,7 @@ import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -45,65 +46,80 @@ public class MasterFactory {
public void run() {
try {
while (true) {
log.debug("master execute {}", StateServerFactory.getServerId());
var alivePeers = StateServerFactory.getRaftNode().listAlivePeers();
log.debug("master execute {}", StateServerFactory.getRaftNode().listAlivePeers());
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
log.debug("master({}) execute {}", StateFactory.getServerId(),
StateFactory.getRaftNode().listAlivePeers());
if (alivePeers != null) {
var ss = StateServerFactory.getStateServer();
// 读一致性
ss.readIndexState( new GenericClosure<State>() {
// 读一致性
StateFactory.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
var state = this.getValue();
// log.debug("masterExecute start {} {}", status, alivePeers);
alivePeers.forEach((peer) -> {
WorkerState ws = state.getWorkers().get(peer);
if (ws != null) {
//
var canTasks = MAX_TASKS - ws.getTaskQueueSize();
log.debug("cap :{} peer: {}", canTasks, peer);
if (canTasks <= 0) {
return;
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
var request = new PacketsRequest();
for (int i = 0; i < canTasks; i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("PacketsRequest run {}", status);
try {
ss.getRpcClient().invokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if(err != null) {
log.debug("{}", err);
}
log.debug("PacketsRequest: {}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
if (state == null) {
log.error("readIndexState获取的状态为 {}", state);
return;
}
WorkerState ws = state.getWorkers().get(peer);
if (ws == null) {
log.error("WorkerState获取的状态为 {}", ws);
return;
}
var canTasks = MAX_TASKS - ws.getTaskQueueSize();
log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks);
if (canTasks <= 0) {
return;
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
// 模拟发送包的数据到该节点上
var request = new PacketsRequest();
for (int i = 0; i < canTasks; i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("PacketsRequest run {}", status);
try {
StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err != null) {
// TODO: 如果错误, 需要让节点恢复任务处理的状态
log.debug("{}", err);
}
log.debug("PacketsRequest: {}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
});
}
} );
});
}
Thread.sleep(5000);

View File

@ -0,0 +1,272 @@
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reflections.Reflections;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
* 2022年7月12日-13:36:24
*/
@Slf4j
public class StateFactory {
private static StateServer ss;
public static void startStateServer(String peerstr, Configuration conf) throws Exception {
if (ss != null) {
throw new Exception("重复初始化 InitStateServer");
}
ss = new StateFactory.StateServer(peerstr, conf);
}
public static boolean isLeader() {
return ss.node.isLeader();
}
public static PeerId getLeaderId() {
return ss.node.getLeaderId();
}
public static PeerId getServerId() {
return ss.cluster.getServerId();
}
public static Node getNode() {
return ss.node;
}
public static Node getRaftNode() {
return ss.cluster.getRaftNode();
}
public static RpcClient getRpcClient() {
return ss.getRpcClient();
}
public static RaftGroupService getCluster() {
return ss.getCluster();
}
// 获取状态服务的对象
public static StateServer getStateServer() {
return ss;
}
public static void readIndexState(GenericClosure<State> closure) {
ss.readIndexState(closure);
}
public static void applyOperate(Operate op, GenericClosure<Operate> closure) {
ss.applyOperate(op, closure);
}
public static void rpcClientInvokeAsync(final Endpoint endpoint,final Object request,final InvokeCallback callback,final long timeoutMs)
throws InterruptedException, RemotingException {
ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs);
}
public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final long timeoutMs)
throws InterruptedException, RemotingException {
return ss.getRpcClient().invokeSync(endpoint, request, timeoutMs);
}
public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
final long timeoutMs) throws InterruptedException, RemotingException {
return ss.getRpcClient().invokeSync(endpoint, request, ctx, timeoutMs);
}
@Getter
@Setter
public static class StateServer {
RpcClient rpcClient;
private Node node;
private RaftGroupService cluster;
private StateMachine fsm;
private String groupId = "dataflow";
private Executor readIndexExecutor = createReadIndexExecutor();
public StateServer(String addr, Configuration conf) {
// String[] peers = new
// String[]{"localhost:4440","localhost:4441","localhost:4442"};
// String[] sprPeers = new String[]{"3440","3441","3442"};
// var peeridstr = peers[Integer.parseInt(serverId)];
// var sprPort = sprPeers[Integer.parseInt(args[0])];
// String groupId = "jraft";
// conf =
// JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(addr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port));
log.info("mkdirs: {}", RaftDataFile.mkdirs());
nodeOptions.setLogUri(String.format("./raftdata/%d/logs", port));
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
fsm = new StateMachine(); // 状态实例初始化
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
Set<Class<?>> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass) -> {
try {
cluster.getRpcServer()
.registerProcessor((RpcProcessor<?>) pRaftClass.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString());
}
});
node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
}
public boolean isLeader() {
return this.fsm.isLeader();
}
public void readIndexState(GenericClosure<State> closure) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
log.debug("readIndexState({}) {}", getServerId(), status);
if (status.isOk()) {
// 回调失败
closure.success(ss.fsm.getState());
closure.setValue(ss.fsm.getState());
}
closure.run(status);
}
});
}
public void applyOperate(Operate op, GenericClosure<Operate> closure) {
// 所有的提交都必须再leader进行
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
try {
closure.setValue(op);
final Task task = new Task();
task.setData(
ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
task.setDone(closure); // 确认所有数据 一致, 不需要加锁
StateFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.debug(errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
}
public <T> RaftResponse<T> redirect() {
final RaftResponse<T> response = new RaftResponse<T>();
response.setSuccess(false);
if (this.node != null) {
final PeerId leader = this.node.getLeaderId();
if (leader != null) {
response.setRedirect(leader);
}
}
return response;
}
public <T> void handlerNotLeaderError(final GenericClosure<T> closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
private Executor createReadIndexExecutor() {
return ThreadPoolUtil.newBuilder() //
.poolName("ReadIndexPool") //
.enableMetric(true) //
.coreThreads(4) //
.maximumThreads(4) //
.keepAliveSeconds(60L) //
.workQueue(new SynchronousQueue<>()) //
.threadFactory(new NamedThreadFactory("ReadIndexService", true)) //
.rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
.build();
}
}
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000);
log.info("{}", resp);
}
}

View File

@ -28,7 +28,7 @@ import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -72,13 +72,13 @@ public class StateMachine extends StateMachineAdapter {
Operate op = null;
GenericClosure<Operate> closure = null;
if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
closure = (GenericClosure<Operate>) iter.done(); // 只支持单一个State. 全状态机只支持一种提交
// leader可以直接从 回调closure里提取operate
closure = (GenericClosure<Operate>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
op = closure.getValue();
} else {
// Have to parse FetchAddRequest from this user log.
// 非leader 需要从getData反序列化出来后处理
final ByteBuffer data = iter.getData();
try {
@ -91,29 +91,34 @@ public class StateMachine extends StateMachineAdapter {
}
if (op != null) {
switch (op.getType()) {
case PUT:
WorkerState ws = op.getValue();
state.getWorkers().put(ws.peerId, ws);
if (closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
case REMOVE:
if (closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
default:
break;
if (op == null) {
log.error("op 为 {}. 存在错误, 可能版本不一致", op);
continue;
}
}
} else {
switch (op.getType()) {
case PUT_WORKERSTATE:
WorkerState ws = op.getValue();
log.debug("PUT {}", ws.peerId);
state.getWorkers().put(ws.peerId, ws);
if (closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
case REMOVE:
if (closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
default:
break;
}
iter.next();
}
@ -136,7 +141,7 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onLeaderStart(final long term) {
log.debug("onLeaderStart {}", StateServerFactory.getServerId());
log.debug("onLeaderStart {}", StateFactory.getServerId());
this.leaderTerm.set(term);
// 判断是否Master线程还在跑, 如果存在则中断
@ -144,26 +149,23 @@ public class StateMachine extends StateMachineAdapter {
MasterFactory.getMasterExecute().interrupt();
}
var ss = StateServerFactory.getStateServer();
ss.readIndexState(new GenericClosure<State>() {
StateFactory.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
var ws = state.getWorkers().get(StateServerFactory.getServerId());
var ws = state.getWorkers().get(StateFactory.getServerId());
if (ws == null) {
ws = new WorkerState(StateServerFactory.getServerId());
ws = new WorkerState(StateFactory.getServerId());
}
Operate op = new Operate(OperateType.PUT, ws);
ss.applyOperate(op, new GenericClosure<Operate>() {
StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.debug("master update workerstate: {}", status);
}
});
}
});
// 当成为master时候 必须启动
@ -174,7 +176,7 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onLeaderStop(final Status status) {
log.debug("onLeaderStop {}", StateServerFactory.getCluster().getServerId());
log.debug("onLeaderStop {}", StateFactory.getCluster().getServerId());
this.leaderTerm.set(-1);
// 判断是否Master线程还在跑, 如果存在则中断
if (MasterFactory.getMasterExecute().isAlive()) {
@ -192,7 +194,7 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
log.debug("[onStartFollowing] {} {}", ctx, StateServerFactory.getCluster().getServerId());
log.debug("[onStartFollowing] {} {}", ctx, StateFactory.getCluster().getServerId());
try {
// 判断是否Master线程还在跑, 如果存在则中断
@ -201,14 +203,13 @@ public class StateMachine extends StateMachineAdapter {
}
var ws = new WorkerState(StateServerFactory.getServerId());
log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var ws = new WorkerState(StateFactory.getServerId());
log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new GenericClosure<Operate>() {
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());
log.info("onStartFollowing CallOperate {} {}", status, this.getResponse());
}
});
@ -227,14 +228,14 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onStopFollowing(LeaderChangeContext ctx) {
log.debug("{} {}", ctx, StateServerFactory.getCluster().getServerId());
log.debug("{} {}", ctx, StateFactory.getCluster().getServerId());
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState(StateServerFactory.getServerId());
var ss = StateFactory.getStateServer();
var ws = new WorkerState(StateFactory.getServerId());
log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws);
var op = new Operate(OperateType.PUT_WORKERSTATE, ws);
Operate.CallOperate(op, new GenericClosure() {
@Override
public void run(Status status) {

View File

@ -1,259 +0,0 @@
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reflections.Reflections;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
@Slf4j
@var
public class StateServerFactory {
private static StateServer ss;
public static void startStateServer(String peerstr, Configuration conf) throws Exception {
if(ss != null) {
throw new Exception("重复初始化 InitStateServer");
}
ss = new StateServerFactory.StateServer(peerstr, conf);
}
public static boolean isLeader() {
return ss.node.isLeader() ;
}
public static PeerId getLeaderId() {
return ss.node.getLeaderId() ;
}
public static PeerId getServerId() {
return ss.cluster.getServerId();
}
public static Node getNode() {
return ss.node ;
}
public static Node getRaftNode() {
return ss.cluster.getRaftNode() ;
}
public static RpcClient getRpcClient() {
return ss.getRpcClient();
}
public static RaftGroupService getCluster() {
return ss.getCluster();
}
// 获取状态服务的对象
public static StateServer getStateServer() {
return ss;
}
@Getter
@Setter
public static class StateServer {
RpcClient rpcClient;
private Node node;
private RaftGroupService cluster;
private StateMachine fsm;
private String groupId = "dataflow";
private Executor readIndexExecutor = createReadIndexExecutor();
public StateServer(String addr, Configuration conf) {
// String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
// String[] sprPeers = new String[]{"3440","3441","3442"};
// var peeridstr = peers[Integer.parseInt(serverId)];
// var sprPort = sprPeers[Integer.parseInt(args[0])];
// String groupId = "jraft";
// conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(addr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port) );
log.info("mkdirs: {}",RaftDataFile.mkdirs());
nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) );
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
fsm = new StateMachine(); // 状态实例初始化
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
Set<Class<?>> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass)->{
try {
cluster.getRpcServer().registerProcessor((RpcProcessor<?>) pRaftClass.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString());
}
});
node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
}
public boolean isLeader() {
return this.fsm.isLeader();
}
public void useFsmState(Consumer<State> dofunc) {
var state = ss.fsm.getState();
synchronized(state) {
dofunc.accept(state);
}
}
public void readIndexState(GenericClosure<State> closure) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()) {
// 回调失败
closure.success(ss.fsm.getState());
}
closure.run(status);
}
} );
}
public void applyOperate(Operate op, GenericClosure<Operate> closure) {
// 所有的提交都必须再leader进行
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
try {
closure.setValue(op);
final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
task.setDone(closure); // 确认所有数据 一致, 不需要加锁
StateServerFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.debug(errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
}
public <T> RaftResponse<T> redirect() {
final RaftResponse<T> response = new RaftResponse<T>();
response.setSuccess(false);
if (this.node != null) {
final PeerId leader = this.node.getLeaderId();
if (leader != null) {
response.setRedirect(leader);
}
}
return response;
}
public <T> void handlerNotLeaderError(final GenericClosure<T> closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
private Executor createReadIndexExecutor() {
return ThreadPoolUtil.newBuilder() //
.poolName("ReadIndexPool") //
.enableMetric(true) //
.coreThreads(4) //
.maximumThreads(4) //
.keepAliveSeconds(60L) //
.workQueue(new SynchronousQueue<>()) //
.threadFactory(new NamedThreadFactory("ReadIndexService", true)) //
.rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
.build();
}
}
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new OperateRequest(), 5000);
log.info("{}", resp);
}
}

View File

@ -3,6 +3,7 @@ package com.yuandian.dataflow.statemachine.closure;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.State;
@ -21,15 +22,21 @@ import org.slf4j.LoggerFactory;
@ToString
public abstract class GenericClosure<T> implements Closure {
// 状态机的统一响应
private RaftResponse<T> response;
// 代表任务状态
private T value;
public GenericClosure() {
}
/**
* 错误的时候返回错误信息. 自动装配response
* @param errorMsg
* @param redirect
*/
public void failure(final String errorMsg, final PeerId redirect) {
final RaftResponse<T> response = new RaftResponse<T>();
response.setSuccess(false);
@ -38,6 +45,10 @@ public abstract class GenericClosure<T> implements Closure {
setResponse(response);
}
/**
* 成功时调用该方法. 自动装配response
* @param value
*/
public void success(final T value) {
final RaftResponse<T> response = new RaftResponse<T>();
response.setValue(value);

View File

@ -1,5 +0,0 @@
package com.yuandian.dataflow.statemachine.closure;
public class StateClosure {
}

View File

@ -5,14 +5,14 @@ import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Data;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -22,11 +22,17 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@Data
@var
public class Operate implements Serializable {
public static enum OperateType {
PUT, REMOVE;
/**
* 同步WorkerState状态.
*/
PUT_WORKERSTATE,
/**
* 暂无想法
*/
REMOVE;
}
private OperateType type;
@ -52,10 +58,10 @@ public class Operate implements Serializable {
* @param closure 回调函数. Operate为返回值
*/
public static void CallOperate(Operate op, GenericClosure<Operate> closure) {
var ss = StateServerFactory.getStateServer();
log.debug("CallOperate Value {}", op.<WorkerState>getValue());
var ss = StateFactory.getStateServer();
// 如果是leader 就直接提交
if (StateServerFactory.isLeader()) {
if (StateFactory.isLeader()) {
ss.applyOperate(op, closure);
return;
}
@ -64,7 +70,7 @@ public class Operate implements Serializable {
var request = new OperateProcessor.OperateRequest();
request.setOperate(op);
var leaderId = StateServerFactory.getLeaderId();
var leaderId = StateFactory.getLeaderId();
try {
ss.getRpcClient().invokeAsync(leaderId.getEndpoint(),
request, new InvokeCallback() {

View File

@ -16,8 +16,8 @@ import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.StateFactory.StateServer;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
@ -29,7 +29,7 @@ import org.apache.commons.lang.StringUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -82,7 +82,7 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
}
};
StateServerFactory.getStateServer().applyOperate(request.getOperate(), closure);
StateFactory.getStateServer().applyOperate(request.getOperate(), closure);
}
@Override

View File

@ -16,7 +16,7 @@ import lombok.Setter;
import lombok.ToString;
/**
* description
* WorkerState 每个节点进行任务操作的状态, 用于同步状态机
*
* @author eson
*2022年7月15日-10:04:00
@ -41,7 +41,13 @@ public class WorkerState implements Serializable {
*/
public Instant updateAt;
public WorkerState(PeerId pid) {
this.peerId = pid;
/**
* 初始化 并构造 updateAt时间
* @param peer 传入当前服务的peer
*/
public WorkerState(PeerId peer) {
this.peerId = peer;
this.updateAt = Instant.now();
}
}

View File

@ -3,7 +3,7 @@
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{yyyyMMdd HH:mm:ss.SSS} %level %thread\(%file:%line\): %msg%n
%d{yyyy-MM-dd HH:mm:ss.SSS} %red(%level) %cyan(%thread\(%file:%line\)): %msg%n
</pattern>
</encoder>
<!-- <filter class="ch.qos.logback.classic.filter.LevelFilter">

View File

@ -32,7 +32,7 @@ import io.netty.handler.codec.dns.DatagramDnsQuery;
import lombok.Cleanup;
import lombok.Getter;
import lombok.Setter;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -40,7 +40,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@DisplayName("AppTest")
@Slf4j
@var
public class AppTest {