package main import ( "bytes" "context" "encoding/binary" "encoding/gob" "fmt" "log" "os/exec" gen "slimming/proto/gen" "strings" "sync/atomic" "time" "github.com/klauspost/compress/zstd" "github.com/songgao/packets/ethernet" "github.com/songgao/water" "github.com/songgao/water/waterutil" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type NetCard struct { FrameChan chan []byte ifce *water.Interface server *RPCServer clientMap map[string]*RPCClient } type RPCClient struct { FrameChan chan [][]byte conn *grpc.ClientConn } func (cli *RPCClient) connect(realAddr string) { log.Println("rpcclient start") // defer log.Println("rpcclient exit") conn, err := grpc.Dial(realAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { log.Printf("did not connect: %v", err) return } cli.conn = conn go cli.run() time.Sleep(time.Millisecond) } func (cli *RPCClient) run() { defer log.Println("rpcclient exit") defer func() { if err := recover(); err != nil { log.Println("recover") cli.conn = nil } }() c := gen.NewFrameServiceClient(cli.conn) stream, err := c.SendFrames(context.Background()) if err != nil { log.Panic(err) } buf := bytes.NewBuffer(nil) for { // Contact the server and print out its response. buf.Reset() zenc, err := zstd.NewWriter(buf) if err != nil { log.Panic(err) } enc := gob.NewEncoder(zenc) enc.Encode(<-cli.FrameChan) // 发到对面的网卡 err = stream.Send(&gen.Request{ Frames: buf.Bytes(), }) if err != nil { log.Panic(err) } } } func (nc *NetCard) Run() { go nc.runRead() go nc.runWrite() // go nc.cli.run() time.Sleep(time.Second) cmdstr := fmt.Sprintf("ip addr add %s dev stun", config.Network.Self.Virt) log.Println(cmdstr) cmd := strings.Split(cmdstr, " ") err := exec.Command(cmd[0], cmd[1:]...).Run() if err != nil { log.Panic(err) } cmdstr = "ip link set dev stun up" cmd = strings.Split(cmdstr, " ") err = exec.Command(cmd[0], cmd[1:]...).Run() if err != nil { log.Panic(err) } nc.server.run() } func NewNetCard() *NetCard { config := water.Config{ DeviceType: water.TUN, } config.Name = "stun" ifce, err := water.New(config) if err != nil { log.Panic(err) } nc := &NetCard{ FrameChan: make(chan []byte, 3000), ifce: ifce, clientMap: map[string]*RPCClient{}, } nc.server = newRPCServer(nc) return nc } func (nc *NetCard) runRead() { log.Println("start netcard read") var ifce *water.Interface = nc.ifce for { var bytesMap map[string]*[][]byte = make(map[string]*[][]byte) var ok bool var isLoop int32 = 1 // 20 微秒后停止收集数据 go func() { var after = time.NewTimer(time.Millisecond * 20) <-after.C atomic.StoreInt32(&isLoop, 0) }() for atomic.LoadInt32(&isLoop) > 0 { var rframe ethernet.Frame rframe.Resize(1500) n, err := ifce.Read([]byte(rframe)) if err != nil { log.Fatal(err) } rframe = rframe[:n] if !waterutil.IsIPv4(rframe) { continue } var realAddr string config.Lock.Lock() if _, ok = config.IPv4Nodes[binary.LittleEndian.Uint32([]byte(waterutil.IPv4Destination(rframe)))]; !ok { continue } config.Lock.Unlock() // log.Printf("Payload: % x\n", rframe.Payload()) log.Printf("Dst: %s Src %s\n", waterutil.IPv4Destination(rframe), waterutil.IPv4Source(rframe)) log.Printf("Ethertype: % x %v\n", rframe.Ethertype(), waterutil.IsIPv4(rframe)) var buffer *[][]byte if buffer, ok = bytesMap[realAddr]; !ok { mbuffer := make([][]byte, 0, 100) buffer = &mbuffer bytesMap[realAddr] = buffer } *buffer = append(*buffer, []byte(rframe)) } for dst, buffer := range bytesMap { var cli *RPCClient if cli, ok = nc.clientMap[dst]; !ok { cli = &RPCClient{FrameChan: make(chan [][]byte, 1000)} nc.clientMap[dst] = cli } if cli.conn == nil { cli.connect(dst) } else { cli.FrameChan <- *buffer // 网卡数据 发到对方 *buffer = (*buffer)[0:] } } // 写到grpc服务 // log.Printf("Dst: %s\n", rframe.Destination()[0:4]) // log.Printf("Src: %s\n", rframe.Source()[0:4]) // log.Printf("Ethertype: % x\n", rframe.Ethertype()) // log.Printf("Payload: % x\n", rframe.Payload()) } } func (nc *NetCard) runWrite() { var ifce *water.Interface = nc.ifce var buf = bytes.NewBuffer(nil) for wframe := range nc.FrameChan { log.Printf("get wframes bytes len: %d", len(wframe)) buf.Reset() buf.Write(wframe) zdec, err := zstd.NewReader(buf) if err != nil { log.Panic(err) } dec := gob.NewDecoder(zdec) var bufs [][]byte err = dec.Decode(&bufs) if err != nil { log.Panic(err) } for _, buf := range bufs { _, err := ifce.Write(buf) if err != nil { log.Panic(err) } } } }