1.整理代码

2.添加protoc自动处理的protoc
This commit is contained in:
huangsimin 2022-07-28 16:10:55 +08:00
parent 7381a55174
commit faa20d8bf8
19 changed files with 49 additions and 15188 deletions

26
pom.xml
View File

@ -216,6 +216,32 @@
<version>1.6.2</version>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<pluginId>grpc-java</pluginId>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<outputDirectory>${basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>

View File

@ -10,13 +10,7 @@ import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
@ -25,14 +19,11 @@ import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
@ -63,7 +54,7 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
// TODO: request.packets 入库,回填, 告警 等操作
} finally { // 确保 更新 最终的任务状态给master.

View File

@ -6,7 +6,6 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;

View File

@ -16,19 +16,19 @@
package com.yuandian.dataflow.grpc;
import com.google.protobuf.Any;
import com.yuandian.dataflow.proto.Base;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.google.protobuf.Any;
import com.yuandian.dataflow.proto.Base;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
*/

View File

@ -2,29 +2,21 @@ package com.yuandian.dataflow.projo;
import java.time.LocalDateTime;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
import org.bson.codecs.pojo.annotations.BsonProperty;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.mongodb.client.FindIterable;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.codecs.pojo.annotations.BsonProperty;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import lombok.Getter;
import lombok.Setter;

View File

@ -13,8 +13,6 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reflections.Reflections;
@ -43,16 +41,13 @@ import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**

View File

@ -2,7 +2,6 @@ package com.yuandian.dataflow.statemachine;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
@ -13,22 +12,14 @@ import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.yuandian.dataflow.Server;
import com.yuandian.dataflow.controller.TransferLeaderProcessor;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.extern.slf4j.Slf4j;
/**
@ -227,16 +218,16 @@ public class StateMachine extends StateMachineAdapter {
}
@Override
public void onStopFollowing(LeaderChangeContext ctx) {
log.debug("{} {}", ctx, StateFactory.getCluster().getServerId());
var ss = StateFactory.getStateServer();
var ws = new WorkerState(StateFactory.getServerId());
log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId());
var op = new Operate(OperateType.PUT_WORKERSTATE, ws);
Operate.CallOperate(op, new GenericClosure() {
Operate.CallOperate(op, new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());

View File

@ -1,21 +1,14 @@
package com.yuandian.dataflow.statemachine.closure;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
@Getter
@Setter

View File

@ -3,8 +3,6 @@ package com.yuandian.dataflow.statemachine.operate;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.statemachine.StateFactory;
@ -14,7 +12,6 @@ import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
@ -45,6 +42,7 @@ public class Operate implements Serializable {
this.value = ws;
}
@java.lang.SuppressWarnings("unchecked")
public <T> T getValue() {
return (T) this.value;
};
@ -59,6 +57,7 @@ public class Operate implements Serializable {
* @param op 传入的操作类
* @param closure 回调函数. Operate为返回值
*/
@java.lang.SuppressWarnings("unchecked")
public static void CallOperate(Operate op, GenericClosure<Operate> closure) {
log.debug("CallOperate Value {}", op.<WorkerState>getValue());
var ss = StateFactory.getStateServer();
@ -80,7 +79,7 @@ public class Operate implements Serializable {
@Override
public void complete(Object result, Throwable err) {
log.debug("Object result {}", result);
var resp = (RaftResponse<Operate>) result;
closure.setResponse(resp);
closure.success(resp.getValue());

View File

@ -7,29 +7,19 @@
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import java.nio.ByteBuffer;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.StateFactory.StateServer;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.commons.lang.StringUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**

View File

@ -9,8 +9,6 @@ package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;

View File

@ -2,36 +2,12 @@ package com.yuandian.dataflow;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.text.AbstractDocument.BranchElement;
import org.bson.Document;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
// import org.springframework.expression.spel.ast.FunctionReference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.InsertManyOptions;
import com.yuandian.dataflow.projo.Doc;
import io.netty.handler.codec.dns.DatagramDnsQuery;
import lombok.Cleanup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,19 +1,8 @@
package com.yuandian.dataflow;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class MongodbTest {

View File

@ -1,19 +1,9 @@
package com.yuandian.dataflow.statemachine;
import java.util.concurrent.Executor;
import org.junit.jupiter.api.Test;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.extern.slf4j.Slf4j;
@Slf4j