TODO: 新模式

This commit is contained in:
huangsimin 2022-07-20 01:47:49 +08:00
parent d7cd8ed758
commit 62dd411739
5 changed files with 124 additions and 24 deletions

View File

@ -8,6 +8,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
@ -56,7 +57,7 @@ public class Server {
StateServerFactory.initStateServer(peeridstr, conf);
System.setProperty("server.port", sprPort);
var app = SpringApplication.run(Server.class, args);
ConfigurableApplicationContext app = SpringApplication.run(Server.class, args);
app.start();
}
}

View File

@ -53,7 +53,12 @@ public class TaskLog {
@GetMapping(path = "/test3")
public ResponseEntity<Response> RemoveLeader() {
Response response = new Response();
try {
StateServerFactory.getNode().shutdown();
} catch (Exception e) {
e.printStackTrace();
}
return new ResponseEntity<Response>(response, HttpStatus.OK);
}
}

View File

@ -9,11 +9,14 @@ import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.PeerId;
@ -153,7 +156,7 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onError(final RaftException e) {
log.error("Raft error: {}", e, e);
log.debug("Raft error: {}", e, e);
}
@Override
@ -163,6 +166,7 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onLeaderStart(final long term) {
log.debug("onLeaderStart {}", StateServerFactory.getCluster().getServerId());
this.leaderTerm.set(term);
try {
updateState((state)->{
@ -180,6 +184,7 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onLeaderStop(final Status status) {
log.debug("onLeaderStop {}", StateServerFactory.getCluster().getServerId());
this.leaderTerm.set(-1);
super.onLeaderStop(status);
@ -188,6 +193,7 @@ public class StateMachine extends StateMachineAdapter {
state.getWorkers().remove( StateServerFactory.getServerId() );
return state;
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
@ -196,11 +202,30 @@ public class StateMachine extends StateMachineAdapter {
}
@Override
public void onShutdown() {
log.debug("onShutdown");
try {
updateState((state)->{
state.getWorkers().remove( StateServerFactory.getServerId() );
return state;
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
}
super.onShutdown();
}
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
log.debug("{} {}", ctx, StateServerFactory.getCluster().getServerId());
try {
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState(ss.getCluster().getServerId());
@ -210,7 +235,7 @@ public class StateMachine extends StateMachineAdapter {
ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
if(resp == null) {
log.error("{} set WorkerState is error", resp);
log.debug("{} set WorkerState is error", resp);
}
log.debug("WorkerState is {}", resp);
return;
@ -221,13 +246,22 @@ public class StateMachine extends StateMachineAdapter {
super.onStartFollowing(ctx);
}
public static void main(String[] args) throws InterruptedException, RemotingException {
@Override
public void onConfigurationCommitted(Configuration conf) {
// TODO Auto-generated method stub
super.onConfigurationCommitted(conf);
}
@Override
public void onStopFollowing(LeaderChangeContext ctx) {
log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId());
super.onStopFollowing(ctx);
}
}

View File

@ -13,6 +13,8 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import org.springframework.context.ConfigurableApplicationContext;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
@ -28,6 +30,7 @@ import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
@ -57,6 +60,7 @@ import lombok.extern.slf4j.Slf4j;
public class StateServerFactory {
private static StateServer ss;
private static ConfigurableApplicationContext appCxt;
public static void initStateServer(String peerstr, Configuration conf) throws Exception {
if(ss != null) {
@ -66,6 +70,15 @@ public class StateServerFactory {
log.debug("init peerid {}", ss.node.getNodeId().getPeerId());
}
public static void setAppCxt(ConfigurableApplicationContext cxt) {
appCxt = cxt;
}
public static ConfigurableApplicationContext getAppCxt() {
return appCxt;
}
public static PeerId getServerId() {
return ss.getCluster().getServerId();
}
@ -133,6 +146,8 @@ public class StateServerFactory {
cluster.getRpcServer().registerProcessor(new SyncConditionProcessor());
node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
}
@ -206,8 +221,10 @@ public class StateServerFactory {
SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
if(status.isOk()) {
dofunc.apply(this.getValue());
}
}
};
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@ -251,7 +268,7 @@ public class StateServerFactory {
var closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
log.debug("leader {}", status);
// log.debug("leader {}", status);
this.synclock.notify();
}
};
@ -267,11 +284,11 @@ public class StateServerFactory {
request.setWorkerState(ws);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
resp = (ResponseSM)StateServerFactory.getRpcClient().invokeSync(StateServerFactory.getNode().getLeaderId().getEndpoint(), request, 5000);
if(resp == null) {
log.error("{} set WorkerState is error", resp);
}
log.debug("follow is {}", resp);
// log.debug("follow is {}", resp);
return;
} catch (InterruptedException | RemotingException e) {
e.printStackTrace();
@ -289,13 +306,12 @@ public class StateServerFactory {
var closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
log.debug("leader {}", status);
// log.debug("leader update {}", status);
synclock.notify();
}
};
StateServerFactory.getStateServer().applyState(s, closure);
closure.synclock.wait(5000);
return;
}
@ -308,7 +324,51 @@ public class StateServerFactory {
if(resp == null) {
log.error("{} set State is error", resp);
}
log.debug("follow is {}", resp);
// log.debug("follow is {}", resp);
return;
}
public void updateFsmStateAsync(State s, Function<Status, Void> onCompleted) throws InterruptedException, RemotingException {
if(isLeader()) {
var closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
// log.debug("leader update {}", status);
if(status.isOk()) {
onCompleted.apply(status);
}
}
};
StateServerFactory.getStateServer().applyState(s, closure);
return;
}
var ss = StateServerFactory.getStateServer();
var request = new RequestState();
request.setState(s);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
StateServerFactory
.getStateServer()
.getRpcClient()
.invokeAsync(ss.getNode().getLeaderId().getEndpoint(), request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if(result != null){
onCompleted.apply(Status.OK());
} else {
var status = new Status(10000, "rpc invokeAsync with request: {}", request);
log.debug("{}", status);
onCompleted.apply(status);
}
}
}, 5000);
// log.debug("follow is {}", resp);
return;
}

View File

@ -14,11 +14,11 @@
</appender>
<!-- <root level="INFO">
<root level="INFO">
<appender-ref ref="CONSOLE" />
</root> -->
</root>
<logger name="com.yuandian.dataflow" level="debug">
<!-- <logger name="com.yuandian.dataflow" level="info">
<appender-ref ref="CONSOLE" />
</logger>
</logger> -->
</configuration>