去除Spring Boot
This commit is contained in:
parent
62dd411739
commit
182d38595f
44
assembly.xml
Normal file
44
assembly.xml
Normal file
|
@ -0,0 +1,44 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<assembly>
|
||||
<id>bin</id>
|
||||
<formats>
|
||||
<format>dir</format>
|
||||
<format>tar.gz</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<useProjectArtifact>true</useProjectArtifact>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
<scope>runtime</scope>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<lineEnding>unix</lineEnding>
|
||||
<fileMode>0755</fileMode>
|
||||
<includes>
|
||||
<include>bin/**</include>
|
||||
<include>config/**</include>
|
||||
</includes>
|
||||
<excludes>
|
||||
<exclude>**/src/**</exclude>
|
||||
<exclude>**/target/**</exclude>
|
||||
<exclude>**/.*/**</exclude>
|
||||
</excludes>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
<moduleSets>
|
||||
<moduleSet>
|
||||
<binaries>
|
||||
<outputDirectory>lib/</outputDirectory>
|
||||
<unpack>false</unpack>
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</binaries>
|
||||
</moduleSet>
|
||||
</moduleSets>
|
||||
</assembly>
|
103
pom.xml
103
pom.xml
|
@ -21,8 +21,8 @@
|
|||
<ratis.version>2.3.0</ratis.version>
|
||||
<grpc.version>1.32.3</grpc.version>
|
||||
<slf4j.version>1.7.36</slf4j.version>
|
||||
<jraft.version>1.3.10</jraft.version>
|
||||
<spring.boot.version>2.7.0</spring.boot.version>
|
||||
<jraft.version>1.3.11</jraft.version>
|
||||
<spring.boot.version>2.7.1</spring.boot.version>
|
||||
<mongo.driver.version>3.12.11</mongo.driver.version>
|
||||
<nacos.version>2.1.0</nacos.version>
|
||||
<snakeyaml.version>1.30</snakeyaml.version>
|
||||
|
@ -48,6 +48,20 @@
|
|||
</dependency>
|
||||
|
||||
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.11</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-core</artifactId>
|
||||
<version>1.2.11</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
@ -55,6 +69,27 @@
|
|||
</dependency>
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<!-- <dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
<version>${slf4j.version}</version>
|
||||
</dependency> -->
|
||||
|
||||
|
||||
<!-- <dependency>
|
||||
<groupId>ch.qos.reload4j</groupId>
|
||||
<artifactId>reload4j</artifactId>
|
||||
<version>1.2.21</version>
|
||||
</dependency> -->
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.yaml</groupId>
|
||||
<artifactId>snakeyaml</artifactId>
|
||||
|
@ -77,11 +112,11 @@
|
|||
|
||||
|
||||
|
||||
<dependency>
|
||||
<!-- <dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
<version>${spring.boot.version}</version>
|
||||
</dependency>
|
||||
</dependency> -->
|
||||
|
||||
|
||||
<!-- protobuf 依赖 -->
|
||||
|
@ -214,7 +249,7 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin> -->
|
||||
<plugin>
|
||||
<!-- <plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<version>${spring.boot.version}</version>
|
||||
|
@ -225,7 +260,65 @@
|
|||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin> -->
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<addClasspath>true</addClasspath>
|
||||
<mainClass>com.yuandian.dataflow.Server</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make-assembly</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>assembly.xml</descriptor>
|
||||
</descriptors>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
|
||||
|
||||
<!-- <plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>com.yuandian.dataflow.Server</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make-assembly</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin> -->
|
||||
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
|
|
|
@ -1,26 +1,10 @@
|
|||
package com.yuandian.dataflow;
|
||||
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.SpringBootConfiguration;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
|
||||
import com.alipay.remoting.serialization.SerializerManager;
|
||||
import com.alipay.sofa.jraft.JRaftUtils;
|
||||
import com.alipay.sofa.jraft.Node;
|
||||
import com.alipay.sofa.jraft.RaftGroupService;
|
||||
import com.alipay.sofa.jraft.conf.Configuration;
|
||||
import com.alipay.sofa.jraft.entity.PeerId;
|
||||
import com.alipay.sofa.jraft.option.NodeOptions;
|
||||
import com.yuandian.dataflow.statemachine.SyncClosure;
|
||||
import com.yuandian.dataflow.statemachine.StateMachine;
|
||||
import com.yuandian.dataflow.statemachine.StateServerFactory;
|
||||
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor;
|
||||
|
||||
import lombok.var;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -33,31 +17,38 @@ import lombok.extern.slf4j.Slf4j;
|
|||
*
|
||||
*/
|
||||
@Slf4j
|
||||
@SpringBootApplication(exclude = {MongoAutoConfiguration.class})
|
||||
@SpringBootConfiguration
|
||||
public class Server {
|
||||
|
||||
public static String peeridstr;
|
||||
public static String sprPort;
|
||||
public static Configuration conf ;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
|
||||
|
||||
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
|
||||
String[] sprPeers = new String[]{"3440","3441","3442"};
|
||||
|
||||
var peeridstr = peers[ Integer.parseInt(args[0] )];
|
||||
var sprPort = sprPeers[Integer.parseInt(args[0] )];
|
||||
|
||||
var peeridx = Integer.parseInt(args[0]);
|
||||
var peeridstr = peers[ peeridx ];
|
||||
var sprPort = sprPeers[ peeridx ];
|
||||
|
||||
// var peeridstr = peers[2];
|
||||
// var sprPort = sprPeers[2];
|
||||
log.info("{} {}", peeridstr, sprPort);
|
||||
|
||||
conf = JRaftUtils.getConfiguration(String.join(",", peers));
|
||||
StateServerFactory.startStateServer(peeridstr, conf);
|
||||
|
||||
Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
|
||||
// System.setProperty("server.port", sprPort);
|
||||
// ConfigurableApplicationContext app = SpringApplication.run(Server.class, args);
|
||||
// StateServerFactory.setAppCxt(app);
|
||||
// app.addApplicationListener(new SpringReadyEvent());
|
||||
// app.start();
|
||||
|
||||
StateServerFactory.initStateServer(peeridstr, conf);
|
||||
|
||||
System.setProperty("server.port", sprPort);
|
||||
ConfigurableApplicationContext app = SpringApplication.run(Server.class, args);
|
||||
app.start();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
package com.yuandian.dataflow.controller;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
// import org.springframework.http.HttpStatus;
|
||||
// import org.springframework.http.MediaType;
|
||||
// import org.springframework.http.ResponseEntity;
|
||||
// import org.springframework.stereotype.Controller;
|
||||
// import org.springframework.web.bind.annotation.GetMapping;
|
||||
// import org.springframework.web.bind.annotation.PostMapping;
|
||||
// import org.springframework.web.bind.annotation.RequestBody;
|
||||
// import org.springframework.web.bind.annotation.RequestParam;
|
||||
|
||||
import com.alipay.sofa.jraft.Status;
|
||||
import com.alipay.sofa.jraft.entity.PeerId;
|
||||
|
@ -19,46 +22,52 @@ import lombok.var;
|
|||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Controller
|
||||
// @Controller
|
||||
public class TaskLog {
|
||||
|
||||
@GetMapping(path = "/test")
|
||||
public ResponseEntity<Response> Processing() throws InterruptedException, RemotingException {
|
||||
// var ws = new WorkerState(new PeerId());
|
||||
// StateServerFactory.getStateServer().updateFsmWorkerState(ws);
|
||||
Response response = new Response();
|
||||
StateServerFactory.getStateServer().useFsmState((fsmState)->{
|
||||
log.debug(fsmState.toString() );
|
||||
log.debug( StateServerFactory.getNode().getLeaderId().toString() );
|
||||
response.Message = fsmState.toString();
|
||||
return null;
|
||||
});
|
||||
response.Code = HttpStatus.OK;
|
||||
return new ResponseEntity<Response>(response, HttpStatus.OK);
|
||||
}
|
||||
|
||||
@GetMapping(path = "/test2")
|
||||
public ResponseEntity<Response> MongodbTest() {
|
||||
Response response = new Response();
|
||||
StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{
|
||||
log.debug("{} {}", fsmState.toString());
|
||||
// @PostMapping(path = "/test", produces={MediaType.APPLICATION_JSON_VALUE})
|
||||
// public ResponseEntity<Response> Processing(@RequestBody String Count) throws InterruptedException, RemotingException {
|
||||
// // var ws = new WorkerState(new PeerId());
|
||||
// // StateServerFactory.getStateServer().updateFsmWorkerState(ws);
|
||||
// log.debug("{}", Count);
|
||||
// Response response = new Response();
|
||||
// synchronized(response) {
|
||||
// StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{
|
||||
// log.debug("http: {}",StateServerFactory.getServerId() );
|
||||
// response.Message = fsmState.toString();
|
||||
return null;
|
||||
});
|
||||
// response.notify();
|
||||
// return null;
|
||||
// });
|
||||
// }
|
||||
|
||||
return new ResponseEntity<Response>(response, HttpStatus.OK);
|
||||
}
|
||||
// response.Code = HttpStatus.OK;
|
||||
// return new ResponseEntity<Response>(response, HttpStatus.OK);
|
||||
// }
|
||||
|
||||
// @GetMapping(path = "/test2")
|
||||
// public ResponseEntity<Response> MongodbTest() {
|
||||
// Response response = new Response();
|
||||
// StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{
|
||||
// log.debug("{} {}", fsmState.toString());
|
||||
// // response.Message = fsmState.toString();
|
||||
// return null;
|
||||
// });
|
||||
|
||||
// return new ResponseEntity<Response>(response, HttpStatus.OK);
|
||||
// }
|
||||
|
||||
|
||||
@GetMapping(path = "/test3")
|
||||
public ResponseEntity<Response> RemoveLeader() {
|
||||
Response response = new Response();
|
||||
try {
|
||||
StateServerFactory.getNode().shutdown();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
// @GetMapping(path = "/test3")
|
||||
// public ResponseEntity<Response> RemoveLeader() {
|
||||
// Response response = new Response();
|
||||
// try {
|
||||
// StateServerFactory.getNode().shutdown();
|
||||
// } catch (Exception e) {
|
||||
// e.printStackTrace();
|
||||
// response.Message = e.getMessage();
|
||||
// return new ResponseEntity<Response>(response, HttpStatus.INTERNAL_SERVER_ERROR);
|
||||
// }
|
||||
|
||||
return new ResponseEntity<Response>(response, HttpStatus.OK);
|
||||
}
|
||||
// return new ResponseEntity<Response>(response, HttpStatus.OK);
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -28,8 +28,8 @@ import com.yuandian.dataflow.proto.msgtype.*;
|
|||
import io.grpc.ManagedChannelBuilder;
|
||||
import lombok.var;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.*;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
// import org.springframework.http.*;
|
||||
// import org.springframework.web.client.RestTemplate;
|
||||
|
||||
/**
|
||||
* description
|
||||
|
@ -121,15 +121,15 @@ public class CollectPackets extends CollectPacketsServerImplBase {
|
|||
var result = p.unpack(BacktrackingFlowOuterClass.BacktrackingFlow.class);
|
||||
System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
|
||||
|
||||
RestTemplate client = new RestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
// RestTemplate client = new RestTemplate();
|
||||
// HttpHeaders headers = new HttpHeaders();
|
||||
// headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
|
||||
HttpEntity<Object> requestEntity = new HttpEntity<>(JsonFormat.printer().print(result), headers);
|
||||
String url = "http://localhost:3440/test";
|
||||
ResponseEntity<Response> response = client.exchange(url, HttpMethod.POST, requestEntity, Response.class);
|
||||
// HttpEntity<Object> requestEntity = new HttpEntity<>(JsonFormat.printer().print(result), headers);
|
||||
// String url = "http://localhost:3440/test";
|
||||
// ResponseEntity<Response> response = client.exchange(url, HttpMethod.POST, requestEntity, Response.class);
|
||||
|
||||
System.out.println("result:" + response.getBody());
|
||||
// System.out.println("result:" + response.getBody());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
package com.yuandian.dataflow.projo;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.apache.http.HttpStatus;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class Response {
|
||||
@JsonProperty("code")
|
||||
public HttpStatus Code;
|
||||
public org.apache.http.HttpStatus Code;
|
||||
@JsonProperty("message")
|
||||
public String Message;
|
||||
@JsonProperty("data")
|
||||
|
|
|
@ -9,7 +9,7 @@ import java.util.function.Function;
|
|||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
|
||||
|
||||
import com.alipay.remoting.exception.CodecException;
|
||||
import com.alipay.remoting.serialization.SerializerManager;
|
||||
|
@ -34,6 +34,7 @@ import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
|
|||
import com.yuandian.dataflow.statemachine.rpc.RequestCondition;
|
||||
import com.yuandian.dataflow.statemachine.rpc.RequestState;
|
||||
import com.yuandian.dataflow.statemachine.state.State;
|
||||
import com.yuandian.dataflow.statemachine.state.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine.state.WorkerState;
|
||||
|
||||
import lombok.var;
|
||||
|
@ -76,10 +77,8 @@ public class StateMachine extends StateMachineAdapter {
|
|||
* Returns current value. 读取修改都在这个函数域内进行
|
||||
*/
|
||||
public void useState(Function<State, Void> dofunc) {
|
||||
synchronized(this.state) {
|
||||
dofunc.apply(this.state);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns current value. 读取修改都在这个函数域内进行
|
||||
|
@ -87,7 +86,7 @@ public class StateMachine extends StateMachineAdapter {
|
|||
* @throws InterruptedException
|
||||
*/
|
||||
public void updateState(Function<State, State> dofunc) throws InterruptedException, RemotingException {
|
||||
synchronized(this.state) {
|
||||
|
||||
var newstate = dofunc.apply(this.state);
|
||||
var ss = StateServerFactory.getStateServer();
|
||||
if(!isLeader()) {
|
||||
|
@ -109,7 +108,7 @@ public class StateMachine extends StateMachineAdapter {
|
|||
colsure.setValue(newstate);
|
||||
StateServerFactory.getStateServer().applyState(newstate, colsure);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -123,9 +122,9 @@ public class StateMachine extends StateMachineAdapter {
|
|||
// parsing.
|
||||
var closure = (SyncClosure<State>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
|
||||
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure);
|
||||
synchronized(this.state) {
|
||||
|
||||
this.state = closure.getValue();
|
||||
}
|
||||
|
||||
|
||||
closure.success(state);
|
||||
closure.run(Status.OK());
|
||||
|
@ -174,6 +173,9 @@ public class StateMachine extends StateMachineAdapter {
|
|||
state.getWorkers().put(ws.peerId, ws);
|
||||
return state;
|
||||
});
|
||||
if(!StateFactory.getMasterExecute().isAlive()) {
|
||||
StateFactory.getMasterExecute().start();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (RemotingException e) {
|
||||
|
@ -188,6 +190,10 @@ public class StateMachine extends StateMachineAdapter {
|
|||
this.leaderTerm.set(-1);
|
||||
super.onLeaderStop(status);
|
||||
|
||||
if(StateFactory.getMasterExecute().isAlive()) {
|
||||
StateFactory.getMasterExecute().interrupt();
|
||||
}
|
||||
|
||||
try {
|
||||
updateState((state)->{
|
||||
state.getWorkers().remove( StateServerFactory.getServerId() );
|
||||
|
@ -206,27 +212,19 @@ public class StateMachine extends StateMachineAdapter {
|
|||
@Override
|
||||
public void onShutdown() {
|
||||
log.debug("onShutdown");
|
||||
try {
|
||||
updateState((state)->{
|
||||
state.getWorkers().remove( StateServerFactory.getServerId() );
|
||||
return state;
|
||||
});
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (RemotingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
super.onShutdown();
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onStartFollowing(LeaderChangeContext ctx) {
|
||||
log.debug("{} {}", ctx, StateServerFactory.getCluster().getServerId());
|
||||
log.debug("[onStartFollowing] {} {}", ctx, StateServerFactory.getCluster().getServerId());
|
||||
try {
|
||||
|
||||
if(StateFactory.getMasterExecute().isAlive()) {
|
||||
StateFactory.getMasterExecute().interrupt();
|
||||
}
|
||||
|
||||
var ss = StateServerFactory.getStateServer();
|
||||
var ws = new WorkerState(ss.getCluster().getServerId());
|
||||
var request = new RequestCondition();
|
||||
|
@ -248,7 +246,6 @@ public class StateMachine extends StateMachineAdapter {
|
|||
|
||||
@Override
|
||||
public void onConfigurationCommitted(Configuration conf) {
|
||||
// TODO Auto-generated method stub
|
||||
super.onConfigurationCommitted(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import java.util.concurrent.SynchronousQueue;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
|
||||
|
||||
import com.alipay.remoting.NamedThreadFactory;
|
||||
import com.alipay.remoting.exception.CodecException;
|
||||
|
@ -60,23 +60,20 @@ import lombok.extern.slf4j.Slf4j;
|
|||
public class StateServerFactory {
|
||||
|
||||
private static StateServer ss;
|
||||
private static ConfigurableApplicationContext appCxt;
|
||||
|
||||
public static void initStateServer(String peerstr, Configuration conf) throws Exception {
|
||||
private static String myPeerStr;
|
||||
private static Configuration raftConf;
|
||||
|
||||
public static void startStateServer(String peerstr, Configuration conf) throws Exception {
|
||||
if(ss != null) {
|
||||
throw new Exception("重复初始化 InitStateServer");
|
||||
}
|
||||
ss = new StateServerFactory.StateServer(peerstr, conf);
|
||||
log.debug("init peerid {}", ss.node.getNodeId().getPeerId());
|
||||
}
|
||||
|
||||
public static void setAppCxt(ConfigurableApplicationContext cxt) {
|
||||
appCxt = cxt;
|
||||
}
|
||||
|
||||
public static ConfigurableApplicationContext getAppCxt() {
|
||||
return appCxt;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public static PeerId getServerId() {
|
||||
|
@ -114,15 +111,15 @@ public class StateServerFactory {
|
|||
private Executor readIndexExecutor = createReadIndexExecutor();
|
||||
|
||||
public StateServer(String addr, Configuration conf) {
|
||||
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
|
||||
String[] sprPeers = new String[]{"3440","3441","3442"};
|
||||
// String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
|
||||
// String[] sprPeers = new String[]{"3440","3441","3442"};
|
||||
|
||||
// var peeridstr = peers[Integer.parseInt(serverId)];
|
||||
// var sprPort = sprPeers[Integer.parseInt(args[0])];
|
||||
|
||||
// String groupId = "jraft";
|
||||
|
||||
// Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
|
||||
// conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
|
||||
|
||||
PeerId serverId = JRaftUtils.getPeerId(addr);
|
||||
int port = serverId.getPort();
|
||||
|
@ -146,76 +143,17 @@ public class StateServerFactory {
|
|||
cluster.getRpcServer().registerProcessor(new SyncConditionProcessor());
|
||||
node = cluster.start();
|
||||
|
||||
|
||||
|
||||
rpcClient = new BoltRaftRpcFactory().createRpcClient();
|
||||
rpcClient.init(new CliOptions());
|
||||
}
|
||||
|
||||
|
||||
|
||||
public boolean isLeader() {
|
||||
return this.fsm.isLeader();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 同步 可以使用follow使用, 但是可能延迟于leader. 只读
|
||||
* @param dofunc
|
||||
*/
|
||||
public void useFsmState(Function<State, Void> dofunc) {
|
||||
|
||||
SyncClosure<State> closure = new SyncClosure<State>() {
|
||||
@Override
|
||||
public void run(Status status) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
|
||||
@Override
|
||||
public void run(Status status, long index, byte[] reqCtx) {
|
||||
|
||||
getFsm().useState((fsmState)->{
|
||||
if(status.isOk()){
|
||||
closure.success(fsmState);
|
||||
closure.run(Status.OK());
|
||||
synchronized(dofunc) {
|
||||
dofunc.apply(fsmState);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
readIndexExecutor.execute(() -> {
|
||||
if(isLeader()){
|
||||
log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", getNode());
|
||||
applyState(fsmState, closure);
|
||||
}else {
|
||||
handlerNotLeaderError(closure);
|
||||
}
|
||||
});
|
||||
return null;
|
||||
});
|
||||
|
||||
synchronized(dofunc) {
|
||||
// log.debug("dofunc notify {}", getNode());
|
||||
dofunc.notify();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
synchronized(dofunc) {
|
||||
// log.debug("dofunc wait");
|
||||
dofunc.wait(5000);
|
||||
// log.debug("dofunc unwait");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return ;
|
||||
|
||||
}
|
||||
|
||||
public void useFsmStateAsync(Function<State, Void> dofunc) {
|
||||
|
||||
SyncClosure<State> closure = new SyncClosure<State>() {
|
||||
|
@ -402,7 +340,7 @@ public class StateServerFactory {
|
|||
}
|
||||
|
||||
|
||||
useFsmState((fsmState)->{
|
||||
useFsmStateAsync((fsmState)->{
|
||||
|
||||
var wmap = fsmState.getWorkers();
|
||||
var wstate = wmap.get(state.getPeerId());
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/**
|
||||
* description
|
||||
*
|
||||
* @author eson
|
||||
*2022年7月20日-10:00:05
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine.state;
|
||||
|
||||
|
||||
import com.alibaba.nacos.api.cmdb.pojo.Entity;
|
||||
import com.yuandian.dataflow.statemachine.StateServerFactory;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
import lombok.var;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* description
|
||||
*
|
||||
* @author eson
|
||||
*2022年7月20日-10:00:05
|
||||
*/
|
||||
@Slf4j
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
public class StateFactory {
|
||||
|
||||
public static Thread masterExecute = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while(true) {
|
||||
log.debug("master execute {}", StateServerFactory.getServerId());
|
||||
var alivePeers = StateServerFactory.getCluster().getRaftNode().listAlivePeers();
|
||||
log.debug("master execute {}", StateServerFactory.getCluster().getRaftNode().listAlivePeers());
|
||||
if(alivePeers != null) {
|
||||
synchronized(alivePeers) {
|
||||
StateServerFactory.getStateServer().useFsmStateAsync((state)->{
|
||||
alivePeers.forEach((peer)->{
|
||||
var ws = state.getWorkers().get(peer);
|
||||
if(ws != null) {
|
||||
var cap = 10000 - ws.getTaskQueueSize();
|
||||
if(cap > 0) {
|
||||
log.debug("{}", cap);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
});
|
||||
alivePeers.notify();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
public static Thread getMasterExecute() {
|
||||
return masterExecute;
|
||||
}
|
||||
|
||||
public static void Init() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -7,6 +7,7 @@
|
|||
package com.yuandian.dataflow.statemachine.state;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
|
||||
import com.alipay.sofa.jraft.entity.PeerId;
|
||||
|
||||
|
@ -26,9 +27,19 @@ import lombok.ToString;
|
|||
public class WorkerState implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -1L;
|
||||
// 节点的对应peerID
|
||||
|
||||
/**
|
||||
* 节点的对应peerID
|
||||
*/
|
||||
public PeerId peerId;
|
||||
/**
|
||||
* 任务队列的数量
|
||||
*/
|
||||
public long taskQueueSize;
|
||||
/**
|
||||
* 更新时间
|
||||
*/
|
||||
public Instant updateAt;
|
||||
|
||||
public WorkerState(PeerId pid) {
|
||||
this.peerId = pid;
|
||||
|
|
|
@ -10,15 +10,21 @@
|
|||
<level>DEBUG</level>
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<level>ERROR</level>
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter> -->
|
||||
</appender>
|
||||
|
||||
|
||||
<root level="INFO">
|
||||
<root level="info">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</root>
|
||||
|
||||
<!-- <logger name="com.yuandian.dataflow" level="info">
|
||||
<logger name="com.yuandian.dataflow" level="debug">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</logger> -->
|
||||
</logger>
|
||||
</configuration>
|
|
@ -21,7 +21,7 @@ import javax.swing.text.AbstractDocument.BranchElement;
|
|||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.expression.spel.ast.FunctionReference;
|
||||
// import org.springframework.expression.spel.ast.FunctionReference;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.client.model.InsertManyOptions;
|
||||
|
|
9
start.sh
9
start.sh
|
@ -2,11 +2,14 @@ screen -S raft-0 -X quit
|
|||
screen -S raft-1 -X quit
|
||||
screen -S raft-2 -X quit
|
||||
|
||||
|
||||
version=1.0.0-SNAPSHOT
|
||||
|
||||
sleep 1
|
||||
|
||||
screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0
|
||||
screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1
|
||||
screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2
|
||||
screen -dmS raft-0 -L java -cp target/dataflow-${version}.jar-bin/lib/*jar -jar target/dataflow-${version}.jar 0
|
||||
screen -dmS raft-1 -L java -cp target/dataflow-${version}.jar-bin/lib/*jar -jar target/dataflow-${version}.jar 1
|
||||
screen -dmS raft-2 -L java -cp target/dataflow-${version}.jar-bin/lib/*jar -jar target/dataflow-${version}.jar 2
|
||||
|
||||
sleep 1
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user