From 42e66efe673c06ebc949282db3f1efa86ef13039 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Tue, 2 Aug 2022 18:19:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=95=B0=E5=88=86=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 10 +- .../dataflow/controller/MasterProcessor.java | 105 ++++++++++++------ .../dataflow/controller/PacketsProcessor.java | 2 +- .../statemachine/master/MasterContext.java | 12 +- .../statemachine/operate/Operate.java | 7 +- .../statemachine/state/WorkerState.java | 2 +- .../com/yuandian/dataflow/utils/Utils.java | 47 +++++++- 7 files changed, 146 insertions(+), 39 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index c9a246d..80b7356 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,13 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "type": "java", + "name": "Launch Utils", + "request": "launch", + "mainClass": "com.yuandian.dataflow.utils.Utils", + "projectName": "dataflow" + }, { "type": "java", "name": "Raft-0", @@ -57,8 +64,7 @@ "group": "", "order": 3 } - }, - + } ], "compounds": [ diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index b6e2c70..fb01aab 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -1,12 +1,16 @@ package com.yuandian.dataflow.controller; import java.time.Instant; +import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.google.protobuf.Any; @@ -21,6 +25,7 @@ import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.utils.Utils; import lombok.extern.slf4j.Slf4j; @@ -28,69 +33,107 @@ import lombok.extern.slf4j.Slf4j; @MasterRegister public class MasterProcessor implements MasterExecute { + /** + * 最大任务数限制 + */ private final int MAX_TASKS = 100; + private final int DEFAULT_ASYNC_TIMEOUT = 5000; + + public static Random rand = new Random(); @Override public void loop(MasterContext cxt) { - // try { - // Thread.sleep(1000); - // } catch (InterruptedException e) { - // // TODO Auto-generated catch block - // e.printStackTrace(); - // } + ArrayList packets = new ArrayList<>(); + // 模拟发送包的数据到该节点上 + + for (int i = 0; i < ThreadLocalRandom.current().nextLong(1000, 5000); i++) { + var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow + .newBuilder() + .setTableId(10086) + .build()); + packets.add(p); + } + // 必须复制. raft有一直使用该list var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); log.debug("master({}) execute {}", StateFactory.getServerId(), alivePeers); if (alivePeers == null) { + cxt.sleep(100); // 休眠100毫秒. return; } + + + + + + + var peerSize = alivePeers.size(); + // 等待全部反馈后才能进入下次循环 CountDownLatch latch = new CountDownLatch(alivePeers.size()); // 读一致性 StateFactory.readIndexState(new GenericClosure() { @Override public void run(Status status) { - var state = this.getValue(); // log.debug("masterExecute start {} {}", status, alivePeers); - - alivePeers.forEach((peer) -> { + var state = this.getValue(); + if (state == null) { + log.error("readIndexState获取的状态为 {}", state); + return; + } + PeerId[] peers = new PeerId[alivePeers.size()]; + alivePeers.toArray(peers); - if (state == null) { - log.error("readIndexState获取的状态为 {}", state); + var canTasks = new int[alivePeers.size()]; + + for(int i = 0; i < peers.length; i++) { + var peer = peers[i]; + WorkerState ws = state.getWorkers().get(peer); + if (ws == null) { + log.error("WorkerState获取的状态为 {}", ws); return; } + var can = MAX_TASKS - ws.getTaskQueueSize(); + canTasks[i] = can; + + } + + + var allocTasks = Utils.allocationTasks(packets.size(), canTasks); + + + for(int i = 0; i < peers.length; i++) { + var peer = peers[i]; + + WorkerState ws = state.getWorkers().get(peer); if (ws == null) { log.error("WorkerState获取的状态为 {}", ws); return; } - var canTasks = MAX_TASKS - ws.getTaskQueueSize(); - if(canTasks > 0) { - log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks); - } - - if (canTasks <= 0) { - return; - } + // TODO: 处理完善的任务数量 + allocTasks; + log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks); ws.setUpdateAt(Instant.now()); ws.setTaskQueueSize(MAX_TASKS); // 模拟发送包的数据到该节点上 - var request = new PacketsRequest(); - for (int i = 0; i < canTasks; i++) { - var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow - .newBuilder() - .setTableId(10086) - .build()); - request.getPackets().add(p); - } + // var request = new PacketsRequest(); + // for (int i = 0; i < canTasks; i++) { + // var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow + // .newBuilder() + // .setTableId(10086) + // .build()); + // request.getPackets().add(p); + // } // 先提交 节点的 剩余能处理的任务数量. 然后再处理 - + var request = new PacketsRequest(); // 数据包切片 + request.setPackets(packets); Operate.CallOperate( new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure() { @@ -109,14 +152,14 @@ public class MasterProcessor implements MasterExecute { } // log.debug("PacketsRequest: {}", result); } - }, 5000); + }, DEFAULT_ASYNC_TIMEOUT); } catch (InterruptedException | RemotingException e) { log.info("error send packets {}", e.toString()); } } }); - }); + } } @@ -124,7 +167,7 @@ public class MasterProcessor implements MasterExecute { }); try { - latch.await(5000, TimeUnit.MILLISECONDS); + latch.await(DEFAULT_ASYNC_TIMEOUT, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("{}", e.toString()); } diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index a00d2bc..e6eab59 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -45,7 +45,7 @@ public class PacketsProcessor implements RpcProcessor packets = new ArrayList<>(); } - public static Random rand = new Random(); + @Override public void handleRequest(RpcContext rpcCtx, PacketsRequest request) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java index 5acc959..181d38b 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java @@ -6,10 +6,9 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; - +@Slf4j public class MasterContext { private AtomicBoolean isExit = new AtomicBoolean(false); - private Object share; public Boolean getIsExit() { @@ -27,4 +26,13 @@ public class MasterContext { public void setShare(Object share) { this.share = share; } + + public void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + log.error("{}",e); + this.setIsExit(true); // 设置推出. 在下个循环退出 + } + } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index d5b3e42..b518f48 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -23,6 +23,8 @@ import lombok.extern.slf4j.Slf4j; @Data public class Operate implements Serializable { + private static int DEFAULT_ASYNC_TIMEOUT = 5000; + public static enum OperateType { /** * 同步WorkerState状态. @@ -85,7 +87,7 @@ public class Operate implements Serializable { closure.run(Status.OK()); } - }, 5000); + }, DEFAULT_ASYNC_TIMEOUT); } catch (InterruptedException | RemotingException e) { closure.failure(e.getMessage(), null); closure.run(new Status(100000, "invokeAsync fail")); @@ -93,4 +95,7 @@ public class Operate implements Serializable { } } + + + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java index ef95ebd..d837c49 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java @@ -35,7 +35,7 @@ public class WorkerState implements Serializable { /** * 任务队列的数量 */ - public long taskQueueSize; + public int taskQueueSize; /** * 更新时间 */ diff --git a/src/main/java/com/yuandian/dataflow/utils/Utils.java b/src/main/java/com/yuandian/dataflow/utils/Utils.java index 690a638..a6e81a2 100644 --- a/src/main/java/com/yuandian/dataflow/utils/Utils.java +++ b/src/main/java/com/yuandian/dataflow/utils/Utils.java @@ -1,5 +1,10 @@ package com.yuandian.dataflow.utils; +import java.util.ArrayList; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class Utils { @@ -37,5 +42,45 @@ public class Utils { return pos; } - + + + + /** + * 负载分配算法 + * @param total 总的任务数 + * @param canTasks 每个节点可以处理的任务数 + * @return 返回每个节点应该分配的任务数 + */ + public static int[] allocationTasks(int total, int[] canTasks) { + int[] atasks = new int[canTasks.length]; + + int totalCans = 0; + for(var i = 0;i < canTasks.length;i++) { + var can = canTasks[i]; + totalCans += can; + } + + if(total - totalCans > 0) { + for(var i = 0;i < canTasks.length;i++) { + atasks[i] = canTasks[i]; + } + return atasks; + } + + // 剩下来的平均值 + int remainAvg = (totalCans - total) / canTasks.length; + + + for(var i = 0;i < canTasks.length;i++) { + var can = canTasks[i]; + if(remainAvg <= can) { + atasks[i] = can - remainAvg; + } + } + + + return atasks; + } + + }