grpc3--流传输 examples route-guide

流传输就是一节一节的传递,就是加一个for循环

首先看proto文件, 流传输就是在参数的前面加一个 stream ,进行声明

syntax = "proto3";

package routeguide;

// Interface exported by the server.
service RouteGuide {<br></br>  
  rpc GetFeature(Point) returns (Feature) {}<br></br> // 服务端流传输,单向流
  rpc ListFeatures(Rectangle) returns (stream Feature) {}<br></br> // 客户端流传输,单向流
  rpc RecordRoute(stream Point) returns (RouteSummary) {}<br></br> // 双向流
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}
<br></br>// 这里面lo的类型指定的事 Point, 也就是调用上面的Poing类型,没有什么特殊的
message Rectangle {
  Point lo = 1;
  Point hi = 2;
}

message Feature {
  string name = 1;
  Point location = 2;
}

// A RouteNote is a message sent while at a given point.
message RouteNote {
  Point location = 1;
  string message = 2;
}
message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

server.go

看的时候就从mian函数开始看起,一步一步就能看明白,从这里面可以发现流传输并不一定是必须要在函数的返回值哪里表明返回的结构,直接使用Send方法进行流传输,声明结构就好

package main

import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "math"
    "net"
    "sync"
    "time"

    "google.golang.org/grpc"

    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/testdata"

    "github.com/golang/protobuf/proto"

    pb "google.golang.org/grpc/examples/route_guide/routeguide"
)

var (
    tls        = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
    certFile   = flag.String("cert_file", "", "The TLS cert file")
    keyFile    = flag.String("key_file", "", "The TLS key file")
    jsonDBFile = flag.String("json_db_file", "", "A json file containing a list of features")
    port       = flag.Int("port", 10000, "The server port")
)

type routeGuideServer struct {
    savedFeatures []*pb.Feature // read-only after initialized

    mu         sync.Mutex // protects routeNotes
    routeNotes map[string][]*pb.RouteNote
}

// GetFeature returns the feature at the given point.
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {<br></br>  // 看看s里面有没有数据,有的话直接返回,没有的话就直接拿收到的参数返回,数据结构是引用的,数据在初始化是s里面已经赋值数据了,详情看newServer方法
    for _, feature := range s.savedFeatures {
        if proto.Equal(feature.Location, point) {
            return feature, nil
        }
    }
    // No feature was found, return an unnamed feature
    return &pb.Feature{Location: point}, nil
}

// 这个就是服务端的流传输.
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {<br></br>  // 将s.saveFeatures的数据循环发送
    for _, feature := range s.savedFeatures {
        if inRange(feature.Location, rect) {
            if err := stream.Send(feature); err != nil {
                return err
            }
        }
    }
    return nil
}

//这个就是客户端的发送来的数据,使用流传输
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
    var pointCount, featureCount, distance int32
    var lastPoint *pb.Point
    startTime := time.Now()<br></br>  // 起一个死循环
    for {
        point, err := stream.Recv()<br></br>     // 如果没有数据,就会跑出EOF的错误
        if err == io.EOF {<br></br>       // 在这里可以进行完成接受数据之后的操作
            endTime := time.Now()<br></br>       // 这个sendANDclose就是告诉客户端我接受完啦,同时将数据传递给客户端
            return stream.SendAndClose(&pb.RouteSummary{
                PointCount:   pointCount,
                FeatureCount: featureCount,
                Distance:     distance,
                ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
            })
        }
        if err != nil {
            return err
        }
        pointCount++<br></br>     // 每次接受数据,就将数据与服务端的数据进行比较爱哦是否相等
        for _, feature := range s.savedFeatures {
            if proto.Equal(feature.Location, point) {
                featureCount++
            }
        }
        if lastPoint != nil {
            distance += calcDistance(lastPoint, point)
        }
        lastPoint = point
    }
}

// RouteChat receives a stream of message/location pairs, and responds with a stream of all
// previous messages at each of those locations.
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
    for {<br></br>     // 循环接受数据
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }<br></br>     // 其实就是相当于一个坐标
        key := serialize(in.Location)
<br></br>     // 锁一下
        s.mu.Lock()<br></br>     // map结构,就是python中的字典,将接受的数据,传到对应的列表中,就是key相同的列表
        s.routeNotes[key] = append(s.routeNotes[key], in)
        // 创建一个与s.royeNotes[key]等长的列表,然后将里面的数据复制给rn   
        rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
        copy(rn, s.routeNotes[key])
        s.mu.Unlock()
     // 循环发送,数据结构一样就直接发送,不需要重新建造数据结构啦
        for _, note := range rn {
            if err := stream.Send(note); err != nil {
                return err
            }
        }
    }
}

