Package subscriber
Overview ▹
Index ▹
Constants
const (
// DefaultHTTPTimeout is the default HTTP timeout for a Config.
DefaultHTTPTimeout = 30 * time.Second
// DefaultWriteConcurrency is the default write concurrency for a Config.
DefaultWriteConcurrency = 40
// DefaultWriteBufferSize is the default write buffer size for a Config.
DefaultWriteBufferSize = 1000
)
type BalanceMode ¶
BalanceMode specifies what balance mode to use on a subscription.
type BalanceMode int
const (
// ALL indicates to send writes to all subscriber destinations.
ALL BalanceMode = iota
// ANY indicates to send writes to a single subscriber destination, round robin.
ANY
)
type Config ¶
Config represents a configuration of the subscriber service.
type Config struct {
// Whether to enable to Subscriber service
Enabled bool `toml:"enabled"`
HTTPTimeout toml.Duration `toml:"http-timeout"`
// InsecureSkipVerify gets passed to the http client, if true, it will
// skip https certificate verification. Defaults to false
InsecureSkipVerify bool `toml:"insecure-skip-verify"`
// configure the path to the PEM encoded CA certs file. If the
// empty string, the default system certs will be used
CaCerts string `toml:"ca-certs"`
// The number of writer goroutines processing the write channel.
WriteConcurrency int `toml:"write-concurrency"`
// The number of in-flight writes buffered in the write channel.
WriteBufferSize int `toml:"write-buffer-size"`
}
func NewConfig ¶
func NewConfig() Config
NewConfig returns a new instance of a subscriber config.
func (Config) Validate ¶
func (c Config) Validate() error
Validate returns an error if the config is invalid.
type HTTP ¶
HTTP supports writing points over HTTP using the line protocol.
type HTTP struct {
// contains filtered or unexported fields
}
func NewHTTP ¶
func NewHTTP(addr string, timeout time.Duration) (*HTTP, error)
NewHTTP returns a new HTTP points writer with default options.
func NewHTTPS ¶
func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string) (*HTTP, error)
NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured.
func (*HTTP) WritePoints ¶
func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error)
WritePoints writes points over HTTP transport.
type PointsWriter ¶
PointsWriter is an interface for writing points to a subscription destination. Only WritePoints() needs to be satisfied. PointsWriter implementations must be goroutine safe.
type PointsWriter interface {
WritePoints(p *coordinator.WritePointsRequest) error
}
type Service ¶
Service manages forking the incoming data from InfluxDB to defined third party destinations. Subscriptions are defined per database and retention policy.
type Service struct {
MetaClient interface {
Databases() []meta.DatabaseInfo
WaitForDataChanged() chan struct{}
}
NewPointsWriter func(u url.URL) (PointsWriter, error)
Logger zap.Logger
// contains filtered or unexported fields
}
func NewService ¶
func NewService(c Config) *Service
NewService returns a subscriber service with given settings
func (*Service) Close ¶
func (s *Service) Close() error
Close terminates the subscription service. It will panic if called multiple times or without first opening the service.
func (*Service) Open ¶
func (s *Service) Open() error
Open starts the subscription service.
func (*Service) Points ¶
func (s *Service) Points() chan<- *coordinator.WritePointsRequest
Points returns a channel into which write point requests can be sent.
func (*Service) Statistics ¶
func (s *Service) Statistics(tags map[string]string) []models.Statistic
Statistics returns statistics for periodic monitoring.
func (*Service) Update ¶
func (s *Service) Update() error
Update will start new and stop deleted subscriptions.
func (*Service) WithLogger ¶
func (s *Service) WithLogger(log zap.Logger)
WithLogger sets the logger on the service.
type Statistics ¶
Statistics maintains the statistics for the subscriber service.
type Statistics struct {
CreateFailures int64
PointsWritten int64
WriteFailures int64
}
type UDP ¶
UDP supports writing points over UDP using the line protocol.
type UDP struct {
// contains filtered or unexported fields
}
func NewUDP ¶
func NewUDP(addr string) *UDP
NewUDP returns a new UDP listener with default options.
func (*UDP) WritePoints ¶
func (u *UDP) WritePoints(p *coordinator.WritePointsRequest) (err error)
WritePoints writes points over UDP transport.
ActiveGo 1.8