diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 450d47f..84d6536 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -54,20 +54,17 @@ public class PacketsProcessor implements RpcProcessor{ var work = state.getWorkers().get( ss.getCluster().getServerId()); - - + + work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size()); work.setUpdateAt(Instant.now()); - + log.debug("workerState taskQueueSize: {} psize: {}", work.getTaskQueueSize(), request.packets.size()); if(!ss.isLeader()) { var requestUpdateState = new SyncWorkerStateProcessor.RequestWorkerState(); requestUpdateState.setWorkerState(work); @@ -80,7 +77,7 @@ public class PacketsProcessor implements RpcProcessor() { @Override public void run(Status status) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 6f888d1..be327a4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -177,8 +177,22 @@ public class StateServerFactory { } public void useFsmStateNotLock(Consumer dofunc) { + var state = ss.fsm.getState(); - dofunc.accept(state); + synchronized(state) { + dofunc.accept(state); + } + + // getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { + + // @Override + // public void run(Status status, long index, byte[] reqCtx) { + // var state = ss.fsm.getState(); + // dofunc.accept(state); + // } + + // } ); + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java index 8c8ab68..c95931f 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java @@ -90,19 +90,14 @@ public class StateFactory { public void run(Status status) { log.info("任务队列更新成功 {}", this.getValue().getWorkers()); - pclist.forEach((peer) -> { - - - - - - - if (peer.getCap() <= 0) { + pclist.forEach((peercap) -> { + + if (peercap.getCap() <= 0) { return ; } var request = new PacketsRequest(); - for (int i = 0; i < peer.getCap(); i++) { + for (int i = 0; i < peercap.getCap(); i++) { var p = Any.pack( BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() .setTableId(10086) @@ -112,8 +107,8 @@ public class StateFactory { try { - log.debug("rpc {}", peer); - ss.getRpcClient().invokeAsync(peer.peer.getEndpoint() , + log.debug("rpc {}", peercap); + ss.getRpcClient().invokeAsync(peercap.peer.getEndpoint() , request, new InvokeCallback() { @Override public void complete(Object result, Throwable err) {