mirror of
https://github.com/Luzifer/nginx-sso.git
synced 2025-01-02 03:01:16 +00:00
330 lines
9.4 KiB
Go
330 lines
9.4 KiB
Go
// Copyright 2014 Google LLC
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package longtest
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"cloud.google.com/go/internal/testutil"
|
|
"cloud.google.com/go/pubsub"
|
|
"google.golang.org/api/option"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
timeout = time.Minute * 10
|
|
ackDeadline = time.Second * 10
|
|
nMessages = 1e4
|
|
acceptableDupPercentage = 1
|
|
numAcceptableDups = int(nMessages * acceptableDupPercentage / 100)
|
|
)
|
|
|
|
// The end-to-end pumps many messages into a topic and tests that they are all
|
|
// delivered to each subscription for the topic. It also tests that messages
|
|
// are not unexpectedly redelivered.
|
|
func TestEndToEnd_Dupes(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
client, topic, cleanup := prepareEndToEndTest(ctx, t)
|
|
defer cleanup()
|
|
subPrefix := fmt.Sprintf("endtoend-%d", time.Now().UnixNano())
|
|
|
|
// Two subscriptions to the same topic.
|
|
var err error
|
|
var subs [2]*pubsub.Subscription
|
|
for i := 0; i < len(subs); i++ {
|
|
subs[i], err = client.CreateSubscription(ctx, fmt.Sprintf("%s-%d", subPrefix, i), pubsub.SubscriptionConfig{
|
|
Topic: topic,
|
|
AckDeadline: ackDeadline,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateSub error: %v", err)
|
|
}
|
|
defer subs[i].Delete(ctx)
|
|
}
|
|
|
|
err = publish(ctx, topic, nMessages)
|
|
topic.Stop()
|
|
if err != nil {
|
|
t.Fatalf("publish: %v", err)
|
|
}
|
|
|
|
// recv provides an indication that messages are still arriving.
|
|
recv := make(chan struct{})
|
|
// We have two subscriptions to our topic.
|
|
// Each subscription will get a copy of each published message.
|
|
var wg sync.WaitGroup
|
|
cctx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
consumers := []*consumer{
|
|
{counts: make(map[string]int), recv: recv, durations: []time.Duration{time.Hour}},
|
|
{counts: make(map[string]int), recv: recv,
|
|
durations: []time.Duration{ackDeadline, ackDeadline, ackDeadline / 2, ackDeadline / 2, time.Hour}},
|
|
}
|
|
for i, con := range consumers {
|
|
con := con
|
|
sub := subs[i]
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
con.consume(ctx, t, sub)
|
|
}()
|
|
}
|
|
// Wait for a while after the last message before declaring quiescence.
|
|
// We wait a multiple of the ack deadline, for two reasons:
|
|
// 1. To detect if messages are redelivered after having their ack
|
|
// deadline extended.
|
|
// 2. To wait for redelivery of messages that were en route when a Receive
|
|
// is canceled. This can take considerably longer than the ack deadline.
|
|
quiescenceDur := ackDeadline * 6
|
|
quiescenceTimer := time.NewTimer(quiescenceDur)
|
|
|
|
loop:
|
|
for {
|
|
select {
|
|
case <-recv:
|
|
// Reset timer so we wait quiescenceDur after the last message.
|
|
// See https://godoc.org/time#Timer.Reset for why the Stop
|
|
// and channel drain are necessary.
|
|
if !quiescenceTimer.Stop() {
|
|
<-quiescenceTimer.C
|
|
}
|
|
quiescenceTimer.Reset(quiescenceDur)
|
|
|
|
case <-quiescenceTimer.C:
|
|
cancel()
|
|
log.Println("quiesced")
|
|
break loop
|
|
|
|
case <-cctx.Done():
|
|
t.Fatal("timed out")
|
|
}
|
|
}
|
|
wg.Wait()
|
|
for i, con := range consumers {
|
|
var numDups int
|
|
var zeroes int
|
|
for _, v := range con.counts {
|
|
if v == 0 {
|
|
zeroes++
|
|
}
|
|
numDups += v - 1
|
|
}
|
|
|
|
if zeroes > 0 {
|
|
t.Errorf("Consumer %d: %d messages never arrived", i, zeroes)
|
|
} else if numDups > numAcceptableDups {
|
|
t.Errorf("Consumer %d: Willing to accept %d dups (%v duplicated of %d messages), but got %d", i, numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestEndToEnd_LongProcessingTime(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
client, topic, cleanup := prepareEndToEndTest(ctx, t)
|
|
defer cleanup()
|
|
subPrefix := fmt.Sprintf("endtoend-%d", time.Now().UnixNano())
|
|
|
|
// Two subscriptions to the same topic.
|
|
sub, err := client.CreateSubscription(ctx, subPrefix+"-00", pubsub.SubscriptionConfig{
|
|
Topic: topic,
|
|
AckDeadline: ackDeadline,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("CreateSub error: %v", err)
|
|
}
|
|
defer sub.Delete(ctx)
|
|
|
|
// Tests the issue found in https://github.com/googleapis/google-cloud-go/issues/1247.
|
|
sub.ReceiveSettings.Synchronous = true
|
|
sub.ReceiveSettings.MaxOutstandingMessages = 500
|
|
|
|
err = publish(ctx, topic, 1000)
|
|
topic.Stop()
|
|
if err != nil {
|
|
t.Fatalf("publish: %v", err)
|
|
}
|
|
|
|
// recv provides an indication that messages are still arriving.
|
|
recv := make(chan struct{})
|
|
consumer := consumer{
|
|
counts: make(map[string]int),
|
|
recv: recv,
|
|
durations: []time.Duration{time.Hour},
|
|
processingDelay: func() time.Duration {
|
|
return time.Duration(1+rand.Int63n(120)) * time.Second
|
|
},
|
|
}
|
|
go consumer.consume(ctx, t, sub)
|
|
// Wait for a while after the last message before declaring quiescence.
|
|
// We wait a multiple of the ack deadline, for two reasons:
|
|
// 1. To detect if messages are redelivered after having their ack
|
|
// deadline extended.
|
|
// 2. To wait for redelivery of messages that were en route when a Receive
|
|
// is canceled. This can take considerably longer than the ack deadline.
|
|
quiescenceDur := ackDeadline * 6
|
|
quiescenceTimer := time.NewTimer(quiescenceDur)
|
|
loop:
|
|
for {
|
|
select {
|
|
case <-recv:
|
|
// Reset timer so we wait quiescenceDur after the last message.
|
|
// See https://godoc.org/time#Timer.Reset for why the Stop
|
|
// and channel drain are necessary.
|
|
if !quiescenceTimer.Stop() {
|
|
<-quiescenceTimer.C
|
|
}
|
|
quiescenceTimer.Reset(quiescenceDur)
|
|
|
|
case <-quiescenceTimer.C:
|
|
cancel()
|
|
log.Println("quiesced")
|
|
break loop
|
|
|
|
case <-ctx.Done():
|
|
t.Fatal("timed out")
|
|
}
|
|
}
|
|
var numDups int
|
|
var zeroes int
|
|
for _, v := range consumer.counts {
|
|
if v == 0 {
|
|
zeroes++
|
|
}
|
|
numDups += v - 1
|
|
}
|
|
|
|
if zeroes > 0 {
|
|
t.Errorf("%d messages never arrived", zeroes)
|
|
} else if numDups > numAcceptableDups {
|
|
t.Errorf("Willing to accept %d dups (%v duplicated of %d messages), but got %d", numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
|
|
}
|
|
}
|
|
|
|
// publish publishes n messages to topic.
|
|
func publish(ctx context.Context, topic *pubsub.Topic, n int) error {
|
|
var rs []*pubsub.PublishResult
|
|
for i := 0; i < n; i++ {
|
|
m := &pubsub.Message{Data: []byte(fmt.Sprintf("msg %d", i))}
|
|
rs = append(rs, topic.Publish(ctx, m))
|
|
}
|
|
for _, r := range rs {
|
|
_, err := r.Get(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// consumer consumes messages according to its configuration.
|
|
type consumer struct {
|
|
// A consumer will spin out a Receive for each duration, which will be
|
|
// canceled after each duration and the next one spun up. For example, if
|
|
// there are 5 3 second durations, then there will be 5 3 second Receives.
|
|
durations []time.Duration
|
|
|
|
// A value is sent to recv each time Inc is called.
|
|
recv chan struct{}
|
|
|
|
// How long to wait for before acking.
|
|
processingDelay func() time.Duration
|
|
|
|
mu sync.Mutex
|
|
counts map[string]int // msgID: recvdAmt
|
|
totalRecvd int
|
|
}
|
|
|
|
// consume reads messages from a subscription, and keeps track of what it receives in mc.
|
|
// After consume returns, the caller should wait on wg to ensure that no more updates to mc will be made.
|
|
func (c *consumer) consume(ctx context.Context, t *testing.T, sub *pubsub.Subscription) {
|
|
for _, dur := range c.durations {
|
|
ctx2, cancel := context.WithTimeout(ctx, dur)
|
|
defer cancel()
|
|
id := sub.String()[len(sub.String())-1:]
|
|
t.Logf("%s: start receive", id)
|
|
prev := c.totalRecvd
|
|
err := sub.Receive(ctx2, c.process)
|
|
t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev)
|
|
if serr, _ := status.FromError(err); err != nil && serr.Code() != codes.Canceled {
|
|
panic(err)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// process handles a message and records it in mc.
|
|
func (c *consumer) process(_ context.Context, m *pubsub.Message) {
|
|
c.mu.Lock()
|
|
c.counts[m.ID]++
|
|
c.totalRecvd++
|
|
c.mu.Unlock()
|
|
c.recv <- struct{}{}
|
|
|
|
var delay time.Duration
|
|
if c.processingDelay == nil {
|
|
delay = time.Duration(rand.Intn(int(ackDeadline * 3)))
|
|
} else {
|
|
delay = c.processingDelay()
|
|
}
|
|
|
|
// Simulate time taken to process m, while continuing to process more messages.
|
|
// Some messages will need to have their ack deadline extended due to this delay.
|
|
time.AfterFunc(delay, func() {
|
|
m.Ack()
|
|
})
|
|
}
|
|
|
|
// Remember to call cleanup!
|
|
func prepareEndToEndTest(ctx context.Context, t *testing.T) (*pubsub.Client, *pubsub.Topic, func()) {
|
|
if testing.Short() {
|
|
t.Skip("Integration tests skipped in short mode")
|
|
}
|
|
ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
|
|
if ts == nil {
|
|
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
|
|
}
|
|
|
|
now := time.Now()
|
|
topicName := fmt.Sprintf("endtoend-%d", now.UnixNano())
|
|
|
|
client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
|
|
if err != nil {
|
|
t.Fatalf("Creating client error: %v", err)
|
|
}
|
|
|
|
var topic *pubsub.Topic
|
|
if topic, err = client.CreateTopic(ctx, topicName); err != nil {
|
|
t.Fatalf("CreateTopic error: %v", err)
|
|
}
|
|
|
|
return client, topic, func() {
|
|
topic.Delete(ctx)
|
|
client.Close()
|
|
}
|
|
}
|