diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index b3556a2..7b3723e 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -2,11 +2,13 @@ package com.yuandian.dataflow; +import org.slf4j.MarkerFactory; + import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.conf.Configuration; -import com.yuandian.dataflow.statemachine.StateServerFactory; +import com.yuandian.dataflow.statemachine.StateFactory; -import lombok.var; + import lombok.extern.slf4j.Slf4j; @@ -25,22 +27,19 @@ public class Server { public static void main(String[] args) throws Exception { - - String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; - String[] sprPeers = new String[]{"3440","3441","3442"}; - + var peeridx = Integer.parseInt(args[0]); var peeridstr = peers[ peeridx ]; - var sprPort = sprPeers[ peeridx ]; + // var peeridstr = peers[2]; // var sprPort = sprPeers[2]; log.info("{} {}", peeridstr, sprPort); conf = JRaftUtils.getConfiguration(String.join(",", peers)); - StateServerFactory.startStateServer(peeridstr, conf); + StateFactory.startStateServer(peeridstr, conf); diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 6c1b3fb..69c6117 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -10,6 +10,9 @@ import java.io.Serializable; import java.time.Instant; import java.util.ArrayList; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.error.RemotingException; @@ -17,7 +20,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.google.protobuf.Any; -import com.yuandian.dataflow.statemachine.StateServerFactory; +import com.yuandian.dataflow.statemachine.StateFactory; import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; @@ -29,7 +32,7 @@ import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; import lombok.Setter; -import lombok.var; + import lombok.extern.slf4j.Slf4j; /** @@ -56,28 +59,28 @@ public class PacketsProcessor implements RpcProcessor 里的 getValue为 State的状态 ss.readIndexState(new GenericClosure() { @Override public void run(Status status) { - log.debug("status {}", status); + if (status.isOk()) { - var state = this.getValue(); - var ws = state.getWorkers().get(StateServerFactory.getServerId()); - + var state = this.getValue(); // 获取返回的状态 + var ws = state.getWorkers().get(StateFactory.getServerId()); ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); - ws.setUpdateAt(Instant.now()); + ws.setUpdateAt(Instant.now()); // 设置更新时间 - log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), - request.packets.size(), state.getWorkers().size()); - Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure() { + // log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), + // request.packets.size(), state.getWorkers().size()); + Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure() { @Override public void run(Status status) { if (status.isOk()) { - log.info("{}", resp); + log.info("[{}] {}", StateFactory.getServerId(), resp); } } }); diff --git a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java index 8372ff6..103133b 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java @@ -7,13 +7,17 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; -import com.yuandian.dataflow.statemachine.StateServerFactory; +import com.yuandian.dataflow.statemachine.StateFactory; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; + +/** + * 例子 强制转换leader + */ @Slf4j @ProcessorRaft public class TransferLeaderProcessor implements RpcProcessor { @@ -26,7 +30,7 @@ public class TransferLeaderProcessor implements RpcProcessor() { + + // 读一致性 + StateFactory.readIndexState(new GenericClosure() { @Override public void run(Status status) { var state = this.getValue(); - + // log.debug("masterExecute start {} {}", status, alivePeers); alivePeers.forEach((peer) -> { - WorkerState ws = state.getWorkers().get(peer); - if (ws != null) { - // - var canTasks = MAX_TASKS - ws.getTaskQueueSize(); - log.debug("cap :{} peer: {}", canTasks, peer); - if (canTasks <= 0) { - return; - } - ws.setUpdateAt(Instant.now()); - ws.setTaskQueueSize(MAX_TASKS); - - var request = new PacketsRequest(); - for (int i = 0; i < canTasks; i++) { - var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow - .newBuilder() - .setTableId(10086) - .build()); - request.getPackets().add(p); - } - - Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure() { - @Override - public void run(Status status) { - log.info("PacketsRequest run {}", status); - try { - ss.getRpcClient().invokeAsync(peer.getEndpoint(), request, - new InvokeCallback() { - @Override - public void complete(Object result, Throwable err) { - if(err != null) { - log.debug("{}", err); - } - log.debug("PacketsRequest: {}", result); - } - }, 5000); - } catch (InterruptedException | RemotingException e) { - log.info("error send packets {}", e.toString()); - } - } - }); + + if (state == null) { + log.error("readIndexState获取的状态为 {}", state); + return; } + + WorkerState ws = state.getWorkers().get(peer); + if (ws == null) { + log.error("WorkerState获取的状态为 {}", ws); + return; + } + + var canTasks = MAX_TASKS - ws.getTaskQueueSize(); + log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks); + if (canTasks <= 0) { + return; + } + ws.setUpdateAt(Instant.now()); + ws.setTaskQueueSize(MAX_TASKS); + + // 模拟发送包的数据到该节点上 + var request = new PacketsRequest(); + for (int i = 0; i < canTasks; i++) { + var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow + .newBuilder() + .setTableId(10086) + .build()); + request.getPackets().add(p); + } + + + // 先提交 节点的 剩余能处理的任务数量. 然后再处理 + Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), + new GenericClosure() { + @Override + public void run(Status status) { + log.info("PacketsRequest run {}", status); + try { + StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request, + new InvokeCallback() { + @Override + public void complete(Object result, Throwable err) { + if (err != null) { + // TODO: 如果错误, 需要让节点恢复任务处理的状态 + log.debug("{}", err); + } + log.debug("PacketsRequest: {}", result); + } + }, 5000); + } catch (InterruptedException | RemotingException e) { + log.info("error send packets {}", e.toString()); + } + } + }); + }); } - - } ); + + }); } Thread.sleep(5000); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java new file mode 100644 index 0000000..9399243 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java @@ -0,0 +1,272 @@ +/** + * description + * + * @author eson + *2022年7月12日-13:36:24 + */ +package com.yuandian.dataflow.statemachine; + +import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.reflections.Reflections; + +import com.alipay.remoting.NamedThreadFactory; +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.JRaftUtils; +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.closure.ReadIndexClosure; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.entity.Task; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.option.CliOptions; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.InvokeCallback; +import com.alipay.sofa.jraft.rpc.InvokeContext; +import com.alipay.sofa.jraft.rpc.RpcClient; +import com.alipay.sofa.jraft.rpc.RpcProcessor; +import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; +import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; +import com.yuandian.dataflow.statemachine.closure.GenericClosure; +import com.yuandian.dataflow.statemachine.operate.Operate; +import com.yuandian.dataflow.statemachine.rpc.RaftResponse; + +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; +import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; +import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.WorkerState; + +import lombok.Getter; +import lombok.Setter; + +import lombok.extern.slf4j.Slf4j; + +/** + * description + * + * @author eson + * 2022年7月12日-13:36:24 + */ +@Slf4j +public class StateFactory { + + private static StateServer ss; + + public static void startStateServer(String peerstr, Configuration conf) throws Exception { + if (ss != null) { + throw new Exception("重复初始化 InitStateServer"); + } + ss = new StateFactory.StateServer(peerstr, conf); + } + + public static boolean isLeader() { + return ss.node.isLeader(); + } + + public static PeerId getLeaderId() { + return ss.node.getLeaderId(); + } + + public static PeerId getServerId() { + return ss.cluster.getServerId(); + } + + public static Node getNode() { + return ss.node; + } + + public static Node getRaftNode() { + return ss.cluster.getRaftNode(); + } + + public static RpcClient getRpcClient() { + return ss.getRpcClient(); + } + + public static RaftGroupService getCluster() { + return ss.getCluster(); + } + + // 获取状态服务的对象 + public static StateServer getStateServer() { + return ss; + } + + public static void readIndexState(GenericClosure closure) { + ss.readIndexState(closure); + } + + public static void applyOperate(Operate op, GenericClosure closure) { + ss.applyOperate(op, closure); + } + + public static void rpcClientInvokeAsync(final Endpoint endpoint,final Object request,final InvokeCallback callback,final long timeoutMs) + throws InterruptedException, RemotingException { + ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs); + } + + public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final long timeoutMs) + throws InterruptedException, RemotingException { + return ss.getRpcClient().invokeSync(endpoint, request, timeoutMs); + } + + public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx, + final long timeoutMs) throws InterruptedException, RemotingException { + return ss.getRpcClient().invokeSync(endpoint, request, ctx, timeoutMs); + } + + @Getter + @Setter + public static class StateServer { + + RpcClient rpcClient; + + private Node node; + private RaftGroupService cluster; + private StateMachine fsm; + + private String groupId = "dataflow"; + private Executor readIndexExecutor = createReadIndexExecutor(); + + public StateServer(String addr, Configuration conf) { + // String[] peers = new + // String[]{"localhost:4440","localhost:4441","localhost:4442"}; + // String[] sprPeers = new String[]{"3440","3441","3442"}; + + // var peeridstr = peers[Integer.parseInt(serverId)]; + // var sprPort = sprPeers[Integer.parseInt(args[0])]; + + // String groupId = "jraft"; + + // conf = + // JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); + + PeerId serverId = JRaftUtils.getPeerId(addr); + int port = serverId.getPort(); + + NodeOptions nodeOptions = new NodeOptions(); + + nodeOptions.setElectionTimeoutMs(1000); + nodeOptions.setSnapshotLogIndexMargin(3600); + nodeOptions.setInitialConf(conf); + + File RaftDataFile = new File(String.format("./raftdata/%d", port)); + log.info("mkdirs: {}", RaftDataFile.mkdirs()); + + nodeOptions.setLogUri(String.format("./raftdata/%d/logs", port)); + nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port)); + nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port)); + fsm = new StateMachine(); // 状态实例初始化 + nodeOptions.setFsm(fsm); + + cluster = new RaftGroupService(groupId, serverId, nodeOptions); + + Set> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class); + scans.forEach((pRaftClass) -> { + try { + cluster.getRpcServer() + .registerProcessor((RpcProcessor) pRaftClass.getDeclaredConstructor().newInstance()); + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException | SecurityException e) { + log.info("{}", e.toString()); + } + }); + node = cluster.start(); + + rpcClient = new BoltRaftRpcFactory().createRpcClient(); + rpcClient.init(new CliOptions()); + } + + public boolean isLeader() { + return this.fsm.isLeader(); + } + + public void readIndexState(GenericClosure closure) { + getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { + @Override + public void run(Status status, long index, byte[] reqCtx) { + log.debug("readIndexState({}) {}", getServerId(), status); + if (status.isOk()) { + // 回调失败 + closure.success(ss.fsm.getState()); + closure.setValue(ss.fsm.getState()); + } + closure.run(status); + } + }); + } + + public void applyOperate(Operate op, GenericClosure closure) { + // 所有的提交都必须再leader进行 + if (!ss.isLeader()) { + ss.handlerNotLeaderError(closure); + return; + } + + try { + closure.setValue(op); + final Task task = new Task(); + task.setData( + ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); + task.setDone(closure); // 确认所有数据 一致, 不需要加锁 + StateFactory.getStateServer().getNode().apply(task); + } catch (CodecException e) { + String errorMsg = "Fail to encode TaskState"; + log.debug(errorMsg, e); + closure.failure(errorMsg, PeerId.emptyPeer()); + closure.run(new Status(RaftError.EINTERNAL, errorMsg)); + } + } + + public RaftResponse redirect() { + final RaftResponse response = new RaftResponse(); + response.setSuccess(false); + if (this.node != null) { + final PeerId leader = this.node.getLeaderId(); + if (leader != null) { + response.setRedirect(leader); + } + } + return response; + } + + public void handlerNotLeaderError(final GenericClosure closure) { + closure.failure("Not leader.", redirect().getRedirect()); + closure.run(new Status(RaftError.EPERM, "Not leader")); + } + + private Executor createReadIndexExecutor() { + return ThreadPoolUtil.newBuilder() // + .poolName("ReadIndexPool") // + .enableMetric(true) // + .coreThreads(4) // + .maximumThreads(4) // + .keepAliveSeconds(60L) // + .workQueue(new SynchronousQueue<>()) // + .threadFactory(new NamedThreadFactory("ReadIndexService", true)) // + .rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) // + .build(); + } + } + + public static void main(String[] args) throws InterruptedException, RemotingException { + var rpcClient = new BoltRaftRpcFactory().createRpcClient(); + rpcClient.init(new CliOptions()); + var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000); + log.info("{}", resp); + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 0270fba..2409578 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -28,7 +28,7 @@ import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; -import lombok.var; + import lombok.extern.slf4j.Slf4j; /** @@ -72,13 +72,13 @@ public class StateMachine extends StateMachineAdapter { Operate op = null; GenericClosure closure = null; if (iter.done() != null) { - // This task is applied by this node, get value from closure to avoid additional - // parsing. - closure = (GenericClosure) iter.done(); // 只支持单一个State. 全状态机只支持一种提交 + + // leader可以直接从 回调closure里提取operate + closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 op = closure.getValue(); } else { - // Have to parse FetchAddRequest from this user log. + // 非leader 需要从getData反序列化出来后处理 final ByteBuffer data = iter.getData(); try { @@ -91,29 +91,34 @@ public class StateMachine extends StateMachineAdapter { } - if (op != null) { - switch (op.getType()) { - case PUT: - WorkerState ws = op.getValue(); - state.getWorkers().put(ws.peerId, ws); - if (closure != null) { - closure.success(op); - closure.run(Status.OK()); - } - break; - case REMOVE: - if (closure != null) { - closure.success(op); - closure.run(Status.OK()); - } - break; - default: - break; + if (op == null) { + log.error("op 为 {}. 存在错误, 可能版本不一致", op); + continue; + } - } - } else { + switch (op.getType()) { + + case PUT_WORKERSTATE: + + WorkerState ws = op.getValue(); + log.debug("PUT {}", ws.peerId); + state.getWorkers().put(ws.peerId, ws); + if (closure != null) { + closure.success(op); + closure.run(Status.OK()); + } + break; + case REMOVE: + if (closure != null) { + closure.success(op); + closure.run(Status.OK()); + } + break; + default: + break; } + iter.next(); } @@ -136,7 +141,7 @@ public class StateMachine extends StateMachineAdapter { @Override public void onLeaderStart(final long term) { - log.debug("onLeaderStart {}", StateServerFactory.getServerId()); + log.debug("onLeaderStart {}", StateFactory.getServerId()); this.leaderTerm.set(term); // 判断是否Master线程还在跑, 如果存在则中断 @@ -144,26 +149,23 @@ public class StateMachine extends StateMachineAdapter { MasterFactory.getMasterExecute().interrupt(); } - var ss = StateServerFactory.getStateServer(); - ss.readIndexState(new GenericClosure() { - + + StateFactory.readIndexState(new GenericClosure() { @Override public void run(Status status) { - var ws = state.getWorkers().get(StateServerFactory.getServerId()); + var ws = state.getWorkers().get(StateFactory.getServerId()); if (ws == null) { - ws = new WorkerState(StateServerFactory.getServerId()); + ws = new WorkerState(StateFactory.getServerId()); } - Operate op = new Operate(OperateType.PUT, ws); - ss.applyOperate(op, new GenericClosure() { + StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure() { @Override public void run(Status status) { log.debug("master update workerstate: {}", status); } }); } - }); // 当成为master时候 必须启动 @@ -174,7 +176,7 @@ public class StateMachine extends StateMachineAdapter { @Override public void onLeaderStop(final Status status) { - log.debug("onLeaderStop {}", StateServerFactory.getCluster().getServerId()); + log.debug("onLeaderStop {}", StateFactory.getCluster().getServerId()); this.leaderTerm.set(-1); // 判断是否Master线程还在跑, 如果存在则中断 if (MasterFactory.getMasterExecute().isAlive()) { @@ -192,7 +194,7 @@ public class StateMachine extends StateMachineAdapter { @Override public void onStartFollowing(LeaderChangeContext ctx) { - log.debug("[onStartFollowing] {} {}", ctx, StateServerFactory.getCluster().getServerId()); + log.debug("[onStartFollowing] {} {}", ctx, StateFactory.getCluster().getServerId()); try { // 判断是否Master线程还在跑, 如果存在则中断 @@ -201,14 +203,13 @@ public class StateMachine extends StateMachineAdapter { } - var ws = new WorkerState(StateServerFactory.getServerId()); - log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); + var ws = new WorkerState(StateFactory.getServerId()); + log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId()); - var op = new Operate(OperateType.PUT, ws); - Operate.CallOperate(op, new GenericClosure() { + Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure() { @Override public void run(Status status) { - log.info("{} {}", status, this.getResponse()); + log.info("onStartFollowing CallOperate {} {}", status, this.getResponse()); } }); @@ -227,14 +228,14 @@ public class StateMachine extends StateMachineAdapter { @Override public void onStopFollowing(LeaderChangeContext ctx) { - log.debug("{} {}", ctx, StateServerFactory.getCluster().getServerId()); + log.debug("{} {}", ctx, StateFactory.getCluster().getServerId()); - var ss = StateServerFactory.getStateServer(); - var ws = new WorkerState(StateServerFactory.getServerId()); + var ss = StateFactory.getStateServer(); + var ws = new WorkerState(StateFactory.getServerId()); - log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); + log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId()); - var op = new Operate(OperateType.PUT, ws); + var op = new Operate(OperateType.PUT_WORKERSTATE, ws); Operate.CallOperate(op, new GenericClosure() { @Override public void run(Status status) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java deleted file mode 100644 index caccbe2..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ /dev/null @@ -1,259 +0,0 @@ -/** - * description - * - * @author eson - *2022年7月12日-13:36:24 - */ -package com.yuandian.dataflow.statemachine; - -import java.io.File; -import java.lang.reflect.InvocationTargetException; -import java.nio.ByteBuffer; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.reflections.Reflections; - -import com.alipay.remoting.NamedThreadFactory; -import com.alipay.remoting.exception.CodecException; -import com.alipay.remoting.serialization.SerializerManager; -import com.alipay.sofa.jraft.JRaftUtils; -import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.RaftGroupService; -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.closure.ReadIndexClosure; -import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.entity.Task; -import com.alipay.sofa.jraft.error.RaftError; -import com.alipay.sofa.jraft.error.RemotingException; -import com.alipay.sofa.jraft.option.CliOptions; -import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.rpc.InvokeCallback; -import com.alipay.sofa.jraft.rpc.RpcClient; -import com.alipay.sofa.jraft.rpc.RpcProcessor; -import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; -import com.alipay.sofa.jraft.util.BytesUtil; -import com.alipay.sofa.jraft.util.Endpoint; -import com.alipay.sofa.jraft.util.ThreadPoolUtil; -import com.yuandian.dataflow.statemachine.closure.GenericClosure; -import com.yuandian.dataflow.statemachine.operate.Operate; -import com.yuandian.dataflow.statemachine.rpc.RaftResponse; - -import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; -import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; -import com.yuandian.dataflow.statemachine.state.State; -import com.yuandian.dataflow.statemachine.state.WorkerState; - -import lombok.Getter; -import lombok.Setter; -import lombok.var; -import lombok.extern.slf4j.Slf4j; - -/** - * description - * - * @author eson - *2022年7月12日-13:36:24 - */ -@Slf4j -@var -public class StateServerFactory { - - private static StateServer ss; - - public static void startStateServer(String peerstr, Configuration conf) throws Exception { - if(ss != null) { - throw new Exception("重复初始化 InitStateServer"); - } - ss = new StateServerFactory.StateServer(peerstr, conf); - } - - - public static boolean isLeader() { - return ss.node.isLeader() ; - } - - - public static PeerId getLeaderId() { - return ss.node.getLeaderId() ; - } - - public static PeerId getServerId() { - return ss.cluster.getServerId(); - } - - public static Node getNode() { - return ss.node ; - } - - public static Node getRaftNode() { - return ss.cluster.getRaftNode() ; - } - - public static RpcClient getRpcClient() { - return ss.getRpcClient(); - } - - public static RaftGroupService getCluster() { - return ss.getCluster(); - } - - // 获取状态服务的对象 - public static StateServer getStateServer() { - return ss; - } - - @Getter - @Setter - public static class StateServer { - - RpcClient rpcClient; - - private Node node; - private RaftGroupService cluster; - private StateMachine fsm; - - private String groupId = "dataflow"; - private Executor readIndexExecutor = createReadIndexExecutor(); - - public StateServer(String addr, Configuration conf) { - // String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; - // String[] sprPeers = new String[]{"3440","3441","3442"}; - - // var peeridstr = peers[Integer.parseInt(serverId)]; - // var sprPort = sprPeers[Integer.parseInt(args[0])]; - - // String groupId = "jraft"; - - // conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); - - PeerId serverId = JRaftUtils.getPeerId(addr); - int port = serverId.getPort(); - - NodeOptions nodeOptions = new NodeOptions(); - - nodeOptions.setElectionTimeoutMs(1000); - nodeOptions.setSnapshotLogIndexMargin(3600); - nodeOptions.setInitialConf(conf); - - File RaftDataFile = new File(String.format("./raftdata/%d", port) ); - log.info("mkdirs: {}",RaftDataFile.mkdirs()); - - nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) ); - nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port)); - nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port)); - fsm = new StateMachine(); // 状态实例初始化 - nodeOptions.setFsm(fsm); - - cluster = new RaftGroupService(groupId, serverId, nodeOptions); - - Set> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class); - scans.forEach((pRaftClass)->{ - try { - cluster.getRpcServer().registerProcessor((RpcProcessor) pRaftClass.getDeclaredConstructor().newInstance()); - } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { - log.info("{}", e.toString()); - } - }); - node = cluster.start(); - - rpcClient = new BoltRaftRpcFactory().createRpcClient(); - rpcClient.init(new CliOptions()); - } - - - - public boolean isLeader() { - return this.fsm.isLeader(); - } - - - - public void useFsmState(Consumer dofunc) { - var state = ss.fsm.getState(); - synchronized(state) { - dofunc.accept(state); - } - } - - public void readIndexState(GenericClosure closure) { - getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { - @Override - public void run(Status status, long index, byte[] reqCtx) { - if(status.isOk()) { - // 回调失败 - closure.success(ss.fsm.getState()); - } - closure.run(status); - } - } ); - } - - - - public void applyOperate(Operate op, GenericClosure closure) { - // 所有的提交都必须再leader进行 - if (!ss.isLeader()) { - ss.handlerNotLeaderError(closure); - return; - } - - try { - closure.setValue(op); - final Task task = new Task(); - task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); - task.setDone(closure); // 确认所有数据 一致, 不需要加锁 - StateServerFactory.getStateServer().getNode().apply(task); - } catch (CodecException e) { - String errorMsg = "Fail to encode TaskState"; - log.debug(errorMsg, e); - closure.failure(errorMsg, PeerId.emptyPeer()); - closure.run(new Status(RaftError.EINTERNAL, errorMsg)); - } - } - - public RaftResponse redirect() { - final RaftResponse response = new RaftResponse(); - response.setSuccess(false); - if (this.node != null) { - final PeerId leader = this.node.getLeaderId(); - if (leader != null) { - response.setRedirect(leader); - } - } - return response; - } - - public void handlerNotLeaderError(final GenericClosure closure) { - closure.failure("Not leader.", redirect().getRedirect()); - closure.run(new Status(RaftError.EPERM, "Not leader")); - } - - private Executor createReadIndexExecutor() { - return ThreadPoolUtil.newBuilder() // - .poolName("ReadIndexPool") // - .enableMetric(true) // - .coreThreads(4) // - .maximumThreads(4) // - .keepAliveSeconds(60L) // - .workQueue(new SynchronousQueue<>()) // - .threadFactory(new NamedThreadFactory("ReadIndexService", true)) // - .rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) // - .build(); - } -} - - - - public static void main(String[] args) throws InterruptedException, RemotingException { - var rpcClient = new BoltRaftRpcFactory().createRpcClient(); - rpcClient.init(new CliOptions()); - var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new OperateRequest(), 5000); - log.info("{}", resp); - } -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java index 002bed1..db6787a 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java @@ -3,6 +3,7 @@ package com.yuandian.dataflow.statemachine.closure; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; +import com.yuandian.dataflow.statemachine.StateFactory; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.state.State; @@ -21,15 +22,21 @@ import org.slf4j.LoggerFactory; @ToString public abstract class GenericClosure implements Closure { + // 状态机的统一响应 private RaftResponse response; // 代表任务状态 private T value; public GenericClosure() { - + } + /** + * 错误的时候返回错误信息. 自动装配response + * @param errorMsg + * @param redirect + */ public void failure(final String errorMsg, final PeerId redirect) { final RaftResponse response = new RaftResponse(); response.setSuccess(false); @@ -38,6 +45,10 @@ public abstract class GenericClosure implements Closure { setResponse(response); } + /** + * 成功时调用该方法. 自动装配response + * @param value + */ public void success(final T value) { final RaftResponse response = new RaftResponse(); response.setValue(value); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/closure/StateClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/closure/StateClosure.java deleted file mode 100644 index e49a805..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine/closure/StateClosure.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.yuandian.dataflow.statemachine.closure; - -public class StateClosure { - -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index 3b822af..d385dd2 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -5,14 +5,14 @@ import java.io.Serializable; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.rpc.InvokeCallback; -import com.yuandian.dataflow.statemachine.StateServerFactory; +import com.yuandian.dataflow.statemachine.StateFactory; import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.state.WorkerState; import lombok.Data; -import lombok.var; + import lombok.extern.slf4j.Slf4j; /** @@ -22,11 +22,17 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j @Data -@var public class Operate implements Serializable { public static enum OperateType { - PUT, REMOVE; + /** + * 同步WorkerState状态. + */ + PUT_WORKERSTATE, + /** + * 暂无想法 + */ + REMOVE; } private OperateType type; @@ -52,10 +58,10 @@ public class Operate implements Serializable { * @param closure 回调函数. Operate为返回值 */ public static void CallOperate(Operate op, GenericClosure closure) { - - var ss = StateServerFactory.getStateServer(); + log.debug("CallOperate Value {}", op.getValue()); + var ss = StateFactory.getStateServer(); // 如果是leader 就直接提交 - if (StateServerFactory.isLeader()) { + if (StateFactory.isLeader()) { ss.applyOperate(op, closure); return; } @@ -64,7 +70,7 @@ public class Operate implements Serializable { var request = new OperateProcessor.OperateRequest(); request.setOperate(op); - var leaderId = StateServerFactory.getLeaderId(); + var leaderId = StateFactory.getLeaderId(); try { ss.getRpcClient().invokeAsync(leaderId.getEndpoint(), request, new InvokeCallback() { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index 86aae22..642a111 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -16,8 +16,8 @@ import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; -import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer; +import com.yuandian.dataflow.statemachine.StateFactory; +import com.yuandian.dataflow.statemachine.StateFactory.StateServer; import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; @@ -29,7 +29,7 @@ import org.apache.commons.lang.StringUtils; import lombok.Getter; import lombok.Setter; import lombok.ToString; -import lombok.var; + import lombok.extern.slf4j.Slf4j; /** @@ -82,7 +82,7 @@ public class OperateProcessor implements RpcProcessor - %d{yyyyMMdd HH:mm:ss.SSS} %level %thread\(%file:%line\): %msg%n + %d{yyyy-MM-dd HH:mm:ss.SSS} %red(%level) %cyan(%thread\(%file:%line\)): %msg%n