Query调试通

This commit is contained in:
huangsimin 2022-08-08 00:22:43 +08:00
parent f2c14d9bd6
commit 67dd6f4da2
19 changed files with 802 additions and 833 deletions

13
.vscode/launch.json vendored
View File

@ -11,18 +11,11 @@
"mainClass": "com.yuandian.dataflow.statemachine.client.CounterClient",
"projectName": "dataflow"
},
{
"type": "java",
"name": "Launch Utils",
"request": "launch",
"mainClass": "com.yuandian.dataflow.utils.Utils",
"projectName": "dataflow"
},
{
"type": "java",
"name": "Raft-0",
"request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
"mainClass": "com.yuandian.dataflow.statemachine.StateServer",
"projectName": "dataflow",
"console": "integratedTerminal",
"args": [
@ -40,7 +33,7 @@
"type": "java",
"name": "Raft-1",
"request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
"mainClass": "com.yuandian.dataflow.statemachine.StateServer",
"projectName": "dataflow",
"console": "integratedTerminal",
"args": [
@ -58,7 +51,7 @@
"type": "java",
"name": "Raft-2",
"request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
"mainClass": "com.yuandian.dataflow.statemachine.StateServer",
"projectName": "dataflow",
"console": "integratedTerminal",
"args": [

View File

@ -300,7 +300,7 @@
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.yuandian.dataflow.statemachine.CounterServer</mainClass>
<mainClass>com.yuandian.dataflow.statemachine.StateServer</mainClass>
</manifest>
</archive>
@ -344,7 +344,7 @@
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<mainClass>com.yuandian.dataflow.statemachine.CounterServer</mainClass>
<mainClass>com.yuandian.dataflow.statemachine.StateServer</mainClass>
</manifest>
</archive>
<descriptors>

View File

@ -16,13 +16,13 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.master.MasterContext;
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
import com.yuandian.dataflow.statemachine_old.operate.Operate;
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import com.yuandian.dataflow.utils.PacketsManager;
@ -32,7 +32,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@MasterRegister
public class MasterProcessor implements MasterExecute {@Override
public class MasterProcessor implements MasterExecute {
@Override
public void loop(MasterContext cxt) {
// TODO Auto-generated method stub

View File

@ -17,14 +17,14 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest;
import com.yuandian.dataflow.proto.Processor.ProcessResponse;
import com.yuandian.dataflow.proto.Processor.PacketsRequest;
import com.yuandian.dataflow.proto.Processor.Response;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.Operate;
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine_old.state.State;
@ -41,18 +41,19 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@WorkerRegister
public class PacketsProcessor extends ProcessorServerImplBase {@Override
public void getPackets(PacketsProcessorRequest request, StreamObserver<ProcessResponse> responseObserver) {
// TODO Auto-generated method stub
@GrpcProcessor
public class PacketsProcessor extends ProcessorServerImplBase {
@Override
public void packetsProcessor(PacketsRequest request, StreamObserver<Response> responseObserver) {
request.getPacketsList();
super.getPackets(request, responseObserver);
responseObserver.onNext( Response.newBuilder().build() );
responseObserver.onCompleted();
}
// @Setter
// @Getter
// public static class PacketsRequest implements Serializable {

View File

@ -6,8 +6,8 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
import lombok.Getter;
import lombok.Setter;
@ -18,7 +18,7 @@ import lombok.extern.slf4j.Slf4j;
* 例子 强制转换leader
*/
@Slf4j
@WorkerRegister
@GrpcProcessor
public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> {
@Setter

File diff suppressed because it is too large Load Diff

View File

@ -24,38 +24,38 @@ public final class ProcessorServerGrpc {
private ProcessorServerGrpc() {}
public static final String SERVICE_NAME = "dataflow.ProcessorServer";
public static final String SERVICE_NAME = "com.yuandian.dataflow.proto.ProcessorServer";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.ProcessResponse> getGetPacketsMethod;
private static volatile io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsRequest,
com.yuandian.dataflow.proto.Processor.Response> getPacketsProcessorMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "GetPackets",
requestType = com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.class,
responseType = com.yuandian.dataflow.proto.Processor.ProcessResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.ProcessResponse> getGetPacketsMethod() {
io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest, com.yuandian.dataflow.proto.Processor.ProcessResponse> getGetPacketsMethod;
if ((getGetPacketsMethod = ProcessorServerGrpc.getGetPacketsMethod) == null) {
fullMethodName = SERVICE_NAME + '/' + "PacketsProcessor",
requestType = com.yuandian.dataflow.proto.Processor.PacketsRequest.class,
responseType = com.yuandian.dataflow.proto.Processor.Response.class,
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
public static io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsRequest,
com.yuandian.dataflow.proto.Processor.Response> getPacketsProcessorMethod() {
io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsRequest, com.yuandian.dataflow.proto.Processor.Response> getPacketsProcessorMethod;
if ((getPacketsProcessorMethod = ProcessorServerGrpc.getPacketsProcessorMethod) == null) {
synchronized (ProcessorServerGrpc.class) {
if ((getGetPacketsMethod = ProcessorServerGrpc.getGetPacketsMethod) == null) {
ProcessorServerGrpc.getGetPacketsMethod = getGetPacketsMethod =
io.grpc.MethodDescriptor.<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest, com.yuandian.dataflow.proto.Processor.ProcessResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "GetPackets"))
if ((getPacketsProcessorMethod = ProcessorServerGrpc.getPacketsProcessorMethod) == null) {
ProcessorServerGrpc.getPacketsProcessorMethod = getPacketsProcessorMethod =
io.grpc.MethodDescriptor.<com.yuandian.dataflow.proto.Processor.PacketsRequest, com.yuandian.dataflow.proto.Processor.Response>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "PacketsProcessor"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance()))
com.yuandian.dataflow.proto.Processor.PacketsRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.proto.Processor.ProcessResponse.getDefaultInstance()))
.setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("GetPackets"))
com.yuandian.dataflow.proto.Processor.Response.getDefaultInstance()))
.setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("PacketsProcessor"))
.build();
}
}
}
return getGetPacketsMethod;
return getPacketsProcessorMethod;
}
/**
@ -108,20 +108,20 @@ public final class ProcessorServerGrpc {
/**
*/
public void getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessResponse> responseObserver) {
asyncUnimplementedUnaryCall(getGetPacketsMethod(), responseObserver);
public void packetsProcessor(com.yuandian.dataflow.proto.Processor.PacketsRequest request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.Response> responseObserver) {
asyncUnimplementedUnaryCall(getPacketsProcessorMethod(), responseObserver);
}
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getGetPacketsMethod(),
asyncUnaryCall(
getPacketsProcessorMethod(),
asyncServerStreamingCall(
new MethodHandlers<
com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.ProcessResponse>(
this, METHODID_GET_PACKETS)))
com.yuandian.dataflow.proto.Processor.PacketsRequest,
com.yuandian.dataflow.proto.Processor.Response>(
this, METHODID_PACKETS_PROCESSOR)))
.build();
}
}
@ -142,10 +142,10 @@ public final class ProcessorServerGrpc {
/**
*/
public void getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessResponse> responseObserver) {
asyncUnaryCall(
getChannel().newCall(getGetPacketsMethod(), getCallOptions()), request, responseObserver);
public void packetsProcessor(com.yuandian.dataflow.proto.Processor.PacketsRequest request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.Response> responseObserver) {
asyncServerStreamingCall(
getChannel().newCall(getPacketsProcessorMethod(), getCallOptions()), request, responseObserver);
}
}
@ -165,9 +165,10 @@ public final class ProcessorServerGrpc {
/**
*/
public com.yuandian.dataflow.proto.Processor.ProcessResponse getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) {
return blockingUnaryCall(
getChannel(), getGetPacketsMethod(), getCallOptions(), request);
public java.util.Iterator<com.yuandian.dataflow.proto.Processor.Response> packetsProcessor(
com.yuandian.dataflow.proto.Processor.PacketsRequest request) {
return blockingServerStreamingCall(
getChannel(), getPacketsProcessorMethod(), getCallOptions(), request);
}
}
@ -184,17 +185,9 @@ public final class ProcessorServerGrpc {
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerFutureStub(channel, callOptions);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<com.yuandian.dataflow.proto.Processor.ProcessResponse> getPackets(
com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) {
return futureUnaryCall(
getChannel().newCall(getGetPacketsMethod(), getCallOptions()), request);
}
}
private static final int METHODID_GET_PACKETS = 0;
private static final int METHODID_PACKETS_PROCESSOR = 0;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
@ -213,9 +206,9 @@ public final class ProcessorServerGrpc {
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_GET_PACKETS:
serviceImpl.getPackets((com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) request,
(io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessResponse>) responseObserver);
case METHODID_PACKETS_PROCESSOR:
serviceImpl.packetsProcessor((com.yuandian.dataflow.proto.Processor.PacketsRequest) request,
(io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.Response>) responseObserver);
break;
default:
throw new AssertionError();
@ -278,7 +271,7 @@ public final class ProcessorServerGrpc {
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new ProcessorServerFileDescriptorSupplier())
.addMethod(getGetPacketsMethod())
.addMethod(getPacketsProcessorMethod())
.build();
}
}

