TODO: 分片模式处理数据

This commit is contained in:
huangsimin 2022-07-26 00:25:56 +08:00
parent 644e65603f
commit bf415d45e8
3 changed files with 27 additions and 22 deletions

View File

@ -54,20 +54,17 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
var resp = new RaftResponse(); var resp = new RaftResponse();
resp.setMsg(rpcCtx.getRemoteAddress()); resp.setMsg(rpcCtx.getRemoteAddress());
resp.setSuccess(true); resp.setSuccess(true);
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
ss.useFsmStateNotLock((state)->{ ss.useFsmStateNotLock((state)->{
var work = state.getWorkers().get( ss.getCluster().getServerId()); var work = state.getWorkers().get( ss.getCluster().getServerId());
work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size()); work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size());
work.setUpdateAt(Instant.now()); work.setUpdateAt(Instant.now());
log.debug("workerState taskQueueSize: {} psize: {}", work.getTaskQueueSize(), request.packets.size());
if(!ss.isLeader()) { if(!ss.isLeader()) {
var requestUpdateState = new SyncWorkerStateProcessor.RequestWorkerState(); var requestUpdateState = new SyncWorkerStateProcessor.RequestWorkerState();
requestUpdateState.setWorkerState(work); requestUpdateState.setWorkerState(work);
@ -80,7 +77,7 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
if (err != null) { if (err != null) {
err.printStackTrace(); err.printStackTrace();
} else { } else {
log.debug("转发 udate workerState {}", result); log.debug("转发 update WorkerState {}", result);
} }
rpcCtx.sendResponse(resp); rpcCtx.sendResponse(resp);
}}, 5000); }}, 5000);
@ -90,7 +87,6 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
return; return;
} }
StateServerFactory.getStateServer().applyWorkerState(work, new SyncClosure<State>() { StateServerFactory.getStateServer().applyWorkerState(work, new SyncClosure<State>() {
@Override @Override
public void run(Status status) { public void run(Status status) {

View File

@ -177,8 +177,22 @@ public class StateServerFactory {
} }
public void useFsmStateNotLock(Consumer<State> dofunc) { public void useFsmStateNotLock(Consumer<State> dofunc) {
var state = ss.fsm.getState(); var state = ss.fsm.getState();
dofunc.accept(state); synchronized(state) {
dofunc.accept(state);
}
// getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
// @Override
// public void run(Status status, long index, byte[] reqCtx) {
// var state = ss.fsm.getState();
// dofunc.accept(state);
// }
// } );
} }

View File

@ -90,19 +90,14 @@ public class StateFactory {
public void run(Status status) { public void run(Status status) {
log.info("任务队列更新成功 {}", this.getValue().getWorkers()); log.info("任务队列更新成功 {}", this.getValue().getWorkers());
pclist.forEach((peer) -> { pclist.forEach((peercap) -> {
if (peercap.getCap() <= 0) {
if (peer.getCap() <= 0) {
return ; return ;
} }
var request = new PacketsRequest(); var request = new PacketsRequest();
for (int i = 0; i < peer.getCap(); i++) { for (int i = 0; i < peercap.getCap(); i++) {
var p = Any.pack( var p = Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086) .setTableId(10086)
@ -112,8 +107,8 @@ public class StateFactory {
try { try {
log.debug("rpc {}", peer); log.debug("rpc {}", peercap);
ss.getRpcClient().invokeAsync(peer.peer.getEndpoint() , ss.getRpcClient().invokeAsync(peercap.peer.getEndpoint() ,
request, new InvokeCallback() { request, new InvokeCallback() {
@Override @Override
public void complete(Object result, Throwable err) { public void complete(Object result, Throwable err) {