Merge branch 'develop' into 'master'

最新例子

See merge request testdemo/dataflow!2
This commit is contained in:
黄思敏 2022-07-28 08:27:31 +00:00
commit 7113475224
42 changed files with 1663 additions and 15694 deletions

4
.gitignore vendored
View File

@ -79,7 +79,7 @@ local.properties
.classpath
# Annotation Processing
.apt_generated
.apt_generated*
.sts4-cache/
@ -202,4 +202,4 @@ README.html
raftdata
screenlog.*
screenlog.*

1
.gitmodules vendored
View File

@ -1,3 +1,4 @@
[submodule "src/main/proto"]
path = src/main/proto
url = http://git.yuandian.com/project/proto/dataflow
branch = v1.0.3

19
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,19 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "Launch Server",
"request": "launch",
"mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow",
"args": ["2"],
"preLaunchTask": "restart",
"postDebugTask": "stopall",
}
]
}

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}

34
.vscode/tasks.json vendored Normal file
View File

@ -0,0 +1,34 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "restart",
"type": "shell",
"command": "sh restart.sh",
"isBackground": false,
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "new",
"showReuseMessage": true,
"clear": false,
"close": true
},
},
{
"label": "stopall",
"type": "shell",
"command": "sh stop.sh",
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "shared",
"close": true
},
}
]
}

44
assembly.xml Normal file
View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<assembly>
<id>bin</id>
<formats>
<format>dir</format>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<useProjectArtifact>true</useProjectArtifact>
<outputDirectory>lib</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
<fileSets>
<!-- 文件 -->
<fileSet>
<fileMode>0664</fileMode>
<directory>${project.build.directory}</directory>
<outputDirectory></outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
<moduleSets>
<moduleSet>
<binaries>
<outputDirectory>${project.directory}</outputDirectory>
<unpack>false</unpack>
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
</dependencySet>
</dependencySets>
</binaries>
</moduleSet>
</moduleSets>
</assembly>

125
pom.xml
View File

@ -11,9 +11,9 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>8</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<java.version>11</java.version>
<protobuf.version>3.20.1</protobuf.version>
<protostuff.version>1.7.4</protostuff.version>
@ -21,12 +21,12 @@
<ratis.version>2.3.0</ratis.version>
<grpc.version>1.32.3</grpc.version>
<slf4j.version>1.7.36</slf4j.version>
<jraft.version>1.3.10</jraft.version>
<spring.boot.version>2.7.0</spring.boot.version>
<mongo.driver.version>3.12.11</mongo.driver.version>
<jraft.version>1.3.11</jraft.version>
<spring.boot.version>2.7.1</spring.boot.version>
<mongo.driver.version>4.7.0</mongo.driver.version>
<nacos.version>2.1.0</nacos.version>
<snakeyaml.version>1.30</snakeyaml.version>
<logback.version>1.2.11</logback.version>
<yuandian.common.config.version>1.0.4</yuandian.common.config.version>
</properties>
@ -47,13 +47,27 @@
<version>${yuandian.common.config.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
@ -69,19 +83,28 @@
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongo.driver.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongo.driver.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reflections/reflections -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.10.2</version>
</dependency>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependency> -->
<!-- protobuf 依赖 -->
@ -128,13 +151,14 @@
<version>${grpc.version}</version>
</dependency>
<!-- proto自动生成java文件所需的编译插件 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
</dependencies>
<scm>
@ -191,12 +215,13 @@
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<pluginId>grpc-java</pluginId>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
@ -212,21 +237,77 @@
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.yuandian.dataflow.Server</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>make-assembly</id>
<phase>assembly</phase>
<goals>
<goal>repackage</goal>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<mainClass>com.yuandian.dataflow.Server</mainClass>
</manifest>
</archive>
<descriptors>
<descriptor>assembly.xml</descriptor>
</descriptors>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>

4
restart.sh Executable file
View File

@ -0,0 +1,4 @@
#! /bin/bash
sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0
sh start.sh

View File

@ -1,50 +1,15 @@
package com.yuandian.dataflow;
import com.yuandian.common.Config;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import lombok.val;
import lombok.var;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.io.File;
import java.nio.ByteBuffer;
import com.alipay.remoting.rpc.protocol.RpcResponseProcessor;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.RaftServiceFactory;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.IteratorImpl;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.core.ReplicatorGroupImpl;
import com.alipay.sofa.jraft.entity.NodeId;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.CliClientService;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcRequestProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
import lombok.extern.slf4j.Slf4j;
@ -53,67 +18,41 @@ import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
* Hello world!
*
*/
@SpringBootApplication
@SpringBootConfiguration
@Slf4j
public class Server {
@Autowired
public static Node node;
public static RaftClosure done;
public static Node GetNode() {
return node;
}
public static RaftClosure GetDone() {
return done;
}
public static String peeridstr;
public static String sprPort;
public static Configuration conf ;
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
var peeridx = Integer.parseInt(args[0]);
var peeridstr = peers[ peeridx ];
// var peeridstr = peers[2];
// var sprPort = sprPeers[2];
log.info("{} {}", peeridstr, sprPort);
conf = JRaftUtils.getConfiguration(String.join(",", peers));
StateFactory.startStateServer(peeridstr, conf);
/*String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
var peeridstr = peers[ Integer.parseInt(args[0] ) ];
var sprPort = sprPeers[Integer.parseInt(args[0] )];
String groupId = "jraft";
Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(peeridstr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port) );
System.out.print(RaftDataFile.mkdirs());
nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) );
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
nodeOptions.setFsm(new StateMachine());
RaftGroupService cluster = new RaftGroupService(groupId, serverId, nodeOptions);
node = cluster.start();
done = new RaftClosure();
System.setProperty("server.port", sprPort);*/
System.setProperty("server.port", "3440");
var app = SpringApplication.run(Server.class, args);
app.start();
// node.shutdown(done);
// System.setProperty("server.port", sprPort);
// ConfigurableApplicationContext app = SpringApplication.run(Server.class, args);
// StateServerFactory.setAppCxt(app);
// app.addApplicationListener(new SpringReadyEvent());
// app.start();
}
}

