TODO: 重新设计 均衡处理方案
This commit is contained in:
parent
f7dbbb35cb
commit
644e65603f
|
@ -97,7 +97,6 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
|
|||
log.debug("finsh tasks size {}, size: {}", status, request.packets.size());
|
||||
rpcCtx.sendResponse(resp);
|
||||
}
|
||||
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -205,6 +205,7 @@ public class StateServerFactory {
|
|||
}
|
||||
|
||||
public void applyWorkerState(WorkerState state, SyncClosure<State> closure) {
|
||||
log.debug("applyWorkerState");
|
||||
if (!ss.isLeader()) {
|
||||
ss.handlerNotLeaderError(closure);
|
||||
return;
|
||||
|
@ -213,9 +214,10 @@ public class StateServerFactory {
|
|||
StateServerFactory.getStateServer().useFsmStateNotLock((fsmState)->{
|
||||
var wmap = fsmState.getWorkers();
|
||||
var wstate = wmap.get(state.getPeerId());
|
||||
|
||||
if(wstate == null) {
|
||||
wmap.put(state.getPeerId(), state);
|
||||
}
|
||||
|
||||
try {
|
||||
final Task task = new Task();
|
||||
closure.setValue(fsmState);
|
||||
|
@ -228,10 +230,6 @@ public class StateServerFactory {
|
|||
closure.failure(errorMsg, PeerId.emptyPeer());
|
||||
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
|
||||
}
|
||||
} else {
|
||||
closure.success(fsmState);
|
||||
closure.run(Status.OK());
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ public class SyncWorkerStateProcessor implements RpcProcessor<SyncWorkerStatePro
|
|||
@Override
|
||||
public void handleRequest(RpcContext rpcCtx, RequestWorkerState request) {
|
||||
|
||||
log.info("request: {}", request);
|
||||
log.info("RequestWorkerState: {}", request);
|
||||
final SyncClosure<State> closure = new SyncClosure< State>() {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
|
|
|
@ -7,11 +7,13 @@
|
|||
package com.yuandian.dataflow.statemachine.state;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result;
|
||||
import com.alipay.sofa.jraft.Status;
|
||||
import com.alipay.sofa.jraft.entity.PeerId;
|
||||
import com.alipay.sofa.jraft.error.RemotingException;
|
||||
import com.alipay.sofa.jraft.rpc.InvokeCallback;
|
||||
import com.google.protobuf.Any;
|
||||
|
@ -38,6 +40,18 @@ import lombok.extern.slf4j.Slf4j;
|
|||
@ToString
|
||||
public class StateFactory {
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
public static class PeerIdCap {
|
||||
private PeerId peer;
|
||||
private long cap;
|
||||
public PeerIdCap(PeerId pid, long cap) {
|
||||
this.peer = pid;
|
||||
this.cap = cap;
|
||||
}
|
||||
}
|
||||
|
||||
public static Thread masterExecute = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -50,19 +64,45 @@ public class StateFactory {
|
|||
|
||||
var ss = StateServerFactory.getStateServer();
|
||||
// var state = ss.getFsm().getState();
|
||||
|
||||
ArrayList<PeerIdCap> pclist = new ArrayList<PeerIdCap>();
|
||||
|
||||
ss.useFsmStateNotLock((state) -> {
|
||||
alivePeers.forEach((peer) -> {
|
||||
WorkerState ws = state.getWorkers().get(peer);
|
||||
|
||||
if (ws != null) {
|
||||
var cap = 100 - ws.getTaskQueueSize();
|
||||
log.debug("cap :{} peer: {}", cap, peer);
|
||||
if (cap <= 0) {
|
||||
return ;
|
||||
}
|
||||
ws.setUpdateAt(Instant.now());
|
||||
ws.setTaskQueueSize(100);
|
||||
var pc = new PeerIdCap(peer, cap);
|
||||
pc.setCap(cap);
|
||||
;
|
||||
pclist.add(pc);
|
||||
}
|
||||
});
|
||||
|
||||
ss.applyState(state, new SyncClosure<State>() {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
log.info("任务队列更新成功 {}", this.getValue().getWorkers());
|
||||
|
||||
pclist.forEach((peer) -> {
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if (peer.getCap() <= 0) {
|
||||
return ;
|
||||
}
|
||||
|
||||
var request = new PacketsRequest();
|
||||
for (int i = 0; i < cap; i++) {
|
||||
for (int i = 0; i < peer.getCap(); i++) {
|
||||
var p = Any.pack(
|
||||
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
|
||||
.setTableId(10086)
|
||||
|
@ -73,7 +113,7 @@ public class StateFactory {
|
|||
|
||||
try {
|
||||
log.debug("rpc {}", peer);
|
||||
ss.getRpcClient().invokeAsync(peer.getEndpoint(),
|
||||
ss.getRpcClient().invokeAsync(peer.peer.getEndpoint() ,
|
||||
request, new InvokeCallback() {
|
||||
@Override
|
||||
public void complete(Object result, Throwable err) {
|
||||
|
@ -88,15 +128,9 @@ public class StateFactory {
|
|||
} catch (InterruptedException | RemotingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
ws.setUpdateAt(Instant.now());
|
||||
ws.setTaskQueueSize(100);
|
||||
}
|
||||
});
|
||||
|
||||
ss.applyState(state, new SyncClosure<State>() {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
log.info("任务队列更新成功 {}", this.getValue().getWorkers());
|
||||
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -113,7 +147,7 @@ public class StateFactory {
|
|||
|
||||
}
|
||||
|
||||
Thread.sleep(2000);
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
|
|
Loading…
Reference in New Issue
Block a user