必要时 回滚. 改进后死锁
This commit is contained in:
parent
d184fd014a
commit
f2fdc44a44
|
@ -45,127 +45,85 @@ public class MasterProcessor implements MasterExecute {
|
|||
@Override
|
||||
public void loop(MasterContext cxt) {
|
||||
|
||||
// ArrayList<Any> packets = new ArrayList<>();
|
||||
// ArrayList<Any> packets = new ArrayList<>();
|
||||
// 模拟发送包的数据到该节点上
|
||||
|
||||
|
||||
for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) {
|
||||
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
|
||||
.newBuilder()
|
||||
.setTableId(10086)
|
||||
.build());
|
||||
packetsManager.addPacket(p);
|
||||
packetsManager.addPacket(p);
|
||||
// packets.add(p);
|
||||
}
|
||||
|
||||
// 必须复制. raft有一直使用该list
|
||||
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
|
||||
if(packetsManager.size() >= 100000) {
|
||||
|
||||
if (packetsManager.size() >= 100000) {
|
||||
log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", packetsManager.size());
|
||||
packetsManager.discardPackets(50000);
|
||||
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size());
|
||||
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers,
|
||||
packetsManager.size());
|
||||
cxt.sleep(5000);
|
||||
} else {
|
||||
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size());
|
||||
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(),
|
||||
// alivePeers, packetsManager.size());
|
||||
}
|
||||
|
||||
|
||||
if (alivePeers == null) {
|
||||
cxt.sleep(100); // 休眠100毫秒.
|
||||
cxt.sleep(100); // 休眠100毫秒.
|
||||
return;
|
||||
}
|
||||
|
||||
// 等待全部反馈后才能进入下次循环
|
||||
CountDownLatch latch = new CountDownLatch(alivePeers.size());
|
||||
// 读一致性
|
||||
|
||||
|
||||
var state = StateFactory.getStateServer().getFsm().getState();
|
||||
// log.debug("masterExecute start {} {}", status, alivePeers);
|
||||
// var state = this.<State>getValue();
|
||||
if (state == null) {
|
||||
log.error("readIndexState获取的状态为 {}", state);
|
||||
PeerId[] peers = new PeerId[alivePeers.size()];
|
||||
alivePeers.toArray(peers);
|
||||
// 等待全部反馈后才能进入下次循环
|
||||
|
||||
|
||||
Operate.CallOperate(new Operate(OperateType.ALLOCATE_PACKETS, alivePeers), new GenericClosure() {
|
||||
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
int[] allocTasks = this.getValue();
|
||||
|
||||
if(allocTasks == null) {
|
||||
cxt.sleep(5000);;
|
||||
return;
|
||||
}
|
||||
PeerId[] peers = new PeerId[alivePeers.size()];
|
||||
alivePeers.toArray(peers);
|
||||
|
||||
// 统计 每个节点还有多少任务容量
|
||||
var canTasks = new int[alivePeers.size()];
|
||||
for(int i = 0; i < peers.length; i++) {
|
||||
var peer = peers[i];
|
||||
WorkerState ws = state.getWorkers().get(peer);
|
||||
if (ws == null) {
|
||||
log.error("WorkerState获取的状态为 {}", ws);
|
||||
return;
|
||||
}
|
||||
var can = MAX_TASKS - ws.getTaskQueueSize();
|
||||
canTasks[i] = can;
|
||||
}
|
||||
|
||||
log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks);
|
||||
for (int i = 0; i < peers.length; i++) {
|
||||
|
||||
// 统计每个节点发送多少任务
|
||||
var allocTasks = Utils.allocationTasks(packetsManager.size(), canTasks);
|
||||
for(int i = 0; i < peers.length; i++) {
|
||||
var peer = peers[i];
|
||||
|
||||
WorkerState ws = state.getWorkers().get(peer);
|
||||
if (ws == null) {
|
||||
log.error("WorkerState获取的状态为 {}", ws);
|
||||
latch.countDown(); // 所有没调用提前返回都需要 countDown
|
||||
if (allocTasks[i] <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if(allocTasks[i] <= 0) {
|
||||
latch.countDown(); // 所有没调用提前返回都需要 countDown
|
||||
continue;
|
||||
}
|
||||
|
||||
ws.setUpdateAt(Instant.now());
|
||||
ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]);
|
||||
log.info("剩余能处理的任务数量[{}] :{}", peer, MAX_TASKS - ws.getTaskQueueSize());
|
||||
// ws.setTaskQueueSize(MAX_TASKS);
|
||||
|
||||
var packets = packetsManager.popPackets(allocTasks[i]);
|
||||
var packets = Operate.packetsManager.popPackets(allocTasks[i]);
|
||||
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
|
||||
var request = new PacketsRequest(); // 数据包切片
|
||||
request.setPackets(packets);
|
||||
Operate.CallOperate(
|
||||
new Operate(OperateType.PUT_WORKERSTATE, ws),
|
||||
new GenericClosure() {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
// log.info("PacketsRequest run {}", status);
|
||||
try {
|
||||
StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
|
||||
new InvokeCallback() {
|
||||
@Override
|
||||
public void complete(Object result, Throwable err) {
|
||||
latch.countDown();
|
||||
if (err != null) {
|
||||
// TODO: 如果错误, 需要让节点恢复任务处理的状态
|
||||
log.debug("{}", err);
|
||||
}
|
||||
// log.debug("PacketsRequest: {}", result);
|
||||
}
|
||||
}, DEFAULT_ASYNC_TIMEOUT);
|
||||
} catch (InterruptedException | RemotingException e) {
|
||||
log.info("error send packets {}", e.toString());
|
||||
|
||||
try {
|
||||
StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
|
||||
new InvokeCallback() {
|
||||
@Override
|
||||
public void complete(Object result, Throwable err) {
|
||||
|
||||
if (err != null) {
|
||||
// TODO: 如果错误, 需要让节点恢复任务处理的状态
|
||||
log.debug("{}", err);
|
||||
}
|
||||
// log.debug("PacketsRequest: {}", result);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}, DEFAULT_ASYNC_TIMEOUT);
|
||||
} catch (InterruptedException | RemotingException e) {
|
||||
log.info("error send packets {}", e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
try {
|
||||
latch.await(DEFAULT_ASYNC_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("{}", e.toString());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -71,26 +71,35 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
|
|||
|
||||
// 读状态 Closure<State> 里的 getValue<State>为 State的状态
|
||||
var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态
|
||||
var ws = state.getWorkers().get(StateFactory.getServerId());
|
||||
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
|
||||
ws.setUpdateAt(Instant.now()); // 设置更新时间
|
||||
|
||||
|
||||
|
||||
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
|
||||
new GenericClosure() {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
// 处理完数据 更新工作状态的时间
|
||||
log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis());
|
||||
if (!status.isOk()) {
|
||||
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
|
||||
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
var ws = state.getWorkers().get(StateFactory.getServerId());
|
||||
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
|
||||
ws.setUpdateAt(Instant.now()); // 设置更新时间
|
||||
|
||||
|
||||
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
|
||||
new GenericClosure() {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
// 处理完数据 更新工作状态的时间
|
||||
log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis());
|
||||
if (!status.isOk()) {
|
||||
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
|
||||
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
} ;
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -51,8 +51,9 @@ public final class Doc {
|
|||
CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),fromProviders(pojoCodecProvider));
|
||||
|
||||
MongoDatabase oriDatabase = mgo.getDatabase("ori-database").withCodecRegistry(pojoCodecRegistry);
|
||||
MongoCollection<Doc> db = oriDatabase.getCollection("network_performace_flow_2022072400", Doc.class);
|
||||
log.debug("{}", db.countDocuments( new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(2083478517) )) ));
|
||||
MongoCollection<Doc> db = oriDatabase.getCollection("business_alarm_20220803", Doc.class);
|
||||
|
||||
log.debug("{}", db.find(new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(45601335571100803L)))));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -251,13 +251,13 @@ public class StateFactory {
|
|||
// 提交同步
|
||||
|
||||
log.info("status not ok");
|
||||
readIndexExecutor.execute(()->{
|
||||
if(isLeader()) {
|
||||
Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure);
|
||||
} else {
|
||||
handlerNotLeaderError(closure);
|
||||
}
|
||||
});
|
||||
// readIndexExecutor.execute(()->{
|
||||
// if(isLeader()) {
|
||||
// Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure);
|
||||
// } else {
|
||||
// handlerNotLeaderError(closure);
|
||||
// }
|
||||
// });
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package com.yuandian.dataflow.statemachine;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.alipay.remoting.exception.CodecException;
|
||||
|
@ -11,14 +13,17 @@ import com.alipay.sofa.jraft.Status;
|
|||
import com.alipay.sofa.jraft.conf.Configuration;
|
||||
import com.alipay.sofa.jraft.core.StateMachineAdapter;
|
||||
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
|
||||
import com.alipay.sofa.jraft.entity.PeerId;
|
||||
import com.alipay.sofa.jraft.error.RaftException;
|
||||
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
|
||||
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
|
||||
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
|
||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine.state.State;
|
||||
import com.yuandian.dataflow.statemachine.state.WorkerState;
|
||||
import com.yuandian.dataflow.utils.Utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -90,9 +95,62 @@ public class StateMachine extends StateMachineAdapter {
|
|||
switch (op.getType()) {
|
||||
|
||||
case PUT_WORKERSTATE:
|
||||
WorkerState ws = op.getValue();
|
||||
WorkerState opws = op.getValue();
|
||||
log.debug("PUT {}", opws.peerId);
|
||||
state.getWorkers().put(opws.peerId, opws);
|
||||
if (closure != null) {
|
||||
closure.success(op);
|
||||
closure.run(Status.OK());
|
||||
}
|
||||
break;
|
||||
case ALLOCATE_PACKETS:
|
||||
|
||||
List<PeerId> alivePeers = op.getValue();
|
||||
PeerId[] peers = new PeerId[alivePeers.size()];
|
||||
alivePeers.toArray(peers);
|
||||
|
||||
// 统计 每个节点还有多少任务容量
|
||||
var isNext = false;
|
||||
var canTasks = new int[alivePeers.size()];
|
||||
for(int i = 0; i < peers.length; i++) {
|
||||
var peer = peers[i];
|
||||
WorkerState ws = state.getWorkers().get(peer);
|
||||
if (ws == null) {
|
||||
log.error("WorkerState获取的状态为 {}", ws);
|
||||
continue;
|
||||
}
|
||||
var can = Operate.MAX_TASKS - ws.getTaskQueueSize();
|
||||
canTasks[i] = can;
|
||||
if(!isNext) {
|
||||
isNext = true;
|
||||
}
|
||||
}
|
||||
|
||||
if(!isNext) {
|
||||
break;
|
||||
}
|
||||
|
||||
// 统计每个节点发送多少任务
|
||||
var allocTasks = Utils.allocationTasks(Operate.packetsManager.size(), canTasks);
|
||||
if(closure != null) {
|
||||
closure.setValue(allocTasks);
|
||||
}
|
||||
for(int i = 0; i < peers.length; i++) {
|
||||
var peer = peers[i];
|
||||
if(allocTasks[i] <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
WorkerState ws = state.getWorkers().get(peer);
|
||||
ws.setUpdateAt(Instant.now());
|
||||
ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]);
|
||||
log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize());
|
||||
}
|
||||
|
||||
|
||||
|
||||
// log.debug("PUT {}", ws.peerId);
|
||||
state.getWorkers().put(ws.peerId, ws);
|
||||
// ws.put(ws.peerId, ws);
|
||||
|
||||
break;
|
||||
case GET_STATE:
|
||||
|
@ -142,24 +200,21 @@ public class StateMachine extends StateMachineAdapter {
|
|||
MasterFactory.getMasterExecute().interrupt();
|
||||
}
|
||||
|
||||
StateFactory.readIndexState(new GenericClosure() {
|
||||
|
||||
|
||||
var ws = this.state.getWorkers().get(StateFactory.getServerId());
|
||||
if (ws == null) {
|
||||
ws = new WorkerState(StateFactory.getServerId());
|
||||
}
|
||||
|
||||
// 更新当前WorkerState
|
||||
StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
|
||||
var ws = this.<State>getValue().getWorkers().get(StateFactory.getServerId());
|
||||
if (ws == null) {
|
||||
ws = new WorkerState(StateFactory.getServerId());
|
||||
}
|
||||
|
||||
// 更新当前WorkerState
|
||||
StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
log.debug("master update workerstate: {}", status);
|
||||
}
|
||||
});
|
||||
log.debug("master update workerstate: {}", status);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
// 当成为master时候 必须启动
|
||||
MasterFactory.getMasterExecute().start();
|
||||
|
|
|
@ -22,6 +22,9 @@ public abstract class GenericClosure implements Closure {
|
|||
private Object value;
|
||||
|
||||
public <T> T getValue() {
|
||||
if(this.value == null) {
|
||||
return null;
|
||||
}
|
||||
return (T)this.value;
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
|||
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
|
||||
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
|
||||
import com.yuandian.dataflow.statemachine.state.WorkerState;
|
||||
import com.yuandian.dataflow.utils.PacketsManager;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -24,12 +25,20 @@ import lombok.extern.slf4j.Slf4j;
|
|||
public class Operate implements Serializable {
|
||||
|
||||
private static int DEFAULT_ASYNC_TIMEOUT = 5000;
|
||||
public static final int MAX_TASKS = 1000;
|
||||
public static PacketsManager packetsManager = new PacketsManager();
|
||||
|
||||
public static enum OperateType {
|
||||
/**
|
||||
* 同步WorkerState状态.
|
||||
*/
|
||||
PUT_WORKERSTATE,
|
||||
|
||||
/**
|
||||
* 分配packets
|
||||
*/
|
||||
ALLOCATE_PACKETS,
|
||||
|
||||
/**
|
||||
* 获取State状态
|
||||
*/
|
||||
|
@ -43,9 +52,11 @@ public class Operate implements Serializable {
|
|||
private OperateType type;
|
||||
private Object value;
|
||||
|
||||
public Operate(OperateType t, WorkerState ws) {
|
||||
|
||||
|
||||
public Operate(OperateType t, Object v) {
|
||||
this.type = t;
|
||||
this.value = ws;
|
||||
this.value = v;
|
||||
}
|
||||
|
||||
@java.lang.SuppressWarnings("unchecked")
|
||||
|
|
Loading…
Reference in New Issue
Block a user