View File

@ -0,0 +1,106 @@
/**
* description
*
* @author eson
*2022年7月21日-13:48:01
*/
package com.yuandian.dataflow.controller;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
* 2022年7月21日-13:48:01
*/
@Slf4j
@ProcessorRaft
public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRequest> {
@Setter
@Getter
public static class PacketsRequest implements Serializable {
private ArrayList<Any> packets = new ArrayList<>();
}
public static Random rand = new Random();
@Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
var resp = new RaftResponse<>();
resp.setSuccess(true);
rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
try {
log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
// TODO: request.packets 入库,回填, 告警 等操作
Thread.sleep(ThreadLocalRandom .current().nextLong(100, 3000));
} catch (InterruptedException e) {
log.info(e.toString());
} finally { // 确保 更新 最终的任务状态给master.
// 读状态 Closure<State> 里的 getValue<State> State的状态
StateFactory.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
if (!status.isOk()) {
log.error("失败 readIndexState {}", status);
}
// readIndexState 失败后也需要直接 更新自己状态
var state = this.getValue(); // 获取返回的状态
var ws = state.getWorkers().get(StateFactory.getServerId());
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
ws.setUpdateAt(Instant.now()); // 设置更新时间
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
new GenericClosure<Operate>() {
@Override
public void run(Status status) {
if (!status.isOk()) {
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
}
}
});
}
});
}
;
}
@Override
public String interest() {
return PacketsRequest.class.getName();
}
}

View File

@ -1,75 +0,0 @@
package com.yuandian.dataflow.controller;
import java.nio.ByteBuffer;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.Task;
import com.google.protobuf.util.JsonFormat;
import com.yuandian.dataflow.Server;
import com.yuandian.dataflow.grpc.MongodbTest;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ObjectUtils.Null;
import org.apache.commons.logging.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import com.alipay.sofa.jraft.Node;
@Slf4j
@Controller
public class TaskLog {
// private static final Logger log = LoggerFactory.getLogger(TaskLog.class);
private static Node node = Server.GetNode();
@PostMapping(path = "/test")
public ResponseEntity<Response> Processing(@RequestBody String json) {
/*Task task = new Task();
log.error(node.toString());
RaftClosure done = new RaftClosure();
task.setData(ByteBuffer.wrap("hello".getBytes()));
task.setDone(done);
Server.GetNode().apply(task);*/
try {
// 1类型转换
BacktrackingFlowOuterClass.BacktrackingFlow.Builder builder = BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder();
JsonFormat.parser().merge(json, builder);
BacktrackingFlowOuterClass.BacktrackingFlow backtrackingFlow = builder.build();
// 2业务处理
// 3数据保存到 mongoDB
MongodbTest.insertMsgToMongoDB(backtrackingFlow);
} catch (Exception e) {
e.printStackTrace();
}
Response response = new Response();
response.Code = HttpStatus.OK;
response.Message = HttpStatus.OK.toString();
return new ResponseEntity<Response>(response, HttpStatus.OK);
}
@GetMapping(path = "/test2")
public ResponseEntity<Response> MongodbTest(@RequestBody int status) {
Response response = new Response();
return new ResponseEntity<Response>(response, HttpStatus.OK);
}
}

View File

@ -0,0 +1,42 @@
package com.yuandian.dataflow.controller;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 例子 强制转换leader
*/
@Slf4j
@ProcessorRaft
public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> {
@Setter
@Getter
public static class LeaderRequest implements Serializable {
PeerId peer;
}
@Override
public void handleRequest(RpcContext rpcCtx, LeaderRequest request) {
Status status = StateFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer);
rpcCtx.sendResponse(status);
log.debug("[TransferLeader] {} change leader to {}", status, request.peer);
}
@Override
public String interest() {
return LeaderRequest.class.getName();
}
}

View File

@ -16,7 +16,6 @@ import java.util.stream.Collectors;
import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat;
import com.yuandian.common.Config;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
@ -26,10 +25,10 @@ import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServer
import com.yuandian.dataflow.proto.msgtype.*;
import io.grpc.ManagedChannelBuilder;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;
// import org.springframework.http.*;
// import org.springframework.web.client.RestTemplate;
/**
* description
@ -81,7 +80,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis());
}
} catch (Exception e) {
e.printStackTrace();
log.info("{}", e.toString());
} finally {
channel.shutdown().awaitTermination(1, TimeUnit.SECONDS);
}
@ -99,7 +98,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -110,7 +109,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});*/
@ -120,18 +119,18 @@ public class CollectPackets extends CollectPacketsServerImplBase {
try {
var result = p.unpack(BacktrackingFlowOuterClass.BacktrackingFlow.class);
System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// RestTemplate client = new RestTemplate();
// HttpHeaders headers = new HttpHeaders();
// headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate client = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
// HttpEntity<Object> requestEntity = new HttpEntity<>(JsonFormat.printer().print(result), headers);
// String url = "http://localhost:3440/test";
// ResponseEntity<Response> response = client.exchange(url, HttpMethod.POST, requestEntity, Response.class);
HttpEntity<Object> requestEntity = new HttpEntity<>(JsonFormat.printer().print(result), headers);
String url = "http://localhost:3440/test";
ResponseEntity<Response> response = client.exchange(url, HttpMethod.POST, requestEntity, Response.class);
System.out.println("result:" + response.getBody());
// System.out.println("result:" + response.getBody());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -143,7 +142,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -154,7 +153,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -165,7 +164,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -176,7 +175,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -187,7 +186,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -198,7 +197,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -209,7 +208,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -220,7 +219,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});
@ -231,7 +230,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
log.info("{}", e.toString());
}
return null;
});*/

