初步完成

This commit is contained in:
eson 2022-06-01 18:43:01 +08:00
parent ff849d0737
commit 10e599b276
8 changed files with 130 additions and 2396 deletions

4
.gitignore vendored
View File

@ -199,3 +199,7 @@ README.html
*.iml
.idea
.exercism
raftdata
screenlog.*

47
pom.xml
View File

@ -22,6 +22,8 @@
<grpc.version>1.46.0</grpc.version>
<slf4j.version>1.7.36</slf4j.version>
<jraft.version>1.3.10</jraft.version>
<spring.boot.web>2.7.0</spring.boot.web>
<gson.version>2.9.0</gson.version>
</properties>
<dependencies>
@ -64,29 +66,15 @@
<version>${ratis.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.web}</version>
</dependency>
<!-- protobuf 依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>${protostuff.version}</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>${protostuff.version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
@ -94,10 +82,15 @@
<version>${javax.annotation.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<!-- 添加grpc相关依赖包 -->
<dependency>
<!-- <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
@ -116,7 +109,7 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependency> -->
<!-- proto自动生成java文件所需的编译插件 -->
@ -161,6 +154,18 @@
</execution>
</executions> -->
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,14 +1,17 @@
package com.yuandian.dataflow;
import com.yuandian.dataflow.rpc.DataFlowGrpc;
import com.yuandian.dataflow.rpc.DataFlowGrpc.DataFlowImplBase;
import com.yuandian.dataflow.rpc.Scheduler.Response;
import com.yuandian.dataflow.rpc.Scheduler.State;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.io.File;
import java.nio.ByteBuffer;
import com.alipay.remoting.rpc.protocol.RpcResponseProcessor;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.JRaftUtils;
@ -26,8 +29,10 @@ import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.CliClientService;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcRequestProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
@ -36,74 +41,61 @@ import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
import io.grpc.stub.StreamObserver;
/**
* Hello world!
*
*/
public class Server
{
public class ServerImpl extends DataFlowImplBase {
@SpringBootApplication
public class Server {
@Override
public void update(State request, StreamObserver<Response> responseObserver) {
// TODO Auto-generated method stub
super.update(request, responseObserver);
public static void main(String[] args) {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
}
}
public static void main( String[] args )
{
var peeridstr = peers[ Integer.parseInt(args[0] ) ];
var sprPort = sprPeers[Integer.parseInt(args[0] )];
String groupId = "jraft";
// Endpoint addr = new Endpoint("localhost", 8080);
// String s = addr.toString(); // 结果为 localhost:8080
// PeerId peer = new PeerId();
// boolean success = peer.parse(s);
Endpoint addr = JRaftUtils.getEndPoint("localhost:8080");
PeerId serverId = JRaftUtils.getPeerId("localhost:8080");
Configuration conf = JRaftUtils.getConfiguration("localhost:8081,localhost:8080");
// //IteratorImpl it = new IteratorImpl();
Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(peeridstr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port) );
System.out.print(RaftDataFile.mkdirs());
nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) );
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
nodeOptions.setFsm(new StateMachine());
RaftGroupService cluster = new RaftGroupService(groupId, serverId, nodeOptions);
Node node = cluster.start();
Closure done = new RaftClosure();
Task task = new Task();
task.setData(ByteBuffer.wrap("Hello! This is a sample message to test raft..".getBytes()));
task.setData(ByteBuffer.wrap("hello".getBytes()));
task.setDone(done);
NodeOptions opts = new NodeOptions();
opts.setElectionTimeoutMs(1000);
opts.setLogUri("logs");
opts.setRaftMetaUri("raftMeta");
opts.setSnapshotUri("snapshots");
opts.setFsm(new StateMachine());
opts.setInitialConf(conf);
NodeImpl node = (NodeImpl) RaftServiceFactory.createRaftNode(groupId, serverId);
NodeId nodeId = node.getNodeId();
NodeManager.getInstance().addAddress(serverId.getEndpoint());
BoltRpcServer rpcServer = (BoltRpcServer) RaftRpcServerFactory.createAndStartRaftRpcServer(serverId.getEndpoint());
RaftGroupService cluster = new RaftGroupService(groupId, serverId, opts);
//RpcServer rpcServer2 = cluster.getRpcServer();
//Node node2 = cluster.start();
BoltRaftRpcFactory boltRaftRpcFactory = new BoltRaftRpcFactory();
RpcClient boltRpcClient = boltRaftRpcFactory.createRpcClient();
//boltRpcClient.registerConnectEventListener(null);
//boltRpcClient.invokeSync(addr, s, 0)
ReplicatorGroupOptions replicatorGroupOptions = new ReplicatorGroupOptions();
RaftOptions raftOptions = new RaftOptions();
replicatorGroupOptions.setRaftOptions(raftOptions);
replicatorGroupOptions.setNode(node);
ReplicatorGroupImpl replicatorGroupImpl = new ReplicatorGroupImpl();
replicatorGroupImpl.init(nodeId, replicatorGroupOptions);
boltRpcClient.registerConnectEventListener(replicatorGroupImpl);
node.apply(task);
System.setProperty("server.port", sprPort);
SpringApplication.run(Server.class, args);
node.shutdown(done);
}
}

View File

@ -0,0 +1,25 @@
package com.yuandian.dataflow.controller;
import com.google.gson.JsonObject;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@Controller
public class TaskLog {
@PostMapping(path = "/test", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<JsonObject> greeting(@RequestBody JsonObject input) {
String message = (String) input.get("message").getAsString();
System.out.println(message);
JsonObject response = new JsonObject();
response.addProperty("status", "success");
return new ResponseEntity<JsonObject>(response, HttpStatus.OK);
}
}

View File

@ -1,276 +0,0 @@
package com.yuandian.dataflow.rpc;
import static io.grpc.MethodDescriptor.generateFullMethodName;
/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.46.0)",
comments = "Source: scheduler.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class DataFlowGrpc {
private DataFlowGrpc() {}
public static final String SERVICE_NAME = "com.yuandian.dataflow.rpc.DataFlow";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<com.yuandian.dataflow.rpc.Scheduler.State,
com.yuandian.dataflow.rpc.Scheduler.Response> getUpdateMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "Update",
requestType = com.yuandian.dataflow.rpc.Scheduler.State.class,
responseType = com.yuandian.dataflow.rpc.Scheduler.Response.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<com.yuandian.dataflow.rpc.Scheduler.State,
com.yuandian.dataflow.rpc.Scheduler.Response> getUpdateMethod() {
io.grpc.MethodDescriptor<com.yuandian.dataflow.rpc.Scheduler.State, com.yuandian.dataflow.rpc.Scheduler.Response> getUpdateMethod;
if ((getUpdateMethod = DataFlowGrpc.getUpdateMethod) == null) {
synchronized (DataFlowGrpc.class) {
if ((getUpdateMethod = DataFlowGrpc.getUpdateMethod) == null) {
DataFlowGrpc.getUpdateMethod = getUpdateMethod =
io.grpc.MethodDescriptor.<com.yuandian.dataflow.rpc.Scheduler.State, com.yuandian.dataflow.rpc.Scheduler.Response>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "Update"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.rpc.Scheduler.State.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.rpc.Scheduler.Response.getDefaultInstance()))
.setSchemaDescriptor(new DataFlowMethodDescriptorSupplier("Update"))
.build();
}
}
}
return getUpdateMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static DataFlowStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<DataFlowStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<DataFlowStub>() {
@java.lang.Override
public DataFlowStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new DataFlowStub(channel, callOptions);
}
};
return DataFlowStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static DataFlowBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<DataFlowBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<DataFlowBlockingStub>() {
@java.lang.Override
public DataFlowBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new DataFlowBlockingStub(channel, callOptions);
}
};
return DataFlowBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static DataFlowFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<DataFlowFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<DataFlowFutureStub>() {
@java.lang.Override
public DataFlowFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new DataFlowFutureStub(channel, callOptions);
}
};
return DataFlowFutureStub.newStub(factory, channel);
}
/**
*/
public static abstract class DataFlowImplBase implements io.grpc.BindableService {
/**
*/
public void update(com.yuandian.dataflow.rpc.Scheduler.State request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.rpc.Scheduler.Response> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getUpdateMethod(), responseObserver);
}
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getUpdateMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
com.yuandian.dataflow.rpc.Scheduler.State,
com.yuandian.dataflow.rpc.Scheduler.Response>(
this, METHODID_UPDATE)))
.build();
}
}
/**
*/
public static final class DataFlowStub extends io.grpc.stub.AbstractAsyncStub<DataFlowStub> {
private DataFlowStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected DataFlowStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new DataFlowStub(channel, callOptions);
}
/**
*/
public void update(com.yuandian.dataflow.rpc.Scheduler.State request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.rpc.Scheduler.Response> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getUpdateMethod(), getCallOptions()), request, responseObserver);
}
}
/**
*/
public static final class DataFlowBlockingStub extends io.grpc.stub.AbstractBlockingStub<DataFlowBlockingStub> {
private DataFlowBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected DataFlowBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new DataFlowBlockingStub(channel, callOptions);
}
/**
*/
public com.yuandian.dataflow.rpc.Scheduler.Response update(com.yuandian.dataflow.rpc.Scheduler.State request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getUpdateMethod(), getCallOptions(), request);
}
}
/**
*/
public static final class DataFlowFutureStub extends io.grpc.stub.AbstractFutureStub<DataFlowFutureStub> {
private DataFlowFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected DataFlowFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new DataFlowFutureStub(channel, callOptions);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<com.yuandian.dataflow.rpc.Scheduler.Response> update(
com.yuandian.dataflow.rpc.Scheduler.State request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getUpdateMethod(), getCallOptions()), request);
}
}
private static final int METHODID_UPDATE = 0;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final DataFlowImplBase serviceImpl;
private final int methodId;
MethodHandlers(DataFlowImplBase serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_UPDATE:
serviceImpl.update((com.yuandian.dataflow.rpc.Scheduler.State) request,
(io.grpc.stub.StreamObserver<com.yuandian.dataflow.rpc.Scheduler.Response>) responseObserver);
break;
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
}
private static abstract class DataFlowBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
DataFlowBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return com.yuandian.dataflow.rpc.Scheduler.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("DataFlow");
}
}
private static final class DataFlowFileDescriptorSupplier
extends DataFlowBaseDescriptorSupplier {
DataFlowFileDescriptorSupplier() {}
}
private static final class DataFlowMethodDescriptorSupplier
extends DataFlowBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final String methodName;
DataFlowMethodDescriptorSupplier(String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (DataFlowGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new DataFlowFileDescriptorSupplier())
.addMethod(getUpdateMethod())
.build();
}
}
}
return result;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
server.port=3440

9
start.sh Normal file
View File

@ -0,0 +1,9 @@
screen -S raft-0 -X quit
screen -S raft-1 -X quit
screen -S raft-2 -X quit
sleep 2
screen -dmS raft-0 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0-SNAPSHOT.jar 2