Package broker
Overview ▹
Index ▹
Variables
var ( DefaultSubPath = "/_sub" )
func Connect ¶
func Connect() error
func Disconnect ¶
func Disconnect() error
func Init ¶
func Init(opts ...Option) error
func Publish ¶
func Publish(topic string, msg *Message, opts ...PublishOption) error
func String ¶
func String() string
type Broker ¶
Broker is an interface used for asynchronous messaging. Its an abstraction over various message brokers {NATS, RabbitMQ, Kafka, ...}
type Broker interface { Options() Options Address() string Connect() error Disconnect() error Init(...Option) error Publish(string, *Message, ...PublishOption) error Subscribe(string, Handler, ...SubscribeOption) (Subscriber, error) String() string }
var ( DefaultBroker Broker = newHttpBroker() )
func NewBroker ¶
func NewBroker(opts ...Option) Broker
type Handler ¶
Handler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.
type Handler func(Publication) error
type Message ¶
type Message struct { Header map[string]string Body []byte }
type Option ¶
type Option func(*Options)
func Addrs ¶
func Addrs(addrs ...string) Option
Addrs sets the host addresses to be used by the broker
func Codec ¶
func Codec(c codec.Codec) Option
Codec sets the codec used for encoding/decoding used where a broker does not support headers
func Registry ¶
func Registry(r registry.Registry) Option
func Secure ¶
func Secure(b bool) Option
Secure communication with the broker
func TLSConfig ¶
func TLSConfig(t *tls.Config) Option
Specify TLS Config
type Options ¶
type Options struct { Addrs []string Secure bool Codec codec.Codec TLSConfig *tls.Config // Other options for implementations of the interface // can be stored in a context Context context.Context }
type Publication ¶
Publication is given to a subscription handler for processing
type Publication interface { Topic() string Message() *Message Ack() error }
type PublishOption ¶
type PublishOption func(*PublishOptions)
type PublishOptions ¶
type PublishOptions struct { // Other options for implementations of the interface // can be stored in a context Context context.Context }
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
DisableAutoAck will disable auto acking of messages after they have been handled.
func Queue ¶
func Queue(name string) SubscribeOption
Queue sets the name of the queue to share messages on
type SubscribeOptions ¶
type SubscribeOptions struct { // AutoAck defaults to true. When a handler returns // with a nil error the message is acked. AutoAck bool // Subscribers with the same queue name // will create a shared subscription where each // receives a subset of messages. Queue string // Other options for implementations of the interface // can be stored in a context Context context.Context }
type Subscriber ¶
Subscriber is a convenience return type for the Subscribe method
type Subscriber interface { Options() SubscribeOptions Topic() string Unsubscribe() error }
func Subscribe ¶
func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)