diff --git a/assembly.xml b/assembly.xml
new file mode 100644
index 0000000..d0a0aa8
--- /dev/null
+++ b/assembly.xml
@@ -0,0 +1,44 @@
+
+
+ bin
+
+ dir
+ tar.gz
+
+ false
+
+
+ true
+ lib
+ runtime
+
+
+
+
+ unix
+ 0755
+
+ bin/**
+ config/**
+
+
+ **/src/**
+ **/target/**
+ **/.*/**
+
+
+
+
+
+
+ lib/
+ false
+
+
+ lib
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 384412f..b6954ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,8 +21,8 @@
2.3.0
1.32.3
1.7.36
- 1.3.10
- 2.7.0
+ 1.3.11
+ 2.7.1
3.12.11
2.1.0
1.30
@@ -47,6 +47,20 @@
${yuandian.common.config.version}
+
+
+
+
+ ch.qos.logback
+ logback-classic
+ 1.2.11
+
+
+
+ ch.qos.logback
+ logback-core
+ 1.2.11
+
org.slf4j
@@ -54,6 +68,27 @@
${slf4j.version}
+
+
+
+
+
+
+
+
+
+
+
+
+
org.yaml
@@ -77,11 +112,11 @@
-
+
@@ -214,7 +249,7 @@
-->
-
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ true
+ com.yuandian.dataflow.Server
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+ assembly.xml
+
+
+
+
+
+
+
+
+
org.apache.maven.plugins
diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java
index 6258617..86232a4 100644
--- a/src/main/java/com/yuandian/dataflow/Server.java
+++ b/src/main/java/com/yuandian/dataflow/Server.java
@@ -1,26 +1,10 @@
package com.yuandian.dataflow;
-import java.io.File;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.SpringBootConfiguration;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
-import org.springframework.context.ConfigurableApplicationContext;
-
-import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
-import com.alipay.sofa.jraft.Node;
-import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.conf.Configuration;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.option.NodeOptions;
-import com.yuandian.dataflow.statemachine.SyncClosure;
-import com.yuandian.dataflow.statemachine.StateMachine;
import com.yuandian.dataflow.statemachine.StateServerFactory;
-import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@@ -33,31 +17,38 @@ import lombok.extern.slf4j.Slf4j;
*
*/
@Slf4j
-@SpringBootApplication(exclude = {MongoAutoConfiguration.class})
-@SpringBootConfiguration
public class Server {
-
- public static void main(String[] args) throws Exception {
-
+ public static String peeridstr;
+ public static String sprPort;
+ public static Configuration conf ;
+ public static void main(String[] args) throws Exception {
+
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
- var peeridstr = peers[ Integer.parseInt(args[0] )];
- var sprPort = sprPeers[Integer.parseInt(args[0] )];
-
+ var peeridx = Integer.parseInt(args[0]);
+ var peeridstr = peers[ peeridx ];
+ var sprPort = sprPeers[ peeridx ];
// var peeridstr = peers[2];
// var sprPort = sprPeers[2];
-
+ log.info("{} {}", peeridstr, sprPort);
- Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
-
- StateServerFactory.initStateServer(peeridstr, conf);
+ conf = JRaftUtils.getConfiguration(String.join(",", peers));
+ StateServerFactory.startStateServer(peeridstr, conf);
+
+ // System.setProperty("server.port", sprPort);
+ // ConfigurableApplicationContext app = SpringApplication.run(Server.class, args);
+ // StateServerFactory.setAppCxt(app);
+ // app.addApplicationListener(new SpringReadyEvent());
+ // app.start();
- System.setProperty("server.port", sprPort);
- ConfigurableApplicationContext app = SpringApplication.run(Server.class, args);
- app.start();
}
+
+
+
+
+
}
diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
index 4589103..626217a 100644
--- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
+++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
@@ -1,10 +1,13 @@
package com.yuandian.dataflow.controller;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestParam;
+// import org.springframework.http.HttpStatus;
+// import org.springframework.http.MediaType;
+// import org.springframework.http.ResponseEntity;
+// import org.springframework.stereotype.Controller;
+// import org.springframework.web.bind.annotation.GetMapping;
+// import org.springframework.web.bind.annotation.PostMapping;
+// import org.springframework.web.bind.annotation.RequestBody;
+// import org.springframework.web.bind.annotation.RequestParam;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
@@ -19,46 +22,52 @@ import lombok.var;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-@Controller
+// @Controller
public class TaskLog {
- @GetMapping(path = "/test")
- public ResponseEntity Processing() throws InterruptedException, RemotingException {
- // var ws = new WorkerState(new PeerId());
- // StateServerFactory.getStateServer().updateFsmWorkerState(ws);
- Response response = new Response();
- StateServerFactory.getStateServer().useFsmState((fsmState)->{
- log.debug(fsmState.toString() );
- log.debug( StateServerFactory.getNode().getLeaderId().toString() );
- response.Message = fsmState.toString();
- return null;
- });
- response.Code = HttpStatus.OK;
- return new ResponseEntity(response, HttpStatus.OK);
- }
+ // @PostMapping(path = "/test", produces={MediaType.APPLICATION_JSON_VALUE})
+ // public ResponseEntity Processing(@RequestBody String Count) throws InterruptedException, RemotingException {
+ // // var ws = new WorkerState(new PeerId());
+ // // StateServerFactory.getStateServer().updateFsmWorkerState(ws);
+ // log.debug("{}", Count);
+ // Response response = new Response();
+ // synchronized(response) {
+ // StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{
+ // log.debug("http: {}",StateServerFactory.getServerId() );
+ // response.Message = fsmState.toString();
+ // response.notify();
+ // return null;
+ // });
+ // }
+
+ // response.Code = HttpStatus.OK;
+ // return new ResponseEntity(response, HttpStatus.OK);
+ // }
- @GetMapping(path = "/test2")
- public ResponseEntity MongodbTest() {
- Response response = new Response();
- StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{
- log.debug("{} {}", fsmState.toString());
- // response.Message = fsmState.toString();
- return null;
- });
+ // @GetMapping(path = "/test2")
+ // public ResponseEntity MongodbTest() {
+ // Response response = new Response();
+ // StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{
+ // log.debug("{} {}", fsmState.toString());
+ // // response.Message = fsmState.toString();
+ // return null;
+ // });
- return new ResponseEntity(response, HttpStatus.OK);
- }
+ // return new ResponseEntity(response, HttpStatus.OK);
+ // }
- @GetMapping(path = "/test3")
- public ResponseEntity RemoveLeader() {
- Response response = new Response();
- try {
- StateServerFactory.getNode().shutdown();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ // @GetMapping(path = "/test3")
+ // public ResponseEntity RemoveLeader() {
+ // Response response = new Response();
+ // try {
+ // StateServerFactory.getNode().shutdown();
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // response.Message = e.getMessage();
+ // return new ResponseEntity(response, HttpStatus.INTERNAL_SERVER_ERROR);
+ // }
- return new ResponseEntity(response, HttpStatus.OK);
- }
+ // return new ResponseEntity(response, HttpStatus.OK);
+ // }
}
diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
index 45ce4ea..74b3a8b 100644
--- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
+++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
@@ -28,8 +28,8 @@ import com.yuandian.dataflow.proto.msgtype.*;
import io.grpc.ManagedChannelBuilder;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.*;
-import org.springframework.web.client.RestTemplate;
+// import org.springframework.http.*;
+// import org.springframework.web.client.RestTemplate;
/**
* description
@@ -121,15 +121,15 @@ public class CollectPackets extends CollectPacketsServerImplBase {
var result = p.unpack(BacktrackingFlowOuterClass.BacktrackingFlow.class);
System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
- RestTemplate client = new RestTemplate();
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
+ // RestTemplate client = new RestTemplate();
+ // HttpHeaders headers = new HttpHeaders();
+ // headers.setContentType(MediaType.APPLICATION_JSON);
- HttpEntity