diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index df20041..450d47f 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -97,7 +97,6 @@ public class PacketsProcessor implements RpcProcessor closure) { + log.debug("applyWorkerState"); if (!ss.isLeader()) { ss.handlerNotLeaderError(closure); return; @@ -213,24 +214,21 @@ public class StateServerFactory { StateServerFactory.getStateServer().useFsmStateNotLock((fsmState)->{ var wmap = fsmState.getWorkers(); var wstate = wmap.get(state.getPeerId()); - if(wstate == null) { wmap.put(state.getPeerId(), state); - try { - final Task task = new Task(); - closure.setValue(fsmState); - task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState))); - task.setDone(closure); - StateServerFactory.getStateServer().getNode().apply(task); // 提交数据 - } catch (CodecException e) { - String errorMsg = "Fail to encode TaskState"; - log.debug("{}:{}",errorMsg, e); - closure.failure(errorMsg, PeerId.emptyPeer()); - closure.run(new Status(RaftError.EINTERNAL, errorMsg)); - } - } else { - closure.success(fsmState); - closure.run(Status.OK()); + } + + try { + final Task task = new Task(); + closure.setValue(fsmState); + task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState))); + task.setDone(closure); + StateServerFactory.getStateServer().getNode().apply(task); // 提交数据 + } catch (CodecException e) { + String errorMsg = "Fail to encode TaskState"; + log.debug("{}:{}",errorMsg, e); + closure.failure(errorMsg, PeerId.emptyPeer()); + closure.run(new Status(RaftError.EINTERNAL, errorMsg)); } }); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncWorkerStateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncWorkerStateProcessor.java index 252f473..a7212c6 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncWorkerStateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncWorkerStateProcessor.java @@ -56,7 +56,7 @@ public class SyncWorkerStateProcessor implements RpcProcessor closure = new SyncClosure< State>() { @Override public void run(Status status) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java index 30b8808..8c8ab68 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java @@ -7,11 +7,13 @@ package com.yuandian.dataflow.statemachine.state; import java.time.Instant; +import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.google.protobuf.Any; @@ -38,6 +40,18 @@ import lombok.extern.slf4j.Slf4j; @ToString public class StateFactory { + @Getter + @Setter + @ToString + public static class PeerIdCap { + private PeerId peer; + private long cap; + public PeerIdCap(PeerId pid, long cap) { + this.peer = pid; + this.cap = cap; + } + } + public static Thread masterExecute = new Thread(new Runnable() { @Override public void run() { @@ -50,46 +64,24 @@ public class StateFactory { var ss = StateServerFactory.getStateServer(); // var state = ss.getFsm().getState(); + + ArrayList pclist = new ArrayList(); + ss.useFsmStateNotLock((state) -> { alivePeers.forEach((peer) -> { WorkerState ws = state.getWorkers().get(peer); - if (ws != null) { var cap = 100 - ws.getTaskQueueSize(); log.debug("cap :{} peer: {}", cap, peer); if (cap <= 0) { return ; } - - var request = new PacketsRequest(); - for (int i = 0; i < cap; i++) { - var p = Any.pack( - BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() - .setTableId(10086) - .build()); - request.getPackets().add(p); - } - - - try { - log.debug("rpc {}", peer); - ss.getRpcClient().invokeAsync(peer.getEndpoint(), - request, new InvokeCallback() { - @Override - public void complete(Object result, Throwable err) { - if (err != null) { - err.printStackTrace(); - } else { - log.info("{} peer result", result); - } - } - - }, 5000); - } catch (InterruptedException | RemotingException e) { - e.printStackTrace(); - } ws.setUpdateAt(Instant.now()); ws.setTaskQueueSize(100); + var pc = new PeerIdCap(peer, cap); + pc.setCap(cap); + ; + pclist.add(pc); } }); @@ -97,6 +89,48 @@ public class StateFactory { @Override public void run(Status status) { log.info("任务队列更新成功 {}", this.getValue().getWorkers()); + + pclist.forEach((peer) -> { + + + + + + + if (peer.getCap() <= 0) { + return ; + } + + var request = new PacketsRequest(); + for (int i = 0; i < peer.getCap(); i++) { + var p = Any.pack( + BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() + .setTableId(10086) + .build()); + request.getPackets().add(p); + } + + + try { + log.debug("rpc {}", peer); + ss.getRpcClient().invokeAsync(peer.peer.getEndpoint() , + request, new InvokeCallback() { + @Override + public void complete(Object result, Throwable err) { + if (err != null) { + err.printStackTrace(); + } else { + log.info("{} peer result", result); + } + } + + }, 5000); + } catch (InterruptedException | RemotingException e) { + e.printStackTrace(); + } + + + }); } }); @@ -113,7 +147,7 @@ public class StateFactory { } - Thread.sleep(2000); + Thread.sleep(5000); } } catch (InterruptedException e) { e.printStackTrace();