View File

@ -16,19 +16,19 @@
package com.yuandian.dataflow.grpc;
import com.google.protobuf.Any;
import com.yuandian.dataflow.proto.Base;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.google.protobuf.Any;
import com.yuandian.dataflow.proto.Base;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
*/

View File

@ -1,86 +0,0 @@
package com.yuandian.dataflow.grpc;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
public class MongodbTest {
public static <T> void insertMsgToMongoDB(T obj) {
try {
ServerAddress serverAddress = new ServerAddress("192.168.1.113", 27017);
List<ServerAddress> addrs = new ArrayList<>();
addrs.add(serverAddress);
MongoCredential credential = MongoCredential.createScramSha1Credential("yd-rw", "admin", "yuandian123".toCharArray());
List<MongoCredential> credentials = new ArrayList<>();
credentials.add(credential);
MongoClient mongoClient = new MongoClient(addrs, credentials);
MongoDatabase db = mongoClient.getDatabase("yd-base");
// todo 修改名字
MongoCollection<Document> collection = db.getCollection("lxy-test");
collection.insertOne(obj2Doc(obj));
System.err.println("insert success");
} catch (Exception e) {
e.printStackTrace();
}
}
public static <T> Document obj2Doc(T obj) throws Exception {
Document doc = new Document();
Field[] fields = obj.getClass().getDeclaredFields();
for (Field field : fields) {
String varName = field.getName();
boolean accessFlag = field.isAccessible();
if (!accessFlag) {
field.setAccessible(true);
}
Object param = field.get(obj);
if (param == null) {
continue;
} else if (param instanceof Integer) {
int value = ((Integer) param).intValue();
doc.put(varName, value);
} else if (param instanceof String) {
String value = (String) param;
doc.put(varName, value);
} else if (param instanceof Double) {
double value = ((Double) param).doubleValue();
doc.put(varName, value);
} else if (param instanceof Float) {
float value = ((Float) param).floatValue();
doc.put(varName, value);
} else if (param instanceof Long) {
long value = ((Long) param).longValue();
doc.put(varName, value);
} else if (param instanceof Boolean) {
boolean value = ((Boolean) param).booleanValue();
doc.put(varName, value);
}
field.setAccessible(accessFlag);
}
return doc;
}
public static <T> T doc2Obj(Document doc, Class<T> clazz) throws Exception {
T obj = clazz.newInstance();
for (String key : doc.keySet()) {
Field field = clazz.getDeclaredField(key);
field.setAccessible(true);
field.set(obj, doc.get(key));
}
return obj;
}
}

View File

@ -1,137 +0,0 @@
package com.yuandian.dataflow.master;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
// import com.yuandian.dataflow.proto.decode.PacketBase;
// import com.yuandian.dataflow.proto.decode.PacketHeader;
// import com.yuandian.dataflow.proto.decode.utils;
import io.netty.handler.codec.compression.ZlibDecoder;
import lombok.Cleanup;
import lombok.Getter;
import lombok.Setter;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* Header
*/
@Slf4j
public class Header {
public static void main(String[] args) throws Exception {
var addr = new InetSocketAddress("192.168.1.248", 60001);
@Cleanup
var sock = new Socket();
sock.setReceiveBufferSize(10 * 1024 * 1024);// socket接收缓存
sock.setSoTimeout(1000 * 30);
// 设置超时
sock.connect(addr, 10 * 1000);
var in = new DataInputStream(sock.getInputStream());
var out = new DataOutputStream(sock.getOutputStream());
// 发送验证字符串
// out.write("public".getBytes());
// log.error("{}", PacketHeader.PacketCode(in));
// var pheader = new PacketHeader(in);
// log.error("{}", pheader);
// var pbase = PacketBase.createPacketBase(pheader);
// log.error("{}",pbase);
// //60010流需要解压
// byte[] unzipbodydata = null;
// if (pheader.getTableID() == 20) {
// pheader.parseNextHeader_60010(in);
// byte[] zipbodydata = in.readNBytes(pheader.getMsgLen());
// unzipbodydata = utils.Inflate(zipbodydata);
// }
// for (int i = 0; i < pheader.getRecCount(); i++) {
// PacketBase dataBean = null;
// if (pheader.getTableID() >= 22 && pheader.getTableID() <= 24) {
// // 读取具体数据头信息获取前四个字段值第四个字段为整条数据的长度 字段长度分别为 4 1 4 4
// // bodyhead = new byte[13];
// // readTillLength(bodyhead, 13);
// var p1 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var p2 = ByteBuffer.wrap(in.readNBytes(1)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var p3 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var length = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// if (length <= 13) {
// log.error("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]");
// throw new Exception("数据解析异常");
// }
// // 解析数据
// dataBean = pbase.Parse(pheader, ByteBuffer.wrap(in.readNBytes(length - 13)));
// } else if (pheader.getTableID() == 25) {
// var nowtype = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var p2 = ByteBuffer.wrap(in.readNBytes(1)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var p3 = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// var length = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// pheader.setNowType(nowtype);
// if (length <= 13) {
// log.error("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]");
// throw new Exception("数据解析异常");
// }
// // 读取具体数据体信息
// byte[] bodydata = new byte[length - 13];
// readTillLength(bodydata, length - 13);
// // 解析数据
// dataBean = instance.Parse(pheader, bodydata, 0);
// } else if (pheader.getTableID() == 28 || pheader.getTableID() == 29) { //28或29为Apm流统计
// if (pheader.getTableID() == 28) {
// length = ApmBaseDataFlow.SIZE;
// } else {
// length = BasicTrafficFlow.SIZE;
// }
// byte[] bodydata = new byte[length];
// readTillLength(bodydata, length);
// dataBean = instance.Parse(pheader, bodydata, 0);
// } else if (pheader.getTableID() == 17 || pheader.getTableID() == 18) { //18 为网络性能流
// if (pheader.getTableID() == 17) {
// length = AppFlow.SIZE;
// } else if (pheader.getTableID() == 18) {
// length = QoeFlow.SIZE;
// }
// byte[] bodydata = new byte[length];
// readTillLength(bodydata, length);
// dataBean = instance.Parse(pheader, bodydata, 0);
// } else if (pheader.getTableID() == 20) {
// int offset = i * SstFlow.SIZE;
// dataBean = instance.Parse(pheader, unzipbodydata, offset);
// } else {
// logger.info("不需要的数据类型:" + pheader.getTableID());
// break;
// }
// if (dataBean != null) {
// tempBaseDatas.add(dataBean);
// }
// }
}
}

