标签:取出 签名 transport 大数据 cep contex dock Opens ssi
Protobuf
的角度看,gRPC只不过是一个针对service接口生成代码的生成器。我们在本章的第二节中手工实现了一个简单的Protobuf代码生成器插件,只不过当时生成的代码是适配标准库的RPC框架的。现在我们将学习gRPC的用法。syntax = "proto3";
package main;
message String {
string value = 1;
}
service HelloService {
rpc Hello (String) returns (String);
}
protoc-gen-go
内置的gRPC插件生成gRPC代码:$ protoc --go_out=plugins=grpc:. hello.proto
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
}
type HelloServiceClient interface {
Hello(context.Context, *String, ...grpc.CallOption) (*String, error)
}
HelloServiceServer
接口可以重新实现HelloService服务:type HelloServiceImpl struct{}
func (p *HelloServiceImpl) Hello(
ctx context.Context, args *String,
) (*String, error) {
reply := &String{Value: "hello:" + args.GetValue()}
return reply, nil
}
func main() {
grpcServer := grpc.NewServer()
RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
grpcServer.Serve(lis)
}
grpc.NewServer()
构造一个gRPC服务对象,然后通过gRPC插件生成的RegisterHelloServiceServer
函数注册我们实现的HelloServiceImpl服务。然后通过grpcServer.Serve(lis)
在一个监听端口上提供gRPC服务。unc main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewHelloServiceClient(conn)
reply, err := client.Hello(context.Background(), &String{Value: "hello"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
service HelloService {
rpc Hello (String) returns (String);
rpc Channel (stream String) returns (stream String);
}
stream
指定启用流特性,参数部分是接收客户端参数的流,返回值是返回给客户端的流。type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
Channel(HelloService_ChannelServer) error
}
type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (
*String, error,
)
Channel(ctx context.Context, opts ...grpc.CallOption) (
HelloService_ChannelClient, error,
)
}
HelloService_ChannelServer
类型的参数,可以用于和客户端双向通信
。客户端的Channel方法返回一个HelloService_ChannelClient
类型的返回值,可以用于和服务端进行双向通信
。type HelloService_ChannelServer interface {
Send(*String) error
Recv() (*String, error)
grpc.ServerStream
}
type HelloService_ChannelClient interface {
Send(*String) error
Recv() (*String, error)
grpc.ClientStream
}
func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
for {
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
reply := &String{Value: "hello:" + args.GetValue()}
err = stream.Send(reply)
if err != nil {
return err
}
}
}
stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}
go func() {
for {
if err := stream.Send(&String{Value: "hi"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
import (
"github.com/moby/moby/pkg/pubsub"
)
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
golang := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "golang:") {
return true
}
}
return false
})
docker := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "docker:") {
return true
}
}
return false
})
go p.Publish("hi")
go p.Publish("golang: https://golang.org")
go p.Publish("docker: https://www.docker.com/")
time.Sleep(1)
go func() {
fmt.Println("golang topic:", <-golang)
}()
go func() {
fmt.Println("docker topic:", <-docker)
}()
<-make(chan bool)
}
pubsub.NewPublisher
构造一个发布对象,p.SubscribeTopic()
可以通过函数筛选感兴趣的主题进行订阅。现在尝试基于gRPC和pubsub包,提供一个跨网络的发布和订阅系统。首先通过Protobuf定义一个发布订阅服务接口:
service PubsubService {
rpc Publish (String) returns (String);
rpc Subscribe (String) returns (stream String);
}
type PubsubServiceServer interface {
Publish(context.Context, *String) (*String, error)
Subscribe(*String, PubsubService_SubscribeServer) error
}
type PubsubServiceClient interface {
Publish(context.Context, *String, ...grpc.CallOption) (*String, error)
Subscribe(context.Context, *String, ...grpc.CallOption) (
PubsubService_SubscribeClient, error,
)
}
type PubsubService_SubscribeServer interface {
Send(*String) error
grpc.ServerStream
}
单向流
,因此生成的HelloService_SubscribeServer接口中只有Send方法。type PubsubService struct {
pub *pubsub.Publisher
}
func NewPubsubService() *PubsubService {
return &PubsubService{
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
}
}
func (p *PubsubService) Publish(
ctx context.Context, arg *String,
) (*String, error) {
p.pub.Publish(arg.GetValue())
return &String{}, nil
}
func (p *PubsubService) Subscribe(
arg *String, stream PubsubService_SubscribeServer,
) error {
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key,arg.GetValue()) {
return true
}
}
return false
})
for v := range ch {
if err := stream.Send(&String{Value: v.(string)}); err != nil {
return err
}
}
return nil
}
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewPubsubServiceClient(conn)
_, err = client.Publish(
context.Background(), &String{Value: "golang: hello Go"},
)
if err != nil {
log.Fatal(err)
}
_, err = client.Publish(
context.Background(), &String{Value: "docker: hello Docker"},
)
if err != nil {
log.Fatal(err)
}
}
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewPubsubServiceClient(conn)
stream, err := client.Subscribe(
context.Background(), &String{Value: "golang:"},
)
if err != nil {
log.Fatal(err)
}
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
}
$ openssl genrsa -out server.key 2048
$ openssl req -new -x509 -days 3650 -subj "/C=GB/L=China/O=grpc-server/CN=server.grpc.io" -key server.key -out server.crt
$ openssl genrsa -out client.key 2048
$ openssl req -new -x509 -days 3650 -subj "/C=GB/L=China/O=grpc-client/CN=client.grpc.io" -key client.key -out client.crt
server.key、server.crt、client.key和client.crt
四个文件。其中以.key为后缀名的是私钥文件
,需要妥善保管。以.crt为后缀名是证书文件,也可以简单理解为公钥文件
,并不需要秘密保存。在subj
参数中的/CN=server.grpc.io
表示服务器的名字为server.grpc.io
,在验证服务器的证书时需要用到该信息。func main() {
// 创建证书对象 秘钥+公钥
creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
// 使用秘钥开启服务
server := grpc.NewServer(grpc.Creds(creds))
...
}
credentials.NewServerTLSFromFile
函数是从文件为服务器构造证书对象,然后通过grpc.Creds(creds)
函数将证书包装为选项后作为参数传入grpc.NewServer
函数。func main() {
creds, err := credentials.NewClientTLSFromFile(
"server.crt", "server.grpc.io",
)
if err != nil {
log.Fatal(err)
}
conn, err := grpc.Dial("localhost:5000",
grpc.WithTransportCredentials(creds),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
...
}
redentials.NewClientTLSFromFile
是构造客户端用的证书对象,第一个参数是服务器的证书文件
,第二个参数是签发证书的服务器的名字
。然后通过grpc.WithTransportCredentials(creds)
将证书对象转为参数选项传人grpc.Dial
函数。$ openssl genrsa -out ca.key 2048
$ openssl req -new -x509 -days 3650 -subj "/C=GB/L=China/O=gobook/CN=github.com" -key ca.key -out ca.crt
$ openssl req -new -subj "/C=GB/L=China/O=server/CN=server.io" -key server.key -out server.csr
$ openssl x509 -req -sha256 -CA ca.crt -CAkey ca.key -CAcreateserial -days 3650 -in server.csr -out server.crt
.csr
为后缀名的文件,它表示证书签名请求文件。在证书签名完成之后可以删除.csr文件。func main() {
certificate, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
log.Fatal(err)
}
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Fatal("failed to append ca certs")
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{certificate},
ServerName: tlsServerName, // NOTE: this is required! tlsServerName取决于生成server秘钥的 -subj "/C=GB/L=China/O=server/CN=server.io" \ 中的CN选项
RootCAs: certPool,
})
conn, err := grpc.Dial(
"localhost:5000", grpc.WithTransportCredentials(creds),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
...
}
credentials.NewTLS
函数调用中,客户端通过引入一个CA根证书和服务器的名字来实现对服务器进行验证。客户端在链接服务器时会首先请求服务器的证书,然后使用CA根证书对收到的服务器端证书进行验证。$ openssl req -new -subj "/C=GB/L=China/O=client/CN=client.io" -key client.key -out client.csr
$ openssl x509 -req -sha256 -CA ca.crt -CAkey ca.key -CAcreateserial -days 3650 -in client.csr -out client.crt
func main() {
certificate, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Fatal("failed to append certs")
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{certificate},
ClientAuth: tls.RequireAndVerifyClientCert, // NOTE: this is optional!
ClientCAs: certPool,
})
server := grpc.NewServer(grpc.Creds(creds))
...
}
credentials.NewTLS
函数生成证书,通过ClientCAs
选择CA根证书,并通过ClientAuth
选项启用对客户端进行验证。grpc.PerRPCCredentials
接口:type PerRPCCredentials interface {
// GetRequestMetadata gets the current request metadata, refreshing
// tokens if required. This should be called by the transport layer on
// each request, and the data should be populated in headers or other
// context. If a status code is returned, it will be used as the status
// for the RPC. uri is the URI of the entry point for the request.
// When supported by the underlying implementation, ctx can be used for
// timeout and cancellation.
// TODO(zhaoq): Define the set of the qualified keys instead of leaving
// it as an arbitrary string.
GetRequestMetadata(ctx context.Context, uri ...string) (
map[string]string, error,
)
// RequireTransportSecurity indicates whether the credentials requires
// transport security.
RequireTransportSecurity() bool
}
GetRequestMetadata
方法中返回认证需要的必要信息。RequireTransportSecurity
方法表示是否要求底层使用安全链接。在真实的环境中建议必须要求底层启用安全的链接,否则认证信息有泄露和被篡改的风险。type Authentication struct {
User string
Password string
}
func (a *Authentication) GetRequestMetadata(context.Context, ...string) (
map[string]string, error,
) {
return map[string]string{"user":a.User, "password": a.Password}, nil
}
func (a *Authentication) RequireTransportSecurity() bool {
return false
}
GetRequestMetadata
方法中,我们返回地认证信息包装login和password两个信息。为了演示代码简单,RequireTransportSecurity方法表示不要求底层使用安全链接。func main() {
auth := Authentication{
Login: "gopher",
Password: "password",
}
conn, err := grpc.Dial("localhost"+port, grpc.WithInsecure(), grpc.WithPerRPCCredentials(&auth))
if err != nil {
log.Fatal(err)
}
defer conn.Close()
...
}
grpc.WithPerRPCCredentials
函数将Authentication对象转为grpc.Dial参数。因为这里没有启用安全链接,需要传人grpc.WithInsecure()表示忽略证书认证。type grpcServer struct { auth *Authentication }
func (p *grpcServer) SomeMethod(
ctx context.Context, in *HelloRequest,
) (*HelloReply, error) {
if err := p.auth.Auth(ctx); err != nil {
return nil, err
}
return &HelloReply{Message: "Hello " + in.Name}, nil
}
func (a *Authentication) Auth(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("missing credentials")
}
var appid string
var appkey string
if val, ok := md["user"]; ok { appid = val[0] }
if val, ok := md["password"]; ok { appkey = val[0] }
if appid != a.User || appkey != a.Password {
return grpc.Errorf(codes.Unauthenticated, "invalid token")
}
return nil
}
grpc.UnaryInterceptor
和grpc.StreamInterceptor
分别对普通方法和流方法提供了截取器的支持。我们这里简单介绍普通方法的截取器用法。grpc.UnaryInterceptor
的参数实现一个函数:func filter(ctx context.Context,
req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
log.Println("filter:", info)
return handler(ctx, req)
}
ctx
和req
参数就是每个普通的RPC方法的前两个参数。第三个info
参数表示当前是对应的那个gRPC方法,第四个handler
参数对应当前的gRPC方法函数。上面的函数中首先是日志输出info参数,然后调用handler对应的gRPC方法函数。server := grpc.NewServer(grpc.UnaryInterceptor(filter))
如果截取器函数返回了错误,那么该次gRPC方法调用将被视作失败处理
。因此,我们可以在截取器中对输入的参数做一些简单的验证工作。同样,也可以对handler返回的结果做一些验证工作。截取器也非常适合前面对Token认证工作。func filter(
ctx context.Context, req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
log.Println("filter:", info)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()
return handler(ctx, req)
}
import "github.com/grpc-ecosystem/go-grpc-middleware"
myServer := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
filter1, filter2, ...
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
filter1, filter2, ...
)),
)
func main() {
mux := http.NewServeMux()
h2Handler := h2c.NewHandler(mux, &http2.Server{})
server = &http.Server{Addr: ":3999", Handler: h2Handler}
server.ListenAndServe()
}
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprintln(w, "hello")
})
http.ListenAndServeTLS(port, "server.crt", "server.key",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mux.ServeHTTP(w, r)
return
}),
)
}
func main() {
creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
grpcServer := grpc.NewServer(grpc.Creds(creds))
...
}
func main() {
...
http.ListenAndServeTLS(port, "server.crt", "server.key",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor != 2 {
mux.ServeHTTP(w, r)
return
}
if strings.Contains(
r.Header.Get("Content-Type"), "application/grpc",
) {
grpcServer.ServeHTTP(w, r) // gRPC Server
return
}
mux.ServeHTTP(w, r)
return
}),
)
}
标签:取出 签名 transport 大数据 cep contex dock Opens ssi
原文地址:https://www.cnblogs.com/binHome/p/13058116.html