// 内部的方法,就是在server端初始化的时候,如果传进来有path就是文件读取数据,没有的话就用最下面的数据
func (s *routeGuideServer) loadFeatures(filePath string) {
    var data []byte
    if filePath != "" {
        var err error
        data, err = ioutil.ReadFile(filePath)
        if err != nil {
            log.Fatalf("Failed to load default features: %v", err)
        }
    } else {
        data = exampleData
    }
    if err := json.Unmarshal(data, &s.savedFeatures); err != nil {
        log.Fatalf("Failed to load default features: %v", err)
    }
}

func toRadians(num float64) float64 {
    return num * math.Pi / float64(180)
}

// calcDistance calculates the distance between two points using the "haversine" formula.
// The formula is based on http://mathforum.org/library/drmath/view/51879.html.
func calcDistance(p1 *pb.Point, p2 *pb.Point) int32 {
    const CordFactor float64 = 1e7
    const R = float64(6371000) // earth radius in metres
    lat1 := toRadians(float64(p1.Latitude) / CordFactor)
    lat2 := toRadians(float64(p2.Latitude) / CordFactor)
    lng1 := toRadians(float64(p1.Longitude) / CordFactor)
    lng2 := toRadians(float64(p2.Longitude) / CordFactor)
    dlat := lat2 - lat1
    dlng := lng2 - lng1

    a := math.Sin(dlat/2)*math.Sin(dlat/2) +
        math.Cos(lat1)*math.Cos(lat2)*
            math.Sin(dlng/2)*math.Sin(dlng/2)
    c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))

    distance := R * c
    return int32(distance)
}

func inRange(point *pb.Point, rect *pb.Rectangle) bool {
    left := math.Min(float64(rect.Lo.Longitude), float64(rect.Hi.Longitude))
    right := math.Max(float64(rect.Lo.Longitude), float64(rect.Hi.Longitude))
    top := math.Max(float64(rect.Lo.Latitude), float64(rect.Hi.Latitude))
    bottom := math.Min(float64(rect.Lo.Latitude), float64(rect.Hi.Latitude))

    if float64(point.Longitude) >= left &&
        float64(point.Longitude) <= right &&
        float64(point.Latitude) >= bottom &&
        float64(point.Latitude) <= top {
        return true
    }
    return false
}

func serialize(point *pb.Point) string {
    return fmt.Sprintf("%d %d", point.Latitude, point.Longitude)
}
// 初始化数据用的
func newServer() *routeGuideServer {
    s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
    s.loadFeatures(*jsonDBFile)
    return s
}

func main() {<br></br>  // flag包,初始化一些参数
    flag.Parse()
    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }<br></br>  // 一些grpc其服务之前的一些配置
    var opts []grpc.ServerOption
    if *tls {
        if *certFile == "" {
            *certFile = testdata.Path("server1.pem")
        }
        if *keyFile == "" {
            *keyFile = testdata.Path("server1.key")
        }<br></br>     // 配置认证方式
        creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
        if err != nil {
            log.Fatalf("Failed to generate credentials %v", err)
        }
        opts = []grpc.ServerOption{grpc.Creds(creds)}
    }<br></br>  //实例化
    grpcServer := grpc.NewServer(opts...)<br></br>  // 注册服务
    pb.RegisterRouteGuideServer(grpcServer, newServer())<br></br>  // 开启服务
    grpcServer.Serve(lis)
}

// exampleData is a copy of testdata/route_guide_db.json. It's to avoid
// specifying file path with `go run`.
var exampleData = []byte(`[{
    "location": {
        "latitude": 407838351,
        "longitude": -746143763
    },
    "name": "Patriots Path, Mendham, NJ 07945, USA"
}, ...]`)

client.go

package main

import (
    "context"
    "flag"
    "io"
    "log"
    "math/rand"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    pb "google.golang.org/grpc/examples/route_guide/routeguide"
    "google.golang.org/grpc/testdata"
)

var (
    tls                = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
    caFile             = flag.String("ca_file", "", "The file containning the CA root cert file")
    serverAddr         = flag.String("server_addr", "127.0.0.1:10000", "The server address in the format of host:port")
    serverHostOverride = flag.String("server_host_override", "x.test.youtube.com", "The server name use to verify the hostname returned by TLS handshake")
)

// printFeature gets the feature for the given point.
func printFeature(client pb.RouteGuideClient, point *pb.Point) {
    log.Printf("Getting feature for point (%d, %d)", point.Latitude, point.Longitude)<br></br>  // 生成一个上下文,以及关闭句柄
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()<br></br>  // 调用getfeature方法
    feature, err := client.GetFeature(ctx, point)
    if err != nil {<br></br>     // 遇到错误停止执行,注意fatalf与panic的区别
        log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err)
    }
    log.Println(feature)
}