View File

@ -2,45 +2,57 @@ package com.yuandian.dataflow.projo;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.time.LocalDateTime;
import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
import org.bson.Document;
import org.bson.codecs.pojo.annotations.BsonCreator;
import org.bson.codecs.pojo.annotations.BsonDiscriminator;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.codecs.pojo.annotations.BsonProperty;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.util.JSONPObject;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@BsonDiscriminator
@Getter
@Slf4j
@Setter
public final class Doc extends Document {
@Getter
@ToString
public final class Doc {
@JsonProperty("code")
@BsonProperty("code")
public int Code ;
@BsonProperty("retryPackets")
public int retryPackets ;
@JsonProperty("ts")
@BsonProperty("ts")
public LocalDateTime TS;
@BsonProperty("serverResponseTime")
public int serverResponseTime ;
@JsonProperty("desc")
@BsonProperty("desc")
public String Desc;
@BsonProperty("requestBytes")
public int requestBytes ;
@JsonProperty("data")
@BsonProperty("data")
public Document Data;
@BsonProperty("businessName")
public String businessName ;
@BsonProperty("responseIp")
public int responseIp ;
public static void main(String[] args) {
MongoClient mgo = MongoClients.create("mongodb://yuandian:yuandian123@192.168.1.113:27017");
CodecProvider pojoCodecProvider = PojoCodecProvider.builder().register("com.yuandian.dataflow.projo").build();
CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),fromProviders(pojoCodecProvider));
MongoDatabase oriDatabase = mgo.getDatabase("ori-database").withCodecRegistry(pojoCodecRegistry);
MongoCollection<Doc> db = oriDatabase.getCollection("network_performace_flow_2022072400", Doc.class);
log.debug("{}", db.countDocuments( new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(2083478517) )) ));
}
}

View File

@ -1,14 +0,0 @@
package com.yuandian.dataflow.projo;
import org.springframework.http.HttpStatus;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Response {
@JsonProperty("code")
public HttpStatus Code;
@JsonProperty("message")
public String Message;
@JsonProperty("data")
public Object Data;
}

View File

@ -20,7 +20,7 @@ public final class UsrFlowOuterClass {
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>
@ -501,7 +501,7 @@ public final class UsrFlowOuterClass {
private int tableId_;
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>
@ -1793,7 +1793,7 @@ public final class UsrFlowOuterClass {
private int tableId_ ;
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>
@ -1805,7 +1805,7 @@ public final class UsrFlowOuterClass {
}
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>
@ -1820,7 +1820,7 @@ public final class UsrFlowOuterClass {
}
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>

View File

@ -0,0 +1,142 @@
/**
* description
*
* @author eson
*2022年7月20日-10:00:05
*/
package com.yuandian.dataflow.statemachine;
import java.time.Instant;
import java.util.List;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* Master主线程, 用于接收packets
*
* @author eson
* 2022年7月20日-10:00:05
*/
@Slf4j
@Getter
@Setter
@ToString
public class MasterFactory {
public static final int MAX_TASKS = 100;
public static Thread masterExecute = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
log.debug("master({}) execute {}", StateFactory.getServerId(),
StateFactory.getRaftNode().listAlivePeers());
if (alivePeers != null) {
// 读一致性
StateFactory.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
var state = this.getValue();
// log.debug("masterExecute start {} {}", status, alivePeers);
alivePeers.forEach((peer) -> {
if (state == null) {
log.error("readIndexState获取的状态为 {}", state);
return;
}
WorkerState ws = state.getWorkers().get(peer);
if (ws == null) {
log.error("WorkerState获取的状态为 {}", ws);
return;
}
var canTasks = MAX_TASKS - ws.getTaskQueueSize();
log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks);
if (canTasks <= 0) {
return;
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
// 模拟发送包的数据到该节点上
var request = new PacketsRequest();
for (int i = 0; i < canTasks; i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("PacketsRequest run {}", status);
try {
StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err != null) {
// TODO: 如果错误, 需要让节点恢复任务处理的状态
log.debug("{}", err);
}
log.debug("PacketsRequest: {}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
});
}
});
}
Thread.sleep(2000);
}
} catch (InterruptedException e) {
log.info("{}", e.toString());
}
}
});
public static Thread getMasterExecute() {
return masterExecute;
}
public static void Init() {
}
}

View File

@ -1,27 +0,0 @@
package com.yuandian.dataflow.statemachine;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RaftClosure implements Closure {
private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class);
@Override
public void run(Status status) {
LOG.info("Task completed with status"+status.getCode());
LOG.info("Task completed with "+status.getErrorMsg());
LOG.info("Task completed with "+status.getRaftError());
}
// @Override
// public void onCommitted() {
// System.out.println("Task onCommitted");
// }
}

View File

