mirror of
https://github.com/Luzifer/nginx-sso.git
synced 2024-12-21 05:11:17 +00:00
269 lines
7.3 KiB
Go
269 lines
7.3 KiB
Go
// +build go1.12
|
|
|
|
/*
|
|
*
|
|
* Copyright 2019 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package xds
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/golang/protobuf/ptypes"
|
|
structpb "github.com/golang/protobuf/ptypes/struct"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/balancer"
|
|
cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds"
|
|
basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
|
|
discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
|
|
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
|
|
adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
|
|
"google.golang.org/grpc/grpclog"
|
|
"google.golang.org/grpc/internal/backoff"
|
|
"google.golang.org/grpc/internal/channelz"
|
|
)
|
|
|
|
const (
|
|
grpcHostname = "com.googleapis.trafficdirector.grpc_hostname"
|
|
cdsType = "type.googleapis.com/envoy.api.v2.Cluster"
|
|
edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
|
|
endpointRequired = "endpoints_required"
|
|
)
|
|
|
|
var (
|
|
defaultBackoffConfig = backoff.Exponential{
|
|
MaxDelay: 120 * time.Second,
|
|
}
|
|
)
|
|
|
|
// client is responsible for connecting to the specified traffic director, passing the received
|
|
// ADS response from the traffic director, and sending notification when communication with the
|
|
// traffic director is lost.
|
|
type client struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
cli adspb.AggregatedDiscoveryServiceClient
|
|
opts balancer.BuildOptions
|
|
balancerName string // the traffic director name
|
|
serviceName string // the user dial target name
|
|
enableCDS bool
|
|
newADS func(ctx context.Context, resp proto.Message) error
|
|
loseContact func(ctx context.Context)
|
|
cleanup func()
|
|
backoff backoff.Strategy
|
|
|
|
mu sync.Mutex
|
|
cc *grpc.ClientConn
|
|
}
|
|
|
|
func (c *client) run() {
|
|
c.dial()
|
|
c.makeADSCall()
|
|
}
|
|
|
|
func (c *client) close() {
|
|
c.cancel()
|
|
c.mu.Lock()
|
|
if c.cc != nil {
|
|
c.cc.Close()
|
|
}
|
|
c.mu.Unlock()
|
|
c.cleanup()
|
|
}
|
|
|
|
func (c *client) dial() {
|
|
var dopts []grpc.DialOption
|
|
if creds := c.opts.DialCreds; creds != nil {
|
|
if err := creds.OverrideServerName(c.balancerName); err == nil {
|
|
dopts = append(dopts, grpc.WithTransportCredentials(creds))
|
|
} else {
|
|
grpclog.Warningf("xds: failed to override the server name in the credentials: %v, using Insecure", err)
|
|
dopts = append(dopts, grpc.WithInsecure())
|
|
}
|
|
} else {
|
|
dopts = append(dopts, grpc.WithInsecure())
|
|
}
|
|
if c.opts.Dialer != nil {
|
|
dopts = append(dopts, grpc.WithContextDialer(c.opts.Dialer))
|
|
}
|
|
// Explicitly set pickfirst as the balancer.
|
|
dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
|
|
if channelz.IsOn() {
|
|
dopts = append(dopts, grpc.WithChannelzParentID(c.opts.ChannelzParentID))
|
|
}
|
|
|
|
cc, err := grpc.DialContext(c.ctx, c.balancerName, dopts...)
|
|
// Since this is a non-blocking dial, so if it fails, it due to some serious error (not network
|
|
// related) error.
|
|
if err != nil {
|
|
grpclog.Fatalf("xds: failed to dial: %v", err)
|
|
}
|
|
c.mu.Lock()
|
|
select {
|
|
case <-c.ctx.Done():
|
|
cc.Close()
|
|
default:
|
|
// only assign c.cc when xds client has not been closed, to prevent ClientConn leak.
|
|
c.cc = cc
|
|
}
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
func (c *client) newCDSRequest() *discoverypb.DiscoveryRequest {
|
|
cdsReq := &discoverypb.DiscoveryRequest{
|
|
Node: &basepb.Node{
|
|
Metadata: &structpb.Struct{
|
|
Fields: map[string]*structpb.Value{
|
|
grpcHostname: {
|
|
Kind: &structpb.Value_StringValue{StringValue: c.serviceName},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
TypeUrl: cdsType,
|
|
}
|
|
return cdsReq
|
|
}
|
|
|
|
func (c *client) newEDSRequest() *discoverypb.DiscoveryRequest {
|
|
edsReq := &discoverypb.DiscoveryRequest{
|
|
Node: &basepb.Node{
|
|
Metadata: &structpb.Struct{
|
|
Fields: map[string]*structpb.Value{
|
|
endpointRequired: {
|
|
Kind: &structpb.Value_BoolValue{BoolValue: c.enableCDS},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
ResourceNames: []string{c.serviceName},
|
|
TypeUrl: edsType,
|
|
}
|
|
return edsReq
|
|
}
|
|
|
|
func (c *client) makeADSCall() {
|
|
c.cli = adspb.NewAggregatedDiscoveryServiceClient(c.cc)
|
|
retryCount := 0
|
|
var doRetry bool
|
|
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if doRetry {
|
|
backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount))
|
|
select {
|
|
case <-backoffTimer.C:
|
|
case <-c.ctx.Done():
|
|
backoffTimer.Stop()
|
|
return
|
|
}
|
|
retryCount++
|
|
}
|
|
|
|
firstRespReceived := c.adsCallAttempt()
|
|
if firstRespReceived {
|
|
retryCount = 0
|
|
doRetry = false
|
|
} else {
|
|
doRetry = true
|
|
}
|
|
c.loseContact(c.ctx)
|
|
}
|
|
}
|
|
|
|
func (c *client) adsCallAttempt() (firstRespReceived bool) {
|
|
firstRespReceived = false
|
|
ctx, cancel := context.WithCancel(c.ctx)
|
|
defer cancel()
|
|
st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true))
|
|
if err != nil {
|
|
grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err)
|
|
return
|
|
}
|
|
if c.enableCDS {
|
|
if err := st.Send(c.newCDSRequest()); err != nil {
|
|
// current stream is broken, start a new one.
|
|
return
|
|
}
|
|
}
|
|
if err := st.Send(c.newEDSRequest()); err != nil {
|
|
// current stream is broken, start a new one.
|
|
return
|
|
}
|
|
expectCDS := c.enableCDS
|
|
for {
|
|
resp, err := st.Recv()
|
|
if err != nil {
|
|
// current stream is broken, start a new one.
|
|
return
|
|
}
|
|
firstRespReceived = true
|
|
resources := resp.GetResources()
|
|
if len(resources) < 1 {
|
|
grpclog.Warning("xds: ADS response contains 0 resource info.")
|
|
// start a new call as server misbehaves by sending a ADS response with 0 resource info.
|
|
return
|
|
}
|
|
if resp.GetTypeUrl() == cdsType && !c.enableCDS {
|
|
grpclog.Warning("xds: received CDS response in custom plugin mode.")
|
|
// start a new call as we receive CDS response when in EDS-only mode.
|
|
return
|
|
}
|
|
var adsResp ptypes.DynamicAny
|
|
if err := ptypes.UnmarshalAny(resources[0], &adsResp); err != nil {
|
|
grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
|
|
return
|
|
}
|
|
switch adsResp.Message.(type) {
|
|
case *cdspb.Cluster:
|
|
expectCDS = false
|
|
case *edspb.ClusterLoadAssignment:
|
|
if expectCDS {
|
|
grpclog.Warningf("xds: expecting CDS response, got EDS response instead.")
|
|
return
|
|
}
|
|
}
|
|
if err := c.newADS(c.ctx, adsResp.Message); err != nil {
|
|
grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
|
|
c := &client{
|
|
balancerName: balancerName,
|
|
serviceName: serviceName,
|
|
enableCDS: enableCDS,
|
|
opts: opts,
|
|
newADS: newADS,
|
|
loseContact: loseContact,
|
|
cleanup: exitCleanup,
|
|
backoff: defaultBackoffConfig,
|
|
}
|
|
|
|
c.ctx, c.cancel = context.WithCancel(context.Background())
|
|
|
|
return c
|
|
}
|