This commit is contained in:
huangsimin 2022-08-05 17:56:22 +08:00
parent cf4ff63b31
commit f2c14d9bd6
4 changed files with 2373 additions and 122 deletions

View File

@ -14,7 +14,7 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister;
@ -32,104 +32,108 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@MasterRegister
public class MasterProcessor implements MasterExecute {
/**
* 最大任务数限制
*/
private final int MAX_TASKS = 1000;
private final int DEFAULT_ASYNC_TIMEOUT = 5000;
@Override
public class MasterProcessor implements MasterExecute {@Override
public void loop(MasterContext cxt) {
// ArrayList<Any> packets = new ArrayList<>();
// 模拟发送包的数据到该节点上
for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
Operate.packetsManager.addPacket(p);
// packets.add(p);
}
// 必须复制. raft有一直使用该list
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
if ( Operate.packetsManager.size() >= 100000) {
log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", Operate.packetsManager.size());
Operate.packetsManager.discardPackets(50000);
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers,
Operate.packetsManager.size());
cxt.sleep(5000);
} else {
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(),
// alivePeers, Operate.packetsManager.size());
}
if (alivePeers == null) {
cxt.sleep(100); // 休眠100毫秒.
return;
}
PeerId[] peers = new PeerId[alivePeers.size()];
alivePeers.toArray(peers);
// 等待全部反馈后才能进入下次循环
Operate.CallOperate(new Operate(OperateType.ALLOCATE_PACKETS, alivePeers), new GenericClosure() {
@Override
public void run(Status status) {
int[] allocTasks = this.getValue();
if(allocTasks == null) {
cxt.sleep(5000);;
return;
}
var all = 0;
for(var i : allocTasks) {
all += i;
}
if(all != 0) {
log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks);
}
for (int i = 0; i < peers.length; i++) {
var peer = peers[i];
if (allocTasks[i] <= 0) {
continue;
}
var packets = Operate.packetsManager.popPackets(allocTasks[i]);
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
var request = new PacketsRequest(); // 数据包切片
request.setPackets(packets);
try {
StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err != null) {
// TODO: 如果错误, 需要让节点恢复任务处理的状态
log.debug("{}", err);
}
// log.debug("PacketsRequest: {}", result);
}
}, DEFAULT_ASYNC_TIMEOUT);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
}
});
// TODO Auto-generated method stub
}
// /**
// * 最大任务数限制
// */
// private final int MAX_TASKS = 1000;
// private final int DEFAULT_ASYNC_TIMEOUT = 5000;
// @Override
// public void loop(MasterContext cxt) {
// // ArrayList<Any> packets = new ArrayList<>();
// // 模拟发送包的数据到该节点上
// for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) {
// var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
// .newBuilder()
// .setTableId(10086)
// .build());
// Operate.packetsManager.addPacket(p);
// // packets.add(p);
// }
// // 必须复制. raft有一直使用该list
// var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
// if ( Operate.packetsManager.size() >= 100000) {
// log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", Operate.packetsManager.size());
// Operate.packetsManager.discardPackets(50000);
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers,
// Operate.packetsManager.size());
// cxt.sleep(5000);
// } else {
// // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(),
// // alivePeers, Operate.packetsManager.size());
// }
// if (alivePeers == null) {
// cxt.sleep(100); // 休眠100毫秒.
// return;
// }
// PeerId[] peers = new PeerId[alivePeers.size()];
// alivePeers.toArray(peers);
// // 等待全部反馈后才能进入下次循环
// Operate.CallOperate(new Operate(OperateType.ALLOCATE_PACKETS, alivePeers), new GenericClosure() {
// @Override
// public void run(Status status) {
// int[] allocTasks = this.getValue();
// if(allocTasks == null) {
// cxt.sleep(5000);;
// return;
// }
// var all = 0;
// for(var i : allocTasks) {
// all += i;
// }
// if(all != 0) {
// log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks);
// }
// for (int i = 0; i < peers.length; i++) {
// var peer = peers[i];
// if (allocTasks[i] <= 0) {
// continue;
// }
// var packets = Operate.packetsManager.popPackets(allocTasks[i]);
// // 先提交 节点的 剩余能处理的任务数量. 然后再处理
// var request = new PacketsRequest(); // 数据包切片
// request.setPackets(packets);
// try {
// StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
// new InvokeCallback() {
// @Override
// public void complete(Object result, Throwable err) {
// if (err != null) {
// // TODO: 如果错误, 需要让节点恢复任务处理的状态
// log.debug("{}", err);
// }
// // log.debug("PacketsRequest: {}", result);
// }
// }, DEFAULT_ASYNC_TIMEOUT);
// } catch (InterruptedException | RemotingException e) {
// log.info("error send packets {}", e.toString());
// }
// }
// }
// });
// }
}

