更换apache的raft
This commit is contained in:
parent
f2fdc44a44
commit
f54b3257d7
|
@ -40,7 +40,7 @@ public class MasterProcessor implements MasterExecute {
|
||||||
private final int MAX_TASKS = 1000;
|
private final int MAX_TASKS = 1000;
|
||||||
private final int DEFAULT_ASYNC_TIMEOUT = 5000;
|
private final int DEFAULT_ASYNC_TIMEOUT = 5000;
|
||||||
|
|
||||||
public static PacketsManager packetsManager = new PacketsManager();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void loop(MasterContext cxt) {
|
public void loop(MasterContext cxt) {
|
||||||
|
@ -53,22 +53,22 @@ public class MasterProcessor implements MasterExecute {
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
.setTableId(10086)
|
.setTableId(10086)
|
||||||
.build());
|
.build());
|
||||||
packetsManager.addPacket(p);
|
Operate.packetsManager.addPacket(p);
|
||||||
// packets.add(p);
|
// packets.add(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 必须复制. raft有一直使用该list
|
// 必须复制. raft有一直使用该list
|
||||||
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
|
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
|
||||||
|
|
||||||
if (packetsManager.size() >= 100000) {
|
if ( Operate.packetsManager.size() >= 100000) {
|
||||||
log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", packetsManager.size());
|
log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", Operate.packetsManager.size());
|
||||||
packetsManager.discardPackets(50000);
|
Operate.packetsManager.discardPackets(50000);
|
||||||
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers,
|
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers,
|
||||||
packetsManager.size());
|
Operate.packetsManager.size());
|
||||||
cxt.sleep(5000);
|
cxt.sleep(5000);
|
||||||
} else {
|
} else {
|
||||||
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(),
|
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(),
|
||||||
// alivePeers, packetsManager.size());
|
// alivePeers, Operate.packetsManager.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (alivePeers == null) {
|
if (alivePeers == null) {
|
||||||
|
@ -91,8 +91,14 @@ public class MasterProcessor implements MasterExecute {
|
||||||
cxt.sleep(5000);;
|
cxt.sleep(5000);;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks);
|
var all = 0;
|
||||||
|
for(var i : allocTasks) {
|
||||||
|
all += i;
|
||||||
|
}
|
||||||
|
if(all != 0) {
|
||||||
|
log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks);
|
||||||
|
}
|
||||||
for (int i = 0; i < peers.length; i++) {
|
for (int i = 0; i < peers.length; i++) {
|
||||||
|
|
||||||
var peer = peers[i];
|
var peer = peers[i];
|
||||||
|
|
|
@ -71,36 +71,25 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
|
||||||
|
|
||||||
// 读状态 Closure<State> 里的 getValue<State>为 State的状态
|
// 读状态 Closure<State> 里的 getValue<State>为 State的状态
|
||||||
var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态
|
var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
var ws = state.getWorkers().get(StateFactory.getServerId());
|
||||||
var ws = state.getWorkers().get(StateFactory.getServerId());
|
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
|
||||||
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
|
ws.setUpdateAt(Instant.now()); // 设置更新时间
|
||||||
ws.setUpdateAt(Instant.now()); // 设置更新时间
|
|
||||||
|
|
||||||
|
|
||||||
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
|
|
||||||
new GenericClosure() {
|
|
||||||
@Override
|
|
||||||
public void run(Status status) {
|
|
||||||
// 处理完数据 更新工作状态的时间
|
|
||||||
log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis());
|
|
||||||
if (!status.isOk()) {
|
|
||||||
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
|
||||||
|
new GenericClosure() {
|
||||||
|
@Override
|
||||||
|
public void run(Status status) {
|
||||||
|
// 处理完数据 更新工作状态的时间
|
||||||
|
log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis());
|
||||||
|
if (!status.isOk()) {
|
||||||
|
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
} ;
|
} ;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -51,7 +51,7 @@ public final class Doc {
|
||||||
CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),fromProviders(pojoCodecProvider));
|
CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),fromProviders(pojoCodecProvider));
|
||||||
|
|
||||||
MongoDatabase oriDatabase = mgo.getDatabase("ori-database").withCodecRegistry(pojoCodecRegistry);
|
MongoDatabase oriDatabase = mgo.getDatabase("ori-database").withCodecRegistry(pojoCodecRegistry);
|
||||||
MongoCollection<Doc> db = oriDatabase.getCollection("business_alarm_20220803", Doc.class);
|
MongoCollection<Doc> db = oriDatabase.getCollection("network_performace_flow_2022072400", Doc.class);
|
||||||
|
|
||||||
log.debug("{}", db.find(new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(45601335571100803L)))));
|
log.debug("{}", db.find(new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(45601335571100803L)))));
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,23 +68,19 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
Operate op = null;
|
Operate op = null;
|
||||||
GenericClosure closure = null;
|
GenericClosure closure = null;
|
||||||
if (iter.done() != null) {
|
if (iter.done() != null) {
|
||||||
|
|
||||||
// leader可以直接从 回调closure里提取operate
|
// leader可以直接从 回调closure里提取operate
|
||||||
closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
|
closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
|
||||||
op = (Operate)closure.getValue();
|
op = (Operate)closure.getValue();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// 非leader 需要从getData反序列化出来后处理
|
// 非leader 需要从getData反序列化出来后处理
|
||||||
final ByteBuffer data = iter.getData();
|
final ByteBuffer data = iter.getData();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
|
op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
|
||||||
data.array(),
|
data.array(),
|
||||||
Operate.class.getName());
|
Operate.class.getName());
|
||||||
} catch (CodecException e) {
|
} catch (CodecException e) {
|
||||||
log.info("{}", e.toString());
|
log.info("{}", e.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (op == null) {
|
if (op == null) {
|
||||||
|
@ -104,7 +100,6 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case ALLOCATE_PACKETS:
|
case ALLOCATE_PACKETS:
|
||||||
|
|
||||||
List<PeerId> alivePeers = op.getValue();
|
List<PeerId> alivePeers = op.getValue();
|
||||||
PeerId[] peers = new PeerId[alivePeers.size()];
|
PeerId[] peers = new PeerId[alivePeers.size()];
|
||||||
alivePeers.toArray(peers);
|
alivePeers.toArray(peers);
|
||||||
|
@ -130,6 +125,7 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// log.info("size: {}", Operate.packetsManager.size());
|
||||||
// 统计每个节点发送多少任务
|
// 统计每个节点发送多少任务
|
||||||
var allocTasks = Utils.allocationTasks(Operate.packetsManager.size(), canTasks);
|
var allocTasks = Utils.allocationTasks(Operate.packetsManager.size(), canTasks);
|
||||||
if(closure != null) {
|
if(closure != null) {
|
||||||
|
@ -144,21 +140,15 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
WorkerState ws = state.getWorkers().get(peer);
|
WorkerState ws = state.getWorkers().get(peer);
|
||||||
ws.setUpdateAt(Instant.now());
|
ws.setUpdateAt(Instant.now());
|
||||||
ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]);
|
ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]);
|
||||||
log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize());
|
// log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// log.debug("PUT {}", ws.peerId);
|
|
||||||
// ws.put(ws.peerId, ws);
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case GET_STATE:
|
case GET_STATE:
|
||||||
closure.setValue(this.state);
|
closure.setValue(this.state);
|
||||||
log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
|
log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
|
||||||
break;
|
break;
|
||||||
case REMOVE:
|
case REMOVE:
|
||||||
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user