continuous_querier - ActiveState ActiveGo 1.8

Package continuous_querier

import ""

Overview ▾

Package continuous_querier provides the continuous query service.


Default values for aspects of interval computation.

const (
    // The default value of how often to check whether any CQs need to be run.
    DefaultRunInterval = time.Second
const (
    // NoChunkingSize specifies when not to chunk results. When planning
    // a select statement, passing zero tells it not to chunk results.
    // Only applies to raw queries.
    NoChunkingSize = 0

type Config

Config represents a configuration for the continuous query service.

type Config struct {
    // Enables logging in CQ service to display when CQ's are processed and how many points are wrote.
    LogEnabled bool `toml:"log-enabled"`

    // If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
    Enabled bool `toml:"enabled"`

    // Run interval for checking continuous queries. This should be set to the least common factor
    // of the interval for running continuous queries. If you only aggregate continuous queries
    // every minute, this should be set to 1 minute. The default is set to '1s' so the interval
    // is compatible with most aggregations.
    RunInterval toml.Duration `toml:"run-interval"`

func NewConfig

func NewConfig() Config

NewConfig returns a new instance of Config with defaults.

func (Config) Validate

func (c Config) Validate() error

Validate returns an error if the Config is invalid.

type ContinuousQuerier

ContinuousQuerier represents a service that executes continuous queries.

type ContinuousQuerier interface {
    // Run executes the named query in the named database.  Blank database or name matches all.
    Run(database, name string, t time.Time) error

type ContinuousQuery

ContinuousQuery is a local wrapper / helper around continuous queries.

type ContinuousQuery struct {
    Database string
    Info     *meta.ContinuousQueryInfo
    HasRun   bool
    LastRun  time.Time
    Resample ResampleOptions
    // contains filtered or unexported fields

func NewContinuousQuery

func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error)

NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement.

type ResampleOptions

ResampleOptions controls the resampling intervals and duration of this continuous query.

type ResampleOptions struct {
    // The query will be resampled at this time interval. The first query will be
    // performed at this time interval. If this option is not given, the resample
    // interval is set to the group by interval.
    Every time.Duration

    // The query will continue being resampled for this time duration. If this
    // option is not given, the resample duration is the same as the group by
    // interval. A bucket's time is calculated based on the bucket's start time,
    // so a 40m resample duration with a group by interval of 10m will resample
    // the bucket 4 times (using the default time interval).
    For time.Duration

type RunRequest

RunRequest is a request to run one or more CQs.

type RunRequest struct {
    // Now tells the CQ serivce what the current time is.
    Now time.Time
    // CQs tells the CQ service which queries to run.
    // If nil, all queries will be run.
    CQs []string

type Service

Service manages continuous query execution.

type Service struct {
    MetaClient    metaClient
    QueryExecutor *influxql.QueryExecutor
    Config        *Config
    RunInterval   time.Duration
    // RunCh can be used by clients to signal service to run CQs.
    RunCh  chan *RunRequest
    Logger zap.Logger
    // contains filtered or unexported fields

func NewService

func NewService(c Config) *Service

NewService returns a new instance of Service.

func (*Service) Close

func (s *Service) Close() error

Close stops the service.

func (*Service) ExecuteContinuousQuery

func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error

ExecuteContinuousQuery executes a single CQ.

func (*Service) Open

func (s *Service) Open() error

Open starts the service.

func (*Service) Run

func (s *Service) Run(database, name string, t time.Time) error

Run runs the specified continuous query, or all CQs if none is specified.

func (*Service) Statistics

func (s *Service) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Service) WithLogger

func (s *Service) WithLogger(log zap.Logger)

WithLogger sets the logger on the service.

type Statistics

Statistics maintains the statistics for the continuous query service.

type Statistics struct {
    QueryOK   int64
    QueryFail int64