diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index fb01aab..642c803 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -25,6 +25,7 @@ import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.utils.PacketsManager; import com.yuandian.dataflow.utils.Utils; import lombok.extern.slf4j.Slf4j; @@ -36,40 +37,43 @@ public class MasterProcessor implements MasterExecute { /** * 最大任务数限制 */ - private final int MAX_TASKS = 100; + private final int MAX_TASKS = 1000; private final int DEFAULT_ASYNC_TIMEOUT = 5000; - public static Random rand = new Random(); + public static PacketsManager packetsManager = new PacketsManager(); @Override public void loop(MasterContext cxt) { - ArrayList packets = new ArrayList<>(); + // ArrayList packets = new ArrayList<>(); // 模拟发送包的数据到该节点上 - for (int i = 0; i < ThreadLocalRandom.current().nextLong(1000, 5000); i++) { + for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) { var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow .newBuilder() .setTableId(10086) .build()); - packets.add(p); + packetsManager.addPacket(p); + // packets.add(p); } // 必须复制. raft有一直使用该list var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); - log.debug("master({}) execute {}", StateFactory.getServerId(), alivePeers); + if(packetsManager.size() >= 100000) { + log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", packetsManager.size()); + packetsManager.discardPackets(50000); + log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size()); + cxt.sleep(5000); + } else { + log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size()); + } + + if (alivePeers == null) { cxt.sleep(100); // 休眠100毫秒. return; } - - - - - - - - var peerSize = alivePeers.size(); + // 等待全部反馈后才能进入下次循环 CountDownLatch latch = new CountDownLatch(alivePeers.size()); // 读一致性 @@ -86,8 +90,8 @@ public class MasterProcessor implements MasterExecute { PeerId[] peers = new PeerId[alivePeers.size()]; alivePeers.toArray(peers); + // 统计 每个节点还有多少任务容量 var canTasks = new int[alivePeers.size()]; - for(int i = 0; i < peers.length; i++) { var peer = peers[i]; WorkerState ws = state.getWorkers().get(peer); @@ -95,42 +99,34 @@ public class MasterProcessor implements MasterExecute { log.error("WorkerState获取的状态为 {}", ws); return; } - var can = MAX_TASKS - ws.getTaskQueueSize(); canTasks[i] = can; - } - var allocTasks = Utils.allocationTasks(packets.size(), canTasks); - - + // 统计每个节点发送多少任务 + var allocTasks = Utils.allocationTasks(packetsManager.size(), canTasks); for(int i = 0; i < peers.length; i++) { var peer = peers[i]; - WorkerState ws = state.getWorkers().get(peer); if (ws == null) { log.error("WorkerState获取的状态为 {}", ws); - return; + latch.countDown(); // 所有没调用提前返回都需要 countDown + continue; } - // TODO: 处理完善的任务数量 - allocTasks; - log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks); + if(allocTasks[i] <= 0) { + latch.countDown(); // 所有没调用提前返回都需要 countDown + continue; + } + ws.setUpdateAt(Instant.now()); - ws.setTaskQueueSize(MAX_TASKS); - - // 模拟发送包的数据到该节点上 - // var request = new PacketsRequest(); - // for (int i = 0; i < canTasks; i++) { - // var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow - // .newBuilder() - // .setTableId(10086) - // .build()); - // request.getPackets().add(p); - // } + ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]); + log.info("剩余能处理的任务数量[{}] :{}", peer, MAX_TASKS - ws.getTaskQueueSize()); + // ws.setTaskQueueSize(MAX_TASKS); + var packets = packetsManager.popPackets(allocTasks[i]); // 先提交 节点的 剩余能处理的任务数量. 然后再处理 var request = new PacketsRequest(); // 数据包切片 request.setPackets(packets); @@ -146,6 +142,7 @@ public class MasterProcessor implements MasterExecute { @Override public void complete(Object result, Throwable err) { latch.countDown(); + log.debug("countDown {}", latch.getCount()); if (err != null) { // TODO: 如果错误, 需要让节点恢复任务处理的状态 log.debug("{}", err); diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index e6eab59..db3041e 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -7,6 +7,7 @@ package com.yuandian.dataflow.controller; import java.io.Serializable; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Random; @@ -49,17 +50,20 @@ public class PacketsProcessor implements RpcProcessor temp = new LinkedBlockingQueue<>(); + private LinkedBlockingQueue packets = new LinkedBlockingQueue<>(); + + + /** + * 添加数据到队列里, 做了双队列缓冲 + * @param _packets + */ + public void addPacket(Any packet) { + try { + lock.lock(); + this.temp.add(packet); + } finally{ + lock.unlock(); + } + } + + + /** + * 添加数据到队列里, 做了双队列缓冲 + * @param _packets + */ + public void addPackets(Collection _packets) { + + try { + lock.lock(); + this.temp.addAll(_packets); + } finally{ + lock.unlock(); + + } + + + } + + /** + * 返回 总共packets的数据 + * @return + */ + public int size() { + return this.temp.size() + this.packets.size(); + } + + private void lockTemp( Consumer< LinkedBlockingQueue> dofunc) { + try { + lock.lock(); + this.packets.addAll(this.temp); // 合并缓冲的数据到使用中. + this.temp.clear(); + dofunc.accept(this.packets); + } catch (Exception e) { + log.error("{}",e); + } finally { + lock.unlock(); + + } + } + + /** + * 弹出需要数量num的数据 + * @param num 需要数量 + * @return 反回弹出少于等于num的数据 + */ + public ArrayList popPackets(int num) { + var result = new ArrayList(); + this.lockTemp((_packets)->{ + for(var i = 0 ; i < num ; i++) { + var p = _packets.poll(); + if(p == null){ + return; + } + result.add(p); + } + }); + return result; + } + + public void discardPackets(int remainSize) { + this.lockTemp((_packets)->{ + var delsize = _packets.size() - remainSize ; + var step = _packets.size() / delsize ; + var iter = _packets.iterator(); + for(var i = 0 ; i < packets.size(); i++) { + iter.next(); + if(i % step == 0) { + iter.remove(); + } + } + }); + } +}