From f54b3257d778b91c1e9851bb75901112ff9ba26c Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Thu, 4 Aug 2022 10:35:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=8D=A2apache=E7=9A=84raft?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataflow/controller/MasterProcessor.java | 24 +++++++---- .../dataflow/controller/PacketsProcessor.java | 43 +++++++------------ .../java/com/yuandian/dataflow/projo/Doc.java | 2 +- .../dataflow/statemachine/StateMachine.java | 14 +----- 4 files changed, 34 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index 98404c0..2479d83 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -40,7 +40,7 @@ public class MasterProcessor implements MasterExecute { private final int MAX_TASKS = 1000; private final int DEFAULT_ASYNC_TIMEOUT = 5000; - public static PacketsManager packetsManager = new PacketsManager(); + @Override public void loop(MasterContext cxt) { @@ -53,22 +53,22 @@ public class MasterProcessor implements MasterExecute { .newBuilder() .setTableId(10086) .build()); - packetsManager.addPacket(p); + Operate.packetsManager.addPacket(p); // packets.add(p); } // 必须复制. raft有一直使用该list var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); - if (packetsManager.size() >= 100000) { - log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", packetsManager.size()); - packetsManager.discardPackets(50000); + if ( Operate.packetsManager.size() >= 100000) { + log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", Operate.packetsManager.size()); + Operate.packetsManager.discardPackets(50000); log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, - packetsManager.size()); + Operate.packetsManager.size()); cxt.sleep(5000); } else { // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), - // alivePeers, packetsManager.size()); + // alivePeers, Operate.packetsManager.size()); } if (alivePeers == null) { @@ -91,8 +91,14 @@ public class MasterProcessor implements MasterExecute { cxt.sleep(5000);; return; } - - log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks); + + var all = 0; + for(var i : allocTasks) { + all += i; + } + if(all != 0) { + log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks); + } for (int i = 0; i < peers.length; i++) { var peer = peers[i]; diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index ed64835..24d9b9c 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -71,36 +71,25 @@ public class PacketsProcessor implements RpcProcessor 里的 getValue为 State的状态 var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态 - - - - var ws = state.getWorkers().get(StateFactory.getServerId()); - ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 - ws.setUpdateAt(Instant.now()); // 设置更新时间 - - - Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), - new GenericClosure() { - @Override - public void run(Status status) { - // 处理完数据 更新工作状态的时间 - log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis()); - if (!status.isOk()) { - log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp); - - } - } - }); - - - - - - + var ws = state.getWorkers().get(StateFactory.getServerId()); + ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 + ws.setUpdateAt(Instant.now()); // 设置更新时间 + Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), + new GenericClosure() { + @Override + public void run(Status status) { + // 处理完数据 更新工作状态的时间 + log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis()); + if (!status.isOk()) { + log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp); + + } + } + }); + } ; - } @Override diff --git a/src/main/java/com/yuandian/dataflow/projo/Doc.java b/src/main/java/com/yuandian/dataflow/projo/Doc.java index f4d46b9..4c26fef 100644 --- a/src/main/java/com/yuandian/dataflow/projo/Doc.java +++ b/src/main/java/com/yuandian/dataflow/projo/Doc.java @@ -51,7 +51,7 @@ public final class Doc { CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),fromProviders(pojoCodecProvider)); MongoDatabase oriDatabase = mgo.getDatabase("ori-database").withCodecRegistry(pojoCodecRegistry); - MongoCollection db = oriDatabase.getCollection("business_alarm_20220803", Doc.class); + MongoCollection db = oriDatabase.getCollection("network_performace_flow_2022072400", Doc.class); log.debug("{}", db.find(new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(45601335571100803L))))); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index c85413d..e251d06 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -68,23 +68,19 @@ public class StateMachine extends StateMachineAdapter { Operate op = null; GenericClosure closure = null; if (iter.done() != null) { - // leader可以直接从 回调closure里提取operate closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 op = (Operate)closure.getValue(); - } else { // 非leader 需要从getData反序列化出来后处理 final ByteBuffer data = iter.getData(); try { - op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( data.array(), Operate.class.getName()); } catch (CodecException e) { log.info("{}", e.toString()); } - } if (op == null) { @@ -104,7 +100,6 @@ public class StateMachine extends StateMachineAdapter { } break; case ALLOCATE_PACKETS: - List alivePeers = op.getValue(); PeerId[] peers = new PeerId[alivePeers.size()]; alivePeers.toArray(peers); @@ -130,6 +125,7 @@ public class StateMachine extends StateMachineAdapter { break; } + // log.info("size: {}", Operate.packetsManager.size()); // 统计每个节点发送多少任务 var allocTasks = Utils.allocationTasks(Operate.packetsManager.size(), canTasks); if(closure != null) { @@ -144,21 +140,15 @@ public class StateMachine extends StateMachineAdapter { WorkerState ws = state.getWorkers().get(peer); ws.setUpdateAt(Instant.now()); ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]); - log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize()); + // log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize()); } - - - // log.debug("PUT {}", ws.peerId); - // ws.put(ws.peerId, ws); - break; case GET_STATE: closure.setValue(this.state); log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex()); break; case REMOVE: - break; default: break;