diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index c8a3162..ff00dc9 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -45,6 +45,7 @@ public class PacketsProcessor implements RpcProcessor> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class); scans.forEach((pRaftClass)->{ diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java index 6d9c165..0c21e5c 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java @@ -6,13 +6,16 @@ */ package com.yuandian.dataflow.statemachine.state; - +import java.time.Instant; + import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result; +import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RemotingException; import com.google.protobuf.Any; import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; import com.yuandian.dataflow.statemachine.StateServerFactory; +import com.yuandian.dataflow.statemachine.SyncClosure; import lombok.Getter; import lombok.Setter; @@ -43,47 +46,58 @@ public class StateFactory { if (alivePeers != null) { var ss = StateServerFactory.getStateServer(); StateServerFactory.getStateServer().useFsmStateAsync((state) -> { - synchronized(alivePeers){ - alivePeers.forEach((peer) -> { - var ws = state.getWorkers().get(peer); - if (ws != null) { - var cap = 100 - ws.getTaskQueueSize(); - if (cap > 0) { - log.debug("{}", cap); - var request = new PacketsRequest(); - for(int i = 0; i < cap ; i++ ) { - var p = Any.pack( - BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() - .setTableId(10086) - .build() - ); + synchronized (alivePeers) { + alivePeers.forEach((peer) -> { + WorkerState ws = state.getWorkers().get(peer); + if (ws != null) { + var cap = 100 - ws.getTaskQueueSize(); + if (cap > 0) { + log.debug("{}", cap); + var request = new PacketsRequest(); + for (int i = 0; i < cap; i++) { + var p = Any.pack( + BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() + .setTableId(10086) + .build()); request.getPackets().add(p); - } - - try { - var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(), request, 5000); - log.info("{}", result); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (RemotingException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + } + + try { + var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(), + request, 5000); + log.info("{}", result); + ws.setUpdateAt( Instant.now() ); + ws.setTaskQueueSize(ws.getTaskQueueSize() - cap); + + + + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (RemotingException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } } - } - alivePeers.notifyAll(); - }); - } - + + ss.applyState(state, new SyncClosure() { + public void run(Status status) { + log.debug("{}", status); + }; + } ); + alivePeers.notifyAll(); + }); + } + return null; }); - synchronized(alivePeers){ + synchronized (alivePeers) { alivePeers.wait(5000); } - + } Thread.sleep(2000); diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 77df777..e67372b 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -20,11 +20,11 @@ - - - + + \ No newline at end of file