@ -0,0 +1,267 @@
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.reflections.Reflections;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
* 2022年7月12日-13:36:24
*/
@Slf4j
public class StateFactory {
private static StateServer ss;
public static void startStateServer(String peerstr, Configuration conf) throws Exception {
if (ss != null) {
throw new Exception("重复初始化 InitStateServer");
}
ss = new StateFactory.StateServer(peerstr, conf);
}
public static boolean isLeader() {
return ss.node.isLeader();
}
public static PeerId getLeaderId() {
return ss.node.getLeaderId();
}
public static PeerId getServerId() {
return ss.cluster.getServerId();
}
public static Node getNode() {
return ss.node;
}
public static Node getRaftNode() {
return ss.cluster.getRaftNode();
}
public static RpcClient getRpcClient() {
return ss.getRpcClient();
}
public static RaftGroupService getCluster() {
return ss.getCluster();
}
// 获取状态服务的对象
public static StateServer getStateServer() {
return ss;
}
public static void readIndexState(GenericClosure<State> closure) {
ss.readIndexState(closure);
}
public static void applyOperate(Operate op, GenericClosure<Operate> closure) {
ss.applyOperate(op, closure);
}
public static void rpcClientInvokeAsync(final Endpoint endpoint,final Object request,final InvokeCallback callback,final long timeoutMs)
throws InterruptedException, RemotingException {
ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs);
}
public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final long timeoutMs)
throws InterruptedException, RemotingException {
return ss.getRpcClient().invokeSync(endpoint, request, timeoutMs);
}
public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
final long timeoutMs) throws InterruptedException, RemotingException {
return ss.getRpcClient().invokeSync(endpoint, request, ctx, timeoutMs);
}
@Getter
@Setter
public static class StateServer {
RpcClient rpcClient;
private Node node;
private RaftGroupService cluster;
private StateMachine fsm;
private String groupId = "dataflow";
private Executor readIndexExecutor = createReadIndexExecutor();
public StateServer(String addr, Configuration conf) {
// String[] peers = new
// String[]{"localhost:4440","localhost:4441","localhost:4442"};
// String[] sprPeers = new String[]{"3440","3441","3442"};
// var peeridstr = peers[Integer.parseInt(serverId)];
// var sprPort = sprPeers[Integer.parseInt(args[0])];
// String groupId = "jraft";
// conf =
// JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(addr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port));
log.info("mkdirs: {}", RaftDataFile.mkdirs());
nodeOptions.setLogUri(String.format("./raftdata/%d/logs", port));
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
fsm = new StateMachine(); // 状态实例初始化
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
Set<Class<?>> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass) -> {
try {
cluster.getRpcServer()
.registerProcessor((RpcProcessor<?>) pRaftClass.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString());
}
});
node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
}
public boolean isLeader() {
return this.fsm.isLeader();
}
public void readIndexState(GenericClosure<State> closure) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
log.debug("readIndexState({}) {}", getServerId(), status);
if (status.isOk()) {
// 回调失败
closure.success(ss.fsm.getState());
closure.setValue(ss.fsm.getState());
}
closure.run(status);
}
});
}
public void applyOperate(Operate op, GenericClosure<Operate> closure) {
// 所有的提交都必须再leader进行
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
try {
closure.setValue(op);
final Task task = new Task();
task.setData(
ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
task.setDone(closure); // 确认所有数据 一致, 不需要加锁
StateFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.debug(errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
}
public <T> RaftResponse<T> redirect() {
final RaftResponse<T> response = new RaftResponse<T>();
response.setSuccess(false);
if (this.node != null) {
final PeerId leader = this.node.getLeaderId();
if (leader != null) {
response.setRedirect(leader);
}
}
return response;
}
public <T> void handlerNotLeaderError(final GenericClosure<T> closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
private Executor createReadIndexExecutor() {
return ThreadPoolUtil.newBuilder() //
.poolName("ReadIndexPool") //
.enableMetric(true) //
.coreThreads(4) //
.maximumThreads(4) //
.keepAliveSeconds(60L) //
.workQueue(new SynchronousQueue<>()) //
.threadFactory(new NamedThreadFactory("ReadIndexService", true)) //
.rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
.build();
}
}
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000);
log.info("{}", resp);
}
}

View File

