完成负载均衡算法, 和 处理任务时间统计等

This commit is contained in:
huangsimin 2022-08-03 11:55:22 +08:00
parent 42e66efe67
commit 69ac1d6317
6 changed files with 159 additions and 43 deletions

View File

@ -25,6 +25,7 @@ import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.utils.PacketsManager;
import com.yuandian.dataflow.utils.Utils;
import lombok.extern.slf4j.Slf4j;
@ -36,40 +37,43 @@ public class MasterProcessor implements MasterExecute {
/**
* 最大任务数限制
*/
private final int MAX_TASKS = 100;
private final int MAX_TASKS = 1000;
private final int DEFAULT_ASYNC_TIMEOUT = 5000;
public static Random rand = new Random();
public static PacketsManager packetsManager = new PacketsManager();
@Override
public void loop(MasterContext cxt) {
ArrayList<Any> packets = new ArrayList<>();
// ArrayList<Any> packets = new ArrayList<>();
// 模拟发送包的数据到该节点上
for (int i = 0; i < ThreadLocalRandom.current().nextLong(1000, 5000); i++) {
for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
packets.add(p);
packetsManager.addPacket(p);
// packets.add(p);
}
// 必须复制. raft有一直使用该list
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
log.debug("master({}) execute {}", StateFactory.getServerId(), alivePeers);
if(packetsManager.size() >= 100000) {
log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", packetsManager.size());
packetsManager.discardPackets(50000);
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size());
cxt.sleep(5000);
} else {
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size());
}
if (alivePeers == null) {
cxt.sleep(100); // 休眠100毫秒.
return;
}
var peerSize = alivePeers.size();
// 等待全部反馈后才能进入下次循环
CountDownLatch latch = new CountDownLatch(alivePeers.size());
// 读一致性
@ -86,8 +90,8 @@ public class MasterProcessor implements MasterExecute {
PeerId[] peers = new PeerId[alivePeers.size()];
alivePeers.toArray(peers);
// 统计 每个节点还有多少任务容量
var canTasks = new int[alivePeers.size()];
for(int i = 0; i < peers.length; i++) {
var peer = peers[i];
WorkerState ws = state.getWorkers().get(peer);
@ -95,42 +99,34 @@ public class MasterProcessor implements MasterExecute {
log.error("WorkerState获取的状态为 {}", ws);
return;
}
var can = MAX_TASKS - ws.getTaskQueueSize();
canTasks[i] = can;
}
var allocTasks = Utils.allocationTasks(packets.size(), canTasks);
// 统计每个节点发送多少任务
var allocTasks = Utils.allocationTasks(packetsManager.size(), canTasks);
for(int i = 0; i < peers.length; i++) {
var peer = peers[i];
WorkerState ws = state.getWorkers().get(peer);
if (ws == null) {
log.error("WorkerState获取的状态为 {}", ws);
return;
latch.countDown(); // 所有没调用提前返回都需要 countDown
continue;
}
if(allocTasks[i] <= 0) {
latch.countDown(); // 所有没调用提前返回都需要 countDown
continue;
}
// TODO: 处理完善的任务数量
allocTasks;
log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks);
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
// 模拟发送包的数据到该节点上
// var request = new PacketsRequest();
// for (int i = 0; i < canTasks; i++) {
// var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
// .newBuilder()
// .setTableId(10086)
// .build());
// request.getPackets().add(p);
// }
ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]);
log.info("剩余能处理的任务数量[{}] :{}", peer, MAX_TASKS - ws.getTaskQueueSize());
// ws.setTaskQueueSize(MAX_TASKS);
var packets = packetsManager.popPackets(allocTasks[i]);
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
var request = new PacketsRequest(); // 数据包切片
request.setPackets(packets);
@ -146,6 +142,7 @@ public class MasterProcessor implements MasterExecute {
@Override
public void complete(Object result, Throwable err) {
latch.countDown();
log.debug("countDown {}", latch.getCount());
if (err != null) {
// TODO: 如果错误, 需要让节点恢复任务处理的状态
log.debug("{}", err);

View File

@ -7,6 +7,7 @@
package com.yuandian.dataflow.controller;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Random;
@ -49,17 +50,20 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
@Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
var now = Instant.now();
var resp = new RaftResponse();
resp.setSuccess(true);
rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
try {
log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
log.info("response {} ms", Duration.between(now, Instant.now()).toMillis()); // 返回response的时间
// TODO: request.packets 入库,回填, 告警 等操作
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 2000));
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 500));
} catch (InterruptedException e) {
log.info(e.toString());
@ -76,13 +80,18 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
new GenericClosure() {
@Override
public void run(Status status) {
// 处理完数据 更新工作状态的时间
log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis());
if (!status.isOk()) {
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
}
}
});
} ;
}
@Override