View File

@ -17,6 +17,9 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest;
import com.yuandian.dataflow.proto.Processor.ProcessResponse;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
@ -25,6 +28,7 @@ import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine_old.state.State;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -38,62 +42,71 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@WorkerRegister
public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRequest> {
public class PacketsProcessor extends ProcessorServerImplBase {@Override
public void getPackets(PacketsProcessorRequest request, StreamObserver<ProcessResponse> responseObserver) {
// TODO Auto-generated method stub
@Setter
@Getter
public static class PacketsRequest implements Serializable {
private ArrayList<Any> packets = new ArrayList<>();
request.getPacketsList();
super.getPackets(request, responseObserver);
}
@Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
var now = Instant.now();
var resp = new RaftResponse();
resp.setSuccess(true);
rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
try {
// log.info("response {} ms", Duration.between(now, Instant.now()).toMillis()); // 返回response的时间
// @Setter
// @Getter
// public static class PacketsRequest implements Serializable {
// private ArrayList<Any> packets = new ArrayList<>();
// }
// TODO: request.packets 入库,回填, 告警 等操作
Thread.sleep(ThreadLocalRandom.current().nextLong(10, 100));
// @Override
// public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
} catch (InterruptedException e) {
log.info(e.toString());
} finally { // 确保 更新 最终的任务状态给master.
// var now = Instant.now();
// var resp = new RaftResponse();
// resp.setSuccess(true);
// rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
// 读状态 Closure<State> 里的 getValue<State> State的状态
var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态
// try {
// // log.info("response {} ms", Duration.between(now, Instant.now()).toMillis()); // 返回response的时间
var ws = state.getWorkers().get(StateFactory.getServerId());
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
ws.setUpdateAt(Instant.now()); // 设置更新时间
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
new GenericClosure() {
@Override
public void run(Status status) {
// 处理完数据 更新工作状态的时间
log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis());
if (!status.isOk()) {
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
// // TODO: request.packets 入库,回填, 告警 等操作
}
}
});
// Thread.sleep(ThreadLocalRandom.current().nextLong(10, 100));
} ;
}
// } catch (InterruptedException e) {
// log.info(e.toString());
// } finally { // 确保 更新 最终的任务状态给master.
@Override
public String interest() {
return PacketsRequest.class.getName();
}
// // 读状态 Closure<State> 里的 getValue<State> State的状态
// var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态
// var ws = state.getWorkers().get(StateFactory.getServerId());
// ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
// ws.setUpdateAt(Instant.now()); // 设置更新时间
// Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
// new GenericClosure() {
// @Override
// public void run(Status status) {
// // 处理完数据 更新工作状态的时间
// log.info("update workerstate {} ms", Duration.between(now, Instant.now()).toMillis());
// if (!status.isOk()) {
// log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
// }
// }
// });
// } ;
// }
// @Override
// public String interest() {
// return PacketsRequest.class.getName();
// }
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,288 @@
package com.yuandian.dataflow.proto;
import static io.grpc.MethodDescriptor.generateFullMethodName;
import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.futureUnaryCall;
import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.32.3)",
comments = "Source: Processor.proto")
public final class ProcessorServerGrpc {
private ProcessorServerGrpc() {}
public static final String SERVICE_NAME = "dataflow.ProcessorServer";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.ProcessResponse> getGetPacketsMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "GetPackets",
requestType = com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.class,
responseType = com.yuandian.dataflow.proto.Processor.ProcessResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.ProcessResponse> getGetPacketsMethod() {
io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest, com.yuandian.dataflow.proto.Processor.ProcessResponse> getGetPacketsMethod;
if ((getGetPacketsMethod = ProcessorServerGrpc.getGetPacketsMethod) == null) {
synchronized (ProcessorServerGrpc.class) {
if ((getGetPacketsMethod = ProcessorServerGrpc.getGetPacketsMethod) == null) {
ProcessorServerGrpc.getGetPacketsMethod = getGetPacketsMethod =
io.grpc.MethodDescriptor.<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest, com.yuandian.dataflow.proto.Processor.ProcessResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "GetPackets"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.proto.Processor.ProcessResponse.getDefaultInstance()))
.setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("GetPackets"))
.build();
}
}
}
return getGetPacketsMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static ProcessorServerStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<ProcessorServerStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<ProcessorServerStub>() {
@java.lang.Override
public ProcessorServerStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerStub(channel, callOptions);
}
};
return ProcessorServerStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static ProcessorServerBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<ProcessorServerBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<ProcessorServerBlockingStub>() {
@java.lang.Override
public ProcessorServerBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerBlockingStub(channel, callOptions);
}
};
return ProcessorServerBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static ProcessorServerFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<ProcessorServerFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<ProcessorServerFutureStub>() {
@java.lang.Override
public ProcessorServerFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerFutureStub(channel, callOptions);
}
};
return ProcessorServerFutureStub.newStub(factory, channel);
}
/**
*/
public static abstract class ProcessorServerImplBase implements io.grpc.BindableService {
/**
*/
public void getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessResponse> responseObserver) {
asyncUnimplementedUnaryCall(getGetPacketsMethod(), responseObserver);
}
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getGetPacketsMethod(),
asyncUnaryCall(
new MethodHandlers<
com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.ProcessResponse>(
this, METHODID_GET_PACKETS)))
.build();
}
}
/**
*/
public static final class ProcessorServerStub extends io.grpc.stub.AbstractAsyncStub<ProcessorServerStub> {
private ProcessorServerStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected ProcessorServerStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerStub(channel, callOptions);
}
/**
*/
public void getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessResponse> responseObserver) {
asyncUnaryCall(
getChannel().newCall(getGetPacketsMethod(), getCallOptions()), request, responseObserver);
}
}
/**
*/
public static final class ProcessorServerBlockingStub extends io.grpc.stub.AbstractBlockingStub<ProcessorServerBlockingStub> {
private ProcessorServerBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected ProcessorServerBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerBlockingStub(channel, callOptions);
}
/**
*/
public com.yuandian.dataflow.proto.Processor.ProcessResponse getPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) {
return blockingUnaryCall(
getChannel(), getGetPacketsMethod(), getCallOptions(), request);
}
}
/**
*/
public static final class ProcessorServerFutureStub extends io.grpc.stub.AbstractFutureStub<ProcessorServerFutureStub> {
private ProcessorServerFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected ProcessorServerFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerFutureStub(channel, callOptions);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<com.yuandian.dataflow.proto.Processor.ProcessResponse> getPackets(
com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) {
return futureUnaryCall(
getChannel().newCall(getGetPacketsMethod(), getCallOptions()), request);
}
}
private static final int METHODID_GET_PACKETS = 0;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final ProcessorServerImplBase serviceImpl;
private final int methodId;
MethodHandlers(ProcessorServerImplBase serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_GET_PACKETS:
serviceImpl.getPackets((com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) request,
(io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessResponse>) responseObserver);
break;
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
}
private static abstract class ProcessorServerBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
ProcessorServerBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return com.yuandian.dataflow.proto.Processor.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("ProcessorServer");
}
}
private static final class ProcessorServerFileDescriptorSupplier
extends ProcessorServerBaseDescriptorSupplier {
ProcessorServerFileDescriptorSupplier() {}
}
private static final class ProcessorServerMethodDescriptorSupplier
extends ProcessorServerBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final String methodName;
ProcessorServerMethodDescriptorSupplier(String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (ProcessorServerGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new ProcessorServerFileDescriptorSupplier())
.addMethod(getGetPacketsMethod())
.build();
}
}
}
return result;
}
}