This commit is contained in:
huangsimin 2022-08-01 23:25:01 +08:00
parent cbf0308841
commit c339e6e3f0
5 changed files with 14 additions and 12 deletions

View File

@ -69,7 +69,10 @@ public class MasterProcessor implements MasterExecute {
}
var canTasks = MAX_TASKS - ws.getTaskQueueSize();
if(canTasks > 0) {
log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks);
}
if (canTasks <= 0) {
return;
}
@ -93,19 +96,18 @@ public class MasterProcessor implements MasterExecute {
new GenericClosure() {
@Override
public void run(Status status) {
log.info("PacketsRequest run {}", status);
// log.info("PacketsRequest run {}", status);
try {
StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
latch.countDown();
log.debug("countDown {}", latch.getCount());
if (err != null) {
// TODO: 如果错误, 需要让节点恢复任务处理的状态
log.debug("{}", err);
}
log.debug("PacketsRequest: {}", result);
// log.debug("PacketsRequest: {}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {

View File

@ -237,10 +237,10 @@ public class StateFactory {
public void readIndexState(GenericClosure closure) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(2000) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(5000) {
@Override
public void run(Status status, long index, byte[] reqCtx) {
log.debug("readIndexState({}) {}", getServerId(), status);
// log.debug("readIndexState({}) {}", getServerId(), status);
if (status.isOk()) {
closure.success(ss.fsm.getState());
closure.setValue(ss.fsm.getState());

View File

@ -91,13 +91,13 @@ public class StateMachine extends StateMachineAdapter {
case PUT_WORKERSTATE:
WorkerState ws = op.getValue();
log.debug("PUT {}", ws.peerId);
// log.debug("PUT {}", ws.peerId);
state.getWorkers().put(ws.peerId, ws);
break;
case GET_STATE:
closure.setValue(this.state);
log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
// log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
break;
case REMOVE:

View File

@ -63,7 +63,7 @@ public class Operate implements Serializable {
*/
@java.lang.SuppressWarnings("unchecked")
public static void CallOperate(Operate op, GenericClosure closure) {
log.debug("CallOperate Value {}", op.<WorkerState>getValue());
// log.debug("CallOperate Value {}", op.<WorkerState>getValue());
// 如果是leader 就直接提交
if (StateFactory.isLeader()) {
StateFactory.applyOperate(op, closure);
@ -78,7 +78,7 @@ public class Operate implements Serializable {
StateFactory.rpcClientInvokeAsync(StateFactory.getLeaderId().getEndpoint(), request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
log.debug("Object result {}", result);
// log.debug("Object result {}", result);
var resp = (RaftResponse) result;
closure.setResponse(resp);
closure.success(resp.getValue());

View File

@ -54,14 +54,14 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
@Override
public void handleRequest(RpcContext rpcCtx, OperateRequest request) {
log.info("request: {}", request);
// log.info("request: {}", request);
final GenericClosure closure = new GenericClosure () {
@Override
public void run(Status status) {
if(status.isOk()) {
log.info("{}", status);
// log.info("{}", status);
rpcCtx.sendResponse(getResponse());
return;
}