View File

@ -6,6 +6,7 @@
*/
package com.yuandian.dataflow.statemachine;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -58,7 +59,9 @@ public class MasterFactory {
MasterContext cxt = new MasterContext();
while (!cxt.getIsExit()) {
Instant now = Instant.now();
masterExecuteCls.loop(cxt);
log.info("master loop execute time: {} ms",Duration.between(now, Instant.now()).toMillis());
}
}

View File

@ -237,7 +237,7 @@ public class StateFactory {
public void readIndexState(GenericClosure closure) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(5000) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
// log.debug("readIndexState({}) {}", getServerId(), status);

View File

@ -75,7 +75,6 @@ public class Operate implements Serializable {
// 非leader 转发请求 统一有leader处理
var request = new OperateProcessor.OperateRequest();
request.setOperate(op);
try {
StateFactory.rpcClientInvokeAsync(StateFactory.getLeaderId().getEndpoint(), request, new InvokeCallback() {
@Override

View File

@ -0,0 +1,108 @@
package com.yuandian.dataflow.utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue.Consumer;
import com.google.protobuf.Any;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PacketsManager {
private ReentrantLock lock = new ReentrantLock();
private LinkedBlockingQueue<Any> temp = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Any> packets = new LinkedBlockingQueue<>();
/**
* 添加数据到队列里, 做了双队列缓冲
* @param _packets
*/
public void addPacket(Any packet) {
try {
lock.lock();
this.temp.add(packet);
} finally{
lock.unlock();
}
}
/**
* 添加数据到队列里, 做了双队列缓冲
* @param _packets
*/
public void addPackets(Collection <? extends Any> _packets) {
try {
lock.lock();
this.temp.addAll(_packets);
} finally{
lock.unlock();
}
}
/**
* 返回 总共packets的数据
* @return
*/
public int size() {
return this.temp.size() + this.packets.size();
}
private void lockTemp( Consumer< LinkedBlockingQueue<Any>> dofunc) {
try {
lock.lock();
this.packets.addAll(this.temp); // 合并缓冲的数据到使用中.
this.temp.clear();
dofunc.accept(this.packets);
} catch (Exception e) {
log.error("{}",e);
} finally {
lock.unlock();
}
}
/**
* 弹出需要数量num的数据
* @param num 需要数量
* @return 反回弹出少于等于num的数据
*/
public ArrayList<Any> popPackets(int num) {
var result = new ArrayList<Any>();
this.lockTemp((_packets)->{
for(var i = 0 ; i < num ; i++) {
var p = _packets.poll();
if(p == null){
return;
}
result.add(p);
}
});
return result;
}
public void discardPackets(int remainSize) {
this.lockTemp((_packets)->{
var delsize = _packets.size() - remainSize ;
var step = _packets.size() / delsize ;
var iter = _packets.iterator();
for(var i = 0 ; i < packets.size(); i++) {
iter.next();
if(i % step == 0) {
iter.remove();
}
}
});
}
}