This commit is contained in:
huangsimin 2022-07-19 00:04:59 +08:00
parent 36fe0e6428
commit 20e50d142d
8 changed files with 61 additions and 33 deletions

View File

@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
@ -31,7 +32,7 @@ import lombok.extern.slf4j.Slf4j;
*
*/
@Slf4j
@SpringBootApplication
@SpringBootApplication(exclude = {MongoAutoConfiguration.class})
@SpringBootConfiguration
public class Server {

View File

@ -7,10 +7,13 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -20,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
public class TaskLog {
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing() throws InterruptedException {
public ResponseEntity<Response> Processing() throws InterruptedException, RemotingException {
StateServerFactory.getStateServer().useFsmState((fsmState)->{
@ -31,6 +34,13 @@ public class TaskLog {
// state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId());
// state.getWorker().setTaskQueueSize(1);
StateServerFactory.getStateServer().updateFsmState((fsmState)->{
log.error(fsmState.toString() );
fsmState.getWorkers().put(new PeerId(), new WorkerState());
return fsmState;
});

View File

@ -2,27 +2,20 @@ package com.yuandian.dataflow.projo;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.time.LocalDateTime;
import org.bson.Document;
import org.bson.codecs.pojo.annotations.BsonCreator;
import org.bson.codecs.pojo.annotations.BsonDiscriminator;
import org.bson.codecs.pojo.annotations.BsonProperty;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.util.JSONPObject;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public final class Doc extends Document {
@Getter
public final class Doc extends Document {
@JsonProperty("code")
@BsonProperty("code")
@ -33,14 +26,11 @@ public final class Doc extends Document {
public LocalDateTime TS;
@JsonProperty("desc")
@BsonProperty("desc")
@BsonProperty("desc")
public String Desc;
@JsonProperty("data")
@BsonProperty("data")
public Document Data;
@BsonProperty("data")
public Document Data;
}

View File

@ -80,11 +80,32 @@ public class StateMachine extends StateMachineAdapter {
/**
* Returns current value. 读取修改都在这个函数域内进行
* @throws RemotingException
* @throws InterruptedException
*/
public void updateState(Function<State, State> dofunc) {
public void updateState(Function<State, State> dofunc) throws InterruptedException, RemotingException {
synchronized(this.state) {
var newstate = dofunc.apply(this.state);
this.state = newstate;
var ss = StateServerFactory.getStateServer();
if(!isLeader()) {
var request = new RequestState();
request.setState(newstate);
var result = ss.getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
log.info("{}", result);
return;
}
// this.state = newstate;
if(newstate != null) {
var colsure = new SyncClosure<State>() {
@Override
public void run(Status status) {
}
};
colsure.setValue(newstate);
StateServerFactory.getStateServer().applyState(newstate, colsure);
}
}
}
@ -98,19 +119,22 @@ public class StateMachine extends StateMachineAdapter {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
var closure = (SyncClosure<State>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
state = closure.getValue();
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",state, this.leaderTerm);
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure);
synchronized(this.state) {
this.state = closure.getValue();
}
closure.success(state);
closure.run(Status.OK());
} else {
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
synchronized(state) {
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
}
} catch (CodecException e) {
e.printStackTrace();

View File

@ -171,6 +171,10 @@ public class StateServerFactory {
}
public void updateFsmState(Function<State, State> dofunc) throws InterruptedException, RemotingException {
this.getFsm().updateState(dofunc);
}
public void applyState(State state, SyncClosure<State> closure) {
// 所有的提交都必须再leader进行
@ -193,7 +197,7 @@ public class StateServerFactory {
}
}
public void applyWorkerState(WorkerState state, SyncClosure<WorkerState> closure) {
public void applyWorkerState(WorkerState state, SyncClosure<State> closure) {
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
@ -209,13 +213,13 @@ public class StateServerFactory {
wmap.put(state.getPeerId(), state);
try {
final Task task = new Task();
log.error("{}", fsmState);
closure.setValue(fsmState);
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState)));
task.setDone(closure);
StateServerFactory.getStateServer().getNode().apply(task); // 提交数据
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.error(errorMsg, e);
log.error("{}:{}",errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}

View File

@ -19,6 +19,7 @@ import com.lmax.disruptor.WorkProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.alipay.sofa.jraft.entity.PeerId;
@ -40,7 +41,7 @@ public class SyncConditionProcessor implements RpcProcessor<RequestCondition> {
public void handleRequest(RpcContext rpcCtx, RequestCondition request) {
log.info("request: {}", request);
final SyncClosure<WorkerState> closure = new SyncClosure<WorkerState>() {
final SyncClosure<State> closure = new SyncClosure< State>() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());

View File

@ -1,4 +1,4 @@
package com.yuandian.dataflow.grpc;
package com.yuandian.dataflow;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;

View File

@ -23,8 +23,7 @@ public class StateMachineTest {
void testOnApply() throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var fstate = new State();
var fdata = new RequestState();
fdata.setState(fstate);
@ -39,8 +38,7 @@ public class StateMachineTest {
, 5000);
log.info("{}", resp);
}
int i = 0 ;
while(true) {