更新多接口版本

This commit is contained in:
黄思敏 2022-08-25 15:31:51 +08:00
parent e89bd3a51f
commit de29bc292a
9 changed files with 149 additions and 36 deletions

3
.gitignore vendored
View File

@ -1 +1,2 @@
slimming
slimming
*.yaml

36
config.go Normal file
View File

@ -0,0 +1,36 @@
package main
import (
"log"
"os"
"gopkg.in/yaml.v3"
)
type Node struct {
Real string `yaml:"real"`
Virt string `yaml:"virt"`
}
type Config struct {
Network struct {
Self Node `yaml:"self"`
Nodes []Node `yaml:"nodes"`
} `yaml:"network"`
}
var config = NewConfig()
func NewConfig() *Config {
cnf := &Config{}
f, err := os.Open("config.yaml")
if err != nil {
log.Panic(err)
}
dec := yaml.NewDecoder(f)
err = dec.Decode(cnf)
if err != nil {
log.Panic(err)
}
return cnf
}

23
config_test.go Normal file
View File

@ -0,0 +1,23 @@
package main
import (
"log"
"os"
"testing"
"gopkg.in/yaml.v3"
)
func TestConfig(t *testing.T) {
cnf := &Config{}
f, err := os.Open("config.yaml")
if err != nil {
log.Panic(err)
}
dec := yaml.NewDecoder(f)
err = dec.Decode(cnf)
if err != nil {
log.Panic(err)
}
t.Error(cnf)
}

3
go.mod
View File

@ -4,6 +4,8 @@ go 1.19
require github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8
require github.com/klauspost/compress v1.15.9 // indirect
require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091 // indirect
@ -13,4 +15,5 @@ require (
google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc // indirect
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v3 v3.0.1
)

4
go.sum
View File

