From 182d38595f714a5f7fc5728aef6fc3b9d134e3c1 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Wed, 20 Jul 2022 18:27:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4Spring=20Boot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- assembly.xml | 44 ++++++++ pom.xml | 103 +++++++++++++++++- .../java/com/yuandian/dataflow/Server.java | 53 ++++----- .../yuandian/dataflow/controller/TaskLog.java | 89 ++++++++------- .../dataflow/grpc/CollectPackets.java | 18 +-- .../com/yuandian/dataflow/projo/Response.java | 4 +- .../dataflow/statemachine/StateMachine.java | 43 ++++---- .../statemachine/StateServerFactory.java | 88 +++------------ .../statemachine/state/StateFactory.java | 78 +++++++++++++ .../statemachine/state/WorkerState.java | 13 ++- src/main/resources/logback.xml | 12 +- .../java/com/yuandian/dataflow/AppTest.java | 2 +- start.sh | 9 +- 13 files changed, 363 insertions(+), 193 deletions(-) create mode 100644 assembly.xml create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java diff --git a/assembly.xml b/assembly.xml new file mode 100644 index 0000000..d0a0aa8 --- /dev/null +++ b/assembly.xml @@ -0,0 +1,44 @@ + + + bin + + dir + tar.gz + + false + + + true + lib + runtime + + + + + unix + 0755 + + bin/** + config/** + + + **/src/** + **/target/** + **/.*/** + + + + + + + lib/ + false + + + lib + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 384412f..b6954ae 100644 --- a/pom.xml +++ b/pom.xml @@ -21,8 +21,8 @@ 2.3.0 1.32.3 1.7.36 - 1.3.10 - 2.7.0 + 1.3.11 + 2.7.1 3.12.11 2.1.0 1.30 @@ -47,6 +47,20 @@ ${yuandian.common.config.version} + + + + + ch.qos.logback + logback-classic + 1.2.11 + + + + ch.qos.logback + logback-core + 1.2.11 + org.slf4j @@ -54,6 +68,27 @@ ${slf4j.version} + + + + + + + + + + + + + org.yaml @@ -77,11 +112,11 @@ - + @@ -214,7 +249,7 @@ --> - + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + com.yuandian.dataflow.Server + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + make-assembly + package + + single + + + + + + assembly.xml + + + + + + + + + org.apache.maven.plugins diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 6258617..86232a4 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -1,26 +1,10 @@ package com.yuandian.dataflow; -import java.io.File; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.SpringBootConfiguration; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; -import org.springframework.context.ConfigurableApplicationContext; - -import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.JRaftUtils; -import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.RaftGroupService; import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.option.NodeOptions; -import com.yuandian.dataflow.statemachine.SyncClosure; -import com.yuandian.dataflow.statemachine.StateMachine; import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -33,31 +17,38 @@ import lombok.extern.slf4j.Slf4j; * */ @Slf4j -@SpringBootApplication(exclude = {MongoAutoConfiguration.class}) -@SpringBootConfiguration public class Server { - - public static void main(String[] args) throws Exception { - + public static String peeridstr; + public static String sprPort; + public static Configuration conf ; + public static void main(String[] args) throws Exception { + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] sprPeers = new String[]{"3440","3441","3442"}; - var peeridstr = peers[ Integer.parseInt(args[0] )]; - var sprPort = sprPeers[Integer.parseInt(args[0] )]; - + var peeridx = Integer.parseInt(args[0]); + var peeridstr = peers[ peeridx ]; + var sprPort = sprPeers[ peeridx ]; // var peeridstr = peers[2]; // var sprPort = sprPeers[2]; - + log.info("{} {}", peeridstr, sprPort); - Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); - - StateServerFactory.initStateServer(peeridstr, conf); + conf = JRaftUtils.getConfiguration(String.join(",", peers)); + StateServerFactory.startStateServer(peeridstr, conf); + + // System.setProperty("server.port", sprPort); + // ConfigurableApplicationContext app = SpringApplication.run(Server.class, args); + // StateServerFactory.setAppCxt(app); + // app.addApplicationListener(new SpringReadyEvent()); + // app.start(); - System.setProperty("server.port", sprPort); - ConfigurableApplicationContext app = SpringApplication.run(Server.class, args); - app.start(); } + + + + + } diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 4589103..626217a 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -1,10 +1,13 @@ package com.yuandian.dataflow.controller; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestParam; +// import org.springframework.http.HttpStatus; +// import org.springframework.http.MediaType; +// import org.springframework.http.ResponseEntity; +// import org.springframework.stereotype.Controller; +// import org.springframework.web.bind.annotation.GetMapping; +// import org.springframework.web.bind.annotation.PostMapping; +// import org.springframework.web.bind.annotation.RequestBody; +// import org.springframework.web.bind.annotation.RequestParam; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; @@ -19,46 +22,52 @@ import lombok.var; import lombok.extern.slf4j.Slf4j; @Slf4j -@Controller +// @Controller public class TaskLog { - @GetMapping(path = "/test") - public ResponseEntity Processing() throws InterruptedException, RemotingException { - // var ws = new WorkerState(new PeerId()); - // StateServerFactory.getStateServer().updateFsmWorkerState(ws); - Response response = new Response(); - StateServerFactory.getStateServer().useFsmState((fsmState)->{ - log.debug(fsmState.toString() ); - log.debug( StateServerFactory.getNode().getLeaderId().toString() ); - response.Message = fsmState.toString(); - return null; - }); - response.Code = HttpStatus.OK; - return new ResponseEntity(response, HttpStatus.OK); - } + // @PostMapping(path = "/test", produces={MediaType.APPLICATION_JSON_VALUE}) + // public ResponseEntity Processing(@RequestBody String Count) throws InterruptedException, RemotingException { + // // var ws = new WorkerState(new PeerId()); + // // StateServerFactory.getStateServer().updateFsmWorkerState(ws); + // log.debug("{}", Count); + // Response response = new Response(); + // synchronized(response) { + // StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{ + // log.debug("http: {}",StateServerFactory.getServerId() ); + // response.Message = fsmState.toString(); + // response.notify(); + // return null; + // }); + // } + + // response.Code = HttpStatus.OK; + // return new ResponseEntity(response, HttpStatus.OK); + // } - @GetMapping(path = "/test2") - public ResponseEntity MongodbTest() { - Response response = new Response(); - StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{ - log.debug("{} {}", fsmState.toString()); - // response.Message = fsmState.toString(); - return null; - }); + // @GetMapping(path = "/test2") + // public ResponseEntity MongodbTest() { + // Response response = new Response(); + // StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{ + // log.debug("{} {}", fsmState.toString()); + // // response.Message = fsmState.toString(); + // return null; + // }); - return new ResponseEntity(response, HttpStatus.OK); - } + // return new ResponseEntity(response, HttpStatus.OK); + // } - @GetMapping(path = "/test3") - public ResponseEntity RemoveLeader() { - Response response = new Response(); - try { - StateServerFactory.getNode().shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } + // @GetMapping(path = "/test3") + // public ResponseEntity RemoveLeader() { + // Response response = new Response(); + // try { + // StateServerFactory.getNode().shutdown(); + // } catch (Exception e) { + // e.printStackTrace(); + // response.Message = e.getMessage(); + // return new ResponseEntity(response, HttpStatus.INTERNAL_SERVER_ERROR); + // } - return new ResponseEntity(response, HttpStatus.OK); - } + // return new ResponseEntity(response, HttpStatus.OK); + // } } diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index 45ce4ea..74b3a8b 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -28,8 +28,8 @@ import com.yuandian.dataflow.proto.msgtype.*; import io.grpc.ManagedChannelBuilder; import lombok.var; import lombok.extern.slf4j.Slf4j; -import org.springframework.http.*; -import org.springframework.web.client.RestTemplate; +// import org.springframework.http.*; +// import org.springframework.web.client.RestTemplate; /** * description @@ -121,15 +121,15 @@ public class CollectPackets extends CollectPacketsServerImplBase { var result = p.unpack(BacktrackingFlowOuterClass.BacktrackingFlow.class); System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); - RestTemplate client = new RestTemplate(); - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); + // RestTemplate client = new RestTemplate(); + // HttpHeaders headers = new HttpHeaders(); + // headers.setContentType(MediaType.APPLICATION_JSON); - HttpEntity requestEntity = new HttpEntity<>(JsonFormat.printer().print(result), headers); - String url = "http://localhost:3440/test"; - ResponseEntity response = client.exchange(url, HttpMethod.POST, requestEntity, Response.class); + // HttpEntity requestEntity = new HttpEntity<>(JsonFormat.printer().print(result), headers); + // String url = "http://localhost:3440/test"; + // ResponseEntity response = client.exchange(url, HttpMethod.POST, requestEntity, Response.class); - System.out.println("result:" + response.getBody()); + // System.out.println("result:" + response.getBody()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } diff --git a/src/main/java/com/yuandian/dataflow/projo/Response.java b/src/main/java/com/yuandian/dataflow/projo/Response.java index 8936a92..05bfe65 100644 --- a/src/main/java/com/yuandian/dataflow/projo/Response.java +++ b/src/main/java/com/yuandian/dataflow/projo/Response.java @@ -1,12 +1,12 @@ package com.yuandian.dataflow.projo; -import org.springframework.http.HttpStatus; +import org.apache.http.HttpStatus; import com.fasterxml.jackson.annotation.JsonProperty; public class Response { @JsonProperty("code") - public HttpStatus Code; + public org.apache.http.HttpStatus Code; @JsonProperty("message") public String Message; @JsonProperty("data") diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 90eddb4..f4e6595 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -9,7 +9,7 @@ import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.boot.SpringApplication; + import com.alipay.remoting.exception.CodecException; import com.alipay.remoting.serialization.SerializerManager; @@ -34,6 +34,7 @@ import com.yuandian.dataflow.statemachine.rpc.ResponseSM; import com.yuandian.dataflow.statemachine.rpc.RequestCondition; import com.yuandian.dataflow.statemachine.rpc.RequestState; import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.StateFactory; import com.yuandian.dataflow.statemachine.state.WorkerState; import lombok.var; @@ -76,9 +77,7 @@ public class StateMachine extends StateMachineAdapter { * Returns current value. 读取修改都在这个函数域内进行 */ public void useState(Function dofunc) { - synchronized(this.state) { - dofunc.apply(this.state); - } + dofunc.apply(this.state); } /** @@ -87,7 +86,7 @@ public class StateMachine extends StateMachineAdapter { * @throws InterruptedException */ public void updateState(Function dofunc) throws InterruptedException, RemotingException { - synchronized(this.state) { + var newstate = dofunc.apply(this.state); var ss = StateServerFactory.getStateServer(); if(!isLeader()) { @@ -109,7 +108,7 @@ public class StateMachine extends StateMachineAdapter { colsure.setValue(newstate); StateServerFactory.getStateServer().applyState(newstate, colsure); } - } + } @@ -123,9 +122,9 @@ public class StateMachine extends StateMachineAdapter { // parsing. var closure = (SyncClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure); - synchronized(this.state) { + this.state = closure.getValue(); - } + closure.success(state); closure.run(Status.OK()); @@ -174,6 +173,9 @@ public class StateMachine extends StateMachineAdapter { state.getWorkers().put(ws.peerId, ws); return state; }); + if(!StateFactory.getMasterExecute().isAlive()) { + StateFactory.getMasterExecute().start(); + } } catch (InterruptedException e) { e.printStackTrace(); } catch (RemotingException e) { @@ -188,6 +190,10 @@ public class StateMachine extends StateMachineAdapter { this.leaderTerm.set(-1); super.onLeaderStop(status); + if(StateFactory.getMasterExecute().isAlive()) { + StateFactory.getMasterExecute().interrupt(); + } + try { updateState((state)->{ state.getWorkers().remove( StateServerFactory.getServerId() ); @@ -206,27 +212,19 @@ public class StateMachine extends StateMachineAdapter { @Override public void onShutdown() { log.debug("onShutdown"); - try { - updateState((state)->{ - state.getWorkers().remove( StateServerFactory.getServerId() ); - return state; - }); - - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (RemotingException e) { - e.printStackTrace(); - } - super.onShutdown(); - } @Override public void onStartFollowing(LeaderChangeContext ctx) { - log.debug("{} {}", ctx, StateServerFactory.getCluster().getServerId()); + log.debug("[onStartFollowing] {} {}", ctx, StateServerFactory.getCluster().getServerId()); try { + + if(StateFactory.getMasterExecute().isAlive()) { + StateFactory.getMasterExecute().interrupt(); + } + var ss = StateServerFactory.getStateServer(); var ws = new WorkerState(ss.getCluster().getServerId()); var request = new RequestCondition(); @@ -248,7 +246,6 @@ public class StateMachine extends StateMachineAdapter { @Override public void onConfigurationCommitted(Configuration conf) { - // TODO Auto-generated method stub super.onConfigurationCommitted(conf); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 5fc2b2e..d7d28ab 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -13,7 +13,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Function; -import org.springframework.context.ConfigurableApplicationContext; + import com.alipay.remoting.NamedThreadFactory; import com.alipay.remoting.exception.CodecException; @@ -60,23 +60,20 @@ import lombok.extern.slf4j.Slf4j; public class StateServerFactory { private static StateServer ss; - private static ConfigurableApplicationContext appCxt; - public static void initStateServer(String peerstr, Configuration conf) throws Exception { + private static String myPeerStr; + private static Configuration raftConf; + + public static void startStateServer(String peerstr, Configuration conf) throws Exception { if(ss != null) { throw new Exception("重复初始化 InitStateServer"); } ss = new StateServerFactory.StateServer(peerstr, conf); - log.debug("init peerid {}", ss.node.getNodeId().getPeerId()); } - public static void setAppCxt(ConfigurableApplicationContext cxt) { - appCxt = cxt; - } + - public static ConfigurableApplicationContext getAppCxt() { - return appCxt; - } + public static PeerId getServerId() { @@ -114,15 +111,15 @@ public class StateServerFactory { private Executor readIndexExecutor = createReadIndexExecutor(); public StateServer(String addr, Configuration conf) { - String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; - String[] sprPeers = new String[]{"3440","3441","3442"}; + // String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + // String[] sprPeers = new String[]{"3440","3441","3442"}; // var peeridstr = peers[Integer.parseInt(serverId)]; // var sprPort = sprPeers[Integer.parseInt(args[0])]; // String groupId = "jraft"; - // Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); + // conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); PeerId serverId = JRaftUtils.getPeerId(addr); int port = serverId.getPort(); @@ -146,76 +143,17 @@ public class StateServerFactory { cluster.getRpcServer().registerProcessor(new SyncConditionProcessor()); node = cluster.start(); - - rpcClient = new BoltRaftRpcFactory().createRpcClient(); rpcClient.init(new CliOptions()); } + + public boolean isLeader() { return this.fsm.isLeader(); } - - /** - * 同步 可以使用follow使用, 但是可能延迟于leader. 只读 - * @param dofunc - */ - public void useFsmState(Function dofunc) { - - SyncClosure closure = new SyncClosure() { - @Override - public void run(Status status) { - - } - }; - - getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { - @Override - public void run(Status status, long index, byte[] reqCtx) { - - getFsm().useState((fsmState)->{ - if(status.isOk()){ - closure.success(fsmState); - closure.run(Status.OK()); - synchronized(dofunc) { - dofunc.apply(fsmState); - } - return null; - } - - readIndexExecutor.execute(() -> { - if(isLeader()){ - log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", getNode()); - applyState(fsmState, closure); - }else { - handlerNotLeaderError(closure); - } - }); - return null; - }); - - synchronized(dofunc) { - // log.debug("dofunc notify {}", getNode()); - dofunc.notify(); - } - } - }); - - try { - synchronized(dofunc) { - // log.debug("dofunc wait"); - dofunc.wait(5000); - // log.debug("dofunc unwait"); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - return ; - - } - public void useFsmStateAsync(Function dofunc) { SyncClosure closure = new SyncClosure() { @@ -402,7 +340,7 @@ public class StateServerFactory { } - useFsmState((fsmState)->{ + useFsmStateAsync((fsmState)->{ var wmap = fsmState.getWorkers(); var wstate = wmap.get(state.getPeerId()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java new file mode 100644 index 0000000..6f49539 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java @@ -0,0 +1,78 @@ +/** + * description + * + * @author eson + *2022年7月20日-10:00:05 + */ +package com.yuandian.dataflow.statemachine.state; + + +import com.alibaba.nacos.api.cmdb.pojo.Entity; +import com.yuandian.dataflow.statemachine.StateServerFactory; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.var; +import lombok.extern.slf4j.Slf4j; + +/** + * description + * + * @author eson + *2022年7月20日-10:00:05 + */ +@Slf4j +@Getter +@Setter +@ToString +public class StateFactory { + + public static Thread masterExecute = new Thread(new Runnable() { + @Override + public void run() { + try { + while(true) { + log.debug("master execute {}", StateServerFactory.getServerId()); + var alivePeers = StateServerFactory.getCluster().getRaftNode().listAlivePeers(); + log.debug("master execute {}", StateServerFactory.getCluster().getRaftNode().listAlivePeers()); + if(alivePeers != null) { + synchronized(alivePeers) { + StateServerFactory.getStateServer().useFsmStateAsync((state)->{ + alivePeers.forEach((peer)->{ + var ws = state.getWorkers().get(peer); + if(ws != null) { + var cap = 10000 - ws.getTaskQueueSize(); + if(cap > 0) { + log.debug("{}", cap); + + + } + } + }); + alivePeers.notify(); + return null; + }); + } + + } + + Thread.sleep(2000); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + }); + + public static Thread getMasterExecute() { + return masterExecute; + } + + public static void Init() { + + } + + +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java index 2d46d94..5e16daf 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java @@ -7,6 +7,7 @@ package com.yuandian.dataflow.statemachine.state; import java.io.Serializable; +import java.time.Instant; import com.alipay.sofa.jraft.entity.PeerId; @@ -26,9 +27,19 @@ import lombok.ToString; public class WorkerState implements Serializable { private static final long serialVersionUID = -1L; - // 节点的对应peerID + + /** + * 节点的对应peerID + */ public PeerId peerId; + /** + * 任务队列的数量 + */ public long taskQueueSize; + /** + * 更新时间 + */ + public Instant updateAt; public WorkerState(PeerId pid) { this.peerId = pid; diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index f3f6718..85b4e0c 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -10,15 +10,21 @@ DEBUG ACCEPT DENY + + + + ERROR + ACCEPT + DENY --> - + - + \ No newline at end of file diff --git a/src/test/java/com/yuandian/dataflow/AppTest.java b/src/test/java/com/yuandian/dataflow/AppTest.java index 0d6d478..1117839 100644 --- a/src/test/java/com/yuandian/dataflow/AppTest.java +++ b/src/test/java/com/yuandian/dataflow/AppTest.java @@ -21,7 +21,7 @@ import javax.swing.text.AbstractDocument.BranchElement; import org.bson.Document; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.springframework.expression.spel.ast.FunctionReference; +// import org.springframework.expression.spel.ast.FunctionReference; import com.mongodb.MongoClient; import com.mongodb.client.model.InsertManyOptions; diff --git a/start.sh b/start.sh index 748c796..366b3f1 100755 --- a/start.sh +++ b/start.sh @@ -2,11 +2,14 @@ screen -S raft-0 -X quit screen -S raft-1 -X quit screen -S raft-2 -X quit + +version=1.0.0-SNAPSHOT + sleep 1 -screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0 -screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1 -screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2 +screen -dmS raft-0 -L java -cp target/dataflow-${version}.jar-bin/lib/*jar -jar target/dataflow-${version}.jar 0 +screen -dmS raft-1 -L java -cp target/dataflow-${version}.jar-bin/lib/*jar -jar target/dataflow-${version}.jar 1 +screen -dmS raft-2 -L java -cp target/dataflow-${version}.jar-bin/lib/*jar -jar target/dataflow-${version}.jar 2 sleep 1