diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 4a49a81..6258617 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -8,6 +8,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; +import org.springframework.context.ConfigurableApplicationContext; import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.JRaftUtils; @@ -49,14 +50,14 @@ public class Server { // var peeridstr = peers[2]; // var sprPort = sprPeers[2]; - + Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); StateServerFactory.initStateServer(peeridstr, conf); System.setProperty("server.port", sprPort); - var app = SpringApplication.run(Server.class, args); + ConfigurableApplicationContext app = SpringApplication.run(Server.class, args); app.start(); } } diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 4a48904..4589103 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -53,7 +53,12 @@ public class TaskLog { @GetMapping(path = "/test3") public ResponseEntity RemoveLeader() { Response response = new Response(); - StateServerFactory.getNode().shutdown(); + try { + StateServerFactory.getNode().shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + return new ResponseEntity(response, HttpStatus.OK); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 4d292da..90eddb4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -9,11 +9,14 @@ import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; + import com.alipay.remoting.exception.CodecException; import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Iterator; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.core.StateMachineAdapter; import com.alipay.sofa.jraft.entity.LeaderChangeContext; import com.alipay.sofa.jraft.entity.PeerId; @@ -153,7 +156,7 @@ public class StateMachine extends StateMachineAdapter { @Override public void onError(final RaftException e) { - log.error("Raft error: {}", e, e); + log.debug("Raft error: {}", e, e); } @Override @@ -163,6 +166,7 @@ public class StateMachine extends StateMachineAdapter { @Override public void onLeaderStart(final long term) { + log.debug("onLeaderStart {}", StateServerFactory.getCluster().getServerId()); this.leaderTerm.set(term); try { updateState((state)->{ @@ -180,6 +184,7 @@ public class StateMachine extends StateMachineAdapter { @Override public void onLeaderStop(final Status status) { + log.debug("onLeaderStop {}", StateServerFactory.getCluster().getServerId()); this.leaderTerm.set(-1); super.onLeaderStop(status); @@ -188,19 +193,39 @@ public class StateMachine extends StateMachineAdapter { state.getWorkers().remove( StateServerFactory.getServerId() ); return state; }); + } catch (InterruptedException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } } + + + + @Override + public void onShutdown() { + log.debug("onShutdown"); + try { + updateState((state)->{ + state.getWorkers().remove( StateServerFactory.getServerId() ); + return state; + }); + + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } + + super.onShutdown(); + + } @Override public void onStartFollowing(LeaderChangeContext ctx) { - - - + log.debug("{} {}", ctx, StateServerFactory.getCluster().getServerId()); try { var ss = StateServerFactory.getStateServer(); var ws = new WorkerState(ss.getCluster().getServerId()); @@ -210,7 +235,7 @@ public class StateMachine extends StateMachineAdapter { ResponseSM resp; resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); if(resp == null) { - log.error("{} set WorkerState is error", resp); + log.debug("{} set WorkerState is error", resp); } log.debug("WorkerState is {}", resp); return; @@ -221,13 +246,22 @@ public class StateMachine extends StateMachineAdapter { super.onStartFollowing(ctx); } - + @Override + public void onConfigurationCommitted(Configuration conf) { + // TODO Auto-generated method stub + super.onConfigurationCommitted(conf); + } + + @Override + public void onStopFollowing(LeaderChangeContext ctx) { + log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId()); + super.onStopFollowing(ctx); + } - - public static void main(String[] args) throws InterruptedException, RemotingException { - - } + + + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 5f01dc3..5fc2b2e 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -13,6 +13,8 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Function; +import org.springframework.context.ConfigurableApplicationContext; + import com.alipay.remoting.NamedThreadFactory; import com.alipay.remoting.exception.CodecException; import com.alipay.remoting.serialization.SerializerManager; @@ -28,6 +30,7 @@ import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.option.CliOptions; import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.RpcClient; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.BytesUtil; @@ -57,6 +60,7 @@ import lombok.extern.slf4j.Slf4j; public class StateServerFactory { private static StateServer ss; + private static ConfigurableApplicationContext appCxt; public static void initStateServer(String peerstr, Configuration conf) throws Exception { if(ss != null) { @@ -66,6 +70,15 @@ public class StateServerFactory { log.debug("init peerid {}", ss.node.getNodeId().getPeerId()); } + public static void setAppCxt(ConfigurableApplicationContext cxt) { + appCxt = cxt; + } + + public static ConfigurableApplicationContext getAppCxt() { + return appCxt; + } + + public static PeerId getServerId() { return ss.getCluster().getServerId(); } @@ -132,6 +145,8 @@ public class StateServerFactory { cluster.getRpcServer().registerProcessor(new SyncStateProcessor()); cluster.getRpcServer().registerProcessor(new SyncConditionProcessor()); node = cluster.start(); + + rpcClient = new BoltRaftRpcFactory().createRpcClient(); rpcClient.init(new CliOptions()); @@ -206,7 +221,9 @@ public class StateServerFactory { SyncClosure closure = new SyncClosure() { @Override public void run(Status status) { - dofunc.apply(this.getValue()); + if(status.isOk()) { + dofunc.apply(this.getValue()); + } } }; @@ -251,7 +268,7 @@ public class StateServerFactory { var closure = new SyncClosure() { @Override public void run(Status status) { - log.debug("leader {}", status); + // log.debug("leader {}", status); this.synclock.notify(); } }; @@ -267,11 +284,11 @@ public class StateServerFactory { request.setWorkerState(ws); log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); ResponseSM resp; - resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); + resp = (ResponseSM)StateServerFactory.getRpcClient().invokeSync(StateServerFactory.getNode().getLeaderId().getEndpoint(), request, 5000); if(resp == null) { log.error("{} set WorkerState is error", resp); } - log.debug("follow is {}", resp); + // log.debug("follow is {}", resp); return; } catch (InterruptedException | RemotingException e) { e.printStackTrace(); @@ -289,13 +306,12 @@ public class StateServerFactory { var closure = new SyncClosure() { @Override public void run(Status status) { - log.debug("leader {}", status); + // log.debug("leader update {}", status); synclock.notify(); } }; StateServerFactory.getStateServer().applyState(s, closure); closure.synclock.wait(5000); - return; } @@ -308,7 +324,51 @@ public class StateServerFactory { if(resp == null) { log.error("{} set State is error", resp); } - log.debug("follow is {}", resp); + // log.debug("follow is {}", resp); + return; + + } + + + public void updateFsmStateAsync(State s, Function onCompleted) throws InterruptedException, RemotingException { + if(isLeader()) { + var closure = new SyncClosure() { + @Override + public void run(Status status) { + // log.debug("leader update {}", status); + if(status.isOk()) { + onCompleted.apply(status); + } + } + }; + StateServerFactory.getStateServer().applyState(s, closure); + return; + } + + var ss = StateServerFactory.getStateServer(); + var request = new RequestState(); + request.setState(s); + log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); + + StateServerFactory + .getStateServer() + .getRpcClient() + .invokeAsync(ss.getNode().getLeaderId().getEndpoint(), request, new InvokeCallback() { + + @Override + public void complete(Object result, Throwable err) { + if(result != null){ + onCompleted.apply(Status.OK()); + } else { + var status = new Status(10000, "rpc invokeAsync with request: {}", request); + log.debug("{}", status); + onCompleted.apply(status); + } + } + + }, 5000); + + // log.debug("follow is {}", resp); return; } diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 4b84335..f3f6718 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -14,11 +14,11 @@ - + - + \ No newline at end of file