subscriber - ActiveState ActiveGo 1.8

Package subscriber

import ""

Overview ▾

Package subscriber implements the subscriber service to forward incoming data to remote services.


const (
    // DefaultHTTPTimeout is the default HTTP timeout for a Config.
    DefaultHTTPTimeout = 30 * time.Second

    // DefaultWriteConcurrency is the default write concurrency for a Config.
    DefaultWriteConcurrency = 40

    // DefaultWriteBufferSize is the default write buffer size for a Config.
    DefaultWriteBufferSize = 1000

type BalanceMode

BalanceMode specifies what balance mode to use on a subscription.

type BalanceMode int
const (
    // ALL indicates to send writes to all subscriber destinations.
    ALL BalanceMode = iota

    // ANY indicates to send writes to a single subscriber destination, round robin.

type Config

Config represents a configuration of the subscriber service.

type Config struct {
    // Whether to enable to Subscriber service
    Enabled bool `toml:"enabled"`

    HTTPTimeout toml.Duration `toml:"http-timeout"`

    // InsecureSkipVerify gets passed to the http client, if true, it will
    // skip https certificate verification. Defaults to false
    InsecureSkipVerify bool `toml:"insecure-skip-verify"`

    // configure the path to the PEM encoded CA certs file. If the
    // empty string, the default system certs will be used
    CaCerts string `toml:"ca-certs"`

    // The number of writer goroutines processing the write channel.
    WriteConcurrency int `toml:"write-concurrency"`

    // The number of in-flight writes buffered in the write channel.
    WriteBufferSize int `toml:"write-buffer-size"`

func NewConfig

func NewConfig() Config

NewConfig returns a new instance of a subscriber config.

func (Config) Validate

func (c Config) Validate() error

Validate returns an error if the config is invalid.

type HTTP

HTTP supports writing points over HTTP using the line protocol.

type HTTP struct {
    // contains filtered or unexported fields

func NewHTTP

func NewHTTP(addr string, timeout time.Duration) (*HTTP, error)

NewHTTP returns a new HTTP points writer with default options.

func NewHTTPS

func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string) (*HTTP, error)

NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured.

func (*HTTP) WritePoints

func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error)

WritePoints writes points over HTTP transport.

type PointsWriter

PointsWriter is an interface for writing points to a subscription destination. Only WritePoints() needs to be satisfied. PointsWriter implementations must be goroutine safe.

type PointsWriter interface {
    WritePoints(p *coordinator.WritePointsRequest) error

type Service

Service manages forking the incoming data from InfluxDB to defined third party destinations. Subscriptions are defined per database and retention policy.

type Service struct {
    MetaClient interface {
        Databases() []meta.DatabaseInfo
        WaitForDataChanged() chan struct{}
    NewPointsWriter func(u url.URL) (PointsWriter, error)
    Logger          zap.Logger
    // contains filtered or unexported fields

func NewService

func NewService(c Config) *Service

NewService returns a subscriber service with given settings

func (*Service) Close

func (s *Service) Close() error

Close terminates the subscription service. It will panic if called multiple times or without first opening the service.

func (*Service) Open

func (s *Service) Open() error

Open starts the subscription service.

func (*Service) Points

func (s *Service) Points() chan<- *coordinator.WritePointsRequest

Points returns a channel into which write point requests can be sent.

func (*Service) Statistics

func (s *Service) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Service) Update

func (s *Service) Update() error

Update will start new and stop deleted subscriptions.

func (*Service) WithLogger

func (s *Service) WithLogger(log zap.Logger)

WithLogger sets the logger on the service.

type Statistics

Statistics maintains the statistics for the subscriber service.

type Statistics struct {
    CreateFailures int64
    PointsWritten  int64
    WriteFailures  int64

type UDP

UDP supports writing points over UDP using the line protocol.

type UDP struct {
    // contains filtered or unexported fields

func NewUDP

func NewUDP(addr string) *UDP

NewUDP returns a new UDP listener with default options.

func (*UDP) WritePoints

func (u *UDP) WritePoints(p *coordinator.WritePointsRequest) (err error)

WritePoints writes points over UDP transport.