@ -44,6 +44,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@ -130,5 +132,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -13,7 +13,7 @@ service FrameService {
//
message Request {
int32 Type = 1;
repeated bytes Frames = 2;
bytes Frames = 2;
}
//

View File

@ -31,8 +31,8 @@ type Request struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type int32 `protobuf:"varint,1,opt,name=Type,proto3" json:"Type,omitempty"`
Frames [][]byte `protobuf:"bytes,2,rep,name=Frames,proto3" json:"Frames,omitempty"`
Type int32 `protobuf:"varint,1,opt,name=Type,proto3" json:"Type,omitempty"`
Frames []byte `protobuf:"bytes,2,opt,name=Frames,proto3" json:"Frames,omitempty"`
}
func (x *Request) Reset() {
@ -74,7 +74,7 @@ func (x *Request) GetType() int32 {
return 0
}
func (x *Request) GetFrames() [][]byte {
func (x *Request) GetFrames() []byte {
if x != nil {
return x.Frames
}
@ -154,7 +154,7 @@ var file_proto_frame_proto_rawDesc = []byte{
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x35, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x12, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04,
0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x46, 0x72, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x02,
0x20, 0x03, 0x28, 0x0c, 0x52, 0x06, 0x46, 0x72, 0x61, 0x6d, 0x65, 0x73, 0x22, 0x4c, 0x0a, 0x08,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x46, 0x72, 0x61, 0x6d, 0x65, 0x73, 0x22, 0x4c, 0x0a, 0x08,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x43, 0x6f, 0x64, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x4d,

10
rpc.go
View File

@ -1,7 +1,6 @@
package main
import (
"flag"
"fmt"
"log"
"net"
@ -15,11 +14,7 @@ type RPCServer struct {
netCard *NetCard
}
var (
serverPort = flag.Int("port", 50051, "The server port")
cardIP = flag.String("card", "", "The card ip")
othersAddr = flag.String("addr", "", "The other server addr")
)
var ()
func newRPCServer(netCard *NetCard) *RPCServer {
return &RPCServer{netCard: netCard}
@ -27,7 +22,7 @@ func newRPCServer(netCard *NetCard) *RPCServer {
func (rpc *RPCServer) run() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *serverPort))
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.Network.Self.Real))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
@ -50,6 +45,7 @@ func (s *RPCServer) SendFrames(stream gen.FrameService_SendFramesServer) error {
log.Panic(err)
}
log.Printf("request: %v", len(request.Frames))
s.netCard.FrameChan <- request.GetFrames() // 接受数据 广播到网卡上
}

96
tap.go
View File

@ -1,7 +1,9 @@
package main
import (
"bytes"
"context"
"encoding/gob"
"flag"
"fmt"
"log"
@ -11,6 +13,7 @@ import (
"sync/atomic"
"time"
"github.com/klauspost/compress/zstd"
"github.com/songgao/packets/ethernet"
"github.com/songgao/water"
"google.golang.org/grpc"
@ -18,10 +21,11 @@ import (
)
type NetCard struct {
FrameChan chan [][]byte
FrameChan chan []byte
ifce *water.Interface
cli *RPCClient
server *RPCServer
clientMap map[string]*RPCClient
}
type RPCClient struct {
@ -29,11 +33,11 @@ type RPCClient struct {
conn *grpc.ClientConn
}
func (cli *RPCClient) connect() {
func (cli *RPCClient) connect(addr string) {
log.Println("rpcclient start")
// defer log.Println("rpcclient exit")
conn, err := grpc.Dial(*othersAddr,
conn, err := grpc.Dial(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
@ -56,18 +60,28 @@ func (cli *RPCClient) run() {
}()
c := gen.NewFrameServiceClient(cli.conn)
stream, err := c.SendFrames(context.Background())
if err != nil {
log.Panic(err)
}
buf := bytes.NewBuffer(nil)
for {
// Contact the server and print out its response.
buf.Reset()
zenc, err := zstd.NewWriter(buf)
if err != nil {
log.Panic(err)
}
enc := gob.NewEncoder(zenc)
enc.Encode(<-cli.FrameChan)
// 发到对面的网卡
err = stream.Send(&gen.Request{
Frames: <-cli.FrameChan,
Frames: buf.Bytes(),
})
if err != nil {
@ -81,8 +95,9 @@ func (nc *NetCard) Run() {
go nc.runWrite()
// go nc.cli.run()
time.Sleep(time.Second)
log.Printf("ip addr add %s/24 dev stap", *cardIP)
cmd := strings.Split(fmt.Sprintf("ip addr add %s/24 dev stap", *cardIP), " ")
log.Printf("ip addr add %s dev stap", config.Network.Self.Virt)
cmd := strings.Split(fmt.Sprintf("ip addr add %s dev stap", config.Network.Self.Virt), " ")
err := exec.Command(cmd[0], cmd[1:]...).Run()
if err != nil {
log.Panic(err)
@ -109,9 +124,9 @@ func NewNetCard() *NetCard {
}
nc := &NetCard{
FrameChan: make(chan [][]byte, 2000),
FrameChan: make(chan []byte, 3000),
ifce: ifce,
cli: &RPCClient{FrameChan: make(chan [][]byte, 2000)},
clientMap: map[string]*RPCClient{},
}
nc.server = newRPCServer(nc)
return nc
@ -124,7 +139,9 @@ func (nc *NetCard) runRead() {
for {
var framesBytes [][]byte
var bytesMap map[string]*[][]byte = make(map[string]*[][]byte)
var ok bool
var isLoop int32 = 1
go func() {
var after = time.NewTimer(time.Millisecond * 20)
@ -138,18 +155,33 @@ func (nc *NetCard) runRead() {
if err != nil {
log.Fatal(err)
}
var buffer *[][]byte
if buffer, ok = bytesMap[rframe.Destination().String()]; !ok {
mbuffer := make([][]byte, 100)
*buffer = mbuffer
bytesMap[rframe.Destination().String()] = buffer
}
rframe = rframe[:n]
framesBytes = append(framesBytes, []byte(rframe))
*buffer = append(*buffer, []byte(rframe))
}
log.Println(len(framesBytes))
if len(framesBytes) > 0 {
if nc.cli.conn == nil {
nc.cli.connect()
for dst, buffer := range bytesMap {
var cli *RPCClient
if cli, ok = nc.clientMap[dst]; !ok {
cli = &RPCClient{FrameChan: make(chan [][]byte, 1000)}
nc.clientMap[dst] = cli
}
if nc.cli.conn != nil {
nc.cli.FrameChan <- framesBytes // 网卡数据 发到对方
if cli.conn == nil {
cli.connect(dst)
} else {
cli.FrameChan <- *buffer // 网卡数据 发到对方
}
}
// 写到grpc服务
@ -165,12 +197,30 @@ func (nc *NetCard) runRead() {
func (nc *NetCard) runWrite() {
var ifce *water.Interface = nc.ifce
for wframes := range nc.FrameChan {
log.Printf("get wframes %d", len(wframes))
for _, wframe := range wframes {
_, err := ifce.Write(wframe)
var buf = bytes.NewBuffer(nil)
for wframe := range nc.FrameChan {
log.Printf("get wframes bytes len: %d", len(wframe))
buf.Reset()
buf.Write(wframe)
zdec, err := zstd.NewReader(buf)
if err != nil {
log.Panic(err)
}
dec := gob.NewDecoder(zdec)
var bufs [][]byte
err = dec.Decode(&bufs)
if err != nil {
log.Panic(err)
}
for _, buf := range bufs {
_, err := ifce.Write(buf)
if err != nil {
panic(err)
log.Panic(err)
}
}
}