Person的定义和之前的protobuf中一致, 新加了一些用于grpc调用的结构体, 这些结构体很简单, 就不讲了. service Manage中定义的是这个服务提供的rpc调用接口.
rpc定义很直观, 应该可以参照写出需要的rpc, 按照我了解的, 每个rpc有一个输入参数和一个输出参数, 这个需要注意.
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"sync"
pb "personservice/tutorial"
"google.golang.org/grpc"
)
// 个人信息服务端
type personServer struct {
persons sync.Map
}
// AddPerson 添加一个个人信息
func (s *personServer) AddPerson(ctx context.Context, person *pb.Person) (*pb.Result, error) {
s.persons.LoadOrStore(person.Name, person)
return &pb.Result{
Success: true,
}, nil
}
// AddPersons 添加多个个人信息
func (s *personServer) AddPersons(stream pb.Manage_AddPersonsServer) error {
for {
person, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.Result{
Success: true,
})
}
if err != nil {
return err
}
s.persons.LoadOrStore(person.Name, person)
}
}
// GetPersonsLimit 获取限定数目的个人信息
func (s *personServer) GetPersonsLimit(limitNum *pb.ReqNum, stream pb.Manage_GetPersonsLimitServer) error {
var err error
var i int32
s.persons.Range(func(key, value interface{}) bool {
person, ok := value.(*pb.Person)
if !ok {
return false
}
err = stream.Send(person)
if err != nil {
return false
}
i++
if i >= (limitNum.Num) {
return false
}
return true
})
return err
}
// GetPersons 获取给定名字的所有个人信息
func (s *personServer) GetPersons(stream pb.Manage_GetPersonsServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
value, ok := s.persons.Load(in.Name)
if !ok {
continue
}
person, ok := value.(*pb.Person)
if !ok {
continue
}
err = stream.Send(person)
if err != nil {
return err
}
}
}
func newServer() *personServer {
s := &personServer{}
return s
}
func main() {
address := "localhost:50001"
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
pb.RegisterManageServer(grpcServer, newServer())
fmt.Println("Server listening on:", address)
grpcServer.Serve(lis)
}
package main
import (
"context"
"fmt"
"io"
"log"
pb "personservice/tutorial"
"time"
"google.golang.org/grpc"
)
const (
rpcTimeOut = 10
)
// addPerson 用于添加个人信息
func addPerson(client pb.ManageClient, person *pb.Person) bool {
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second)
defer cancel()
res, err := client.AddPerson(ctx, person)
if err != nil {
log.Printf("client.AddPerson failed, error: %v\n", err)
return false
}
return res.Success
}
// addPersons 用来添加多个个人信息
func addPersons(client pb.ManageClient, persons []*pb.Person) bool {
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second)
defer cancel()
stream, err := client.AddPersons(ctx)
if err != nil {
log.Printf("client.AddPersons failed, error: %v\n", err)
return false
}
for _, person := range persons {
if err := stream.Send(person); err != nil {
log.Printf("stream.Send failed, error: %v\n", err)
return false
}
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Printf("stream.CloseAndRecv failed, error: %v\n", err)
return false
}
return res.Success
}
// getPersonsLimit 用来获取指定数目的个人信息
func getPersonsLimit(client pb.ManageClient, limitNum int32) ([]*pb.Person, error) {
var persons []*pb.Person
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second)
defer cancel()
num := pb.ReqNum{
Num: limitNum,
}
stream, err := client.GetPersonsLimit(ctx, &num)
if err != nil {
log.Printf("client.GetPersonsLimit failed, error: %v\n", err)
return persons, err
}
for {
person, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("stream.Recv failed, error: %v\n", err)
return persons, err
}
persons = append(persons, person)
}
return persons, nil
}
// getPersons 用来获取指定名字的所有个人信息
func getPersons(client pb.ManageClient, personNames []string) ([]*pb.Person, error) {
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second)
defer cancel()
stream, err := client.GetPersons(ctx)
if err != nil {
log.Printf("client.GetPersons failed, error: %v\n", err)
return nil, err
}
waitc := make(chan struct{})
// 发送个人名字信息
go func() {
for _, personName := range personNames {
name := pb.ReqName{
Name: personName,
}
if err := stream.Send(&name); err != nil {
log.Printf("stream.Send failed, error: %v\n", err)
break
}
}
err := stream.CloseSend()
if err != nil {
log.Printf("stream.CloseSend failed, error: %v\n", err)
}
close(waitc)
}()
// 获取对应的所有个人信息
var persons []*pb.Person
var in *pb.Person
for {
in, err = stream.Recv()
if err != nil {
break
}
persons = append(persons, in)
}
<-waitc
// 检查读取结果, err应该不会为nil
if err == io.EOF || err == nil {
return persons, nil
}
log.Fatalf("stream.Recv failed, error: %v\n", err)
return persons, err
}
func makePerson(name string, id int32, email string) pb.Person {
return pb.Person{
Name: name,
Id: id,
Email: email,
}
}
func printPersons(persons []*pb.Person) {
for _, person := range persons {
fmt.Printf("%+v\n", person)
}
fmt.Println("")
}
func main() {
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
conn, err := grpc.Dial("localhost:50001", opts...)
if err != nil {
log.Fatalf("grpc.Dial failed, error: %v\n", err)
}
defer conn.Close()
client := pb.NewManageClient(conn)
person := makePerson("Tom", 1, "tom@gmail.com")
suc := addPerson(client, &person)
if !suc {
log.Fatalf("addPerson failed.\n")
}
person = makePerson("Lilly", 2, "lilly@gmail.com")
person2 := makePerson("Jim", 3, "jim@gmail.com")
persons := []*pb.Person{&person, &person2}
suc = addPersons(client, persons)
if !suc {
log.Fatalf("addPersons failed.\n")
}
resPersons, err := getPersonsLimit(client, 5)
if err != nil {
log.Fatalf("getPersonsLimit failed, error: %v\n", err)
}
fmt.Println("getPersonsLimit output:")
printPersons(resPersons)
var personNames []string
for _, person := range persons {
personNames = append(personNames, person.GetName())
}
resPersons, err = getPersons(client, personNames)
if err != nil {
log.Fatalf("getPersons failed, error: %v\n", err)
}
fmt.Println("getPersons output:")
printPersons(resPersons)
}
这个我没有使用单元测试, 可能使用单元测试会更好, 不过根据客户端代码和输出, 也可以验证服务的正确性.