diff --git a/.vscode/launch.json b/.vscode/launch.json index 73e21c5..b5e0972 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,13 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "type": "java", + "name": "Launch ProcessorServer", + "request": "launch", + "mainClass": "com.yuandian.dataflow.statemachine.grpc.ProcessorServer", + "projectName": "dataflow" + }, { "type": "java", "name": "Launch CounterClient", diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index 69927b8..bc89d93 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -17,14 +17,14 @@ import com.google.protobuf.Any; // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; import com.yuandian.dataflow.statemachine.annotations.MasterRegister; +import com.yuandian.dataflow.statemachine.master.MasterContext; +import com.yuandian.dataflow.statemachine.master.MasterExecute; +import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine_old.StateFactory; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.master.MasterContext; -import com.yuandian.dataflow.statemachine_old.master.MasterExecute; import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; -import com.yuandian.dataflow.statemachine_old.state.State; -import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.utils.PacketsManager; import com.yuandian.dataflow.utils.Utils; diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 5485aea..e21e623 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -17,16 +17,16 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.google.protobuf.Any; -import com.yuandian.dataflow.proto.Processor.PacketsRequest; -import com.yuandian.dataflow.proto.Processor.Response; +import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest; +import com.yuandian.dataflow.proto.Processor.ProcessorResponse; import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase; import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; +import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine_old.StateFactory; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine_old.state.State; import io.grpc.stub.StreamObserver; import lombok.Getter; @@ -43,17 +43,22 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @GrpcProcessor public class PacketsProcessor extends ProcessorServerImplBase { + @Override - public void packetsProcessor(PacketsRequest request, StreamObserver responseObserver) { + public void allPackets(PacketsProcessorRequest request, StreamObserver responseObserver) { - responseObserver.onNext( Response.newBuilder().build() ); + var response = ProcessorResponse.newBuilder(); + responseObserver.onNext( response.build() ); responseObserver.onCompleted(); - } - - - + super.allPackets(request, responseObserver); + log.info("packets {}", request.getPacketsList().size()); + + + } + + // @Setter // @Getter // public static class PacketsRequest implements Serializable { diff --git a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java index b5a0db3..379a6c9 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java @@ -18,7 +18,6 @@ import lombok.extern.slf4j.Slf4j; * 例子 强制转换leader */ @Slf4j -@GrpcProcessor public class TransferLeaderProcessor implements RpcProcessor { @Setter diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index 7030e64..c6bb7d5 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -13,6 +13,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.thirdparty.com.google.rpc.context.AttributeContext.Peer; + import com.google.protobuf.*; import com.google.protobuf.util.JsonFormat; import com.yuandian.common.Config; @@ -48,10 +51,12 @@ public class CollectPackets extends CollectPacketsServerImplBase { return null; }); + var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017); // var managedChannelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 60017); var channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build(); + Map> domap = getHandleFuncMap(); try { var stub = CollectPacketsServerGrpc.newBlockingStub(channel); diff --git a/src/main/java/com/yuandian/dataflow/proto/Processor.java b/src/main/java/com/yuandian/dataflow/proto/Processor.java index cfd71e9..bfc5385 100644 --- a/src/main/java/com/yuandian/dataflow/proto/Processor.java +++ b/src/main/java/com/yuandian/dataflow/proto/Processor.java @@ -14,64 +14,61 @@ public final class Processor { registerAllExtensions( (com.google.protobuf.ExtensionRegistryLite) registry); } - public interface ResponseOrBuilder extends - // @@protoc_insertion_point(interface_extends:com.yuandian.dataflow.proto.Response) + public interface ProcessorResponseOrBuilder extends + // @@protoc_insertion_point(interface_extends:dataflow.ProcessorResponse) com.google.protobuf.MessageOrBuilder { /** + *
+     * 返回的状态码
+     * 
+ * * int32 code = 1; * @return The code. */ int getCode(); /** - * string msg = 2; - * @return The msg. + *
+     * 消息
+     * 
+ * + * string message = 2; + * @return The message. */ - java.lang.String getMsg(); + java.lang.String getMessage(); /** - * string msg = 2; - * @return The bytes for msg. + *
+     * 消息
+     * 
+ * + * string message = 2; + * @return The bytes for message. */ com.google.protobuf.ByteString - getMsgBytes(); - - /** - * .google.protobuf.Any data = 3; - * @return Whether the data field is set. - */ - boolean hasData(); - /** - * .google.protobuf.Any data = 3; - * @return The data. - */ - com.google.protobuf.Any getData(); - /** - * .google.protobuf.Any data = 3; - */ - com.google.protobuf.AnyOrBuilder getDataOrBuilder(); + getMessageBytes(); } /** - * Protobuf type {@code com.yuandian.dataflow.proto.Response} + * Protobuf type {@code dataflow.ProcessorResponse} */ - public static final class Response extends + public static final class ProcessorResponse extends com.google.protobuf.GeneratedMessageV3 implements - // @@protoc_insertion_point(message_implements:com.yuandian.dataflow.proto.Response) - ResponseOrBuilder { + // @@protoc_insertion_point(message_implements:dataflow.ProcessorResponse) + ProcessorResponseOrBuilder { private static final long serialVersionUID = 0L; - // Use Response.newBuilder() to construct. - private Response(com.google.protobuf.GeneratedMessageV3.Builder builder) { + // Use ProcessorResponse.newBuilder() to construct. + private ProcessorResponse(com.google.protobuf.GeneratedMessageV3.Builder builder) { super(builder); } - private Response() { - msg_ = ""; + private ProcessorResponse() { + message_ = ""; } @java.lang.Override @SuppressWarnings({"unused"}) protected java.lang.Object newInstance( UnusedPrivateParameter unused) { - return new Response(); + return new ProcessorResponse(); } @java.lang.Override @@ -79,7 +76,7 @@ public final class Processor { getUnknownFields() { return this.unknownFields; } - private Response( + private ProcessorResponse( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { @@ -105,20 +102,7 @@ public final class Processor { case 18: { java.lang.String s = input.readStringRequireUtf8(); - msg_ = s; - break; - } - case 26: { - com.google.protobuf.Any.Builder subBuilder = null; - if (data_ != null) { - subBuilder = data_.toBuilder(); - } - data_ = input.readMessage(com.google.protobuf.Any.parser(), extensionRegistry); - if (subBuilder != null) { - subBuilder.mergeFrom(data_); - data_ = subBuilder.buildPartial(); - } - + message_ = s; break; } default: { @@ -144,20 +128,24 @@ public final class Processor { } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_Response_descriptor; + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_ProcessorResponse_descriptor; } @java.lang.Override protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_Response_fieldAccessorTable + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_ProcessorResponse_fieldAccessorTable .ensureFieldAccessorsInitialized( - com.yuandian.dataflow.proto.Processor.Response.class, com.yuandian.dataflow.proto.Processor.Response.Builder.class); + com.yuandian.dataflow.proto.Processor.ProcessorResponse.class, com.yuandian.dataflow.proto.Processor.ProcessorResponse.Builder.class); } public static final int CODE_FIELD_NUMBER = 1; private int code_; /** + *
+     * 返回的状态码
+     * 
+ * * int32 code = 1; * @return The code. */ @@ -166,70 +154,52 @@ public final class Processor { return code_; } - public static final int MSG_FIELD_NUMBER = 2; - private volatile java.lang.Object msg_; + public static final int MESSAGE_FIELD_NUMBER = 2; + private volatile java.lang.Object message_; /** - * string msg = 2; - * @return The msg. + *
+     * 消息
+     * 
+ * + * string message = 2; + * @return The message. */ @java.lang.Override - public java.lang.String getMsg() { - java.lang.Object ref = msg_; + public java.lang.String getMessage() { + java.lang.Object ref = message_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); - msg_ = s; + message_ = s; return s; } } /** - * string msg = 2; - * @return The bytes for msg. + *
+     * 消息
+     * 
+ * + * string message = 2; + * @return The bytes for message. */ @java.lang.Override public com.google.protobuf.ByteString - getMsgBytes() { - java.lang.Object ref = msg_; + getMessageBytes() { + java.lang.Object ref = message_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - msg_ = b; + message_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } - public static final int DATA_FIELD_NUMBER = 3; - private com.google.protobuf.Any data_; - /** - * .google.protobuf.Any data = 3; - * @return Whether the data field is set. - */ - @java.lang.Override - public boolean hasData() { - return data_ != null; - } - /** - * .google.protobuf.Any data = 3; - * @return The data. - */ - @java.lang.Override - public com.google.protobuf.Any getData() { - return data_ == null ? com.google.protobuf.Any.getDefaultInstance() : data_; - } - /** - * .google.protobuf.Any data = 3; - */ - @java.lang.Override - public com.google.protobuf.AnyOrBuilder getDataOrBuilder() { - return getData(); - } - private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -247,11 +217,8 @@ public final class Processor { if (code_ != 0) { output.writeInt32(1, code_); } - if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(msg_)) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 2, msg_); - } - if (data_ != null) { - output.writeMessage(3, getData()); + if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(message_)) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 2, message_); } unknownFields.writeTo(output); } @@ -266,12 +233,8 @@ public final class Processor { size += com.google.protobuf.CodedOutputStream .computeInt32Size(1, code_); } - if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(msg_)) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, msg_); - } - if (data_ != null) { - size += com.google.protobuf.CodedOutputStream - .computeMessageSize(3, getData()); + if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(message_)) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, message_); } size += unknownFields.getSerializedSize(); memoizedSize = size; @@ -283,20 +246,15 @@ public final class Processor { if (obj == this) { return true; } - if (!(obj instanceof com.yuandian.dataflow.proto.Processor.Response)) { + if (!(obj instanceof com.yuandian.dataflow.proto.Processor.ProcessorResponse)) { return super.equals(obj); } - com.yuandian.dataflow.proto.Processor.Response other = (com.yuandian.dataflow.proto.Processor.Response) obj; + com.yuandian.dataflow.proto.Processor.ProcessorResponse other = (com.yuandian.dataflow.proto.Processor.ProcessorResponse) obj; if (getCode() != other.getCode()) return false; - if (!getMsg() - .equals(other.getMsg())) return false; - if (hasData() != other.hasData()) return false; - if (hasData()) { - if (!getData() - .equals(other.getData())) return false; - } + if (!getMessage() + .equals(other.getMessage())) return false; if (!unknownFields.equals(other.unknownFields)) return false; return true; } @@ -310,80 +268,76 @@ public final class Processor { hash = (19 * hash) + getDescriptor().hashCode(); hash = (37 * hash) + CODE_FIELD_NUMBER; hash = (53 * hash) + getCode(); - hash = (37 * hash) + MSG_FIELD_NUMBER; - hash = (53 * hash) + getMsg().hashCode(); - if (hasData()) { - hash = (37 * hash) + DATA_FIELD_NUMBER; - hash = (53 * hash) + getData().hashCode(); - } + hash = (37 * hash) + MESSAGE_FIELD_NUMBER; + hash = (53 * hash) + getMessage().hashCode(); hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom( java.nio.ByteBuffer data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom( java.nio.ByteBuffer data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom(byte[] data) + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom(java.io.InputStream input) + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.Response parseDelimitedFrom(java.io.InputStream input) + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseDelimitedWithIOException(PARSER, input); } - public static com.yuandian.dataflow.proto.Processor.Response parseDelimitedFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseDelimitedWithIOException(PARSER, input, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input); } - public static com.yuandian.dataflow.proto.Processor.Response parseFrom( + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -396,7 +350,7 @@ public final class Processor { public static Builder newBuilder() { return DEFAULT_INSTANCE.toBuilder(); } - public static Builder newBuilder(com.yuandian.dataflow.proto.Processor.Response prototype) { + public static Builder newBuilder(com.yuandian.dataflow.proto.Processor.ProcessorResponse prototype) { return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); } @java.lang.Override @@ -412,26 +366,26 @@ public final class Processor { return builder; } /** - * Protobuf type {@code com.yuandian.dataflow.proto.Response} + * Protobuf type {@code dataflow.ProcessorResponse} */ public static final class Builder extends com.google.protobuf.GeneratedMessageV3.Builder implements - // @@protoc_insertion_point(builder_implements:com.yuandian.dataflow.proto.Response) - com.yuandian.dataflow.proto.Processor.ResponseOrBuilder { + // @@protoc_insertion_point(builder_implements:dataflow.ProcessorResponse) + com.yuandian.dataflow.proto.Processor.ProcessorResponseOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_Response_descriptor; + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_ProcessorResponse_descriptor; } @java.lang.Override protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_Response_fieldAccessorTable + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_ProcessorResponse_fieldAccessorTable .ensureFieldAccessorsInitialized( - com.yuandian.dataflow.proto.Processor.Response.class, com.yuandian.dataflow.proto.Processor.Response.Builder.class); + com.yuandian.dataflow.proto.Processor.ProcessorResponse.class, com.yuandian.dataflow.proto.Processor.ProcessorResponse.Builder.class); } - // Construct using com.yuandian.dataflow.proto.Processor.Response.newBuilder() + // Construct using com.yuandian.dataflow.proto.Processor.ProcessorResponse.newBuilder() private Builder() { maybeForceBuilderInitialization(); } @@ -451,31 +405,25 @@ public final class Processor { super.clear(); code_ = 0; - msg_ = ""; + message_ = ""; - if (dataBuilder_ == null) { - data_ = null; - } else { - data_ = null; - dataBuilder_ = null; - } return this; } @java.lang.Override public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_Response_descriptor; + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_ProcessorResponse_descriptor; } @java.lang.Override - public com.yuandian.dataflow.proto.Processor.Response getDefaultInstanceForType() { - return com.yuandian.dataflow.proto.Processor.Response.getDefaultInstance(); + public com.yuandian.dataflow.proto.Processor.ProcessorResponse getDefaultInstanceForType() { + return com.yuandian.dataflow.proto.Processor.ProcessorResponse.getDefaultInstance(); } @java.lang.Override - public com.yuandian.dataflow.proto.Processor.Response build() { - com.yuandian.dataflow.proto.Processor.Response result = buildPartial(); + public com.yuandian.dataflow.proto.Processor.ProcessorResponse build() { + com.yuandian.dataflow.proto.Processor.ProcessorResponse result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } @@ -483,15 +431,10 @@ public final class Processor { } @java.lang.Override - public com.yuandian.dataflow.proto.Processor.Response buildPartial() { - com.yuandian.dataflow.proto.Processor.Response result = new com.yuandian.dataflow.proto.Processor.Response(this); + public com.yuandian.dataflow.proto.Processor.ProcessorResponse buildPartial() { + com.yuandian.dataflow.proto.Processor.ProcessorResponse result = new com.yuandian.dataflow.proto.Processor.ProcessorResponse(this); result.code_ = code_; - result.msg_ = msg_; - if (dataBuilder_ == null) { - result.data_ = data_; - } else { - result.data_ = dataBuilder_.build(); - } + result.message_ = message_; onBuilt(); return result; } @@ -530,26 +473,23 @@ public final class Processor { } @java.lang.Override public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof com.yuandian.dataflow.proto.Processor.Response) { - return mergeFrom((com.yuandian.dataflow.proto.Processor.Response)other); + if (other instanceof com.yuandian.dataflow.proto.Processor.ProcessorResponse) { + return mergeFrom((com.yuandian.dataflow.proto.Processor.ProcessorResponse)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(com.yuandian.dataflow.proto.Processor.Response other) { - if (other == com.yuandian.dataflow.proto.Processor.Response.getDefaultInstance()) return this; + public Builder mergeFrom(com.yuandian.dataflow.proto.Processor.ProcessorResponse other) { + if (other == com.yuandian.dataflow.proto.Processor.ProcessorResponse.getDefaultInstance()) return this; if (other.getCode() != 0) { setCode(other.getCode()); } - if (!other.getMsg().isEmpty()) { - msg_ = other.msg_; + if (!other.getMessage().isEmpty()) { + message_ = other.message_; onChanged(); } - if (other.hasData()) { - mergeData(other.getData()); - } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -565,11 +505,11 @@ public final class Processor { com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { - com.yuandian.dataflow.proto.Processor.Response parsedMessage = null; + com.yuandian.dataflow.proto.Processor.ProcessorResponse parsedMessage = null; try { parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); } catch (com.google.protobuf.InvalidProtocolBufferException e) { - parsedMessage = (com.yuandian.dataflow.proto.Processor.Response) e.getUnfinishedMessage(); + parsedMessage = (com.yuandian.dataflow.proto.Processor.ProcessorResponse) e.getUnfinishedMessage(); throw e.unwrapIOException(); } finally { if (parsedMessage != null) { @@ -581,6 +521,10 @@ public final class Processor { private int code_ ; /** + *
+       * 返回的状态码
+       * 
+ * * int32 code = 1; * @return The code. */ @@ -589,6 +533,10 @@ public final class Processor { return code_; } /** + *
+       * 返回的状态码
+       * 
+ * * int32 code = 1; * @param value The code to set. * @return This builder for chaining. @@ -600,6 +548,10 @@ public final class Processor { return this; } /** + *
+       * 返回的状态码
+       * 
+ * * int32 code = 1; * @return This builder for chaining. */ @@ -610,200 +562,101 @@ public final class Processor { return this; } - private java.lang.Object msg_ = ""; + private java.lang.Object message_ = ""; /** - * string msg = 2; - * @return The msg. + *
+       * 消息
+       * 
+ * + * string message = 2; + * @return The message. */ - public java.lang.String getMsg() { - java.lang.Object ref = msg_; + public java.lang.String getMessage() { + java.lang.Object ref = message_; if (!(ref instanceof java.lang.String)) { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); - msg_ = s; + message_ = s; return s; } else { return (java.lang.String) ref; } } /** - * string msg = 2; - * @return The bytes for msg. + *
+       * 消息
+       * 
+ * + * string message = 2; + * @return The bytes for message. */ public com.google.protobuf.ByteString - getMsgBytes() { - java.lang.Object ref = msg_; + getMessageBytes() { + java.lang.Object ref = message_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - msg_ = b; + message_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * string msg = 2; - * @param value The msg to set. + *
+       * 消息
+       * 
+ * + * string message = 2; + * @param value The message to set. * @return This builder for chaining. */ - public Builder setMsg( + public Builder setMessage( java.lang.String value) { if (value == null) { throw new NullPointerException(); } - msg_ = value; + message_ = value; onChanged(); return this; } /** - * string msg = 2; + *
+       * 消息
+       * 
+ * + * string message = 2; * @return This builder for chaining. */ - public Builder clearMsg() { + public Builder clearMessage() { - msg_ = getDefaultInstance().getMsg(); + message_ = getDefaultInstance().getMessage(); onChanged(); return this; } /** - * string msg = 2; - * @param value The bytes for msg to set. + *
+       * 消息
+       * 
+ * + * string message = 2; + * @param value The bytes for message to set. * @return This builder for chaining. */ - public Builder setMsgBytes( + public Builder setMessageBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } checkByteStringIsUtf8(value); - msg_ = value; + message_ = value; onChanged(); return this; } - - private com.google.protobuf.Any data_; - private com.google.protobuf.SingleFieldBuilderV3< - com.google.protobuf.Any, com.google.protobuf.Any.Builder, com.google.protobuf.AnyOrBuilder> dataBuilder_; - /** - * .google.protobuf.Any data = 3; - * @return Whether the data field is set. - */ - public boolean hasData() { - return dataBuilder_ != null || data_ != null; - } - /** - * .google.protobuf.Any data = 3; - * @return The data. - */ - public com.google.protobuf.Any getData() { - if (dataBuilder_ == null) { - return data_ == null ? com.google.protobuf.Any.getDefaultInstance() : data_; - } else { - return dataBuilder_.getMessage(); - } - } - /** - * .google.protobuf.Any data = 3; - */ - public Builder setData(com.google.protobuf.Any value) { - if (dataBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - data_ = value; - onChanged(); - } else { - dataBuilder_.setMessage(value); - } - - return this; - } - /** - * .google.protobuf.Any data = 3; - */ - public Builder setData( - com.google.protobuf.Any.Builder builderForValue) { - if (dataBuilder_ == null) { - data_ = builderForValue.build(); - onChanged(); - } else { - dataBuilder_.setMessage(builderForValue.build()); - } - - return this; - } - /** - * .google.protobuf.Any data = 3; - */ - public Builder mergeData(com.google.protobuf.Any value) { - if (dataBuilder_ == null) { - if (data_ != null) { - data_ = - com.google.protobuf.Any.newBuilder(data_).mergeFrom(value).buildPartial(); - } else { - data_ = value; - } - onChanged(); - } else { - dataBuilder_.mergeFrom(value); - } - - return this; - } - /** - * .google.protobuf.Any data = 3; - */ - public Builder clearData() { - if (dataBuilder_ == null) { - data_ = null; - onChanged(); - } else { - data_ = null; - dataBuilder_ = null; - } - - return this; - } - /** - * .google.protobuf.Any data = 3; - */ - public com.google.protobuf.Any.Builder getDataBuilder() { - - onChanged(); - return getDataFieldBuilder().getBuilder(); - } - /** - * .google.protobuf.Any data = 3; - */ - public com.google.protobuf.AnyOrBuilder getDataOrBuilder() { - if (dataBuilder_ != null) { - return dataBuilder_.getMessageOrBuilder(); - } else { - return data_ == null ? - com.google.protobuf.Any.getDefaultInstance() : data_; - } - } - /** - * .google.protobuf.Any data = 3; - */ - private com.google.protobuf.SingleFieldBuilderV3< - com.google.protobuf.Any, com.google.protobuf.Any.Builder, com.google.protobuf.AnyOrBuilder> - getDataFieldBuilder() { - if (dataBuilder_ == null) { - dataBuilder_ = new com.google.protobuf.SingleFieldBuilderV3< - com.google.protobuf.Any, com.google.protobuf.Any.Builder, com.google.protobuf.AnyOrBuilder>( - getData(), - getParentForChildren(), - isClean()); - data_ = null; - } - return dataBuilder_; - } @java.lang.Override public final Builder setUnknownFields( final com.google.protobuf.UnknownFieldSet unknownFields) { @@ -817,87 +670,138 @@ public final class Processor { } - // @@protoc_insertion_point(builder_scope:com.yuandian.dataflow.proto.Response) + // @@protoc_insertion_point(builder_scope:dataflow.ProcessorResponse) } - // @@protoc_insertion_point(class_scope:com.yuandian.dataflow.proto.Response) - private static final com.yuandian.dataflow.proto.Processor.Response DEFAULT_INSTANCE; + // @@protoc_insertion_point(class_scope:dataflow.ProcessorResponse) + private static final com.yuandian.dataflow.proto.Processor.ProcessorResponse DEFAULT_INSTANCE; static { - DEFAULT_INSTANCE = new com.yuandian.dataflow.proto.Processor.Response(); + DEFAULT_INSTANCE = new com.yuandian.dataflow.proto.Processor.ProcessorResponse(); } - public static com.yuandian.dataflow.proto.Processor.Response getDefaultInstance() { + public static com.yuandian.dataflow.proto.Processor.ProcessorResponse getDefaultInstance() { return DEFAULT_INSTANCE; } - private static final com.google.protobuf.Parser - PARSER = new com.google.protobuf.AbstractParser() { + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { @java.lang.Override - public Response parsePartialFrom( + public ProcessorResponse parsePartialFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { - return new Response(input, extensionRegistry); + return new ProcessorResponse(input, extensionRegistry); } }; - public static com.google.protobuf.Parser parser() { + public static com.google.protobuf.Parser parser() { return PARSER; } @java.lang.Override - public com.google.protobuf.Parser getParserForType() { + public com.google.protobuf.Parser getParserForType() { return PARSER; } @java.lang.Override - public com.yuandian.dataflow.proto.Processor.Response getDefaultInstanceForType() { + public com.yuandian.dataflow.proto.Processor.ProcessorResponse getDefaultInstanceForType() { return DEFAULT_INSTANCE; } } - public interface PacketsRequestOrBuilder extends - // @@protoc_insertion_point(interface_extends:com.yuandian.dataflow.proto.PacketsRequest) + public interface PacketsProcessorRequestOrBuilder extends + // @@protoc_insertion_point(interface_extends:dataflow.PacketsProcessorRequest) com.google.protobuf.MessageOrBuilder { /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求类型.  默认 为 1.  暂无意义
+     * 
+ * + * int32 type = 1; + * @return The type. + */ + int getType(); + + /** + *
+     * 请求版本.  区分版本. 非必要时不使用 
+     * 
+ * + * string version = 2; + * @return The version. + */ + java.lang.String getVersion(); + /** + *
+     * 请求版本.  区分版本. 非必要时不使用 
+     * 
+ * + * string version = 2; + * @return The bytes for version. + */ + com.google.protobuf.ByteString + getVersionBytes(); + + /** + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ java.util.List getPacketsList(); /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ com.google.protobuf.Any getPackets(int index); /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ int getPacketsCount(); /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ java.util.List getPacketsOrBuilderList(); /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ com.google.protobuf.AnyOrBuilder getPacketsOrBuilder( int index); } /** - * Protobuf type {@code com.yuandian.dataflow.proto.PacketsRequest} + * Protobuf type {@code dataflow.PacketsProcessorRequest} */ - public static final class PacketsRequest extends + public static final class PacketsProcessorRequest extends com.google.protobuf.GeneratedMessageV3 implements - // @@protoc_insertion_point(message_implements:com.yuandian.dataflow.proto.PacketsRequest) - PacketsRequestOrBuilder { + // @@protoc_insertion_point(message_implements:dataflow.PacketsProcessorRequest) + PacketsProcessorRequestOrBuilder { private static final long serialVersionUID = 0L; - // Use PacketsRequest.newBuilder() to construct. - private PacketsRequest(com.google.protobuf.GeneratedMessageV3.Builder builder) { + // Use PacketsProcessorRequest.newBuilder() to construct. + private PacketsProcessorRequest(com.google.protobuf.GeneratedMessageV3.Builder builder) { super(builder); } - private PacketsRequest() { + private PacketsProcessorRequest() { + version_ = ""; packets_ = java.util.Collections.emptyList(); } @@ -905,7 +809,7 @@ public final class Processor { @SuppressWarnings({"unused"}) protected java.lang.Object newInstance( UnusedPrivateParameter unused) { - return new PacketsRequest(); + return new PacketsProcessorRequest(); } @java.lang.Override @@ -913,7 +817,7 @@ public final class Processor { getUnknownFields() { return this.unknownFields; } - private PacketsRequest( + private PacketsProcessorRequest( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { @@ -932,7 +836,18 @@ public final class Processor { case 0: done = true; break; - case 10: { + case 8: { + + type_ = input.readInt32(); + break; + } + case 18: { + java.lang.String s = input.readStringRequireUtf8(); + + version_ = s; + break; + } + case 26: { if (!((mutable_bitField0_ & 0x00000001) != 0)) { packets_ = new java.util.ArrayList(); mutable_bitField0_ |= 0x00000001; @@ -967,28 +882,97 @@ public final class Processor { } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor; + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_PacketsProcessorRequest_descriptor; } @java.lang.Override protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_PacketsRequest_fieldAccessorTable + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_PacketsProcessorRequest_fieldAccessorTable .ensureFieldAccessorsInitialized( - com.yuandian.dataflow.proto.Processor.PacketsRequest.class, com.yuandian.dataflow.proto.Processor.PacketsRequest.Builder.class); + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.class, com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.Builder.class); } - public static final int PACKETS_FIELD_NUMBER = 1; + public static final int TYPE_FIELD_NUMBER = 1; + private int type_; + /** + *
+     * 请求类型.  默认 为 1.  暂无意义
+     * 
+ * + * int32 type = 1; + * @return The type. + */ + @java.lang.Override + public int getType() { + return type_; + } + + public static final int VERSION_FIELD_NUMBER = 2; + private volatile java.lang.Object version_; + /** + *
+     * 请求版本.  区分版本. 非必要时不使用 
+     * 
+ * + * string version = 2; + * @return The version. + */ + @java.lang.Override + public java.lang.String getVersion() { + java.lang.Object ref = version_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + version_ = s; + return s; + } + } + /** + *
+     * 请求版本.  区分版本. 非必要时不使用 
+     * 
+ * + * string version = 2; + * @return The bytes for version. + */ + @java.lang.Override + public com.google.protobuf.ByteString + getVersionBytes() { + java.lang.Object ref = version_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + version_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + public static final int PACKETS_FIELD_NUMBER = 3; private java.util.List packets_; /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ @java.lang.Override public java.util.List getPacketsList() { return packets_; } /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ @java.lang.Override public java.util.List @@ -996,21 +980,33 @@ public final class Processor { return packets_; } /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ @java.lang.Override public int getPacketsCount() { return packets_.size(); } /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ @java.lang.Override public com.google.protobuf.Any getPackets(int index) { return packets_.get(index); } /** - * repeated .google.protobuf.Any packets = 1; + *
+     * 请求的可变参数. 暂不使用
+     * 
+ * + * repeated .google.protobuf.Any packets = 3; */ @java.lang.Override public com.google.protobuf.AnyOrBuilder getPacketsOrBuilder( @@ -1032,8 +1028,14 @@ public final class Processor { @java.lang.Override public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { + if (type_ != 0) { + output.writeInt32(1, type_); + } + if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(version_)) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 2, version_); + } for (int i = 0; i < packets_.size(); i++) { - output.writeMessage(1, packets_.get(i)); + output.writeMessage(3, packets_.get(i)); } unknownFields.writeTo(output); } @@ -1044,9 +1046,16 @@ public final class Processor { if (size != -1) return size; size = 0; + if (type_ != 0) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, type_); + } + if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(version_)) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, version_); + } for (int i = 0; i < packets_.size(); i++) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(1, packets_.get(i)); + .computeMessageSize(3, packets_.get(i)); } size += unknownFields.getSerializedSize(); memoizedSize = size; @@ -1058,11 +1067,15 @@ public final class Processor { if (obj == this) { return true; } - if (!(obj instanceof com.yuandian.dataflow.proto.Processor.PacketsRequest)) { + if (!(obj instanceof com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest)) { return super.equals(obj); } - com.yuandian.dataflow.proto.Processor.PacketsRequest other = (com.yuandian.dataflow.proto.Processor.PacketsRequest) obj; + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest other = (com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) obj; + if (getType() + != other.getType()) return false; + if (!getVersion() + .equals(other.getVersion())) return false; if (!getPacketsList() .equals(other.getPacketsList())) return false; if (!unknownFields.equals(other.unknownFields)) return false; @@ -1076,6 +1089,10 @@ public final class Processor { } int hash = 41; hash = (19 * hash) + getDescriptor().hashCode(); + hash = (37 * hash) + TYPE_FIELD_NUMBER; + hash = (53 * hash) + getType(); + hash = (37 * hash) + VERSION_FIELD_NUMBER; + hash = (53 * hash) + getVersion().hashCode(); if (getPacketsCount() > 0) { hash = (37 * hash) + PACKETS_FIELD_NUMBER; hash = (53 * hash) + getPacketsList().hashCode(); @@ -1085,69 +1102,69 @@ public final class Processor { return hash; } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom( java.nio.ByteBuffer data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom( java.nio.ByteBuffer data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom(byte[] data) + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom(java.io.InputStream input) + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseDelimitedFrom(java.io.InputStream input) + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseDelimitedWithIOException(PARSER, input); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseDelimitedFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseDelimitedWithIOException(PARSER, input, extensionRegistry); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest parseFrom( + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -1160,7 +1177,7 @@ public final class Processor { public static Builder newBuilder() { return DEFAULT_INSTANCE.toBuilder(); } - public static Builder newBuilder(com.yuandian.dataflow.proto.Processor.PacketsRequest prototype) { + public static Builder newBuilder(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest prototype) { return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); } @java.lang.Override @@ -1176,26 +1193,26 @@ public final class Processor { return builder; } /** - * Protobuf type {@code com.yuandian.dataflow.proto.PacketsRequest} + * Protobuf type {@code dataflow.PacketsProcessorRequest} */ public static final class Builder extends com.google.protobuf.GeneratedMessageV3.Builder implements - // @@protoc_insertion_point(builder_implements:com.yuandian.dataflow.proto.PacketsRequest) - com.yuandian.dataflow.proto.Processor.PacketsRequestOrBuilder { + // @@protoc_insertion_point(builder_implements:dataflow.PacketsProcessorRequest) + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequestOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor; + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_PacketsProcessorRequest_descriptor; } @java.lang.Override protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_PacketsRequest_fieldAccessorTable + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_PacketsProcessorRequest_fieldAccessorTable .ensureFieldAccessorsInitialized( - com.yuandian.dataflow.proto.Processor.PacketsRequest.class, com.yuandian.dataflow.proto.Processor.PacketsRequest.Builder.class); + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.class, com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.Builder.class); } - // Construct using com.yuandian.dataflow.proto.Processor.PacketsRequest.newBuilder() + // Construct using com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.newBuilder() private Builder() { maybeForceBuilderInitialization(); } @@ -1214,6 +1231,10 @@ public final class Processor { @java.lang.Override public Builder clear() { super.clear(); + type_ = 0; + + version_ = ""; + if (packetsBuilder_ == null) { packets_ = java.util.Collections.emptyList(); bitField0_ = (bitField0_ & ~0x00000001); @@ -1226,17 +1247,17 @@ public final class Processor { @java.lang.Override public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return com.yuandian.dataflow.proto.Processor.internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor; + return com.yuandian.dataflow.proto.Processor.internal_static_dataflow_PacketsProcessorRequest_descriptor; } @java.lang.Override - public com.yuandian.dataflow.proto.Processor.PacketsRequest getDefaultInstanceForType() { - return com.yuandian.dataflow.proto.Processor.PacketsRequest.getDefaultInstance(); + public com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest getDefaultInstanceForType() { + return com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance(); } @java.lang.Override - public com.yuandian.dataflow.proto.Processor.PacketsRequest build() { - com.yuandian.dataflow.proto.Processor.PacketsRequest result = buildPartial(); + public com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest build() { + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } @@ -1244,9 +1265,11 @@ public final class Processor { } @java.lang.Override - public com.yuandian.dataflow.proto.Processor.PacketsRequest buildPartial() { - com.yuandian.dataflow.proto.Processor.PacketsRequest result = new com.yuandian.dataflow.proto.Processor.PacketsRequest(this); + public com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest buildPartial() { + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest result = new com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest(this); int from_bitField0_ = bitField0_; + result.type_ = type_; + result.version_ = version_; if (packetsBuilder_ == null) { if (((bitField0_ & 0x00000001) != 0)) { packets_ = java.util.Collections.unmodifiableList(packets_); @@ -1294,16 +1317,23 @@ public final class Processor { } @java.lang.Override public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof com.yuandian.dataflow.proto.Processor.PacketsRequest) { - return mergeFrom((com.yuandian.dataflow.proto.Processor.PacketsRequest)other); + if (other instanceof com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) { + return mergeFrom((com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(com.yuandian.dataflow.proto.Processor.PacketsRequest other) { - if (other == com.yuandian.dataflow.proto.Processor.PacketsRequest.getDefaultInstance()) return this; + public Builder mergeFrom(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest other) { + if (other == com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance()) return this; + if (other.getType() != 0) { + setType(other.getType()); + } + if (!other.getVersion().isEmpty()) { + version_ = other.version_; + onChanged(); + } if (packetsBuilder_ == null) { if (!other.packets_.isEmpty()) { if (packets_.isEmpty()) { @@ -1345,11 +1375,11 @@ public final class Processor { com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { - com.yuandian.dataflow.proto.Processor.PacketsRequest parsedMessage = null; + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest parsedMessage = null; try { parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); } catch (com.google.protobuf.InvalidProtocolBufferException e) { - parsedMessage = (com.yuandian.dataflow.proto.Processor.PacketsRequest) e.getUnfinishedMessage(); + parsedMessage = (com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) e.getUnfinishedMessage(); throw e.unwrapIOException(); } finally { if (parsedMessage != null) { @@ -1360,6 +1390,145 @@ public final class Processor { } private int bitField0_; + private int type_ ; + /** + *
+       * 请求类型.  默认 为 1.  暂无意义
+       * 
+ * + * int32 type = 1; + * @return The type. + */ + @java.lang.Override + public int getType() { + return type_; + } + /** + *
+       * 请求类型.  默认 为 1.  暂无意义
+       * 
+ * + * int32 type = 1; + * @param value The type to set. + * @return This builder for chaining. + */ + public Builder setType(int value) { + + type_ = value; + onChanged(); + return this; + } + /** + *
+       * 请求类型.  默认 为 1.  暂无意义
+       * 
+ * + * int32 type = 1; + * @return This builder for chaining. + */ + public Builder clearType() { + + type_ = 0; + onChanged(); + return this; + } + + private java.lang.Object version_ = ""; + /** + *
+       * 请求版本.  区分版本. 非必要时不使用 
+       * 
+ * + * string version = 2; + * @return The version. + */ + public java.lang.String getVersion() { + java.lang.Object ref = version_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + version_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + *
+       * 请求版本.  区分版本. 非必要时不使用 
+       * 
+ * + * string version = 2; + * @return The bytes for version. + */ + public com.google.protobuf.ByteString + getVersionBytes() { + java.lang.Object ref = version_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + version_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + *
+       * 请求版本.  区分版本. 非必要时不使用 
+       * 
+ * + * string version = 2; + * @param value The version to set. + * @return This builder for chaining. + */ + public Builder setVersion( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + + version_ = value; + onChanged(); + return this; + } + /** + *
+       * 请求版本.  区分版本. 非必要时不使用 
+       * 
+ * + * string version = 2; + * @return This builder for chaining. + */ + public Builder clearVersion() { + + version_ = getDefaultInstance().getVersion(); + onChanged(); + return this; + } + /** + *
+       * 请求版本.  区分版本. 非必要时不使用 
+       * 
+ * + * string version = 2; + * @param value The bytes for version to set. + * @return This builder for chaining. + */ + public Builder setVersionBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + + version_ = value; + onChanged(); + return this; + } + private java.util.List packets_ = java.util.Collections.emptyList(); private void ensurePacketsIsMutable() { @@ -1373,7 +1542,11 @@ public final class Processor { com.google.protobuf.Any, com.google.protobuf.Any.Builder, com.google.protobuf.AnyOrBuilder> packetsBuilder_; /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public java.util.List getPacketsList() { if (packetsBuilder_ == null) { @@ -1383,7 +1556,11 @@ public final class Processor { } } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public int getPacketsCount() { if (packetsBuilder_ == null) { @@ -1393,7 +1570,11 @@ public final class Processor { } } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public com.google.protobuf.Any getPackets(int index) { if (packetsBuilder_ == null) { @@ -1403,7 +1584,11 @@ public final class Processor { } } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder setPackets( int index, com.google.protobuf.Any value) { @@ -1420,7 +1605,11 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder setPackets( int index, com.google.protobuf.Any.Builder builderForValue) { @@ -1434,7 +1623,11 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder addPackets(com.google.protobuf.Any value) { if (packetsBuilder_ == null) { @@ -1450,7 +1643,11 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder addPackets( int index, com.google.protobuf.Any value) { @@ -1467,7 +1664,11 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder addPackets( com.google.protobuf.Any.Builder builderForValue) { @@ -1481,7 +1682,11 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder addPackets( int index, com.google.protobuf.Any.Builder builderForValue) { @@ -1495,7 +1700,11 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder addAllPackets( java.lang.Iterable values) { @@ -1510,7 +1719,11 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder clearPackets() { if (packetsBuilder_ == null) { @@ -1523,7 +1736,11 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public Builder removePackets(int index) { if (packetsBuilder_ == null) { @@ -1536,14 +1753,22 @@ public final class Processor { return this; } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public com.google.protobuf.Any.Builder getPacketsBuilder( int index) { return getPacketsFieldBuilder().getBuilder(index); } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public com.google.protobuf.AnyOrBuilder getPacketsOrBuilder( int index) { @@ -1553,7 +1778,11 @@ public final class Processor { } } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public java.util.List getPacketsOrBuilderList() { @@ -1564,14 +1793,22 @@ public final class Processor { } } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public com.google.protobuf.Any.Builder addPacketsBuilder() { return getPacketsFieldBuilder().addBuilder( com.google.protobuf.Any.getDefaultInstance()); } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public com.google.protobuf.Any.Builder addPacketsBuilder( int index) { @@ -1579,7 +1816,11 @@ public final class Processor { index, com.google.protobuf.Any.getDefaultInstance()); } /** - * repeated .google.protobuf.Any packets = 1; + *
+       * 请求的可变参数. 暂不使用
+       * 
+ * + * repeated .google.protobuf.Any packets = 3; */ public java.util.List getPacketsBuilderList() { @@ -1612,56 +1853,56 @@ public final class Processor { } - // @@protoc_insertion_point(builder_scope:com.yuandian.dataflow.proto.PacketsRequest) + // @@protoc_insertion_point(builder_scope:dataflow.PacketsProcessorRequest) } - // @@protoc_insertion_point(class_scope:com.yuandian.dataflow.proto.PacketsRequest) - private static final com.yuandian.dataflow.proto.Processor.PacketsRequest DEFAULT_INSTANCE; + // @@protoc_insertion_point(class_scope:dataflow.PacketsProcessorRequest) + private static final com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest DEFAULT_INSTANCE; static { - DEFAULT_INSTANCE = new com.yuandian.dataflow.proto.Processor.PacketsRequest(); + DEFAULT_INSTANCE = new com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest(); } - public static com.yuandian.dataflow.proto.Processor.PacketsRequest getDefaultInstance() { + public static com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest getDefaultInstance() { return DEFAULT_INSTANCE; } - private static final com.google.protobuf.Parser - PARSER = new com.google.protobuf.AbstractParser() { + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { @java.lang.Override - public PacketsRequest parsePartialFrom( + public PacketsProcessorRequest parsePartialFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { - return new PacketsRequest(input, extensionRegistry); + return new PacketsProcessorRequest(input, extensionRegistry); } }; - public static com.google.protobuf.Parser parser() { + public static com.google.protobuf.Parser parser() { return PARSER; } @java.lang.Override - public com.google.protobuf.Parser getParserForType() { + public com.google.protobuf.Parser getParserForType() { return PARSER; } @java.lang.Override - public com.yuandian.dataflow.proto.Processor.PacketsRequest getDefaultInstanceForType() { + public com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest getDefaultInstanceForType() { return DEFAULT_INSTANCE; } } private static final com.google.protobuf.Descriptors.Descriptor - internal_static_com_yuandian_dataflow_proto_Response_descriptor; + internal_static_dataflow_ProcessorResponse_descriptor; private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_com_yuandian_dataflow_proto_Response_fieldAccessorTable; + internal_static_dataflow_ProcessorResponse_fieldAccessorTable; private static final com.google.protobuf.Descriptors.Descriptor - internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor; + internal_static_dataflow_PacketsProcessorRequest_descriptor; private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_com_yuandian_dataflow_proto_PacketsRequest_fieldAccessorTable; + internal_static_dataflow_PacketsProcessorRequest_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -1671,34 +1912,34 @@ public final class Processor { descriptor; static { java.lang.String[] descriptorData = { - "\n\017Processor.proto\022\033com.yuandian.dataflow" + - ".proto\032\031google/protobuf/any.proto\"I\n\010Res" + - "ponse\022\014\n\004code\030\001 \001(\005\022\013\n\003msg\030\002 \001(\t\022\"\n\004data" + - "\030\003 \001(\0132\024.google.protobuf.Any\"7\n\016PacketsR" + - "equest\022%\n\007packets\030\001 \003(\0132\024.google.protobu" + - "f.Any2}\n\017ProcessorServer\022j\n\020PacketsProce" + - "ssor\022+.com.yuandian.dataflow.proto.Packe" + - "tsRequest\032%.com.yuandian.dataflow.proto." + - "Response\"\0000\001B\037\n\033com.yuandian.dataflow.pr" + - "otoP\000b\006proto3" + "\n\017Processor.proto\022\010dataflow\032\031google/prot" + + "obuf/any.proto\"2\n\021ProcessorResponse\022\014\n\004c" + + "ode\030\001 \001(\005\022\017\n\007message\030\002 \001(\t\"_\n\027PacketsPro" + + "cessorRequest\022\014\n\004type\030\001 \001(\005\022\017\n\007version\030\002" + + " \001(\t\022%\n\007packets\030\003 \003(\0132\024.google.protobuf." + + "Any2_\n\017ProcessorServer\022L\n\nAllPackets\022!.d" + + "ataflow.PacketsProcessorRequest\032\033.datafl" + + "ow.ProcessorResponseB4\n\033com.yuandian.dat" + + "aflow.protoP\000Z\023../grpc-gen;grpcgenb\006prot" + + "o3" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[] { com.google.protobuf.AnyProto.getDescriptor(), }); - internal_static_com_yuandian_dataflow_proto_Response_descriptor = + internal_static_dataflow_ProcessorResponse_descriptor = getDescriptor().getMessageTypes().get(0); - internal_static_com_yuandian_dataflow_proto_Response_fieldAccessorTable = new + internal_static_dataflow_ProcessorResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_com_yuandian_dataflow_proto_Response_descriptor, - new java.lang.String[] { "Code", "Msg", "Data", }); - internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor = + internal_static_dataflow_ProcessorResponse_descriptor, + new java.lang.String[] { "Code", "Message", }); + internal_static_dataflow_PacketsProcessorRequest_descriptor = getDescriptor().getMessageTypes().get(1); - internal_static_com_yuandian_dataflow_proto_PacketsRequest_fieldAccessorTable = new + internal_static_dataflow_PacketsProcessorRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_com_yuandian_dataflow_proto_PacketsRequest_descriptor, - new java.lang.String[] { "Packets", }); + internal_static_dataflow_PacketsProcessorRequest_descriptor, + new java.lang.String[] { "Type", "Version", "Packets", }); com.google.protobuf.AnyProto.getDescriptor(); } diff --git a/src/main/java/com/yuandian/dataflow/proto/ProcessorServerGrpc.java b/src/main/java/com/yuandian/dataflow/proto/ProcessorServerGrpc.java index 60408d5..640ef49 100644 --- a/src/main/java/com/yuandian/dataflow/proto/ProcessorServerGrpc.java +++ b/src/main/java/com/yuandian/dataflow/proto/ProcessorServerGrpc.java @@ -24,38 +24,38 @@ public final class ProcessorServerGrpc { private ProcessorServerGrpc() {} - public static final String SERVICE_NAME = "com.yuandian.dataflow.proto.ProcessorServer"; + public static final String SERVICE_NAME = "dataflow.ProcessorServer"; // Static method descriptors that strictly reflect the proto. - private static volatile io.grpc.MethodDescriptor getPacketsProcessorMethod; + private static volatile io.grpc.MethodDescriptor getAllPacketsMethod; @io.grpc.stub.annotations.RpcMethod( - fullMethodName = SERVICE_NAME + '/' + "PacketsProcessor", - requestType = com.yuandian.dataflow.proto.Processor.PacketsRequest.class, - responseType = com.yuandian.dataflow.proto.Processor.Response.class, - methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) - public static io.grpc.MethodDescriptor getPacketsProcessorMethod() { - io.grpc.MethodDescriptor getPacketsProcessorMethod; - if ((getPacketsProcessorMethod = ProcessorServerGrpc.getPacketsProcessorMethod) == null) { + fullMethodName = SERVICE_NAME + '/' + "AllPackets", + requestType = com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.class, + responseType = com.yuandian.dataflow.proto.Processor.ProcessorResponse.class, + methodType = io.grpc.MethodDescriptor.MethodType.UNARY) + public static io.grpc.MethodDescriptor getAllPacketsMethod() { + io.grpc.MethodDescriptor getAllPacketsMethod; + if ((getAllPacketsMethod = ProcessorServerGrpc.getAllPacketsMethod) == null) { synchronized (ProcessorServerGrpc.class) { - if ((getPacketsProcessorMethod = ProcessorServerGrpc.getPacketsProcessorMethod) == null) { - ProcessorServerGrpc.getPacketsProcessorMethod = getPacketsProcessorMethod = - io.grpc.MethodDescriptor.newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "PacketsProcessor")) + if ((getAllPacketsMethod = ProcessorServerGrpc.getAllPacketsMethod) == null) { + ProcessorServerGrpc.getAllPacketsMethod = getAllPacketsMethod = + io.grpc.MethodDescriptor.newBuilder() + .setType(io.grpc.MethodDescriptor.MethodType.UNARY) + .setFullMethodName(generateFullMethodName(SERVICE_NAME, "AllPackets")) .setSampledToLocalTracing(true) .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - com.yuandian.dataflow.proto.Processor.PacketsRequest.getDefaultInstance())) + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance())) .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( - com.yuandian.dataflow.proto.Processor.Response.getDefaultInstance())) - .setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("PacketsProcessor")) + com.yuandian.dataflow.proto.Processor.ProcessorResponse.getDefaultInstance())) + .setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("AllPackets")) .build(); } } } - return getPacketsProcessorMethod; + return getAllPacketsMethod; } /** @@ -108,20 +108,20 @@ public final class ProcessorServerGrpc { /** */ - public void packetsProcessor(com.yuandian.dataflow.proto.Processor.PacketsRequest request, - io.grpc.stub.StreamObserver responseObserver) { - asyncUnimplementedUnaryCall(getPacketsProcessorMethod(), responseObserver); + public void allPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request, + io.grpc.stub.StreamObserver responseObserver) { + asyncUnimplementedUnaryCall(getAllPacketsMethod(), responseObserver); } @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() { return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()) .addMethod( - getPacketsProcessorMethod(), - asyncServerStreamingCall( + getAllPacketsMethod(), + asyncUnaryCall( new MethodHandlers< - com.yuandian.dataflow.proto.Processor.PacketsRequest, - com.yuandian.dataflow.proto.Processor.Response>( - this, METHODID_PACKETS_PROCESSOR))) + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest, + com.yuandian.dataflow.proto.Processor.ProcessorResponse>( + this, METHODID_ALL_PACKETS))) .build(); } } @@ -142,10 +142,10 @@ public final class ProcessorServerGrpc { /** */ - public void packetsProcessor(com.yuandian.dataflow.proto.Processor.PacketsRequest request, - io.grpc.stub.StreamObserver responseObserver) { - asyncServerStreamingCall( - getChannel().newCall(getPacketsProcessorMethod(), getCallOptions()), request, responseObserver); + public void allPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request, + io.grpc.stub.StreamObserver responseObserver) { + asyncUnaryCall( + getChannel().newCall(getAllPacketsMethod(), getCallOptions()), request, responseObserver); } } @@ -165,10 +165,9 @@ public final class ProcessorServerGrpc { /** */ - public java.util.Iterator packetsProcessor( - com.yuandian.dataflow.proto.Processor.PacketsRequest request) { - return blockingServerStreamingCall( - getChannel(), getPacketsProcessorMethod(), getCallOptions(), request); + public com.yuandian.dataflow.proto.Processor.ProcessorResponse allPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) { + return blockingUnaryCall( + getChannel(), getAllPacketsMethod(), getCallOptions(), request); } } @@ -185,9 +184,17 @@ public final class ProcessorServerGrpc { io.grpc.Channel channel, io.grpc.CallOptions callOptions) { return new ProcessorServerFutureStub(channel, callOptions); } + + /** + */ + public com.google.common.util.concurrent.ListenableFuture allPackets( + com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) { + return futureUnaryCall( + getChannel().newCall(getAllPacketsMethod(), getCallOptions()), request); + } } - private static final int METHODID_PACKETS_PROCESSOR = 0; + private static final int METHODID_ALL_PACKETS = 0; private static final class MethodHandlers implements io.grpc.stub.ServerCalls.UnaryMethod, @@ -206,9 +213,9 @@ public final class ProcessorServerGrpc { @java.lang.SuppressWarnings("unchecked") public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) { switch (methodId) { - case METHODID_PACKETS_PROCESSOR: - serviceImpl.packetsProcessor((com.yuandian.dataflow.proto.Processor.PacketsRequest) request, - (io.grpc.stub.StreamObserver) responseObserver); + case METHODID_ALL_PACKETS: + serviceImpl.allPackets((com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) request, + (io.grpc.stub.StreamObserver) responseObserver); break; default: throw new AssertionError(); @@ -271,7 +278,7 @@ public final class ProcessorServerGrpc { if (result == null) { serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME) .setSchemaDescriptor(new ProcessorServerFileDescriptorSupplier()) - .addMethod(getPacketsProcessorMethod()) + .addMethod(getAllPacketsMethod()) .build(); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java b/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java new file mode 100644 index 0000000..20b188b --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java @@ -0,0 +1,32 @@ +package com.yuandian.dataflow.statemachine; + +import org.apache.ratis.protocol.RaftPeer; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +public class PeerId { + + public PeerId(RaftPeer raftPeer, int processorPort) { + this.raftPeer = raftPeer; + this.processorPort = processorPort; + } + + private RaftPeer raftPeer; + private int processorPort; + + @Override + public boolean equals(Object arg0) { + return getRaftPeer().getId().toString() == ((PeerId)arg0).getRaftPeer().getId().toString(); + } + + @Override + public int hashCode() { + return getRaftPeer().getId().hashCode() ; + } + +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Query.java b/src/main/java/com/yuandian/dataflow/statemachine/Query.java index 89f0c46..622e109 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/Query.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/Query.java @@ -9,7 +9,7 @@ import javax.servlet.http.PushBuilder; import org.apache.ratis.protocol.Message; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import com.yuandian.dataflow.statemachine_old.state.WorkerState; +import com.yuandian.dataflow.statemachine.state.WorkerState; import lombok.Getter; import lombok.Setter; @@ -24,7 +24,7 @@ public class Query implements Message,Serializable { /** * 同步WorkerState状态. */ - GET_WORKER_STATE, + GET_STATE, } private Type type; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index dafde19..2f712be 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -38,8 +38,9 @@ import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; import com.yuandian.dataflow.statemachine.Operate.OperateType; -import com.yuandian.dataflow.statemachine_old.state.State; -import com.yuandian.dataflow.statemachine_old.state.WorkerState; +import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.statemachine_old.MasterFactory; import lombok.extern.slf4j.Slf4j; @@ -197,11 +198,32 @@ public class StateMachine extends BaseStateMachine { @Override public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { - log.info("newLeaderId: {} groupMemberId: {}", newLeaderId , groupMemberId.getPeerId()); + leader.set(newLeaderId == groupMemberId.getPeerId()); + log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader()); + + + var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.CurrentPeerId())); + try { + var reply = StateServerFactory.send(op); + log.info("{}", reply); + } catch (IOException e) { + e.printStackTrace(); + } + + if (MasterFactory.getMasterExecute().isAlive()) + MasterFactory.getMasterExecute().interrupt(); + + if(isLeader()) + MasterFactory.getMasterExecute().start(); + + super.notifyLeaderChanged(groupMemberId, newLeaderId); } + + + @@ -221,7 +243,7 @@ public class StateMachine extends BaseStateMachine { log.info("{}", request); var op = (Query)inObject.readObject(); switch(op.getType()){ - case GET_WORKER_STATE: + case GET_STATE: try(var rlock = readLock()) { var ws = state.getWorkers().get( ((WorkerState)op.getValue()).getPeerId() ); if(ws == null) { @@ -232,11 +254,11 @@ public class StateMachine extends BaseStateMachine { default: - if (op.getType() == Query.Type.GET_WORKER_STATE ) { - return CompletableFuture.completedFuture( - Message.valueOf("Invalid Command")); - } - break; + + return CompletableFuture.completedFuture( + Message.valueOf("Invalid Command")); + + } } catch (ClassNotFoundException | IOException e) { @@ -272,13 +294,8 @@ public class StateMachine extends BaseStateMachine { var inBytes = new ByteArrayInputStream( data.toByteArray()); var inObject = new ObjectInputStream(inBytes); - - // log.info("applyTransaction {}", inObject.toString()); op = (Operate)inObject.readObject(); - - - - log.info("applyTransaction {}", data); + // log.info("applyTransaction {}", data); inObject.close(); inBytes.close(); } catch (IOException | ClassNotFoundException e) { @@ -301,7 +318,7 @@ public class StateMachine extends BaseStateMachine { case PUT_WORKERSTATE: var ws = (WorkerState)op.getValue(); - log.info("applyTransaction {}", 3); + // log.info("applyTransaction {}", OperateType.PUT_WORKERSTATE); state.getWorkers().put(ws.getPeerId() , ws); break; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index 1eeadf7..cecd728 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -18,9 +18,13 @@ package com.yuandian.dataflow.statemachine; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; // import org.apache.ratis.examples.common.Constants; import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcFactory; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; @@ -28,13 +32,19 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.NetUtils; +import com.yuandian.dataflow.statemachine.grpc.ProcessorServer; +import com.yuandian.dataflow.statemachine_old.MasterFactory; + import io.netty.util.internal.StringUtil; +import lombok.Getter; +import lombok.Setter; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Scanner; import java.util.UUID; @@ -49,12 +59,24 @@ import static java.nio.charset.StandardCharsets.UTF_8; * Run this application three times with three different parameter set-up a * ratis cluster which maintain a counter value replicated in each server memory */ +@Getter +@Setter public final class StateServer implements Closeable { - private final RaftServer server; + + public static HashMap activesPeers = new HashMap<>(); + + private final RaftClient raftClient; + private final RaftServer raftServer; + private final ProcessorServer processorServer; + + private final PeerId peer; public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); - public StateServer(RaftPeer peer, ArrayList peers, File storageDir) throws IOException { + public StateServer(RaftPeer curpeer, ArrayList peers) throws IOException { + + final File storageDir = new File("./raftdata/" + curpeer.getId()); + //create a property object RaftProperties properties = new RaftProperties(); @@ -62,63 +84,85 @@ public final class StateServer implements Closeable { RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir)); //set the port which server listen to in RaftProperty object - final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort(); + final int port = NetUtils.createSocketAddr(curpeer.getAddress()).getPort(); GrpcConfigKeys.Server.setPort(properties, port); - + //create the counter state machine which hold the counter value - StateMachine counterStateMachine = new StateMachine(); - + RaftGroup raftGroup = RaftGroup.valueOf( RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); + + + StateMachine stateMachine = new StateMachine(); //create and start the Raft server - this.server = RaftServer.newBuilder() + this.raftServer = RaftServer.newBuilder() .setGroup(raftGroup) .setProperties(properties) - .setServerId(peer.getId()) - .setStateMachine(counterStateMachine) + .setServerId(curpeer.getId()) + .setStateMachine(stateMachine) .build(); + + + // create RaftClient + raftClient = buildClient(peers,raftGroup); + + this.processorServer = new ProcessorServer(); + this.processorServer.getGrpcServer().start(); + this.peer = new PeerId(curpeer, this.processorServer.getGrpcServer().getPort()); } - public void start() throws IOException { - server.start(); + // block + public void start() throws IOException, InterruptedException { + raftServer.start(); + this.processorServer.getGrpcServer().awaitTermination(); } @Override public void close() throws IOException { - server.close(); + MasterFactory.getMasterExecute().interrupt(); + raftServer.close(); + this.processorServer.getGrpcServer().shutdown(); + } - public static void main(String[] args) throws IOException { + private static RaftClient buildClient( ArrayList peers, RaftGroup raftGroup) { + RaftProperties raftProperties = new RaftProperties(); + + RaftClient.Builder builder = RaftClient.newBuilder() + .setProperties(raftProperties) + .setRaftGroup(raftGroup) + .setClientRpc(new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties)); + return builder.build(); + } + + public static void main(String[] args) throws IOException, InterruptedException { if (args.length < 1) { System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}"); System.err.println("{serverIndex} could be 1, 2 or 3"); System.exit(1); } - var peers = new ArrayList(); - String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + StateServerFactory.startServer(args[0]); - for (int i = 0; i < addresses.length; i++) { - var port = addresses[i].split(":")[1]; - peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build()); - } + // var peers = new ArrayList(); + // String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; - //find current peer object based on application parameter - final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0])); + // for (int i = 0; i < addresses.length; i++) { + // // var port = addresses[i].split(":")[1]; + // peers.add(RaftPeer.newBuilder().setId("yd-" + args[0]).setAddress(addresses[i]).build()); + // } - //start a counter server - final File storageDir = new File("./raftdata/" + currentPeer.getId()); - final StateServer stateServer = new StateServer(currentPeer, peers, storageDir); - - stateServer.start(); - - + // //find current peer object based on application parameter + // final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0])); + // //start a counter server + // final File storageDir = new File("./raftdata/" + currentPeer.getId()); + // final StateServer stateServer = new StateServer(currentPeer, peers, storageDir); - //exit when any input entered - Scanner scanner = new Scanner(System.in, UTF_8.name()); - scanner.nextLine(); - stateServer.close(); + + + // stateServer.start(); + // stateServer.close(); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java new file mode 100644 index 0000000..9a5cf26 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -0,0 +1,44 @@ +package com.yuandian.dataflow.statemachine; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeer; + +public class StateServerFactory { + public static StateServer stateServer; + + public static void startServer(String sid) throws IOException, InterruptedException { + + var peers = new ArrayList(); + String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + + for (int i = 0; i < addresses.length; i++) { + // var port = addresses[i].split(":")[1]; + peers.add(RaftPeer.newBuilder().setId("yd-" + sid).setAddress(addresses[i]).build()); + } + + //find current peer object based on application parameter + final RaftPeer currentPeer = peers.get(Integer.parseInt(sid)); + + //start a counter server + + final StateServer stateServer = new StateServer(currentPeer, peers); + + stateServer.start(); + stateServer.close(); + } + + + public static PeerId CurrentPeerId() { + return stateServer.getPeer(); + } + + public static RaftClientReply send(Message msg) throws IOException { + + return stateServer.getRaftClient().io().send(msg); + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java index 9f87896..f18cbb0 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java @@ -40,7 +40,7 @@ import org.springframework.cglib.proxy.CallbackFilter; import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; import com.yuandian.dataflow.statemachine.StateServer; import com.yuandian.dataflow.statemachine.Operate.OperateType; -import com.yuandian.dataflow.statemachine_old.state.WorkerState; +import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine.Operate; import com.yuandian.dataflow.statemachine.Query; @@ -92,6 +92,7 @@ public final class CounterClient { var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(null)); var reply = raftClient.io().send(op); + log.info("{}", reply); executorService.submit(() -> raftClient.io().send(op)); latch.countDown(); @@ -105,10 +106,11 @@ public final class CounterClient { log.info("{}", Duration.between(now, Instant.now()).toMillis()); //send GET command and print the response - var query = new Query(Query.Type.GET_WORKER_STATE, new WorkerState(null)); + var query = new Query(Query.Type.GET_STATE, new WorkerState(null)); RaftClientReply count = raftClient.io().sendReadOnly(query); String response = count.getMessage().getContent().toString(Charset.defaultCharset()); - System.out.println(response); + // System.out.println(response); + log.info("{}", response); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java b/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java new file mode 100644 index 0000000..426cd89 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java @@ -0,0 +1,102 @@ +package com.yuandian.dataflow.statemachine.grpc; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.reflections.Reflections; + +import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest; +import com.yuandian.dataflow.proto.Processor.ProcessorResponse; +import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase; +import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; +import com.yuandian.dataflow.statemachine.annotations.MasterRegister; +import com.yuandian.dataflow.statemachine.master.MasterExecute; +import com.yuandian.dataflow.statemachine_old.MasterFactory; +import com.yuandian.dataflow.utils.Utils; + +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerServiceDefinition; +import io.grpc.stub.StreamObserver; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +public class ProcessorServer { + + public static void main(String[] args) throws Exception { + // ServerBuilder builder = ServerBuilder.forPort(0); + // var server = builder.build().start(); + // log.info("{}", server.getPort()); + var server = new ProcessorServer(); + server.grpcServer.awaitTermination(); + } + + private Server grpcServer; + + + public ProcessorServer() { + ServerBuilder builder = ServerBuilder.forPort(0); + + // 扫描注解RaftProccessor 注册 + var now = Instant.now(); + HashMap> scansMap = new HashMap<>(); + var traces = Thread.currentThread().getStackTrace(); + var clsName = traces[traces.length - 1].getClassName(); + var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3)); + log.info("获取 {} -> {} 下包的所有注解", clsName, packName); + + var refl = new Reflections(packName); + Set> scans = refl.getTypesAnnotatedWith(GrpcProcessor.class); + + scans.forEach((pRaftClass) -> { + scansMap.put(pRaftClass.getName(), pRaftClass); + }); + log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis()); + scansMap.forEach((name, pRaftClass) -> { + try { + builder.addService( (BindableService)pRaftClass.getDeclaredConstructor().newInstance() ); + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException | SecurityException e) { + log.info("{}",e.toString()); + } + }); + + refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass) -> { + try { + MasterExecute execute = (MasterExecute) pClass.getDeclaredConstructor().newInstance(); + MasterFactory.registerMasterLoop(execute); + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException | SecurityException e) { + log.info("{}", e.toString()); + + } + }); + + + grpcServer = builder.build(); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + log.info("*** shutting down gRPC server since JVM is shutting down"); + try { + grpcServer.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("{}", e.toString()); + } + System.err.println("*** server shut down"); + } + }); + } + +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java similarity index 95% rename from src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java rename to src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java index 9db19bd..1ddd5df 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.statemachine_old.master; +package com.yuandian.dataflow.statemachine.master; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; @@ -12,9 +12,6 @@ public class MasterContext { private AtomicBoolean isExit = new AtomicBoolean(false); private Duration lastLoopExecuteTime = Duration.ZERO; - - - private Object share; public Boolean getIsExit() { diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java similarity index 66% rename from src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java rename to src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java index d34411b..d1542eb 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.statemachine_old.master; +package com.yuandian.dataflow.statemachine.master; /** * Master的主线程循环 diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java similarity index 87% rename from src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java rename to src/main/java/com/yuandian/dataflow/statemachine/state/State.java index eb3b361..2449ab8 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java @@ -4,11 +4,11 @@ * @author eson *2022年7月13日-09:11:26 */ -package com.yuandian.dataflow.statemachine_old.state; +package com.yuandian.dataflow.statemachine.state; import java.io.Serializable; -import com.alipay.sofa.jraft.entity.PeerId; + import lombok.Getter; import lombok.Setter; @@ -21,6 +21,8 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.HashMap; +import com.yuandian.dataflow.statemachine.PeerId; + /** * 代表任务状态 暂时全局使用这个结构. 添加新增状态 * diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java similarity index 89% rename from src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java rename to src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java index ec2f5f2..e4869ff 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java @@ -4,12 +4,12 @@ * @author eson *2022年7月15日-10:04:00 */ -package com.yuandian.dataflow.statemachine_old.state; +package com.yuandian.dataflow.statemachine.state; import java.io.Serializable; import java.time.Instant; -import com.alipay.sofa.jraft.entity.PeerId; +import com.yuandian.dataflow.statemachine.PeerId; import lombok.Getter; import lombok.Setter; @@ -50,4 +50,6 @@ public class WorkerState implements Serializable { this.peerId = peer; this.updateAt = Instant.now(); } + + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java index 74e5b5f..d647256 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java @@ -18,13 +18,13 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.google.protobuf.Any; // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; +import com.yuandian.dataflow.statemachine.master.MasterContext; +import com.yuandian.dataflow.statemachine.master.MasterExecute; +import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.master.MasterContext; -import com.yuandian.dataflow.statemachine_old.master.MasterExecute; import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; -import com.yuandian.dataflow.statemachine_old.state.State; -import com.yuandian.dataflow.statemachine_old.state.WorkerState; import lombok.Getter; import lombok.Setter; @@ -56,7 +56,7 @@ public class MasterFactory { public static Thread masterExecuteThread = new Thread(new Runnable() { @Override public void run() { - + log.info("master execute"); MasterContext cxt = new MasterContext(); while (!cxt.getIsExit()) { Instant now = Instant.now(); @@ -71,9 +71,5 @@ public class MasterFactory { public static Thread getMasterExecute() { return masterExecuteThread; } - - public static void Init() { - - } - + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java index 1af96f1..ad306f4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java @@ -51,13 +51,13 @@ import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; import com.yuandian.dataflow.statemachine.annotations.MasterRegister; +import com.yuandian.dataflow.statemachine.master.MasterExecute; +import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.master.MasterExecute; import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest; -import com.yuandian.dataflow.statemachine_old.state.State; import com.yuandian.dataflow.utils.Utils; import lombok.Getter; diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java index ac24784..d642e43 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java @@ -17,12 +17,12 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.WorkerState; // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; -import com.yuandian.dataflow.statemachine_old.state.State; -import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.utils.Utils; import lombok.extern.slf4j.Slf4j; @@ -93,7 +93,7 @@ public class StateMachine extends StateMachineAdapter { case PUT_WORKERSTATE: WorkerState opws = op.getValue(); log.debug("PUT {}", opws.peerId); - state.getWorkers().put(opws.peerId, opws); + // state.getWorkers().put(opws.peerId, opws); if (closure != null) { closure.success(op); closure.run(Status.OK()); @@ -194,7 +194,7 @@ public class StateMachine extends StateMachineAdapter { var ws = this.state.getWorkers().get(StateFactory.getServerId()); if (ws == null) { - ws = new WorkerState(StateFactory.getServerId()); + // ws = new WorkerState(StateFactory.getServerId()); } // 更新当前WorkerState @@ -242,12 +242,12 @@ public class StateMachine extends StateMachineAdapter { // 更新当前WorkerState - OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() { - @Override - public void run(Status status) { - log.debug("onStartFollowing update workerstate: {}", status); - } - }); + // OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() { + // @Override + // public void run(Status status) { + // log.debug("onStartFollowing update workerstate: {}", status); + // } + // }); return; } catch (Exception e) { @@ -267,14 +267,14 @@ public class StateMachine extends StateMachineAdapter { public void onStopFollowing(LeaderChangeContext ctx) { log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx); - var ws = new WorkerState(StateFactory.getServerId()); - var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws); - OperateOld.CallOperate(op, new GenericClosure() { - @Override - public void run(Status status) { - log.info("{} {}", status, this.getResponse()); - } - }); + // var ws = new WorkerState(StateFactory.getServerId()); + // var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws); + // OperateOld.CallOperate(op, new GenericClosure() { + // @Override + // public void run(Status status) { + // log.info("{} {}", status, this.getResponse()); + // } + // }); super.onStopFollowing(ctx); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java index fe93604..fb4eae4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java @@ -5,11 +5,11 @@ import java.io.Serializable; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.rpc.InvokeCallback; +import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine_old.StateFactory; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor; import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.utils.PacketsManager; import lombok.Data;