// printFeatures lists all the features within the given bounding Rectangle.
func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) {
    log.Printf("Looking for features within %v", rect)
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()<br></br>  // 调用ListFeature,并发送数据,该方法是流传输接受来自服务端的数据
    stream, err := client.ListFeatures(ctx, rect)
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }<br></br>  // 循环接收来自客户端的数据
    for {
        feature, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
        }
        log.Println(feature)
    }
}

// runRecordRoute sends a sequence of points to server and expects to get a RouteSummary from server.
func runRecordRoute(client pb.RouteGuideClient) {<br></br>  // 这些就是创建一些发送参数的循环结构
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
    var points []*pb.Point
    for i := 0; i < pointCount; i++ {
        points = append(points, randomPoint(r))
    }
    log.Printf("Traversing %d points.", len(points))
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()<br></br>  // 调用recordroute方法,这个参数是流传输,直接用stream.send循环发送
    stream, err := client.RecordRoute(ctx)
    if err != nil {
        log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
    }
    for _, point := range points {
        if err := stream.Send(point); err != nil {
            log.Fatalf("%v.Send(%v) = %v", stream, point, err)
        }
    }<br></br>  // 发送完毕通知服务端,并接受返回的数据
    reply, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
    }
    log.Printf("Route summary: %v", reply)
}

// runRouteChat receives a sequence of route notes, while sending notes for various locations.
func runRouteChat(client pb.RouteGuideClient) {
    notes := []*pb.RouteNote{
        {Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
        {Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
    }
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.RouteChat(ctx)
    if err != nil {
        log.Fatalf("%v.RouteChat(_) = _, %v", client, err)
    }<br></br>  // 起一个管道,这个管道就是为了等待在该函数中新起的go线程执行完毕
    waitc := make(chan struct{})<br></br>  // 起一个线程循环接受
    go func() {
        for {
            in, err := stream.Recv()
            if err == io.EOF {        <br></br>// 接受完毕,关闭管道.
                close(waitc)
                return
            }
            if err != nil {
                log.Fatalf("Failed to receive a note : %v", err)
            }
            log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
        }
    }()<br></br>  // 循环发送数据
    for _, note := range notes {
        if err := stream.Send(note); err != nil {
            log.Fatalf("Failed to send a note: %v", err)
        }
    }<br></br>  // 通知服务端发送完毕
    stream.CloseSend()<br></br>  // 阻塞
    <-waitc
}

func randomPoint(r *rand.Rand) *pb.Point {
    lat := (r.Int31n(180) - 90) * 1e7
    long := (r.Int31n(360) - 180) * 1e7
    return &pb.Point{Latitude: lat, Longitude: long}
}

func main() {<br></br>  // flag
    flag.Parse()<br></br>  // 配置rpc的认证
    var opts []grpc.DialOption
    if *tls {
        if *caFile == "" {
            *caFile = testdata.Path("ca.pem")
        }
        creds, err := credentials.NewClientTLSFromFile(*caFile, *serverHostOverride)
        if err != nil {
            log.Fatalf("Failed to create TLS credentials %v", err)
        }
        opts = append(opts, grpc.WithTransportCredentials(creds))
    } else {
        opts = append(opts, grpc.WithInsecure())
    }
    conn, err := grpc.Dial(*serverAddr, opts...)
    if err != nil {
        log.Fatalf("fail to dial: %v", err)
    }
    defer conn.Close()<br></br>  // 实例化一个客户端
    client := pb.NewRouteGuideClient(conn)

    // 调取一个函数,模拟访问getfeatures,然后得到返回值
    printFeature(client, &pb.Point{Latitude: 409146138, Longitude: -746188906})

    // 模拟访问getfeatures,这次的参数是(0,0,下面所有的都是模拟访问proto里面的方法.
    printFeature(client, &pb.Point{Latitude: 0, Longitude: 0})

    // Looking for features between 40, -75 and 42, -73.
    printFeatures(client, &pb.Rectangle{
        Lo: &pb.Point{Latitude: 400000000, Longitude: -750000000},
        Hi: &pb.Point{Latitude: 420000000, Longitude: -730000000},
    })

    // RecordRoute
    runRecordRoute(client)

    // RouteChat
    runRouteChat(client)
}

声明:该文章系转载,转载该文章的目的在于更广泛的传递信息,并不代表本网站赞同其观点,文章内容仅供参考。

本站是一个个人学习和交流平台,网站上部分文章为网站管理员和网友从相关媒体转载而来,并不用于任何商业目的,内容为作者个人观点, 并不代表本网站赞同其观点和对其真实性负责。

我们已经尽可能的对作者和来源进行了通告,但是可能由于能力有限或疏忽,导致作者和来源有误,亦可能您并不期望您的作品在我们的网站上发布。我们为这些问题向您致歉,如果您在我站上发现此类问题,请及时联系我们,我们将根据您的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。