From cbf03088410ff9a94a5511b644c1e5fe05c6895b Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Mon, 1 Aug 2022 18:09:31 +0800 Subject: [PATCH] =?UTF-8?q?TODO:=20=E8=A7=A3=E5=86=B3=E8=AF=BB=E4=B8=80?= =?UTF-8?q?=E8=87=B4=E6=80=A7=E6=AD=BB=E9=94=81=E7=9A=84=E9=97=AE=E9=A2=98?= =?UTF-8?q?.=20=E5=BC=82=E6=AD=A5=E5=BE=88=E5=AE=B9=E6=98=93=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E6=AD=BB=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 1 - .vscode/tasks.json | 15 +--- readme.md | 49 +++++++++++++ .../dataflow/controller/MasterProcessor.java | 39 +++++++--- .../dataflow/controller/PacketsProcessor.java | 46 +++++------- .../dataflow/statemachine/MasterFactory.java | 2 +- .../dataflow/statemachine/StateFactory.java | 71 +++++++++++-------- .../dataflow/statemachine/StateMachine.java | 62 ++++++++-------- .../statemachine/closure/GenericClosure.java | 17 +++-- .../statemachine/master/MasterContext.java | 21 +++++- .../statemachine/operate/Operate.java | 11 +-- .../statemachine/rpc/OperateProcessor.java | 13 +--- .../statemachine/rpc/RaftResponse.java | 4 +- .../com/yuandian/dataflow/utils/Utils.java | 15 ++-- start.sh | 6 +- 15 files changed, 219 insertions(+), 153 deletions(-) create mode 100644 readme.md diff --git a/.vscode/launch.json b/.vscode/launch.json index 097b04c..c9a246d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -14,7 +14,6 @@ "args": [ "0" ], - "preLaunchTask": "rename", "presentation": { "reveal": "always", "plane": "new", diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 97231cc..4299177 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -28,21 +28,10 @@ "panel": "new", "showReuseMessage": true, "clear": false, - "close": false + "close": true }, - }, - { - "label": "rename", - "type": "shell", - "command": "ehco ${workbench.action.terminal.rename}", - "args": [ - "123" - ] - - }, - - + } ], diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..ea0a8ba --- /dev/null +++ b/readme.md @@ -0,0 +1,49 @@ +# 使用 + +## 状态机的使用 + +* 实现一个Master循环 +```java +@MasterRegister +public class MasterProcessor implements MasterExecute { + + /** + * 主循环入口 + */ + @Override + public void loop(MasterContext cxt) { + //TODO: + } +} +``` + +* 实现多个Worker RPC接口 +```java +@WorkerRegister +public class PacketsProcessor implements RpcProcessor { + + /** + * 请求参数类 + */ + @Setter + @Getter + public static class PacketsRequest implements Serializable { + private ArrayList packets = new ArrayList<>(); // 传参 + private int Code; // 传参 + } + + + @Override + public void handleRequest(RpcContext rpcCtx, PacketsRequest request) { + // TODO: 处理请求 + } + + /** + * 返回请求的类名 + */ + @Override + public String interest() { + return PacketsRequest.class.getName(); + } +} +``` diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index f865a58..91a3d38 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -2,6 +2,9 @@ package com.yuandian.dataflow.controller; import java.time.Instant; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RemotingException; @@ -29,27 +32,29 @@ public class MasterProcessor implements MasterExecute { @Override public void loop(MasterContext cxt) { - try { - Thread.sleep(3000); - } catch (InterruptedException e1) { - e1.printStackTrace(); - return; - } + + // try { + // Thread.sleep(1000); + // } catch (InterruptedException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); - log.debug("master({}) execute {}", StateFactory.getServerId(), - StateFactory.getRaftNode().listAlivePeers()); + log.debug("master({}) execute {}", StateFactory.getServerId(), alivePeers); if (alivePeers == null) { return; } + CountDownLatch latch = new CountDownLatch(alivePeers.size()); // 读一致性 - StateFactory.readIndexState(new GenericClosure() { + StateFactory.readIndexState(new GenericClosure() { @Override public void run(Status status) { - var state = this.getValue(); + var state = this.getValue(); // log.debug("masterExecute start {} {}", status, alivePeers); + alivePeers.forEach((peer) -> { if (state == null) { @@ -82,9 +87,10 @@ public class MasterProcessor implements MasterExecute { } // 先提交 节点的 剩余能处理的任务数量. 然后再处理 + Operate.CallOperate( new Operate(OperateType.PUT_WORKERSTATE, ws), - new GenericClosure() { + new GenericClosure() { @Override public void run(Status status) { log.info("PacketsRequest run {}", status); @@ -93,6 +99,8 @@ public class MasterProcessor implements MasterExecute { new InvokeCallback() { @Override public void complete(Object result, Throwable err) { + latch.countDown(); + log.debug("countDown {}", latch.getCount()); if (err != null) { // TODO: 如果错误, 需要让节点恢复任务处理的状态 log.debug("{}", err); @@ -105,10 +113,19 @@ public class MasterProcessor implements MasterExecute { } } }); + }); + + } }); + + try { + latch.await(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("{}", e.toString()); + } } } diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 9ac1163..a00d2bc 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -50,7 +50,7 @@ public class PacketsProcessor implements RpcProcessor(); + var resp = new RaftResponse(); resp.setSuccess(true); rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回 @@ -59,44 +59,30 @@ public class PacketsProcessor implements RpcProcessor 里的 getValue为 State的状态 - StateFactory.readIndexState(new GenericClosure() { + var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态 + var ws = state.getWorkers().get(StateFactory.getServerId()); + ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 + ws.setUpdateAt(Instant.now()); // 设置更新时间 - @Override - public void run(Status status) { - - if (!status.isOk()) { - log.error("失败 readIndexState {}", status); - } - - // readIndexState 失败后也需要直接 更新自己状态 - - var state = this.getValue(); // 获取返回的状态 - var ws = state.getWorkers().get(StateFactory.getServerId()); - ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 - ws.setUpdateAt(Instant.now()); // 设置更新时间 - Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), - new GenericClosure() { - @Override - public void run(Status status) { - if (!status.isOk()) { - log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp); - } - } - }); - - } - }); - } - ; + Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), + new GenericClosure() { + @Override + public void run(Status status) { + if (!status.isOk()) { + log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp); + } + } + }); + } ; } @Override diff --git a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java index 8e4263a..4eb6ba3 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java @@ -57,7 +57,7 @@ public class MasterFactory { public void run() { MasterContext cxt = new MasterContext(); - while (!cxt.getIsExit().get()) { + while (!cxt.getIsExit()) { masterExecuteCls.loop(cxt); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java index fc6a750..15d8c86 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java @@ -18,6 +18,8 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.reflections.ReflectionUtils; @@ -33,6 +35,7 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.closure.ReadIndexClosure; import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.entity.ReadIndexStatus; import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.error.RemotingException; @@ -51,6 +54,7 @@ import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.master.MasterExecute; import com.yuandian.dataflow.statemachine.operate.Operate; +import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.state.State; @@ -70,6 +74,7 @@ import lombok.extern.slf4j.Slf4j; public class StateFactory { private static StateServer ss; + private static ReentrantLock lockReadIndex = new ReentrantLock(); public static void startStateServer(String peerstr, Configuration conf) throws Exception { if (ss != null) { @@ -111,11 +116,11 @@ public class StateFactory { return ss; } - public static void readIndexState(GenericClosure closure) { + public static void readIndexState(GenericClosure closure) { ss.readIndexState(closure); } - public static void applyOperate(Operate op, GenericClosure closure) { + public static void applyOperate(Operate op, GenericClosure closure) { ss.applyOperate(op, closure); } @@ -161,8 +166,6 @@ public class StateFactory { // conf = // JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); - - PeerId serverId = JRaftUtils.getPeerId(addr); int port = serverId.getPort(); @@ -189,16 +192,15 @@ public class StateFactory { var traces = Thread.currentThread().getStackTrace(); var clsName = traces[traces.length - 1].getClassName(); var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3)); - log.info("获取 {} -> {} 下包的所有注解",clsName, packName ); + log.info("获取 {} -> {} 下包的所有注解", clsName, packName); var refl = new Reflections(packName); Set> scans = refl.getTypesAnnotatedWith(WorkerRegister.class); - scans.forEach((pRaftClass) -> { scansMap.put(pRaftClass.getName(), pRaftClass); }); - log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis()) ; + log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis()); scansMap.forEach((name, pRaftClass) -> { try { cluster.getRpcServer() @@ -209,22 +211,23 @@ public class StateFactory { } }); - refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass)->{ + refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass) -> { try { - MasterExecute execute = (MasterExecute)pClass.getDeclaredConstructor().newInstance(); + MasterExecute execute = (MasterExecute) pClass.getDeclaredConstructor().newInstance(); MasterFactory.registerMasterLoop(execute); - + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { - log.info("{}", e.toString()); + log.info("{}", e.toString()); } - });; - + }); + ; + // 启动集群 node = cluster.start(); - rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端. + rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端. rpcClient.init(new CliOptions()); // 初始化 } @@ -232,27 +235,35 @@ public class StateFactory { return this.fsm.isLeader(); } - public void readIndexState(GenericClosure closure) { - getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { + public void readIndexState(GenericClosure closure) { + + getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(2000) { @Override public void run(Status status, long index, byte[] reqCtx) { log.debug("readIndexState({}) {}", getServerId(), status); if (status.isOk()) { - closure.success(ss.fsm.getState()); closure.setValue(ss.fsm.getState()); - } else { - - // 提交同步 - } - + closure.run(status); + return ; + } // 回调失败 - closure.run(status); + // 提交同步 + + log.info("status not ok"); + readIndexExecutor.execute(()->{ + if(isLeader()) { + Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure); + } else { + handlerNotLeaderError(closure); + } + }); } }); + } - public void applyOperate(Operate op, GenericClosure closure) { + public void applyOperate(Operate op, GenericClosure closure) { // 所有的提交都必须再leader进行 if (!ss.isLeader()) { ss.handlerNotLeaderError(closure); @@ -265,17 +276,17 @@ public class StateFactory { task.setData( ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); task.setDone(closure); // 确认所有数据 一致, 不需要加锁 - StateFactory.getStateServer().getNode().apply(task); + StateFactory.getNode().apply(task); } catch (CodecException e) { - String errorMsg = "Fail to encode TaskState"; - log.debug(errorMsg, e); + String errorMsg = "Fail to encode WorkerState"; + log.debug("{} {}", errorMsg, e); closure.failure(errorMsg, PeerId.emptyPeer()); closure.run(new Status(RaftError.EINTERNAL, errorMsg)); } } - public RaftResponse redirect() { - final RaftResponse response = new RaftResponse(); + public RaftResponse redirect() { + final RaftResponse response = new RaftResponse (); response.setSuccess(false); if (this.node != null) { final PeerId leader = this.node.getLeaderId(); @@ -286,7 +297,7 @@ public class StateFactory { return response; } - public void handlerNotLeaderError(final GenericClosure closure) { + public void handlerNotLeaderError(final GenericClosure closure) { closure.failure("Not leader.", redirect().getRedirect()); closure.run(new Status(RaftError.EPERM, "Not leader")); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 9220fa9..0a551af 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -61,12 +61,12 @@ public class StateMachine extends StateMachineAdapter { while (iter.hasNext()) { Operate op = null; - GenericClosure closure = null; + GenericClosure closure = null; if (iter.done() != null) { // leader可以直接从 回调closure里提取operate - closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 - op = closure.getValue(); + closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 + op = (Operate)closure.getValue(); } else { // 非leader 需要从getData反序列化出来后处理 @@ -93,21 +93,24 @@ public class StateMachine extends StateMachineAdapter { WorkerState ws = op.getValue(); log.debug("PUT {}", ws.peerId); state.getWorkers().put(ws.peerId, ws); - if (closure != null) { - closure.success(op); - closure.run(Status.OK()); - } + + break; + case GET_STATE: + closure.setValue(this.state); + log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex()); break; case REMOVE: - if (closure != null) { - closure.success(op); - closure.run(Status.OK()); - } + break; default: break; } + + if (closure != null) { + closure.success(op); + closure.run(Status.OK()); + } iter.next(); @@ -131,7 +134,7 @@ public class StateMachine extends StateMachineAdapter { @Override public void onLeaderStart(final long term) { - log.debug("onLeaderStart {}", StateFactory.getServerId()); + log.debug("onLeaderStart[{}]", StateFactory.getServerId()); this.leaderTerm.set(term); // 判断是否Master线程还在跑, 如果存在则中断 @@ -139,17 +142,17 @@ public class StateMachine extends StateMachineAdapter { MasterFactory.getMasterExecute().interrupt(); } - - StateFactory.readIndexState(new GenericClosure() { + StateFactory.readIndexState(new GenericClosure() { @Override public void run(Status status) { - - var ws = state.getWorkers().get(StateFactory.getServerId()); + + var ws = this.getValue().getWorkers().get(StateFactory.getServerId()); if (ws == null) { ws = new WorkerState(StateFactory.getServerId()); } - StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure() { + // 更新当前WorkerState + StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () { @Override public void run(Status status) { log.debug("master update workerstate: {}", status); @@ -160,31 +163,29 @@ public class StateMachine extends StateMachineAdapter { // 当成为master时候 必须启动 MasterFactory.getMasterExecute().start(); - super.onLeaderStart(term); } @Override public void onLeaderStop(final Status status) { - log.debug("onLeaderStop {}", StateFactory.getCluster().getServerId()); + log.debug("onLeaderStop[{}]", StateFactory.getServerId()); this.leaderTerm.set(-1); // 判断是否Master线程还在跑, 如果存在则中断 if (MasterFactory.getMasterExecute().isAlive()) { MasterFactory.getMasterExecute().interrupt(); } - super.onLeaderStop(status); } @Override public void onShutdown() { - log.debug("onShutdown"); + log.info("onShutdown[{}]",StateFactory.getServerId()); super.onShutdown(); } @Override public void onStartFollowing(LeaderChangeContext ctx) { - log.debug("[onStartFollowing] {} {}", ctx, StateFactory.getCluster().getServerId()); + log.debug("onStartFollowing[{}]] {}", StateFactory.getServerId(),ctx); try { // 判断是否Master线程还在跑, 如果存在则中断 @@ -192,14 +193,14 @@ public class StateMachine extends StateMachineAdapter { MasterFactory.getMasterExecute().interrupt(); } - - var ws = new WorkerState(StateFactory.getServerId()); - log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId()); + // 在startFollowing不能使用 readIndexState - Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure() { + + // 更新当前WorkerState + Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() { @Override public void run(Status status) { - log.info("onStartFollowing CallOperate {} {}", status, this.getResponse()); + log.debug("onStartFollowing update workerstate: {}", status); } }); @@ -219,14 +220,11 @@ public class StateMachine extends StateMachineAdapter { @Override public void onStopFollowing(LeaderChangeContext ctx) { - log.debug("{} {}", ctx, StateFactory.getCluster().getServerId()); + log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx); var ws = new WorkerState(StateFactory.getServerId()); - - log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId()); - var op = new Operate(OperateType.PUT_WORKERSTATE, ws); - Operate.CallOperate(op, new GenericClosure() { + Operate.CallOperate(op, new GenericClosure() { @Override public void run(Status status) { log.info("{} {}", status, this.getResponse()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java index 0f8a775..4d870e5 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java @@ -13,13 +13,17 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString -public abstract class GenericClosure implements Closure { +public abstract class GenericClosure implements Closure { // 状态机的统一响应 - private RaftResponse response; + private RaftResponse response; // 代表任务状态 - private T value; + private Object value; + + public T getValue() { + return (T)this.value; + } public GenericClosure() { @@ -31,9 +35,10 @@ public abstract class GenericClosure implements Closure { * @param redirect */ public void failure(final String errorMsg, final PeerId redirect) { - final RaftResponse response = new RaftResponse(); + final RaftResponse response = new RaftResponse(); response.setSuccess(false); response.setMsg(errorMsg); + log.error("{}", errorMsg); response.setRedirect(redirect); setResponse(response); } @@ -42,8 +47,8 @@ public abstract class GenericClosure implements Closure { * 成功时调用该方法. 自动装配response * @param value */ - public void success(final T value) { - final RaftResponse response = new RaftResponse(); + public void success(final Object value) { + final RaftResponse response = new RaftResponse (); response.setValue(value); response.setSuccess(true); setResponse(response); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java index f510a6e..5acc959 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java @@ -6,10 +6,25 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -@Slf4j -@Getter -@Setter + public class MasterContext { private AtomicBoolean isExit = new AtomicBoolean(false); + private Object share; + + public Boolean getIsExit() { + return isExit.get(); + } + + public void setIsExit(Boolean isExit) { + this.isExit.set(isExit); + } + + public Object getShare() { + return share; + } + + public void setShare(Object share) { + this.share = share; + } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index 51311ef..44064dd 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -28,6 +28,10 @@ public class Operate implements Serializable { * 同步WorkerState状态. */ PUT_WORKERSTATE, + /** + * 获取State状态 + */ + GET_STATE, /** * 暂无想法 */ @@ -58,7 +62,7 @@ public class Operate implements Serializable { * @param closure 回调函数. Operate为返回值 */ @java.lang.SuppressWarnings("unchecked") - public static void CallOperate(Operate op, GenericClosure closure) { + public static void CallOperate(Operate op, GenericClosure closure) { log.debug("CallOperate Value {}", op.getValue()); // 如果是leader 就直接提交 if (StateFactory.isLeader()) { @@ -70,13 +74,12 @@ public class Operate implements Serializable { var request = new OperateProcessor.OperateRequest(); request.setOperate(op); - var leaderId = StateFactory.getLeaderId(); try { - StateFactory.rpcClientInvokeAsync(leaderId.getEndpoint(), request, new InvokeCallback() { + StateFactory.rpcClientInvokeAsync(StateFactory.getLeaderId().getEndpoint(), request, new InvokeCallback() { @Override public void complete(Object result, Throwable err) { log.debug("Object result {}", result); - var resp = (RaftResponse) result; + var resp = (RaftResponse) result; closure.setResponse(resp); closure.success(resp.getValue()); closure.run(Status.OK()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index f440658..4c6daf0 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -56,7 +56,7 @@ public class OperateProcessor implements RpcProcessor closure = new GenericClosure() { + final GenericClosure closure = new GenericClosure () { @Override public void run(Status status) { @@ -81,15 +81,4 @@ public class OperateProcessor implements RpcProcessor implements Serializable { +public class RaftResponse implements Serializable { private static final long serialVersionUID = 1L; - private T value; + private Object value; private boolean success; /** diff --git a/src/main/java/com/yuandian/dataflow/utils/Utils.java b/src/main/java/com/yuandian/dataflow/utils/Utils.java index e3c1a3b..690a638 100644 --- a/src/main/java/com/yuandian/dataflow/utils/Utils.java +++ b/src/main/java/com/yuandian/dataflow/utils/Utils.java @@ -3,6 +3,13 @@ package com.yuandian.dataflow.utils; public class Utils { + /** + * 找到第n个substr索引点的字符串索引 + * @param str 原始字符串 + * @param substr 需要定位的字符串 + * @param n 第几个 + * @return + */ public static int indexOf(String str, String substr, int n) { int pos = str.indexOf(substr); while (--n > 0 && pos != -1) { @@ -13,10 +20,10 @@ public class Utils { /** - * 尽可能的拿到 n个substr索引点的字符串. 如果超出就拿最大值. + * 尽可能的找到n个substr索引点的字符串索引. 如果超出就拿最大值. * @param str 原始字符串 * @param substr 需要定位的字符串 - * @param n + * @param n 第几个 * @return */ public static int indexOfAsPossible(String str, String substr, int n) { @@ -30,7 +37,5 @@ public class Utils { return pos; } - public static void main(String[] args) { - - } + } diff --git a/start.sh b/start.sh index 0ffea46..c9f1425 100755 --- a/start.sh +++ b/start.sh @@ -1,17 +1,17 @@ #! /bin/bash screen -S raft-0 -X quit screen -S raft-1 -X quit -# screen -S raft-2 -X quit +screen -S raft-2 -X quit sleep 1s VERSION=1.0.0-SNAPSHOT screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0 screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1 -# screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2 +screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2 sleep 0.5s screen -S raft-0 -X logfile flush 0 screen -S raft-1 -X logfile flush 0 -# screen -S raft-2 -X logfile flush 0 \ No newline at end of file +screen -S raft-2 -X logfile flush 0 \ No newline at end of file