@ -1,22 +1,24 @@
package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Utils;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.extern.slf4j.Slf4j;
@ -30,12 +32,13 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StateMachine extends StateMachineAdapter {
// private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class);
// private static final Logger LOG =
// LoggerFactory.getLogger(StateMachine.class);
/**
* Counter value
* State value 全局使用的唯一状态
*/
private final AtomicLong value = new AtomicLong(0);
private State state = new State();
/**
* Leader term
*/
@ -46,27 +49,68 @@ public class StateMachine extends StateMachineAdapter {
}
/**
* Returns current value.
* Returns current value. 只有Get 操作状态由协议流程决定 Apply
*/
public long getValue() {
return this.value.get();
public State getState() {
return state;
}
@Override
@SuppressWarnings("unchecked")
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
Operate op = null;
GenericClosure<Operate> closure = null;
if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
log.error("done:%1$s",iter.getData().toString());
// leader可以直接从 回调closure里提取operate
closure = (GenericClosure<Operate>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
op = closure.getValue();
} else {
// Have to parse FetchAddRequest from this user log.
log.error("null:%1$s",iter.getData().toString());
// 非leader 需要从getData反序列化出来后处理
final ByteBuffer data = iter.getData();
try {
op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(),
Operate.class.getName());
} catch (CodecException e) {
log.info("{}", e.toString());
}
}
if (op == null) {
log.error("op 为 {}. 存在错误, 可能版本不一致", op);
continue;
}
switch (op.getType()) {
case PUT_WORKERSTATE:
WorkerState ws = op.getValue();
log.debug("PUT {}", ws.peerId);
state.getWorkers().put(ws.peerId, ws);
if (closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
case REMOVE:
if (closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
default:
break;
}
iter.next();
}
}
@ -78,26 +122,119 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onError(final RaftException e) {
log.error("Raft error: {}", e, e);
log.debug("Raft error: {}", e, e);
}
@Override
public boolean onSnapshotLoad(final SnapshotReader reader) {
return true;
}
@Override
public void onLeaderStart(final long term) {
log.debug("onLeaderStart {}", StateFactory.getServerId());
this.leaderTerm.set(term);
super.onLeaderStart(term);
// 判断是否Master线程还在跑, 如果存在则中断
if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
}
StateFactory.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
var ws = state.getWorkers().get(StateFactory.getServerId());
if (ws == null) {
ws = new WorkerState(StateFactory.getServerId());
}
StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.debug("master update workerstate: {}", status);
}
});
}
});
// 当成为master时候 必须启动
MasterFactory.getMasterExecute().start();
super.onLeaderStart(term);
}
@Override
public void onLeaderStop(final Status status) {
log.debug("onLeaderStop {}", StateFactory.getCluster().getServerId());
this.leaderTerm.set(-1);
// 判断是否Master线程还在跑, 如果存在则中断
if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
}
super.onLeaderStop(status);
}
@Override
public void onShutdown() {
log.debug("onShutdown");
super.onShutdown();
}
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
log.debug("[onStartFollowing] {} {}", ctx, StateFactory.getCluster().getServerId());
try {
// 判断是否Master线程还在跑, 如果存在则中断
if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
}
var ws = new WorkerState(StateFactory.getServerId());
log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId());
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("onStartFollowing CallOperate {} {}", status, this.getResponse());
}
});
return;
} catch (Exception e) {
log.info("{}", e.toString());
}
super.onStartFollowing(ctx);
}
@Override
public void onConfigurationCommitted(Configuration conf) {
super.onConfigurationCommitted(conf);
}
@Override
public void onStopFollowing(LeaderChangeContext ctx) {
log.debug("{} {}", ctx, StateFactory.getCluster().getServerId());
var ws = new WorkerState(StateFactory.getServerId());
log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId());
var op = new Operate(OperateType.PUT_WORKERSTATE, ws);
Operate.CallOperate(op, new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());
}
});
super.onStopFollowing(ctx);
}
}

View File

@ -0,0 +1,51 @@
package com.yuandian.dataflow.statemachine.closure;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
@ToString
public abstract class GenericClosure<T> implements Closure {
// 状态机的统一响应
private RaftResponse<T> response;
// 代表任务状态
private T value;
public GenericClosure() {
}
/**
* 错误的时候返回错误信息. 自动装配response
* @param errorMsg
* @param redirect
*/
public void failure(final String errorMsg, final PeerId redirect) {
final RaftResponse<T> response = new RaftResponse<T>();
response.setSuccess(false);
response.setMsg(errorMsg);
response.setRedirect(redirect);
setResponse(response);
}
/**
* 成功时调用该方法. 自动装配response
* @param value
*/
public void success(final T value) {
final RaftResponse<T> response = new RaftResponse<T>();
response.setValue(value);
response.setSuccess(true);
setResponse(response);
}
}

View File

