From f2fdc44a440a36e947e39587422d9ce3209d8ac4 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Wed, 3 Aug 2022 18:08:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BF=85=E8=A6=81=E6=97=B6=20=E5=9B=9E?= =?UTF-8?q?=E6=BB=9A.=20=E6=94=B9=E8=BF=9B=E5=90=8E=E6=AD=BB=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataflow/controller/MasterProcessor.java | 134 ++++++------------ .../dataflow/controller/PacketsProcessor.java | 41 +++--- .../java/com/yuandian/dataflow/projo/Doc.java | 5 +- .../dataflow/statemachine/StateFactory.java | 14 +- .../dataflow/statemachine/StateMachine.java | 87 +++++++++--- .../statemachine/closure/GenericClosure.java | 3 + .../statemachine/operate/Operate.java | 15 +- 7 files changed, 168 insertions(+), 131 deletions(-) diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index bcfabf1..98404c0 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -45,127 +45,85 @@ public class MasterProcessor implements MasterExecute { @Override public void loop(MasterContext cxt) { - // ArrayList packets = new ArrayList<>(); + // ArrayList packets = new ArrayList<>(); // 模拟发送包的数据到该节点上 - + for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) { var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow .newBuilder() .setTableId(10086) .build()); - packetsManager.addPacket(p); + packetsManager.addPacket(p); // packets.add(p); } // 必须复制. raft有一直使用该list var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); - if(packetsManager.size() >= 100000) { + + if (packetsManager.size() >= 100000) { log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", packetsManager.size()); packetsManager.discardPackets(50000); - log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size()); + log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, + packetsManager.size()); cxt.sleep(5000); } else { - // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size()); + // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), + // alivePeers, packetsManager.size()); } - if (alivePeers == null) { - cxt.sleep(100); // 休眠100毫秒. + cxt.sleep(100); // 休眠100毫秒. return; } - - // 等待全部反馈后才能进入下次循环 - CountDownLatch latch = new CountDownLatch(alivePeers.size()); - // 读一致性 - - var state = StateFactory.getStateServer().getFsm().getState(); - // log.debug("masterExecute start {} {}", status, alivePeers); - // var state = this.getValue(); - if (state == null) { - log.error("readIndexState获取的状态为 {}", state); + PeerId[] peers = new PeerId[alivePeers.size()]; + alivePeers.toArray(peers); + // 等待全部反馈后才能进入下次循环 + + + Operate.CallOperate(new Operate(OperateType.ALLOCATE_PACKETS, alivePeers), new GenericClosure() { + + @Override + public void run(Status status) { + int[] allocTasks = this.getValue(); + + if(allocTasks == null) { + cxt.sleep(5000);; return; } - PeerId[] peers = new PeerId[alivePeers.size()]; - alivePeers.toArray(peers); - - // 统计 每个节点还有多少任务容量 - var canTasks = new int[alivePeers.size()]; - for(int i = 0; i < peers.length; i++) { - var peer = peers[i]; - WorkerState ws = state.getWorkers().get(peer); - if (ws == null) { - log.error("WorkerState获取的状态为 {}", ws); - return; - } - var can = MAX_TASKS - ws.getTaskQueueSize(); - canTasks[i] = can; - } + log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks); + for (int i = 0; i < peers.length; i++) { - // 统计每个节点发送多少任务 - var allocTasks = Utils.allocationTasks(packetsManager.size(), canTasks); - for(int i = 0; i < peers.length; i++) { var peer = peers[i]; - - WorkerState ws = state.getWorkers().get(peer); - if (ws == null) { - log.error("WorkerState获取的状态为 {}", ws); - latch.countDown(); // 所有没调用提前返回都需要 countDown + if (allocTasks[i] <= 0) { continue; } - - if(allocTasks[i] <= 0) { - latch.countDown(); // 所有没调用提前返回都需要 countDown - continue; - } - - ws.setUpdateAt(Instant.now()); - ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]); - log.info("剩余能处理的任务数量[{}] :{}", peer, MAX_TASKS - ws.getTaskQueueSize()); - // ws.setTaskQueueSize(MAX_TASKS); - - var packets = packetsManager.popPackets(allocTasks[i]); + var packets = Operate.packetsManager.popPackets(allocTasks[i]); // 先提交 节点的 剩余能处理的任务数量. 然后再处理 var request = new PacketsRequest(); // 数据包切片 request.setPackets(packets); - Operate.CallOperate( - new Operate(OperateType.PUT_WORKERSTATE, ws), - new GenericClosure() { - @Override - public void run(Status status) { - // log.info("PacketsRequest run {}", status); - try { - StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request, - new InvokeCallback() { - @Override - public void complete(Object result, Throwable err) { - latch.countDown(); - if (err != null) { - // TODO: 如果错误, 需要让节点恢复任务处理的状态 - log.debug("{}", err); - } - // log.debug("PacketsRequest: {}", result); - } - }, DEFAULT_ASYNC_TIMEOUT); - } catch (InterruptedException | RemotingException e) { - log.info("error send packets {}", e.toString()); + + try { + StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request, + new InvokeCallback() { + @Override + public void complete(Object result, Throwable err) { + + if (err != null) { + // TODO: 如果错误, 需要让节点恢复任务处理的状态 + log.debug("{}", err); + } + // log.debug("PacketsRequest: {}", result); } - } - }); - + }, DEFAULT_ASYNC_TIMEOUT); + } catch (InterruptedException | RemotingException e) { + log.info("error send packets {}", e.toString()); + } } - - - - - - - try { - latch.await(DEFAULT_ASYNC_TIMEOUT, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.error("{}", e.toString()); - } + } + }); + } } diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index aed39b9..ed64835 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -71,26 +71,35 @@ public class PacketsProcessor implements RpcProcessor 里的 getValue为 State的状态 var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态 - var ws = state.getWorkers().get(StateFactory.getServerId()); - ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 - ws.setUpdateAt(Instant.now()); // 设置更新时间 + - Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), - new GenericClosure() { - @Override - public void run(Status status) { - // 处理完数据 更新工作状态的时间 - log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis()); - if (!status.isOk()) { - log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp); - - } - } - }); + + var ws = state.getWorkers().get(StateFactory.getServerId()); + ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 + ws.setUpdateAt(Instant.now()); // 设置更新时间 + + + Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), + new GenericClosure() { + @Override + public void run(Status status) { + // 处理完数据 更新工作状态的时间 + log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis()); + if (!status.isOk()) { + log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp); + + } + } + }); + + + + + + } ; - } diff --git a/src/main/java/com/yuandian/dataflow/projo/Doc.java b/src/main/java/com/yuandian/dataflow/projo/Doc.java index 8680d21..f4d46b9 100644 --- a/src/main/java/com/yuandian/dataflow/projo/Doc.java +++ b/src/main/java/com/yuandian/dataflow/projo/Doc.java @@ -51,8 +51,9 @@ public final class Doc { CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),fromProviders(pojoCodecProvider)); MongoDatabase oriDatabase = mgo.getDatabase("ori-database").withCodecRegistry(pojoCodecRegistry); - MongoCollection db = oriDatabase.getCollection("network_performace_flow_2022072400", Doc.class); - log.debug("{}", db.countDocuments( new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(2083478517) )) )); + MongoCollection db = oriDatabase.getCollection("business_alarm_20220803", Doc.class); + + log.debug("{}", db.find(new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(45601335571100803L))))); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java index 1e5b65d..1c2a8e6 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java @@ -251,13 +251,13 @@ public class StateFactory { // 提交同步 log.info("status not ok"); - readIndexExecutor.execute(()->{ - if(isLeader()) { - Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure); - } else { - handlerNotLeaderError(closure); - } - }); + // readIndexExecutor.execute(()->{ + // if(isLeader()) { + // Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure); + // } else { + // handlerNotLeaderError(closure); + // } + // }); } }); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 5185095..c85413d 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -1,6 +1,8 @@ package com.yuandian.dataflow.statemachine; import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import com.alipay.remoting.exception.CodecException; @@ -11,14 +13,17 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.core.StateMachineAdapter; import com.alipay.sofa.jraft.entity.LeaderChangeContext; +import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.utils.Utils; import lombok.extern.slf4j.Slf4j; @@ -90,9 +95,62 @@ public class StateMachine extends StateMachineAdapter { switch (op.getType()) { case PUT_WORKERSTATE: - WorkerState ws = op.getValue(); + WorkerState opws = op.getValue(); + log.debug("PUT {}", opws.peerId); + state.getWorkers().put(opws.peerId, opws); + if (closure != null) { + closure.success(op); + closure.run(Status.OK()); + } + break; + case ALLOCATE_PACKETS: + + List alivePeers = op.getValue(); + PeerId[] peers = new PeerId[alivePeers.size()]; + alivePeers.toArray(peers); + + // 统计 每个节点还有多少任务容量 + var isNext = false; + var canTasks = new int[alivePeers.size()]; + for(int i = 0; i < peers.length; i++) { + var peer = peers[i]; + WorkerState ws = state.getWorkers().get(peer); + if (ws == null) { + log.error("WorkerState获取的状态为 {}", ws); + continue; + } + var can = Operate.MAX_TASKS - ws.getTaskQueueSize(); + canTasks[i] = can; + if(!isNext) { + isNext = true; + } + } + + if(!isNext) { + break; + } + + // 统计每个节点发送多少任务 + var allocTasks = Utils.allocationTasks(Operate.packetsManager.size(), canTasks); + if(closure != null) { + closure.setValue(allocTasks); + } + for(int i = 0; i < peers.length; i++) { + var peer = peers[i]; + if(allocTasks[i] <= 0) { + continue; + } + + WorkerState ws = state.getWorkers().get(peer); + ws.setUpdateAt(Instant.now()); + ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]); + log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize()); + } + + + // log.debug("PUT {}", ws.peerId); - state.getWorkers().put(ws.peerId, ws); + // ws.put(ws.peerId, ws); break; case GET_STATE: @@ -142,24 +200,21 @@ public class StateMachine extends StateMachineAdapter { MasterFactory.getMasterExecute().interrupt(); } - StateFactory.readIndexState(new GenericClosure() { + + + var ws = this.state.getWorkers().get(StateFactory.getServerId()); + if (ws == null) { + ws = new WorkerState(StateFactory.getServerId()); + } + + // 更新当前WorkerState + StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () { @Override public void run(Status status) { - - var ws = this.getValue().getWorkers().get(StateFactory.getServerId()); - if (ws == null) { - ws = new WorkerState(StateFactory.getServerId()); - } - - // 更新当前WorkerState - StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () { - @Override - public void run(Status status) { - log.debug("master update workerstate: {}", status); - } - }); + log.debug("master update workerstate: {}", status); } }); + // 当成为master时候 必须启动 MasterFactory.getMasterExecute().start(); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java index 4d870e5..f68b264 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java @@ -22,6 +22,9 @@ public abstract class GenericClosure implements Closure { private Object value; public T getValue() { + if(this.value == null) { + return null; + } return (T)this.value; } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index 3feac22..64e18eb 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -10,6 +10,7 @@ import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.utils.PacketsManager; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -24,12 +25,20 @@ import lombok.extern.slf4j.Slf4j; public class Operate implements Serializable { private static int DEFAULT_ASYNC_TIMEOUT = 5000; + public static final int MAX_TASKS = 1000; + public static PacketsManager packetsManager = new PacketsManager(); public static enum OperateType { /** * 同步WorkerState状态. */ PUT_WORKERSTATE, + + /** + * 分配packets + */ + ALLOCATE_PACKETS, + /** * 获取State状态 */ @@ -43,9 +52,11 @@ public class Operate implements Serializable { private OperateType type; private Object value; - public Operate(OperateType t, WorkerState ws) { + + + public Operate(OperateType t, Object v) { this.type = t; - this.value = ws; + this.value = v; } @java.lang.SuppressWarnings("unchecked")