diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index 91a3d38..b6e2c70 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -69,7 +69,10 @@ public class MasterProcessor implements MasterExecute { } var canTasks = MAX_TASKS - ws.getTaskQueueSize(); - log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks); + if(canTasks > 0) { + log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks); + } + if (canTasks <= 0) { return; } @@ -93,19 +96,18 @@ public class MasterProcessor implements MasterExecute { new GenericClosure() { @Override public void run(Status status) { - log.info("PacketsRequest run {}", status); + // log.info("PacketsRequest run {}", status); try { StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request, new InvokeCallback() { @Override public void complete(Object result, Throwable err) { latch.countDown(); - log.debug("countDown {}", latch.getCount()); if (err != null) { // TODO: 如果错误, 需要让节点恢复任务处理的状态 log.debug("{}", err); } - log.debug("PacketsRequest: {}", result); + // log.debug("PacketsRequest: {}", result); } }, 5000); } catch (InterruptedException | RemotingException e) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java index 15d8c86..ceffe49 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java @@ -237,10 +237,10 @@ public class StateFactory { public void readIndexState(GenericClosure closure) { - getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(2000) { + getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(5000) { @Override public void run(Status status, long index, byte[] reqCtx) { - log.debug("readIndexState({}) {}", getServerId(), status); + // log.debug("readIndexState({}) {}", getServerId(), status); if (status.isOk()) { closure.success(ss.fsm.getState()); closure.setValue(ss.fsm.getState()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 0a551af..938ba78 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -91,13 +91,13 @@ public class StateMachine extends StateMachineAdapter { case PUT_WORKERSTATE: WorkerState ws = op.getValue(); - log.debug("PUT {}", ws.peerId); + // log.debug("PUT {}", ws.peerId); state.getWorkers().put(ws.peerId, ws); break; case GET_STATE: closure.setValue(this.state); - log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex()); + // log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex()); break; case REMOVE: diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index 44064dd..d5b3e42 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -63,7 +63,7 @@ public class Operate implements Serializable { */ @java.lang.SuppressWarnings("unchecked") public static void CallOperate(Operate op, GenericClosure closure) { - log.debug("CallOperate Value {}", op.getValue()); + // log.debug("CallOperate Value {}", op.getValue()); // 如果是leader 就直接提交 if (StateFactory.isLeader()) { StateFactory.applyOperate(op, closure); @@ -78,7 +78,7 @@ public class Operate implements Serializable { StateFactory.rpcClientInvokeAsync(StateFactory.getLeaderId().getEndpoint(), request, new InvokeCallback() { @Override public void complete(Object result, Throwable err) { - log.debug("Object result {}", result); + // log.debug("Object result {}", result); var resp = (RaftResponse) result; closure.setResponse(resp); closure.success(resp.getValue()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index 4c6daf0..b75bdc5 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -54,14 +54,14 @@ public class OperateProcessor implements RpcProcessor