implements
- // @@protoc_insertion_point(builder_implements:dataflow.PacketsProcessorRequest)
- com.yuandian.dataflow.proto.Processor.PacketsProcessorRequestOrBuilder {
+ // @@protoc_insertion_point(builder_implements:com.yuandian.dataflow.proto.PacketsRequest)
+ com.yuandian.dataflow.proto.Processor.PacketsRequestOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
- return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_PacketsProcessorRequest_descriptor;
+ return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor;
}
@java.lang.Override
protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
internalGetFieldAccessorTable() {
- return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_PacketsProcessorRequest_fieldAccessorTable
+ return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_PacketsRequest_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.class, com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.Builder.class);
+ com.yuandian.dataflow.proto.Processor.PacketsRequest.class, com.yuandian.dataflow.proto.Processor.PacketsRequest.Builder.class);
}
- // Construct using com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.newBuilder()
+ // Construct using com.yuandian.dataflow.proto.Processor.PacketsRequest.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
@@ -1231,10 +1214,6 @@ public final class Processor {
@java.lang.Override
public Builder clear() {
super.clear();
- type_ = 0;
-
- version_ = "";
-
if (packetsBuilder_ == null) {
packets_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000001);
@@ -1247,17 +1226,17 @@ public final class Processor {
@java.lang.Override
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
- return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_PacketsProcessorRequest_descriptor;
+ return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor;
}
@java.lang.Override
- public com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest getDefaultInstanceForType() {
- return com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance();
+ public com.yuandian.dataflow.proto.Processor.PacketsRequest getDefaultInstanceForType() {
+ return com.yuandian.dataflow.proto.Processor.PacketsRequest.getDefaultInstance();
}
@java.lang.Override
- public com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest build() {
- com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest result = buildPartial();
+ public com.yuandian.dataflow.proto.Processor.PacketsRequest build() {
+ com.yuandian.dataflow.proto.Processor.PacketsRequest result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
@@ -1265,11 +1244,9 @@ public final class Processor {
}
@java.lang.Override
- public com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest buildPartial() {
- com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest result = new com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest(this);
+ public com.yuandian.dataflow.proto.Processor.PacketsRequest buildPartial() {
+ com.yuandian.dataflow.proto.Processor.PacketsRequest result = new com.yuandian.dataflow.proto.Processor.PacketsRequest(this);
int from_bitField0_ = bitField0_;
- result.type_ = type_;
- result.version_ = version_;
if (packetsBuilder_ == null) {
if (((bitField0_ & 0x00000001) != 0)) {
packets_ = java.util.Collections.unmodifiableList(packets_);
@@ -1317,23 +1294,16 @@ public final class Processor {
}
@java.lang.Override
public Builder mergeFrom(com.google.protobuf.Message other) {
- if (other instanceof com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) {
- return mergeFrom((com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest)other);
+ if (other instanceof com.yuandian.dataflow.proto.Processor.PacketsRequest) {
+ return mergeFrom((com.yuandian.dataflow.proto.Processor.PacketsRequest)other);
} else {
super.mergeFrom(other);
return this;
}
}
- public Builder mergeFrom(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest other) {
- if (other == com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance()) return this;
- if (other.getType() != 0) {
- setType(other.getType());
- }
- if (!other.getVersion().isEmpty()) {
- version_ = other.version_;
- onChanged();
- }
+ public Builder mergeFrom(com.yuandian.dataflow.proto.Processor.PacketsRequest other) {
+ if (other == com.yuandian.dataflow.proto.Processor.PacketsRequest.getDefaultInstance()) return this;
if (packetsBuilder_ == null) {
if (!other.packets_.isEmpty()) {
if (packets_.isEmpty()) {
@@ -1375,11 +1345,11 @@ public final class Processor {
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
- com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parsedMessage = null;
+ com.yuandian.dataflow.proto.Processor.PacketsRequest parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
- parsedMessage = (com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) e.getUnfinishedMessage();
+ parsedMessage = (com.yuandian.dataflow.proto.Processor.PacketsRequest) e.getUnfinishedMessage();
throw e.unwrapIOException();
} finally {
if (parsedMessage != null) {
@@ -1390,145 +1360,6 @@ public final class Processor {
}
private int bitField0_;
- private int type_ ;
- /**
- *
- * 请求类型. 默认 为 1. 暂无意义
- *
- *
- * int32 type = 1;
- * @return The type.
- */
- @java.lang.Override
- public int getType() {
- return type_;
- }
- /**
- *
- * 请求类型. 默认 为 1. 暂无意义
- *
- *
- * int32 type = 1;
- * @param value The type to set.
- * @return This builder for chaining.
- */
- public Builder setType(int value) {
-
- type_ = value;
- onChanged();
- return this;
- }
- /**
- *
- * 请求类型. 默认 为 1. 暂无意义
- *
- *
- * int32 type = 1;
- * @return This builder for chaining.
- */
- public Builder clearType() {
-
- type_ = 0;
- onChanged();
- return this;
- }
-
- private java.lang.Object version_ = "";
- /**
- *
- * 请求版本. 区分版本. 非必要时不使用
- *
- *
- * string version = 2;
- * @return The version.
- */
- public java.lang.String getVersion() {
- java.lang.Object ref = version_;
- if (!(ref instanceof java.lang.String)) {
- com.google.protobuf.ByteString bs =
- (com.google.protobuf.ByteString) ref;
- java.lang.String s = bs.toStringUtf8();
- version_ = s;
- return s;
- } else {
- return (java.lang.String) ref;
- }
- }
- /**
- *
- * 请求版本. 区分版本. 非必要时不使用
- *
- *
- * string version = 2;
- * @return The bytes for version.
- */
- public com.google.protobuf.ByteString
- getVersionBytes() {
- java.lang.Object ref = version_;
- if (ref instanceof String) {
- com.google.protobuf.ByteString b =
- com.google.protobuf.ByteString.copyFromUtf8(
- (java.lang.String) ref);
- version_ = b;
- return b;
- } else {
- return (com.google.protobuf.ByteString) ref;
- }
- }
- /**
- *
- * 请求版本. 区分版本. 非必要时不使用
- *
- *
- * string version = 2;
- * @param value The version to set.
- * @return This builder for chaining.
- */
- public Builder setVersion(
- java.lang.String value) {
- if (value == null) {
- throw new NullPointerException();
- }
-
- version_ = value;
- onChanged();
- return this;
- }
- /**
- *
- * 请求版本. 区分版本. 非必要时不使用
- *
- *
- * string version = 2;
- * @return This builder for chaining.
- */
- public Builder clearVersion() {
-
- version_ = getDefaultInstance().getVersion();
- onChanged();
- return this;
- }
- /**
- *
- * 请求版本. 区分版本. 非必要时不使用
- *
- *
- * string version = 2;
- * @param value The bytes for version to set.
- * @return This builder for chaining.
- */
- public Builder setVersionBytes(
- com.google.protobuf.ByteString value) {
- if (value == null) {
- throw new NullPointerException();
- }
- checkByteStringIsUtf8(value);
-
- version_ = value;
- onChanged();
- return this;
- }
-
private java.util.List packets_ =
java.util.Collections.emptyList();
private void ensurePacketsIsMutable() {
@@ -1542,11 +1373,7 @@ public final class Processor {
com.google.protobuf.Any, com.google.protobuf.Any.Builder, com.google.protobuf.AnyOrBuilder> packetsBuilder_;
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public java.util.List getPacketsList() {
if (packetsBuilder_ == null) {
@@ -1556,11 +1383,7 @@ public final class Processor {
}
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public int getPacketsCount() {
if (packetsBuilder_ == null) {
@@ -1570,11 +1393,7 @@ public final class Processor {
}
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public com.google.protobuf.Any getPackets(int index) {
if (packetsBuilder_ == null) {
@@ -1584,11 +1403,7 @@ public final class Processor {
}
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder setPackets(
int index, com.google.protobuf.Any value) {
@@ -1605,11 +1420,7 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder setPackets(
int index, com.google.protobuf.Any.Builder builderForValue) {
@@ -1623,11 +1434,7 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder addPackets(com.google.protobuf.Any value) {
if (packetsBuilder_ == null) {
@@ -1643,11 +1450,7 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder addPackets(
int index, com.google.protobuf.Any value) {
@@ -1664,11 +1467,7 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder addPackets(
com.google.protobuf.Any.Builder builderForValue) {
@@ -1682,11 +1481,7 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder addPackets(
int index, com.google.protobuf.Any.Builder builderForValue) {
@@ -1700,11 +1495,7 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder addAllPackets(
java.lang.Iterable extends com.google.protobuf.Any> values) {
@@ -1719,11 +1510,7 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder clearPackets() {
if (packetsBuilder_ == null) {
@@ -1736,11 +1523,7 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public Builder removePackets(int index) {
if (packetsBuilder_ == null) {
@@ -1753,22 +1536,14 @@ public final class Processor {
return this;
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public com.google.protobuf.Any.Builder getPacketsBuilder(
int index) {
return getPacketsFieldBuilder().getBuilder(index);
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public com.google.protobuf.AnyOrBuilder getPacketsOrBuilder(
int index) {
@@ -1778,11 +1553,7 @@ public final class Processor {
}
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public java.util.List extends com.google.protobuf.AnyOrBuilder>
getPacketsOrBuilderList() {
@@ -1793,22 +1564,14 @@ public final class Processor {
}
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public com.google.protobuf.Any.Builder addPacketsBuilder() {
return getPacketsFieldBuilder().addBuilder(
com.google.protobuf.Any.getDefaultInstance());
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public com.google.protobuf.Any.Builder addPacketsBuilder(
int index) {
@@ -1816,11 +1579,7 @@ public final class Processor {
index, com.google.protobuf.Any.getDefaultInstance());
}
/**
- *
- * 请求的可变参数. 暂不使用
- *
- *
- * repeated .google.protobuf.Any packets = 3;
+ * repeated .google.protobuf.Any packets = 1;
*/
public java.util.List
getPacketsBuilderList() {
@@ -1853,56 +1612,56 @@ public final class Processor {
}
- // @@protoc_insertion_point(builder_scope:dataflow.PacketsProcessorRequest)
+ // @@protoc_insertion_point(builder_scope:com.yuandian.dataflow.proto.PacketsRequest)
}
- // @@protoc_insertion_point(class_scope:dataflow.PacketsProcessorRequest)
- private static final com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest DEFAULT_INSTANCE;
+ // @@protoc_insertion_point(class_scope:com.yuandian.dataflow.proto.PacketsRequest)
+ private static final com.yuandian.dataflow.proto.Processor.PacketsRequest DEFAULT_INSTANCE;
static {
- DEFAULT_INSTANCE = new com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest();
+ DEFAULT_INSTANCE = new com.yuandian.dataflow.proto.Processor.PacketsRequest();
}
- public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest getDefaultInstance() {
+ public static com.yuandian.dataflow.proto.Processor.PacketsRequest getDefaultInstance() {
return DEFAULT_INSTANCE;
}
- private static final com.google.protobuf.Parser
- PARSER = new com.google.protobuf.AbstractParser() {
+ private static final com.google.protobuf.Parser
+ PARSER = new com.google.protobuf.AbstractParser() {
@java.lang.Override
- public PacketsProcessorRequest parsePartialFrom(
+ public PacketsRequest parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
- return new PacketsProcessorRequest(input, extensionRegistry);
+ return new PacketsRequest(input, extensionRegistry);
}
};
- public static com.google.protobuf.Parser parser() {
+ public static com.google.protobuf.Parser parser() {
return PARSER;
}
@java.lang.Override
- public com.google.protobuf.Parser getParserForType() {
+ public com.google.protobuf.Parser getParserForType() {
return PARSER;
}
@java.lang.Override
- public com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest getDefaultInstanceForType() {
+ public com.yuandian.dataflow.proto.Processor.PacketsRequest getDefaultInstanceForType() {
return DEFAULT_INSTANCE;
}
}
private static final com.google.protobuf.Descriptors.Descriptor
- internal_static_dataflow_ProcessResponse_descriptor;
+ internal_static_com_yuandian_dataflow_proto_Response_descriptor;
private static final
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
- internal_static_dataflow_ProcessResponse_fieldAccessorTable;
+ internal_static_com_yuandian_dataflow_proto_Response_fieldAccessorTable;
private static final com.google.protobuf.Descriptors.Descriptor
- internal_static_dataflow_PacketsProcessorRequest_descriptor;
+ internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor;
private static final
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
- internal_static_dataflow_PacketsProcessorRequest_fieldAccessorTable;
+ internal_static_com_yuandian_dataflow_proto_PacketsRequest_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -1912,33 +1671,34 @@ public final class Processor {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\017Processor.proto\022\010dataflow\032\031google/prot" +
- "obuf/any.proto\"0\n\017ProcessResponse\022\014\n\004cod" +
- "e\030\001 \001(\005\022\017\n\007message\030\002 \001(\t\"_\n\027PacketsProce" +
- "ssorRequest\022\014\n\004type\030\001 \001(\005\022\017\n\007version\030\002 \001" +
- "(\t\022%\n\007packets\030\003 \003(\0132\024.google.protobuf.An" +
- "y2]\n\017ProcessorServer\022J\n\nGetPackets\022!.dat" +
- "aflow.PacketsProcessorRequest\032\031.dataflow" +
- ".ProcessResponseB4\n\033com.yuandian.dataflo" +
- "w.protoP\000Z\023../grpc-gen;grpcgenb\006proto3"
+ "\n\017Processor.proto\022\033com.yuandian.dataflow" +
+ ".proto\032\031google/protobuf/any.proto\"I\n\010Res" +
+ "ponse\022\014\n\004code\030\001 \001(\005\022\013\n\003msg\030\002 \001(\t\022\"\n\004data" +
+ "\030\003 \001(\0132\024.google.protobuf.Any\"7\n\016PacketsR" +
+ "equest\022%\n\007packets\030\001 \003(\0132\024.google.protobu" +
+ "f.Any2}\n\017ProcessorServer\022j\n\020PacketsProce" +
+ "ssor\022+.com.yuandian.dataflow.proto.Packe" +
+ "tsRequest\032%.com.yuandian.dataflow.proto." +
+ "Response\"\0000\001B\037\n\033com.yuandian.dataflow.pr" +
+ "otoP\000b\006proto3"
};
descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
com.google.protobuf.AnyProto.getDescriptor(),
});
- internal_static_dataflow_ProcessResponse_descriptor =
+ internal_static_com_yuandian_dataflow_proto_Response_descriptor =
getDescriptor().getMessageTypes().get(0);
- internal_static_dataflow_ProcessResponse_fieldAccessorTable = new
+ internal_static_com_yuandian_dataflow_proto_Response_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
- internal_static_dataflow_ProcessResponse_descriptor,
- new java.lang.String[] { "Code", "Message", });
- internal_static_dataflow_PacketsProcessorRequest_descriptor =
+ internal_static_com_yuandian_dataflow_proto_Response_descriptor,
+ new java.lang.String[] { "Code", "Msg", "Data", });
+ internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor =
getDescriptor().getMessageTypes().get(1);
- internal_static_dataflow_PacketsProcessorRequest_fieldAccessorTable = new
+ internal_static_com_yuandian_dataflow_proto_PacketsRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
- internal_static_dataflow_PacketsProcessorRequest_descriptor,
- new java.lang.String[] { "Type", "Version", "Packets", });
+ internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor,
+ new java.lang.String[] { "Packets", });
com.google.protobuf.AnyProto.getDescriptor();
}
diff --git a/src/main/java/com/yuandian/dataflow/proto/ProcessorServerGrpc.java b/src/main/java/com/yuandian/dataflow/proto/ProcessorServerGrpc.java
index 40993e1..60408d5 100644
--- a/src/main/java/com/yuandian/dataflow/proto/ProcessorServerGrpc.java
+++ b/src/main/java/com/yuandian/dataflow/proto/ProcessorServerGrpc.java
@@ -24,38 +24,38 @@ public final class ProcessorServerGrpc {
private ProcessorServerGrpc() {}
- public static final String SERVICE_NAME = "dataflow.ProcessorServer";
+ public static final String SERVICE_NAME = "com.yuandian.dataflow.proto.ProcessorServer";
// Static method descriptors that strictly reflect the proto.
- private static volatile io.grpc.MethodDescriptor getGetPacketsMethod;
+ private static volatile io.grpc.MethodDescriptor getPacketsProcessorMethod;
@io.grpc.stub.annotations.RpcMethod(
- fullMethodName = SERVICE_NAME + '/' + "GetPackets",
- requestType = com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.class,
- responseType = com.yuandian.dataflow.proto.Processor.ProcessResponse.class,
- methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
- public static io.grpc.MethodDescriptor getGetPacketsMethod() {
- io.grpc.MethodDescriptor getGetPacketsMethod;
- if ((getGetPacketsMethod = ProcessorServerGrpc.getGetPacketsMethod) == null) {
+ fullMethodName = SERVICE_NAME + '/' + "PacketsProcessor",
+ requestType = com.yuandian.dataflow.proto.Processor.PacketsRequest.class,
+ responseType = com.yuandian.dataflow.proto.Processor.Response.class,
+ methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
+ public static io.grpc.MethodDescriptor getPacketsProcessorMethod() {
+ io.grpc.MethodDescriptor getPacketsProcessorMethod;
+ if ((getPacketsProcessorMethod = ProcessorServerGrpc.getPacketsProcessorMethod) == null) {
synchronized (ProcessorServerGrpc.class) {
- if ((getGetPacketsMethod = ProcessorServerGrpc.getGetPacketsMethod) == null) {
- ProcessorServerGrpc.getGetPacketsMethod = getGetPacketsMethod =
- io.grpc.MethodDescriptor.newBuilder()
- .setType(io.grpc.MethodDescriptor.MethodType.UNARY)
- .setFullMethodName(generateFullMethodName(SERVICE_NAME, "GetPackets"))
+ if ((getPacketsProcessorMethod = ProcessorServerGrpc.getPacketsProcessorMethod) == null) {
+ ProcessorServerGrpc.getPacketsProcessorMethod = getPacketsProcessorMethod =
+ io.grpc.MethodDescriptor.newBuilder()
+ .setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
+ .setFullMethodName(generateFullMethodName(SERVICE_NAME, "PacketsProcessor"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
- com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance()))
+ com.yuandian.dataflow.proto.Processor.PacketsRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
- com.yuandian.dataflow.proto.Processor.ProcessResponse.getDefaultInstance()))
- .setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("GetPackets"))
+ com.yuandian.dataflow.proto.Processor.Response.getDefaultInstance()))
+ .setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("PacketsProcessor"))
.build();
}
}
}
- return getGetPacketsMethod;
+ return getPacketsProcessorMethod;
}
/**
@@ -108,20 +108,20 @@ public final class ProcessorServerGrpc {
/**
*/
- public void getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request,
- io.grpc.stub.StreamObserver responseObserver) {
- asyncUnimplementedUnaryCall(getGetPacketsMethod(), responseObserver);
+ public void packetsProcessor(com.yuandian.dataflow.proto.Processor.PacketsRequest request,
+ io.grpc.stub.StreamObserver responseObserver) {
+ asyncUnimplementedUnaryCall(getPacketsProcessorMethod(), responseObserver);
}
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
- getGetPacketsMethod(),
- asyncUnaryCall(
+ getPacketsProcessorMethod(),
+ asyncServerStreamingCall(
new MethodHandlers<
- com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
- com.yuandian.dataflow.proto.Processor.ProcessResponse>(
- this, METHODID_GET_PACKETS)))
+ com.yuandian.dataflow.proto.Processor.PacketsRequest,
+ com.yuandian.dataflow.proto.Processor.Response>(
+ this, METHODID_PACKETS_PROCESSOR)))
.build();
}
}
@@ -142,10 +142,10 @@ public final class ProcessorServerGrpc {
/**
*/
- public void getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request,
- io.grpc.stub.StreamObserver responseObserver) {
- asyncUnaryCall(
- getChannel().newCall(getGetPacketsMethod(), getCallOptions()), request, responseObserver);
+ public void packetsProcessor(com.yuandian.dataflow.proto.Processor.PacketsRequest request,
+ io.grpc.stub.StreamObserver responseObserver) {
+ asyncServerStreamingCall(
+ getChannel().newCall(getPacketsProcessorMethod(), getCallOptions()), request, responseObserver);
}
}
@@ -165,9 +165,10 @@ public final class ProcessorServerGrpc {
/**
*/
- public com.yuandian.dataflow.proto.Processor.ProcessResponse getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) {
- return blockingUnaryCall(
- getChannel(), getGetPacketsMethod(), getCallOptions(), request);
+ public java.util.Iterator packetsProcessor(
+ com.yuandian.dataflow.proto.Processor.PacketsRequest request) {
+ return blockingServerStreamingCall(
+ getChannel(), getPacketsProcessorMethod(), getCallOptions(), request);
}
}
@@ -184,17 +185,9 @@ public final class ProcessorServerGrpc {
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerFutureStub(channel, callOptions);
}
-
- /**
- */
- public com.google.common.util.concurrent.ListenableFuture getPackets(
- com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) {
- return futureUnaryCall(
- getChannel().newCall(getGetPacketsMethod(), getCallOptions()), request);
- }
}
- private static final int METHODID_GET_PACKETS = 0;
+ private static final int METHODID_PACKETS_PROCESSOR = 0;
private static final class MethodHandlers implements
io.grpc.stub.ServerCalls.UnaryMethod,
@@ -213,9 +206,9 @@ public final class ProcessorServerGrpc {
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) {
switch (methodId) {
- case METHODID_GET_PACKETS:
- serviceImpl.getPackets((com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) request,
- (io.grpc.stub.StreamObserver) responseObserver);
+ case METHODID_PACKETS_PROCESSOR:
+ serviceImpl.packetsProcessor((com.yuandian.dataflow.proto.Processor.PacketsRequest) request,
+ (io.grpc.stub.StreamObserver) responseObserver);
break;
default:
throw new AssertionError();
@@ -278,7 +271,7 @@ public final class ProcessorServerGrpc {
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new ProcessorServerFileDescriptorSupplier())
- .addMethod(getGetPacketsMethod())
+ .addMethod(getPacketsProcessorMethod())
.build();
}
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/Operate.java
new file mode 100644
index 0000000..feed7a4
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/Operate.java
@@ -0,0 +1,123 @@
+package com.yuandian.dataflow.statemachine;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.StringUtils;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Setter
+public class Operate implements Message,Serializable {
+
+ private int a = 2;
+
+ public static enum OperateType {
+ /**
+ * 同步WorkerState状态.
+ */
+ PUT_WORKERSTATE,
+
+ /**
+ * 分配packets
+ */
+ ALLOCATE_PACKETS,
+
+
+ /**
+ * 暂无想法
+ */
+ REMOVE;
+ }
+
+ private OperateType type;
+ private Object value;
+
+ public Object getValue() {
+ return this.value;
+ };
+
+ public void setValue(Object value) {
+ this.value = value;
+ return;
+ };
+
+
+ public Operate(OperateType t) {
+ this.type = t;
+ }
+
+ public Operate(OperateType t, Object value) {
+ this.type = t;
+ this.value = value;
+ }
+
+ public Message toMessage() {
+ try {
+
+ var output = ByteString.newOutput();
+ var outputStream = new ObjectOutputStream(output);
+ outputStream.writeObject( this);
+ outputStream.close();
+ output.close();
+
+
+ // var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() );
+ // var inObject = new ObjectInputStream(inBytes);
+
+
+ // var a = (Operate)inObject.readObject();
+ // log.info("applyTransaction {}", a);
+
+
+ return Message.valueOf(output.toByteString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public ByteString getContent() {
+ try {
+
+ var output = ByteString.newOutput();
+ var outputStream = new ObjectOutputStream(output);
+ outputStream.writeObject( this);
+ outputStream.close();
+ output.close();
+
+
+ // var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() );
+ // var inObject = new ObjectInputStream(inBytes);
+
+
+ // var a = (Operate)inObject.readObject();
+ // log.info("applyTransaction {}", a);
+
+
+ return output.toByteString();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+
+
+
+
+
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Query.java b/src/main/java/com/yuandian/dataflow/statemachine/Query.java
new file mode 100644
index 0000000..89f0c46
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/Query.java
@@ -0,0 +1,52 @@
+package com.yuandian.dataflow.statemachine;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import javax.servlet.http.PushBuilder;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import com.yuandian.dataflow.statemachine_old.state.WorkerState;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Setter
+public class Query implements Message,Serializable {
+
+ public static enum Type {
+ /**
+ * 同步WorkerState状态.
+ */
+ GET_WORKER_STATE,
+ }
+
+ private Type type;
+ private Object value;
+ public Query(Type t, WorkerState ws) {
+ this.type = t;
+ this.value = ws;
+ }
+
+ @Override
+ public ByteString getContent() {
+ try {
+ var output = ByteString.newOutput();
+ var outputStream = new ObjectOutputStream(output);
+ outputStream.writeObject(this);
+ outputStream.close();
+ output.close();
+ return output.toByteString();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
similarity index 80%
rename from src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java
rename to src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
index 73f1d98..dafde19 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
@@ -18,6 +18,7 @@
package com.yuandian.dataflow.statemachine;
+import org.apache.http.entity.InputStreamEntity;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.protocol.Message;
@@ -36,7 +37,7 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
-import com.yuandian.dataflow.statemachine_old.operate.Operate;
+import com.yuandian.dataflow.statemachine.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
@@ -44,10 +45,12 @@ import lombok.extern.slf4j.Slf4j;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
@@ -65,7 +68,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* will be handled by {@code applyTransaction}.
*/
@Slf4j
-public class CounterStateMachine extends BaseStateMachine {
+public class StateMachine extends BaseStateMachine {
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
@@ -136,9 +139,10 @@ public class CounterStateMachine extends BaseStateMachine {
storage.getSnapshotFile(last.getTerm(), last.getIndex());
//serialize the counter object and write it into the snapshot file
- try (ObjectOutputStream out = new ObjectOutputStream(
- new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
+ try {
+ ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotFile)));
out.writeObject(counter);
+ out.close();
} catch (IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
+ "\", last applied index=" + last);
@@ -209,15 +213,38 @@ public class CounterStateMachine extends BaseStateMachine {
*/
@Override
public CompletableFuture query(Message request) {
- String msg = request.getContent().toString(Charset.defaultCharset());
+ var data = request.getContent();
+
+ var inBytes = new ByteArrayInputStream( data.toByteArray());
+ try (var inObject = new ObjectInputStream(inBytes)) {
+ // log.info("applyTransaction {}", inObject.toString());
+ log.info("{}", request);
+ var op = (Query)inObject.readObject();
+ switch(op.getType()){
+ case GET_WORKER_STATE:
+ try(var rlock = readLock()) {
+ var ws = state.getWorkers().get( ((WorkerState)op.getValue()).getPeerId() );
+ if(ws == null) {
+ return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist"));
+ }
+ return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist"));
+ }
+
- if (!msg.equals("GET")) {
- return CompletableFuture.completedFuture(
- Message.valueOf("Invalid Command"));
+ default:
+ if (op.getType() == Query.Type.GET_WORKER_STATE ) {
+ return CompletableFuture.completedFuture(
+ Message.valueOf("Invalid Command"));
+ }
+ break;
+ }
+
+ } catch (ClassNotFoundException | IOException e) {
+ e.printStackTrace();
}
return CompletableFuture.completedFuture(
- Message.valueOf(counter.toString()));
+ Message.valueOf("ok"));
}
@@ -229,17 +256,33 @@ public class CounterStateMachine extends BaseStateMachine {
*/
@Override
public CompletableFuture applyTransaction(TransactionContext trx) {
+ // log.info("applyTransaction");
final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+
//check if the command is valid
// String logData = entry.getStateMachineLogEntry().getLogData()
// .toString(Charset.defaultCharset());
- Operate op ;
+
+ Operate op = null;
try {
- op = (Operate)new ObjectInputStream(entry.getStateMachineLogEntry().getLogData().newInput()).readObject();
+
+ var data = entry.getStateMachineLogEntry().getLogData();
+
+ var inBytes = new ByteArrayInputStream( data.toByteArray());
+ var inObject = new ObjectInputStream(inBytes);
+
+ // log.info("applyTransaction {}", inObject.toString());
+ op = (Operate)inObject.readObject();
+
+
+
+ log.info("applyTransaction {}", data);
+ inObject.close();
+ inBytes.close();
} catch (IOException | ClassNotFoundException e) {
- e.printStackTrace();
+ log.info("{}", e.toString());
return CompletableFuture.completedFuture(Message.valueOf("错误op"));
}
@@ -250,39 +293,36 @@ public class CounterStateMachine extends BaseStateMachine {
//update the last applied term and index
final long index = entry.getIndex();
- try(var r = writeLock()) {
+ try(var wlock = writeLock()) {
+
switch(op.getType()) {
case ALLOCATE_PACKETS:
break;
- case GET_STATE:
- break;
case PUT_WORKERSTATE:
- var ws = op.getValue();
+
+ var ws = (WorkerState)op.getValue();
+ log.info("applyTransaction {}", 3);
state.getWorkers().put(ws.getPeerId() , ws);
+
break;
case REMOVE:
break;
default:
break;
-
}
- updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
+ updateLastAppliedTermIndex(entry.getTerm(), index);
}
-
-
- //actual execution of the command: increment the counter
- // counter.incrementAndGet();
-
+
//return the new value of the counter to the client
final CompletableFuture f =
CompletableFuture.completedFuture(Message.valueOf("put ok"));
//if leader, log the incremented value and it's log index
if (isLeader()) {
- log.info("{}: Increment to {}", index, counter.toString());
+ log.info("{}: getType {}", index, op.getType());
}
-
+ // log.info("applyTransaction {}", 6);
return f;
}
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java
similarity index 88%
rename from src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java
rename to src/main/java/com/yuandian/dataflow/statemachine/StateServer.java
index 392ebca..1eeadf7 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java
@@ -41,7 +41,7 @@ import java.util.UUID;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
- * Simplest Ratis server, use a simple state machine {@link CounterStateMachine}
+ * Simplest Ratis server, use a simple state machine {@link StateMachine}
* which maintain a counter across multi server.
* This server application designed to run several times with different
* parameters (1,2 or 3). server addresses hard coded in {@link Constants}
@@ -49,12 +49,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* Run this application three times with three different parameter set-up a
* ratis cluster which maintain a counter value replicated in each server memory
*/
-public final class CounterServer implements Closeable {
+public final class StateServer implements Closeable {
private final RaftServer server;
public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77");
- public CounterServer(RaftPeer peer, ArrayList peers, File storageDir) throws IOException {
+ public StateServer(RaftPeer peer, ArrayList peers, File storageDir) throws IOException {
//create a property object
RaftProperties properties = new RaftProperties();
@@ -66,7 +66,7 @@ public final class CounterServer implements Closeable {
GrpcConfigKeys.Server.setPort(properties, port);
//create the counter state machine which hold the counter value
- CounterStateMachine counterStateMachine = new CounterStateMachine();
+ StateMachine counterStateMachine = new StateMachine();
RaftGroup raftGroup = RaftGroup.valueOf(
RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
@@ -91,7 +91,7 @@ public final class CounterServer implements Closeable {
public static void main(String[] args) throws IOException {
if (args.length < 1) {
- System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
+ System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}");
System.err.println("{serverIndex} could be 1, 2 or 3");
System.exit(1);
}
@@ -109,9 +109,9 @@ public final class CounterServer implements Closeable {
//start a counter server
final File storageDir = new File("./raftdata/" + currentPeer.getId());
- final CounterServer counterServer = new CounterServer(currentPeer, peers, storageDir);
+ final StateServer stateServer = new StateServer(currentPeer, peers, storageDir);
- counterServer.start();
+ stateServer.start();
@@ -119,6 +119,6 @@ public final class CounterServer implements Closeable {
//exit when any input entered
Scanner scanner = new Scanner(System.in, UTF_8.name());
scanner.nextLine();
- counterServer.close();
+ stateServer.close();
}
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java b/src/main/java/com/yuandian/dataflow/statemachine/annotations/GrpcProcessor.java
similarity index 80%
rename from src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java
rename to src/main/java/com/yuandian/dataflow/statemachine/annotations/GrpcProcessor.java
index d7eda8a..3bfb88e 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/annotations/GrpcProcessor.java
@@ -4,7 +4,7 @@
* @author eson
*2022年7月21日-14:27:49
*/
-package com.yuandian.dataflow.statemachine_old.annotations;
+package com.yuandian.dataflow.statemachine.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -19,5 +19,5 @@ import java.lang.annotation.Target;
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
-public @interface WorkerRegister {
+public @interface GrpcProcessor {
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java b/src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java
similarity index 88%
rename from src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java
rename to src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java
index 497114c..968b73c 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java
@@ -4,7 +4,7 @@
* @author eson
*2022年7月21日-14:27:49
*/
-package com.yuandian.dataflow.statemachine_old.annotations;
+package com.yuandian.dataflow.statemachine.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java
index a639c27..9f87896 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java
@@ -32,11 +32,17 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftRpcMessage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.NetUtils;
import org.springframework.cglib.proxy.CallbackFilter;
-import com.yuandian.dataflow.statemachine.CounterServer;
+import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
+import com.yuandian.dataflow.statemachine.StateServer;
+import com.yuandian.dataflow.statemachine.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.state.WorkerState;
+import com.yuandian.dataflow.statemachine.Operate;
+import com.yuandian.dataflow.statemachine.Query;
import lombok.extern.slf4j.Slf4j;
@@ -77,14 +83,17 @@ public final class CounterClient {
// concurrently
ExecutorService executorService = Executors.newFixedThreadPool(10);
- increment = 1000;
+ increment = 10;
CountDownLatch latch = new CountDownLatch(increment);
//send INCREMENT commands concurrently
System.out.printf("Sending %d increment command...%n", increment);
Instant now = Instant.now();
for (int i = 0; i < increment; i++) {
- executorService.submit(() ->
- raftClient.io().send(Message.valueOf("INCREMENT")));
+
+ var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(null));
+ var reply = raftClient.io().send(op);
+ log.info("{}", reply);
+ executorService.submit(() -> raftClient.io().send(op));
latch.countDown();
}
@@ -96,8 +105,8 @@ public final class CounterClient {
log.info("{}", Duration.between(now, Instant.now()).toMillis());
//send GET command and print the response
-
- RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
+ var query = new Query(Query.Type.GET_WORKER_STATE, new WorkerState(null));
+ RaftClientReply count = raftClient.io().sendReadOnly(query);
String response = count.getMessage().getContent().toString(Charset.defaultCharset());
System.out.println(response);
@@ -126,7 +135,7 @@ public final class CounterClient {
peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build());
}
RaftGroup raftGroup = RaftGroup.valueOf(
- RaftGroupId.valueOf(CounterServer.CLUSTER_GROUP_ID), peers);
+ RaftGroupId.valueOf(StateServer.CLUSTER_GROUP_ID), peers);
RaftClient.Builder builder = RaftClient.newBuilder()
.setProperties(raftProperties)
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java
index ea36356..74e5b5f 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java
@@ -16,13 +16,13 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
-import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
+// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.master.MasterContext;
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
-import com.yuandian.dataflow.statemachine_old.operate.Operate;
-import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
+import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
index fdc26b0..1af96f1 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
@@ -49,12 +49,12 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
-import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister;
-import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
+import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
+import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
-import com.yuandian.dataflow.statemachine_old.operate.Operate;
-import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
+import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine_old.state.State;
@@ -120,7 +120,7 @@ public class StateFactory {
ss.readIndexState(closure);
}
- public static void applyOperate(Operate op, GenericClosure closure) {
+ public static void applyOperate(OperateOld op, GenericClosure closure) {
ss.applyOperate(op, closure);
}
@@ -195,7 +195,7 @@ public class StateFactory {
log.info("获取 {} -> {} 下包的所有注解", clsName, packName);
var refl = new Reflections(packName);
- Set> scans = refl.getTypesAnnotatedWith(WorkerRegister.class);
+ Set> scans = refl.getTypesAnnotatedWith(GrpcProcessor.class);
scans.forEach((pRaftClass) -> {
scansMap.put(pRaftClass.getName(), pRaftClass);
@@ -263,7 +263,7 @@ public class StateFactory {
}
- public void applyOperate(Operate op, GenericClosure closure) {
+ public void applyOperate(OperateOld op, GenericClosure closure) {
// 所有的提交都必须再leader进行
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
index d0cf88e..ac24784 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
@@ -17,10 +17,10 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
-import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
+// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.operate.Operate;
-import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
+import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import com.yuandian.dataflow.utils.Utils;
@@ -65,19 +65,19 @@ public class StateMachine extends StateMachineAdapter {
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
- Operate op = null;
+ OperateOld op = null;
GenericClosure closure = null;
if (iter.done() != null) {
// leader可以直接从 回调closure里提取operate
closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
- op = (Operate)closure.getValue();
+ op = (OperateOld)closure.getValue();
} else {
// 非leader 需要从getData反序列化出来后处理
final ByteBuffer data = iter.getData();
try {
op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(),
- Operate.class.getName());
+ OperateOld.class.getName());
} catch (CodecException e) {
log.info("{}", e.toString());
}
@@ -114,7 +114,7 @@ public class StateMachine extends StateMachineAdapter {
log.error("WorkerState获取的状态为 {}", ws);
continue;
}
- var can = Operate.MAX_TASKS - ws.getTaskQueueSize();
+ var can = OperateOld.MAX_TASKS - ws.getTaskQueueSize();
canTasks[i] = can;
if(!isNext) {
isNext = true;
@@ -127,7 +127,7 @@ public class StateMachine extends StateMachineAdapter {
// log.info("size: {}", Operate.packetsManager.size());
// 统计每个节点发送多少任务
- var allocTasks = Utils.allocationTasks(Operate.packetsManager.size(), canTasks);
+ var allocTasks = Utils.allocationTasks(OperateOld.packetsManager.size(), canTasks);
if(closure != null) {
closure.setValue(allocTasks);
}
@@ -144,10 +144,10 @@ public class StateMachine extends StateMachineAdapter {
}
break;
- case GET_STATE:
- closure.setValue(this.state);
- log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
- break;
+ // case GET_STATE:
+ // closure.setValue(this.state);
+ // log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
+ // break;
case REMOVE:
break;
default:
@@ -198,7 +198,7 @@ public class StateMachine extends StateMachineAdapter {
}
// 更新当前WorkerState
- StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
+ StateFactory.applyOperate(new OperateOld(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
@Override
public void run(Status status) {
log.debug("master update workerstate: {}", status);
@@ -242,7 +242,7 @@ public class StateMachine extends StateMachineAdapter {
// 更新当前WorkerState
- Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() {
+ OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() {
@Override
public void run(Status status) {
log.debug("onStartFollowing update workerstate: {}", status);
@@ -268,8 +268,8 @@ public class StateMachine extends StateMachineAdapter {
log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx);
var ws = new WorkerState(StateFactory.getServerId());
- var op = new Operate(OperateType.PUT_WORKERSTATE, ws);
- Operate.CallOperate(op, new GenericClosure() {
+ var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws);
+ OperateOld.CallOperate(op, new GenericClosure() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java
similarity index 92%
rename from src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java
index 0a269e7..fe93604 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@Data
-public class Operate implements Serializable {
+public class OperateOld implements Serializable {
private static int DEFAULT_ASYNC_TIMEOUT = 5000;
public static final int MAX_TASKS = 1000;
@@ -39,10 +39,7 @@ public class Operate implements Serializable {
*/
ALLOCATE_PACKETS,
- /**
- * 获取State状态
- */
- GET_STATE,
+
/**
* 暂无想法
*/
@@ -54,7 +51,7 @@ public class Operate implements Serializable {
- public Operate(OperateType t, Object v) {
+ public OperateOld(OperateType t, Object v) {
this.type = t;
this.value = v;
}
@@ -75,7 +72,7 @@ public class Operate implements Serializable {
* @param closure 回调函数. Operate为返回值
*/
@java.lang.SuppressWarnings("unchecked")
- public static void CallOperate(Operate op, GenericClosure closure) {
+ public static void CallOperate(OperateOld op, GenericClosure closure) {
// log.debug("CallOperate Value {}", op.getValue());
// 如果是leader 就直接提交
if (StateFactory.isLeader()) {
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
index 32c8ffc..8a22d8b 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
@@ -12,10 +12,10 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
+import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine_old.StateFactory;
-import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.operate.Operate;
+import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import javassist.ClassPath;
@@ -31,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
*2022年7月12日-11:10:54
*/
@Slf4j
-@WorkerRegister
+
public class OperateProcessor implements RpcProcessor {
/**
@@ -47,7 +47,7 @@ public class OperateProcessor implements RpcProcessor