任务数分配

This commit is contained in:
huangsimin 2022-08-02 18:19:26 +08:00
parent c339e6e3f0
commit 42e66efe67
7 changed files with 146 additions and 39 deletions

10
.vscode/launch.json vendored
View File

@ -4,6 +4,13 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "Launch Utils",
"request": "launch",
"mainClass": "com.yuandian.dataflow.utils.Utils",
"projectName": "dataflow"
},
{
"type": "java",
"name": "Raft-0",
@ -57,8 +64,7 @@
"group": "",
"order": 3
}
},
}
],
"compounds": [

View File

@ -1,12 +1,16 @@
package com.yuandian.dataflow.controller;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
@ -21,6 +25,7 @@ import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.utils.Utils;
import lombok.extern.slf4j.Slf4j;
@ -28,69 +33,107 @@ import lombok.extern.slf4j.Slf4j;
@MasterRegister
public class MasterProcessor implements MasterExecute {
/**
* 最大任务数限制
*/
private final int MAX_TASKS = 100;
private final int DEFAULT_ASYNC_TIMEOUT = 5000;
public static Random rand = new Random();
@Override
public void loop(MasterContext cxt) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
ArrayList<Any> packets = new ArrayList<>();
// 模拟发送包的数据到该节点上
for (int i = 0; i < ThreadLocalRandom.current().nextLong(1000, 5000); i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
packets.add(p);
}
// 必须复制. raft有一直使用该list
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
log.debug("master({}) execute {}", StateFactory.getServerId(), alivePeers);
if (alivePeers == null) {
cxt.sleep(100); // 休眠100毫秒.
return;
}
var peerSize = alivePeers.size();
// 等待全部反馈后才能进入下次循环
CountDownLatch latch = new CountDownLatch(alivePeers.size());
// 读一致性
StateFactory.readIndexState(new GenericClosure() {
@Override
public void run(Status status) {
var state = this.<State>getValue();
// log.debug("masterExecute start {} {}", status, alivePeers);
alivePeers.forEach((peer) -> {
var state = this.<State>getValue();
if (state == null) {
log.error("readIndexState获取的状态为 {}", state);
return;
}
PeerId[] peers = new PeerId[alivePeers.size()];
alivePeers.toArray(peers);
if (state == null) {
log.error("readIndexState获取的状态为 {}", state);
var canTasks = new int[alivePeers.size()];
for(int i = 0; i < peers.length; i++) {
var peer = peers[i];
WorkerState ws = state.getWorkers().get(peer);
if (ws == null) {
log.error("WorkerState获取的状态为 {}", ws);
return;
}
var can = MAX_TASKS - ws.getTaskQueueSize();
canTasks[i] = can;
}
var allocTasks = Utils.allocationTasks(packets.size(), canTasks);
for(int i = 0; i < peers.length; i++) {
var peer = peers[i];
WorkerState ws = state.getWorkers().get(peer);
if (ws == null) {
log.error("WorkerState获取的状态为 {}", ws);
return;
}
var canTasks = MAX_TASKS - ws.getTaskQueueSize();
if(canTasks > 0) {
log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks);
}
if (canTasks <= 0) {
return;
}
// TODO: 处理完善的任务数量
allocTasks;
log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks);
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
// 模拟发送包的数据到该节点上
var request = new PacketsRequest();
for (int i = 0; i < canTasks; i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
// var request = new PacketsRequest();
// for (int i = 0; i < canTasks; i++) {
// var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
// .newBuilder()
// .setTableId(10086)
// .build());
// request.getPackets().add(p);
// }
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
var request = new PacketsRequest(); // 数据包切片
request.setPackets(packets);
Operate.CallOperate(
new Operate(OperateType.PUT_WORKERSTATE, ws),
new GenericClosure() {
@ -109,14 +152,14 @@ public class MasterProcessor implements MasterExecute {
}
// log.debug("PacketsRequest: {}", result);
}
}, 5000);
}, DEFAULT_ASYNC_TIMEOUT);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
});
}
}
@ -124,7 +167,7 @@ public class MasterProcessor implements MasterExecute {
});
try {
latch.await(5000, TimeUnit.MILLISECONDS);
latch.await(DEFAULT_ASYNC_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("{}", e.toString());
}

View File

@ -45,7 +45,7 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
private ArrayList<Any> packets = new ArrayList<>();
}
public static Random rand = new Random();
@Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {

View File

@ -6,10 +6,9 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MasterContext {
private AtomicBoolean isExit = new AtomicBoolean(false);
private Object share;
public Boolean getIsExit() {
@ -27,4 +26,13 @@ public class MasterContext {
public void setShare(Object share) {
this.share = share;
}
public void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
log.error("{}",e);
this.setIsExit(true); // 设置推出. 在下个循环退出
}
}
}

View File

@ -23,6 +23,8 @@ import lombok.extern.slf4j.Slf4j;
@Data
public class Operate implements Serializable {
private static int DEFAULT_ASYNC_TIMEOUT = 5000;
public static enum OperateType {
/**
* 同步WorkerState状态.
@ -85,7 +87,7 @@ public class Operate implements Serializable {
closure.run(Status.OK());
}
}, 5000);
}, DEFAULT_ASYNC_TIMEOUT);
} catch (InterruptedException | RemotingException e) {
closure.failure(e.getMessage(), null);
closure.run(new Status(100000, "invokeAsync fail"));
@ -93,4 +95,7 @@ public class Operate implements Serializable {
}
}
}

View File

@ -35,7 +35,7 @@ public class WorkerState implements Serializable {
/**
* 任务队列的数量
*/
public long taskQueueSize;
public int taskQueueSize;
/**
* 更新时间
*/

View File

@ -1,5 +1,10 @@
package com.yuandian.dataflow.utils;
import java.util.ArrayList;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Utils {
@ -37,5 +42,45 @@ public class Utils {
return pos;
}
/**
* 负载分配算法
* @param total 总的任务数
* @param canTasks 每个节点可以处理的任务数
* @return 返回每个节点应该分配的任务数
*/
public static int[] allocationTasks(int total, int[] canTasks) {
int[] atasks = new int[canTasks.length];
int totalCans = 0;
for(var i = 0;i < canTasks.length;i++) {
var can = canTasks[i];
totalCans += can;
}
if(total - totalCans > 0) {
for(var i = 0;i < canTasks.length;i++) {
atasks[i] = canTasks[i];
}
return atasks;
}
// 剩下来的平均值
int remainAvg = (totalCans - total) / canTasks.length;
for(var i = 0;i < canTasks.length;i++) {
var can = canTasks[i];
if(remainAvg <= can) {
atasks[i] = can - remainAvg;
}
}
return atasks;
}
}