@ -0,0 +1,97 @@
package com.yuandian.dataflow.statemachine.operate;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* 操作
*
* @author eson
*/
@Slf4j
@Data
public class Operate implements Serializable {
public static enum OperateType {
/**
* 同步WorkerState状态.
*/
PUT_WORKERSTATE,
/**
* 暂无想法
*/
REMOVE;
}
private OperateType type;
private Object value;
public Operate(OperateType t, WorkerState ws) {
this.type = t;
this.value = ws;
}
@java.lang.SuppressWarnings("unchecked")
public <T> T getValue() {
return (T) this.value;
};
public <T> void setValue(T value) {
this.value = value;
return;
};
/**
* 调用操作设置
* @param op 传入的操作类
* @param closure 回调函数. Operate为返回值
*/
@java.lang.SuppressWarnings("unchecked")
public static void CallOperate(Operate op, GenericClosure<Operate> closure) {
log.debug("CallOperate Value {}", op.<WorkerState>getValue());
var ss = StateFactory.getStateServer();
// 如果是leader 就直接提交
if (StateFactory.isLeader()) {
ss.applyOperate(op, closure);
return;
}
// 非leader 转发请求 统一有leader处理
var request = new OperateProcessor.OperateRequest();
request.setOperate(op);
var leaderId = StateFactory.getLeaderId();
try {
ss.getRpcClient().invokeAsync(leaderId.getEndpoint(),
request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
log.debug("Object result {}", result);
var resp = (RaftResponse<Operate>) result;
closure.setResponse(resp);
closure.success(resp.getValue());
closure.run(Status.OK());
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
closure.failure(e.getMessage(), null);
closure.run(new Status(100000, "invokeAsync fail"));
log.info("{}", e.toString());
}
}
}

View File

@ -0,0 +1,85 @@
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
@Slf4j
@ProcessorRaft
public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRequest> {
/**
* 同步状态时, 需要用的结构类. 新增的状态可以在 Operate结构里添加
*
* @author eson
*2022年7月11日-16:01:07
*/
@Getter
@Setter
@ToString
public static class OperateRequest implements Serializable {
private static final long serialVersionUID = 1L;
private Operate operate;
}
@Override
public void handleRequest(RpcContext rpcCtx, OperateRequest request) {
log.info("request: {}", request);
final GenericClosure<Operate> closure = new GenericClosure<Operate>() {
@Override
public void run(Status status) {
if(status.isOk()) {
log.info("{}", status);
rpcCtx.sendResponse(getResponse());
return;
}
if(status.getRaftError() == RaftError.EPERM) {
//TODO: Not leader 需要转发
log.info("{}", status);
}
}
};
StateFactory.getStateServer().applyOperate(request.getOperate(), closure);
}
@Override
public String interest() {
return OperateRequest.class.getName();
}
}

View File

@ -0,0 +1,41 @@
/**
* description
*
* @author eson
*2022年7月13日-09:07:22
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月13日-09:07:22
*/
@Slf4j
@Getter
@Setter
@ToString
public class RaftResponse<T> implements Serializable {
private static final long serialVersionUID = 1L;
private T value;
private boolean success;
/**
* redirect peer id
*/
private PeerId redirect;
private String msg;
}

View File

@ -0,0 +1,23 @@
/**
* description
*
* @author eson
*2022年7月21日-14:27:49
*/
package com.yuandian.dataflow.statemachine.rpc.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* raft自定注册到全局唯一raft服务上
*
* @author eson
*2022年7月21日-14:27:49
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface ProcessorRaft {
}

View File

@ -0,0 +1,39 @@
/**
* description
*
* @author eson
*2022年7月13日-09:11:26
*/
package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.HashMap;
/**
* 代表任务状态 暂时全局使用这个结构. 添加新增状态
*
* @author eson
*2022年7月13日-09:11:26
*/
@Slf4j
@Getter
@Setter
@ToString
public class State implements Serializable {
private static final long serialVersionUID = -1L;
private HashMap<PeerId,WorkerState> workers = new HashMap<>();
}

View File

@ -0,0 +1,53 @@
/**
* description
*
* @author eson
*2022年7月15日-10:04:00
*/
package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable;
import java.time.Instant;
import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* WorkerState 每个节点进行任务操作的状态, 用于同步状态机
*
* @author eson
*2022年7月15日-10:04:00
*/
@Getter
@Setter
@ToString
public class WorkerState implements Serializable {
private static final long serialVersionUID = -1L;
/**
* 节点的对应peerID
*/
public PeerId peerId;
/**
* 任务队列的数量
*/
public long taskQueueSize;
/**
* 更新时间
*/
public Instant updateAt;
/**
* 初始化 并构造 updateAt时间
* @param peer 传入当前服务的peer
*/
public WorkerState(PeerId peer) {
this.peerId = peer;
this.updateAt = Instant.now();
}
}

View File

@ -0,0 +1,8 @@
package com.yuandian.dataflow.utils;
public class Utils {
public static void main(String[] args) {
}
}

View File

@ -3,11 +3,28 @@
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{yyyyMMdd HH:mm:ss.SSS} %-5level%thread\(%file:%line\): %msg%n
%d{yyyy-MM-dd HH:mm:ss.SSS} %red(%level) %cyan(%thread\(%file:%line\)): %msg%n
</pattern>
</encoder>
<!-- <filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter> -->
</appender>
<root level="INFO">
<!-- <root level="debug">
<appender-ref ref="CONSOLE" />
</root>
</root> -->
<logger name="com.yuandian.dataflow" level="debug|info">
<appender-ref ref="CONSOLE"/>
</logger>
</configuration>

View File

@ -2,36 +2,13 @@ package com.yuandian.dataflow;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.text.AbstractDocument.BranchElement;
import org.bson.Document;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.expression.spel.ast.FunctionReference;
import com.mongodb.MongoClient;
import com.mongodb.client.model.InsertManyOptions;
import com.yuandian.dataflow.projo.Doc;
import io.netty.handler.codec.dns.DatagramDnsQuery;
import lombok.Cleanup;
import lombok.Getter;
import lombok.Setter;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -39,8 +16,10 @@ import lombok.extern.slf4j.Slf4j;
*/
@DisplayName("AppTest")
@Slf4j
public class AppTest {
@FunctionalInterface
public interface FuncReturn {
public float Execute();
@ -121,75 +100,77 @@ public class AppTest {
}
}
@Test
public void Mongodb() throws InterruptedException {
// @Test
// public void Mongodb() throws InterruptedException {
ArrayList<Thread> execs = new ArrayList<>();
// ArrayList<Thread> execs = new ArrayList<>();
final Metric metric = new Metric();
metric.start();
for (int c = 0; c < 10; c++) {
Thread exec = new Thread(() -> {
// final Metric metric = new Metric();
// metric.start();
// for (int c = 0; c < 10; c++) {
// Thread exec = new Thread(() -> {
@Cleanup
MongoClient mgo = new MongoClient("localhost", 27017);
// @Cleanup
// MongoClient mgo = MongoClients.create("mongodb://yuandian:yuandian123@192.168.1.113:27017");
log.info("msg");
// log.info("msg");
long LoopNumber = 5;
long BatchSize = 20000;
// long LoopNumber = 5;
// long BatchSize = 20000;
var db = mgo.getDatabase("yuandian");
var cltdoc = db.getCollection("doc");
// var db = mgo.getDatabase("yuandian");
// var cltdoc = db.getCollection("doc");
for (int n = 0; n < LoopNumber; n++) {
// for (int n = 0; n < LoopNumber; n++) {
metric.push(() -> {
// metric.push(() -> {
List<Doc> documents = new ArrayList<>();
Random r = new Random();
// List<Doc> documents = new ArrayList<>();
// Random r = new Random();
for (int i = 0; i < BatchSize; i++) {
// for (int i = 0; i < BatchSize; i++) {
var doc = new Doc();
var datadoc = new Document();
// var doc = new Doc();
// var datadoc = new Document();
doc.append("code", r.nextInt(100));
doc.append("desc", "desc");
doc.append("ts", Instant.now());
// doc.append("code", r.nextInt(100));
// doc.append("desc", "desc");
// doc.append("ts", Instant.now());
for (int ii = 0; ii < 24; ii++) {
UUID uid = UUID.randomUUID();
datadoc
.append(uid.toString(), uid.toString());
}
// for (int ii = 0; ii < 24; ii++) {
// UUID uid = UUID.randomUUID();
// datadoc
// .append(uid.toString(), uid.toString());
// }
doc.append("data", datadoc);
documents.add(doc);
}
// doc.append("data", datadoc);
// documents.add(doc);
// }
var opt = new InsertManyOptions();
cltdoc.insertMany(documents, opt);
return BatchSize;
});
}
});
exec.start();
execs.add(exec);
}
;
// var opt = new InsertManyOptions();
// cltdoc.insertMany(documents, opt);
// return BatchSize;
// });
// }
// });
// exec.start();
// execs.add(exec);
// }
// ;
execs.forEach((e) -> {
try {
e.join();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
// execs.forEach((e) -> {
// try {
// e.join();
// } catch (InterruptedException e1) {
// e1.printStackTrace();
// }
// });
metric.close();
// metric.close();
}
// }

View File

@ -0,0 +1,79 @@
package com.yuandian.dataflow;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MongodbTest {
public static <T> void insertMsgToMongoDB(T obj) {
// try {
// ServerAddress serverAddress = new ServerAddress("192.168.1.113", 27017);
// List<ServerAddress> addrs = new ArrayList<>();
// addrs.add(serverAddress);
// MongoCredential credential = MongoCredential.createScramSha1Credential("yd-rw", "admin", "yuandian123".toCharArray());
// List<MongoCredential> credentials = new ArrayList<>();
// credentials.add(credential);
// MongoClient mongoClient = new MongoClient(addrs, credentials);
// MongoDatabase db = mongoClient.getDatabase("yd-base");
// // todo 修改名字
// MongoCollection<Document> collection = db.getCollection("lxy-test");
// collection.insertOne(obj2Doc(obj));
// System.err.println("insert success");
// } catch (Exception e) {
// log.info("{}", e.toString());
// }
// }
// public static <T> Document obj2Doc(T obj) throws Exception {
// Document doc = new Document();
// Field[] fields = obj.getClass().getDeclaredFields();
// for (Field field : fields) {
// String varName = field.getName();
// boolean accessFlag = field.isAccessible();
// if (!accessFlag) {
// field.setAccessible(true);
// }
// Object param = field.get(obj);
// if (param == null) {
// continue;
// } else if (param instanceof Integer) {
// int value = ((Integer) param).intValue();
// doc.put(varName, value);
// } else if (param instanceof String) {
// String value = (String) param;
// doc.put(varName, value);
// } else if (param instanceof Double) {
// double value = ((Double) param).doubleValue();
// doc.put(varName, value);
// } else if (param instanceof Float) {
// float value = ((Float) param).floatValue();
// doc.put(varName, value);
// } else if (param instanceof Long) {
// long value = ((Long) param).longValue();
// doc.put(varName, value);
// } else if (param instanceof Boolean) {
// boolean value = ((Boolean) param).booleanValue();
// doc.put(varName, value);
// }
// field.setAccessible(accessFlag);
// }
// return doc;
// }
// public static <T> T doc2Obj(Document doc, Class<T> clazz) throws Exception {
// T obj = clazz.newInstance();
// for (String key : doc.keySet()) {
// Field field = clazz.getDeclaredField(key);
// field.setAccessible(true);
// field.set(obj, doc.get(key));
// }
// return obj;
}
}

View File

@ -0,0 +1,62 @@
package com.yuandian.dataflow.statemachine;
import org.junit.jupiter.api.Test;
import com.alipay.sofa.jraft.error.RemotingException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StateMachineTest {
@Test
void testOnApply() throws InterruptedException, RemotingException {
// var rpcClient = new BoltRaftRpcFactory().createRpcClient();
// rpcClient.init(new CliOptions());
// var fstate = new State();
// var fdata = new RequestOperate();
// fdata.setOperate(fstate);
// var leader = new Endpoint("localhost",4441);
// RaftResponse resp = (RaftResponse)rpcClient.invokeSync(leader, fdata
// , 5000);
// log.info("{}", resp);
// if( resp != null && !resp.isSuccess() ) {
// leader = resp.getRedirect().getEndpoint();
// resp = (RaftResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata
// , 5000);
// log.info("{}", resp);
// }
// int i = 0 ;
// while(true) {
// var state = new State();
// var request = new RequestOperate(); // 创建请求
// request.setState(state); // 添加请求的参数
// var wstate = state.getWorkers();
// // state.getWorker().setPeerId( PeerId.parsePeer("localhost:2222") );
// // state.getWorker().setTaskQueueSize(i);
// var pi = i ;
// i++;
// if (i >= 1000) {
// break;
// }
// rpcClient.invokeAsync(leader, request, new InvokeCallback() {
// @Override
// public void complete(Object result, Throwable err) {
// // ResponseSM resp = (ResponseSM)result;
// log.info("{} {} {}", result, err, pi);
// }
// @Override
// public Executor executor() {
// return null;
// }
// } , 5000);
// }
}
}

View File

@ -1,9 +1,17 @@
#! /bin/bash
screen -S raft-0 -X quit
screen -S raft-1 -X quit
screen -S raft-2 -X quit
# screen -S raft-2 -X quit
sleep 2
sleep 1s
screen -dmS raft-0 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 2
VERSION=1.0.0-SNAPSHOT
screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1
# screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2
sleep 0.5s
screen -S raft-0 -X logfile flush 0
screen -S raft-1 -X logfile flush 0
# screen -S raft-2 -X logfile flush 0

View File

@ -1,3 +1,5 @@
#! /bin/bash
screen -S raft-0 -X quit
screen -S raft-1 -X quit
screen -S raft-2 -X quit
exit 0