View File

@ -0,0 +1,123 @@
package com.yuandian.dataflow.statemachine;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.StringUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
public class Operate implements Message,Serializable {
private int a = 2;
public static enum OperateType {
/**
* 同步WorkerState状态.
*/
PUT_WORKERSTATE,
/**
* 分配packets
*/
ALLOCATE_PACKETS,
/**
* 暂无想法
*/
REMOVE;
}
private OperateType type;
private Object value;
public Object getValue() {
return this.value;
};
public void setValue(Object value) {
this.value = value;
return;
};
public Operate(OperateType t) {
this.type = t;
}
public Operate(OperateType t, Object value) {
this.type = t;
this.value = value;
}
public Message toMessage() {
try {
var output = ByteString.newOutput();
var outputStream = new ObjectOutputStream(output);
outputStream.writeObject( this);
outputStream.close();
output.close();
// var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() );
// var inObject = new ObjectInputStream(inBytes);
// var a = (Operate)inObject.readObject();
// log.info("applyTransaction {}", a);
return Message.valueOf(output.toByteString());
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public ByteString getContent() {
try {
var output = ByteString.newOutput();
var outputStream = new ObjectOutputStream(output);
outputStream.writeObject( this);
outputStream.close();
output.close();
// var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() );
// var inObject = new ObjectInputStream(inBytes);
// var a = (Operate)inObject.readObject();
// log.info("applyTransaction {}", a);
return output.toByteString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -0,0 +1,52 @@
package com.yuandian.dataflow.statemachine;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import javax.servlet.http.PushBuilder;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
public class Query implements Message,Serializable {
public static enum Type {
/**
* 同步WorkerState状态.
*/
GET_WORKER_STATE,
}
private Type type;
private Object value;
public Query(Type t, WorkerState ws) {
this.type = t;
this.value = ws;
}
@Override
public ByteString getContent() {
try {
var output = ByteString.newOutput();
var outputStream = new ObjectOutputStream(output);
outputStream.writeObject(this);
outputStream.close();
output.close();
return output.toByteString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -18,6 +18,7 @@
package com.yuandian.dataflow.statemachine;
import org.apache.http.entity.InputStreamEntity;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.protocol.Message;
@ -36,7 +37,7 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import com.yuandian.dataflow.statemachine_old.operate.Operate;
import com.yuandian.dataflow.statemachine.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
@ -44,10 +45,12 @@ import lombok.extern.slf4j.Slf4j;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
@ -65,7 +68,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* will be handled by {@code applyTransaction}.
*/
@Slf4j
public class CounterStateMachine extends BaseStateMachine {
public class StateMachine extends BaseStateMachine {
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
@ -136,9 +139,10 @@ public class CounterStateMachine extends BaseStateMachine {
storage.getSnapshotFile(last.getTerm(), last.getIndex());
//serialize the counter object and write it into the snapshot file
try (ObjectOutputStream out = new ObjectOutputStream(
new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
try {
ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotFile)));
out.writeObject(counter);
out.close();
} catch (IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
+ "\", last applied index=" + last);
@ -209,15 +213,38 @@ public class CounterStateMachine extends BaseStateMachine {
*/
@Override
public CompletableFuture<Message> query(Message request) {
String msg = request.getContent().toString(Charset.defaultCharset());
var data = request.getContent();
if (!msg.equals("GET")) {
return CompletableFuture.completedFuture(
Message.valueOf("Invalid Command"));
var inBytes = new ByteArrayInputStream( data.toByteArray());
try (var inObject = new ObjectInputStream(inBytes)) {
// log.info("applyTransaction {}", inObject.toString());
log.info("{}", request);
var op = (Query)inObject.readObject();
switch(op.getType()){
case GET_WORKER_STATE:
try(var rlock = readLock()) {
var ws = state.getWorkers().get( ((WorkerState)op.getValue()).getPeerId() );
if(ws == null) {
return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist"));
}
return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist"));
}
default:
if (op.getType() == Query.Type.GET_WORKER_STATE ) {
return CompletableFuture.completedFuture(
Message.valueOf("Invalid Command"));
}
break;
}
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return CompletableFuture.completedFuture(
Message.valueOf(counter.toString()));
Message.valueOf("ok"));
}
@ -229,17 +256,33 @@ public class CounterStateMachine extends BaseStateMachine {
*/
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// log.info("applyTransaction");
final RaftProtos.LogEntryProto entry = trx.getLogEntry();
//check if the command is valid
// String logData = entry.getStateMachineLogEntry().getLogData()
// .toString(Charset.defaultCharset());
Operate op ;
Operate op = null;
try {
op = (Operate)new ObjectInputStream(entry.getStateMachineLogEntry().getLogData().newInput()).readObject();
var data = entry.getStateMachineLogEntry().getLogData();
var inBytes = new ByteArrayInputStream( data.toByteArray());
var inObject = new ObjectInputStream(inBytes);
// log.info("applyTransaction {}", inObject.toString());
op = (Operate)inObject.readObject();
log.info("applyTransaction {}", data);
inObject.close();
inBytes.close();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
log.info("{}", e.toString());
return CompletableFuture.completedFuture(Message.valueOf("错误op"));
}
@ -250,39 +293,36 @@ public class CounterStateMachine extends BaseStateMachine {
//update the last applied term and index
final long index = entry.getIndex();
try(var r = writeLock()) {
try(var wlock = writeLock()) {
switch(op.getType()) {
case ALLOCATE_PACKETS:
break;
case GET_STATE:
break;
case PUT_WORKERSTATE:
var ws = op.<WorkerState>getValue();
var ws = (WorkerState)op.getValue();
log.info("applyTransaction {}", 3);
state.getWorkers().put(ws.getPeerId() , ws);
break;
case REMOVE:
break;
default:
break;
}
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
updateLastAppliedTermIndex(entry.getTerm(), index);
}
//actual execution of the command: increment the counter
// counter.incrementAndGet();
//return the new value of the counter to the client
final CompletableFuture<Message> f =
CompletableFuture.completedFuture(Message.valueOf("put ok"));
//if leader, log the incremented value and it's log index
if (isLeader()) {
log.info("{}: Increment to {}", index, counter.toString());
log.info("{}: getType {}", index, op.getType());
}
// log.info("applyTransaction {}", 6);
return f;
}
}

View File

@ -41,7 +41,7 @@ import java.util.UUID;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Simplest Ratis server, use a simple state machine {@link CounterStateMachine}
* Simplest Ratis server, use a simple state machine {@link StateMachine}
* which maintain a counter across multi server.
* This server application designed to run several times with different
* parameters (1,2 or 3). server addresses hard coded in {@link Constants}
@ -49,12 +49,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* Run this application three times with three different parameter set-up a
* ratis cluster which maintain a counter value replicated in each server memory
*/
public final class CounterServer implements Closeable {
public final class StateServer implements Closeable {
private final RaftServer server;
public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77");
public CounterServer(RaftPeer peer, ArrayList<RaftPeer> peers, File storageDir) throws IOException {
public StateServer(RaftPeer peer, ArrayList<RaftPeer> peers, File storageDir) throws IOException {
//create a property object
RaftProperties properties = new RaftProperties();
@ -66,7 +66,7 @@ public final class CounterServer implements Closeable {
GrpcConfigKeys.Server.setPort(properties, port);
//create the counter state machine which hold the counter value
CounterStateMachine counterStateMachine = new CounterStateMachine();
StateMachine counterStateMachine = new StateMachine();
RaftGroup raftGroup = RaftGroup.valueOf(
RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
@ -91,7 +91,7 @@ public final class CounterServer implements Closeable {
public static void main(String[] args) throws IOException {
if (args.length < 1) {
System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}");
System.err.println("{serverIndex} could be 1, 2 or 3");
System.exit(1);
}
@ -109,9 +109,9 @@ public final class CounterServer implements Closeable {
//start a counter server
final File storageDir = new File("./raftdata/" + currentPeer.getId());
final CounterServer counterServer = new CounterServer(currentPeer, peers, storageDir);
final StateServer stateServer = new StateServer(currentPeer, peers, storageDir);
counterServer.start();
stateServer.start();
@ -119,6 +119,6 @@ public final class CounterServer implements Closeable {
//exit when any input entered
Scanner scanner = new Scanner(System.in, UTF_8.name());
scanner.nextLine();
counterServer.close();
stateServer.close();
}
}

View File

@ -4,7 +4,7 @@
* @author eson
*2022年7月21日-14:27:49
*/
package com.yuandian.dataflow.statemachine_old.annotations;
package com.yuandian.dataflow.statemachine.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -19,5 +19,5 @@ import java.lang.annotation.Target;
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface WorkerRegister {
public @interface GrpcProcessor {
}

View File

@ -4,7 +4,7 @@
* @author eson
*2022年7月21日-14:27:49
*/
package com.yuandian.dataflow.statemachine_old.annotations;
package com.yuandian.dataflow.statemachine.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;

View File

@ -32,11 +32,17 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftRpcMessage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.NetUtils;
import org.springframework.cglib.proxy.CallbackFilter;
import com.yuandian.dataflow.statemachine.CounterServer;
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
import com.yuandian.dataflow.statemachine.StateServer;
import com.yuandian.dataflow.statemachine.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import com.yuandian.dataflow.statemachine.Operate;
import com.yuandian.dataflow.statemachine.Query;
import lombok.extern.slf4j.Slf4j;
@ -77,14 +83,17 @@ public final class CounterClient {
// concurrently
ExecutorService executorService = Executors.newFixedThreadPool(10);
increment = 1000;
increment = 10;
CountDownLatch latch = new CountDownLatch(increment);
//send INCREMENT commands concurrently
System.out.printf("Sending %d increment command...%n", increment);
Instant now = Instant.now();
for (int i = 0; i < increment; i++) {
executorService.submit(() ->
raftClient.io().send(Message.valueOf("INCREMENT")));
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(null));
var reply = raftClient.io().send(op);
log.info("{}", reply);
executorService.submit(() -> raftClient.io().send(op));
latch.countDown();
}
@ -96,8 +105,8 @@ public final class CounterClient {
log.info("{}", Duration.between(now, Instant.now()).toMillis());
//send GET command and print the response
RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
var query = new Query(Query.Type.GET_WORKER_STATE, new WorkerState(null));
RaftClientReply count = raftClient.io().sendReadOnly(query);
String response = count.getMessage().getContent().toString(Charset.defaultCharset());
System.out.println(response);
@ -126,7 +135,7 @@ public final class CounterClient {
peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build());
}
RaftGroup raftGroup = RaftGroup.valueOf(
RaftGroupId.valueOf(CounterServer.CLUSTER_GROUP_ID), peers);
RaftGroupId.valueOf(StateServer.CLUSTER_GROUP_ID), peers);
RaftClient.Builder builder = RaftClient.newBuilder()
.setProperties(raftProperties)

View File

@ -16,13 +16,13 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.master.MasterContext;
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
import com.yuandian.dataflow.statemachine_old.operate.Operate;
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;

View File

@ -49,12 +49,12 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
import com.yuandian.dataflow.statemachine_old.operate.Operate;
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine_old.state.State;
@ -120,7 +120,7 @@ public class StateFactory {
ss.readIndexState(closure);
}
public static void applyOperate(Operate op, GenericClosure closure) {
public static void applyOperate(OperateOld op, GenericClosure closure) {
ss.applyOperate(op, closure);
}
@ -195,7 +195,7 @@ public class StateFactory {
log.info("获取 {} -> {} 下包的所有注解", clsName, packName);
var refl = new Reflections(packName);
Set<Class<?>> scans = refl.getTypesAnnotatedWith(WorkerRegister.class);
Set<Class<?>> scans = refl.getTypesAnnotatedWith(GrpcProcessor.class);
scans.forEach((pRaftClass) -> {
scansMap.put(pRaftClass.getName(), pRaftClass);
@ -263,7 +263,7 @@ public class StateFactory {
}
public void applyOperate(Operate op, GenericClosure closure) {
public void applyOperate(OperateOld op, GenericClosure closure) {
// 所有的提交都必须再leader进行
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);

View File

@ -17,10 +17,10 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.Operate;
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import com.yuandian.dataflow.utils.Utils;
@ -65,19 +65,19 @@ public class StateMachine extends StateMachineAdapter {
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
Operate op = null;
OperateOld op = null;
GenericClosure closure = null;
if (iter.done() != null) {
// leader可以直接从 回调closure里提取operate
closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
op = (Operate)closure.getValue();
op = (OperateOld)closure.getValue();
} else {
// 非leader 需要从getData反序列化出来后处理
final ByteBuffer data = iter.getData();
try {
op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(),
Operate.class.getName());
OperateOld.class.getName());
} catch (CodecException e) {
log.info("{}", e.toString());
}
@ -114,7 +114,7 @@ public class StateMachine extends StateMachineAdapter {
log.error("WorkerState获取的状态为 {}", ws);
continue;
}
var can = Operate.MAX_TASKS - ws.getTaskQueueSize();
var can = OperateOld.MAX_TASKS - ws.getTaskQueueSize();
canTasks[i] = can;
if(!isNext) {
isNext = true;
@ -127,7 +127,7 @@ public class StateMachine extends StateMachineAdapter {
// log.info("size: {}", Operate.packetsManager.size());
// 统计每个节点发送多少任务
var allocTasks = Utils.allocationTasks(Operate.packetsManager.size(), canTasks);
var allocTasks = Utils.allocationTasks(OperateOld.packetsManager.size(), canTasks);
if(closure != null) {
closure.setValue(allocTasks);
}
@ -144,10 +144,10 @@ public class StateMachine extends StateMachineAdapter {
}
break;
case GET_STATE:
closure.setValue(this.state);
log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
break;
// case GET_STATE:
// closure.setValue(this.state);
// log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
// break;
case REMOVE:
break;
default:
@ -198,7 +198,7 @@ public class StateMachine extends StateMachineAdapter {
}
// 更新当前WorkerState
StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
StateFactory.applyOperate(new OperateOld(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
@Override
public void run(Status status) {
log.debug("master update workerstate: {}", status);
@ -242,7 +242,7 @@ public class StateMachine extends StateMachineAdapter {
// 更新当前WorkerState
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() {
OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() {
@Override
public void run(Status status) {
log.debug("onStartFollowing update workerstate: {}", status);
@ -268,8 +268,8 @@ public class StateMachine extends StateMachineAdapter {
log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx);
var ws = new WorkerState(StateFactory.getServerId());
var op = new Operate(OperateType.PUT_WORKERSTATE, ws);
Operate.CallOperate(op, new GenericClosure() {
var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws);
OperateOld.CallOperate(op, new GenericClosure() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());

View File

@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@Data
public class Operate implements Serializable {
public class OperateOld implements Serializable {
private static int DEFAULT_ASYNC_TIMEOUT = 5000;
public static final int MAX_TASKS = 1000;
@ -39,10 +39,7 @@ public class Operate implements Serializable {
*/
ALLOCATE_PACKETS,
/**
* 获取State状态
*/
GET_STATE,
/**
* 暂无想法
*/
@ -54,7 +51,7 @@ public class Operate implements Serializable {
public Operate(OperateType t, Object v) {
public OperateOld(OperateType t, Object v) {
this.type = t;
this.value = v;
}
@ -75,7 +72,7 @@ public class Operate implements Serializable {
* @param closure 回调函数. Operate为返回值
*/
@java.lang.SuppressWarnings("unchecked")
public static void CallOperate(Operate op, GenericClosure closure) {
public static void CallOperate(OperateOld op, GenericClosure closure) {
// log.debug("CallOperate Value {}", op.<WorkerState>getValue());
// 如果是leader 就直接提交
if (StateFactory.isLeader()) {

View File

@ -12,10 +12,10 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.Operate;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import javassist.ClassPath;
@ -31,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
*2022年7月12日-11:10:54
*/
@Slf4j
@WorkerRegister
public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRequest> {
/**
@ -47,7 +47,7 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
private static final long serialVersionUID = 1L;
private Operate operate;
private OperateOld operate;
}