From 1ea40571d913a1b48449983d9e73c5c2dbd12173 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Wed, 27 Jul 2022 16:25:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5vscode=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 18 ++++ .vscode/settings.json | 3 + .vscode/tasks.json | 27 ++++++ restart.sh | 2 +- .../dataflow/controller/PacketsProcessor.java | 51 ++++++------ .../dataflow/statemachine/StateMachine.java | 82 ++++++++----------- .../statemachine/StateServerFactory.java | 9 +- .../statemachine/operate/Operate.java | 2 + .../statemachine/rpc/OperateProcessor.java | 2 +- .../com/yuandian/dataflow/utils/Utils.java | 8 ++ start.sh | 4 +- stop.sh | 1 + 12 files changed, 132 insertions(+), 77 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json create mode 100644 .vscode/tasks.json create mode 100644 src/main/java/com/yuandian/dataflow/utils/Utils.java diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..db80756 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,18 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "java", + "name": "Launch Server", + "request": "launch", + "mainClass": "com.yuandian.dataflow.Server", + "projectName": "dataflow", + "args": "2", + "preLaunchTask": "restart", + "postDebugTask": "stopall", + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..e0f15db --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.configuration.updateBuildConfiguration": "automatic" +} \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..7566731 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,27 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "restart", + "type": "shell", + "command": "sh restart.sh", + "presentation": { + "echo": true, + "reveal": "always", + "focus": false, + "panel": "shared", + "showReuseMessage": true, + "clear": false, + "close": true + } + }, + { + "label": "stopall", + "type": "shell", + "command": "sh stop.sh", + "presentation": { + "close": true + } + } + ] +} \ No newline at end of file diff --git a/restart.sh b/restart.sh index 3bb4f6e..a51605a 100755 --- a/restart.sh +++ b/restart.sh @@ -1,2 +1,2 @@ #! /bin/bash -sh stop.sh & rm raftdata/ -rf && mvn -T4 package && truncate -s 0 screenlog.0 && sh start.sh +sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 && sh start.sh diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 14edd27..2c40164 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -52,40 +52,45 @@ public class PacketsProcessor implements RpcProcessor(); + resp.setSuccess(true); + rpcCtx.sendResponse(resp); var ss = StateServerFactory.getStateServer(); - log.info("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size()); + log.debug("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size()); + + ss.readIndexState( new GenericClosure() { @Override public void run(Status status) { - - var state = this.getValue(); - var ws = state.getWorkers().get(StateServerFactory.getServerId()); - ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size()); - ws.setUpdateAt(Instant.now()); - - log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size()); - - Operate.CallOperate(new Operate(OperateType.PUT,ws), new GenericClosure() { - @Override - public void run(Status status) { - var resp = this.getResponse(); - resp.setMsg(rpcCtx.getRemoteAddress()); - if(status.isOk()) { - resp.setSuccess(true); - log.info("{}", resp); - } else { - resp.setSuccess(false); + log.debug("status {}", status); + if(status.isOk()) { + var state = this.getValue(); + var ws = state.getWorkers().get(StateServerFactory.getServerId()); + + + ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size()); + ws.setUpdateAt(Instant.now()); + + log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size()); + + Operate.CallOperate(new Operate(OperateType.PUT,ws), new GenericClosure() { + @Override + public void run(Status status) { + if(status.isOk()) { + log.info("{}", resp); + } } - rpcCtx.sendResponse(resp); - } - }); + }); + } + } } ); - + } @Override diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index cf3b6c1..28fef8e 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -20,7 +20,7 @@ import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; - + import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; @@ -38,7 +38,8 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class StateMachine extends StateMachineAdapter { - // private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); + // private static final Logger LOG = + // LoggerFactory.getLogger(StateMachine.class); /** * State value 全局使用的唯一状态 @@ -70,46 +71,46 @@ public class StateMachine extends StateMachineAdapter { if (iter.done() != null) { // This task is applied by this node, get value from closure to avoid additional // parsing. - closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 + closure = (GenericClosure) iter.done(); // 只支持单一个State. 全状态机只支持一种提交 op = closure.getValue(); - + } else { - // Have to parse FetchAddRequest from this user log. + // Have to parse FetchAddRequest from this user log. final ByteBuffer data = iter.getData(); try { - - op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(data.array(),Operate.class.getName()); + + op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( + data.array(), + Operate.class.getName()); } catch (CodecException e) { - log.info("{}", e.toString()); + log.info("{}", e.toString()); } - + } - if(op != null) { - switch(op.getType()) { + if (op != null) { + switch (op.getType()) { case PUT: - WorkerState ws = op.getValue(); + WorkerState ws = op.getValue(); state.getWorkers().put(ws.peerId, ws); - if(closure != null) { + if (closure != null) { closure.success(op); closure.run(Status.OK()); } break; case REMOVE: - if(closure != null) { + if (closure != null) { closure.success(op); closure.run(Status.OK()); } break; default: break; - + } } else { } - - iter.next(); } @@ -136,21 +137,21 @@ public class StateMachine extends StateMachineAdapter { this.leaderTerm.set(term); // 判断是否Master线程还在跑, 如果存在则中断 - if(MasterFactory.getMasterExecute().isAlive()) { + if (MasterFactory.getMasterExecute().isAlive()) { MasterFactory.getMasterExecute().interrupt(); } - - var ss = StateServerFactory.getStateServer(); - ss.readIndexState( new GenericClosure() { + + var ss = StateServerFactory.getStateServer(); + ss.readIndexState(new GenericClosure() { @Override public void run(Status status) { - - var ws = state.getWorkers().get( StateServerFactory.getServerId() ); - if(ws == null) { + + var ws = state.getWorkers().get(StateServerFactory.getServerId()); + if (ws == null) { ws = new WorkerState(StateServerFactory.getServerId()); } - + Operate op = new Operate(OperateType.PUT, ws); ss.applyOperate(op, new GenericClosure() { @Override @@ -159,7 +160,7 @@ public class StateMachine extends StateMachineAdapter { } }); } - + }); // 当成为master时候 必须启动 @@ -175,21 +176,17 @@ public class StateMachine extends StateMachineAdapter { super.onLeaderStop(status); // 判断是否Master线程还在跑, 如果存在则中断 - if(MasterFactory.getMasterExecute().isAlive()) { + if (MasterFactory.getMasterExecute().isAlive()) { MasterFactory.getMasterExecute().interrupt(); } - } - - @Override public void onShutdown() { log.debug("onShutdown"); super.onShutdown(); } - @Override public void onStartFollowing(LeaderChangeContext ctx) { @@ -197,12 +194,12 @@ public class StateMachine extends StateMachineAdapter { try { // 判断是否Master线程还在跑, 如果存在则中断 - if(MasterFactory.getMasterExecute().isAlive()) { + if (MasterFactory.getMasterExecute().isAlive()) { MasterFactory.getMasterExecute().interrupt(); } - + var ws = new WorkerState(StateServerFactory.getServerId()); - log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); + log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); var op = new Operate(OperateType.PUT, ws); @@ -212,10 +209,10 @@ public class StateMachine extends StateMachineAdapter { log.info("{} {}", status, this.getResponse()); } }); - + return; } catch (Exception e) { - log.info("{}", e.toString()); + log.info("{}", e.toString()); } super.onStartFollowing(ctx); @@ -228,12 +225,12 @@ public class StateMachine extends StateMachineAdapter { @Override public void onStopFollowing(LeaderChangeContext ctx) { - log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId()); + log.debug("{} {}", ctx, StateServerFactory.getCluster().getServerId()); var ss = StateServerFactory.getStateServer(); var ws = new WorkerState(StateServerFactory.getServerId()); - - log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); + + log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); var op = new Operate(OperateType.PUT, ws); Operate.CallOperate(op, new GenericClosure() { @@ -243,14 +240,7 @@ public class StateMachine extends StateMachineAdapter { } }); - super.onStopFollowing(ctx); } - - - - - - } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index c5d724a..7521b7e 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -188,17 +188,18 @@ public class StateServerFactory { getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @Override public void run(Status status, long index, byte[] reqCtx) { - if( status.isOk()) { + if(status.isOk()) { + // 回调失败 closure.success(ss.fsm.getState()); - closure.run(status); - } + } + closure.run(status); } } ); } - public void applyOperate(Operate op, GenericClosure closure) { + public void applyOperate(Operate op, GenericClosure closure) { // 所有的提交都必须再leader进行 if (!ss.isLeader()) { ss.handlerNotLeaderError(closure); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index 0194b1a..df2cf67 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -2,6 +2,7 @@ package com.yuandian.dataflow.statemachine.operate; import java.io.Serializable; +import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.yuandian.dataflow.statemachine.StateServerFactory; @@ -75,6 +76,7 @@ public class Operate implements Serializable { var resp = (RaftResponse) result; closure.setResponse(resp); closure.success(resp.getValue()); + closure.run(Status.OK()); } }, 5000); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index 96a825a..4ad2efd 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -64,7 +64,7 @@ public class OperateProcessor implements RpcProcessor closure = new GenericClosure() { @Override public void run(Status status) { rpcCtx.sendResponse(getResponse()); diff --git a/src/main/java/com/yuandian/dataflow/utils/Utils.java b/src/main/java/com/yuandian/dataflow/utils/Utils.java new file mode 100644 index 0000000..1ba5e8f --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/utils/Utils.java @@ -0,0 +1,8 @@ +package com.yuandian.dataflow.utils; + +public class Utils { + + public static void main(String[] args) { + + } +} diff --git a/start.sh b/start.sh index 2b4c76e..3746270 100755 --- a/start.sh +++ b/start.sh @@ -10,10 +10,10 @@ sleep 1 screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0 screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1 -screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2 +# screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2 sleep 1 screen -S raft-0 -X logfile flush 0 screen -S raft-1 -X logfile flush 0 -screen -S raft-2 -X logfile flush 0 \ No newline at end of file +# screen -S raft-2 -X logfile flush 0 \ No newline at end of file diff --git a/stop.sh b/stop.sh index 12f7ef1..9c0b857 100755 --- a/stop.sh +++ b/stop.sh @@ -2,3 +2,4 @@ screen -S raft-0 -X quit screen -S raft-1 -X quit screen -S raft-2 -X quit +exit 0