1
0
Fork 0
mirror of https://github.com/Luzifer/cloudkeys-go.git synced 2024-11-14 17:02:43 +00:00
cloudkeys-go/vendor/github.com/xuyu/goredis/pubsub.go

148 lines
4.1 KiB
Go

package goredis
import (
"errors"
"strconv"
"strings"
)
// Publish posts a message to the given channel.
// Integer reply: the number of clients that received the message.
func (r *Redis) Publish(channel, message string) (int64, error) {
rp, err := r.ExecuteCommand("PUBLISH", channel, message)
if err != nil {
return 0, err
}
return rp.IntegerValue()
}
// PubSub doc: http://redis.io/topics/pubsub
type PubSub struct {
redis *Redis
conn *connection
Patterns map[string]bool
Channels map[string]bool
}
// PubSub new a PubSub from *redis.
func (r *Redis) PubSub() (*PubSub, error) {
c, err := r.pool.Get()
if err != nil {
return nil, err
}
return &PubSub{
redis: r,
conn: c,
Patterns: make(map[string]bool),
Channels: make(map[string]bool),
}, nil
}
// Close closes current pubsub command.
func (p *PubSub) Close() error {
return p.conn.Conn.Close()
}
// Receive returns the reply of pubsub command.
// A message is a Multi-bulk reply with three elements.
// The first element is the kind of message:
// 1) subscribe: means that we successfully subscribed to the channel given as the second element in the reply.
// The third argument represents the number of channels we are currently subscribed to.
// 2) unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply.
// third argument represents the number of channels we are currently subscribed to.
// When the last argument is zero, we are no longer subscribed to any channel,
// and the client can issue any kind of Redis command as we are outside the Pub/Sub state.
// 3) message: it is a message received as result of a PUBLISH command issued by another client.
// The second element is the name of the originating channel, and the third argument is the actual message payload.
func (p *PubSub) Receive() ([]string, error) {
rp, err := p.conn.RecvReply()
if err != nil {
return nil, err
}
command, err := rp.Multi[0].StringValue()
if err != nil {
return nil, err
}
switch strings.ToLower(command) {
case "psubscribe", "punsubscribe":
pattern, err := rp.Multi[1].StringValue()
if err != nil {
return nil, err
}
if command == "psubscribe" {
p.Patterns[pattern] = true
} else {
delete(p.Patterns, pattern)
}
number, err := rp.Multi[2].IntegerValue()
if err != nil {
return nil, err
}
return []string{command, pattern, strconv.FormatInt(number, 10)}, nil
case "subscribe", "unsubscribe":
channel, err := rp.Multi[1].StringValue()
if err != nil {
return nil, err
}
if command == "subscribe" {
p.Channels[channel] = true
} else {
delete(p.Channels, channel)
}
number, err := rp.Multi[2].IntegerValue()
if err != nil {
return nil, err
}
return []string{command, channel, strconv.FormatInt(number, 10)}, nil
case "pmessage":
pattern, err := rp.Multi[1].StringValue()
if err != nil {
return nil, err
}
channel, err := rp.Multi[2].StringValue()
if err != nil {
return nil, err
}
message, err := rp.Multi[3].StringValue()
if err != nil {
return nil, err
}
return []string{command, pattern, channel, message}, nil
case "message":
channel, err := rp.Multi[1].StringValue()
if err != nil {
return nil, err
}
message, err := rp.Multi[2].StringValue()
if err != nil {
return nil, err
}
return []string{command, channel, message}, nil
}
return nil, errors.New("pubsub protocol error")
}
// Subscribe channel [channel ...]
func (p *PubSub) Subscribe(channels ...string) error {
args := packArgs("SUBSCRIBE", channels)
return p.conn.SendCommand(args...)
}
// PSubscribe pattern [pattern ...]
func (p *PubSub) PSubscribe(patterns ...string) error {
args := packArgs("PSUBSCRIBE", patterns)
return p.conn.SendCommand(args...)
}
// UnSubscribe [channel [channel ...]]
func (p *PubSub) UnSubscribe(channels ...string) error {
args := packArgs("UNSUBSCRIBE", channels)
return p.conn.SendCommand(args...)
}
// PUnSubscribe [pattern [pattern ...]]
func (p *PubSub) PUnSubscribe(patterns ...string) error {
args := packArgs("PUNSUBSCRIBE", patterns)
return p.conn.SendCommand(args...)
}