coordinator - ActiveState ActiveGo 1.8
...

Package coordinator

import "github.com/influxdata/influxdb/coordinator"
Overview
Index

Overview ▾

Package coordinator contains abstractions for writing points, executing statements, and accessing meta data.

Index ▾

Constants
Variables
type BufferedPointsWriter
    func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter
    func (w *BufferedPointsWriter) Cap() int
    func (w *BufferedPointsWriter) Flush() error
    func (w *BufferedPointsWriter) Len() int
    func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error
type Config
    func NewConfig() Config
type IntoWriteRequest
type IteratorCreator
type LocalShardMapper
    func (e *LocalShardMapper) MapShards(sources influxql.Sources, opt *influxql.SelectOptions) (IteratorCreator, error)
type LocalShardMapping
    func (a *LocalShardMapping) Close() error
    func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error)
    func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
    func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType
type LocalTSDBStore
type MetaClient
type PointsWriter
    func NewPointsWriter() *PointsWriter
    func (w *PointsWriter) Close() error
    func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
    func (w *PointsWriter) Open() error
    func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic
    func (w *PointsWriter) WithLogger(log zap.Logger)
    func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
    func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error
type ShardIteratorCreator
type ShardMapper
type ShardMapping
    func NewShardMapping() *ShardMapping
    func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point)
type Source
type StatementExecutor
    func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error
    func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error)
type TSDBStore
type WritePointsRequest
    func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string)
type WriteStatistics

Package files

config.go meta_client.go points_writer.go shard_mapper.go statement_executor.go

Constants

const (
    // DefaultWriteTimeout is the default timeout for a complete write to succeed.
    DefaultWriteTimeout = 10 * time.Second

    // DefaultMaxConcurrentQueries is the maximum number of running queries.
    // A value of zero will make the maximum query limit unlimited.
    DefaultMaxConcurrentQueries = 0

    // DefaultMaxSelectPointN is the maximum number of points a SELECT can process.
    // A value of zero will make the maximum point count unlimited.
    DefaultMaxSelectPointN = 0

    // DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run.
    // A value of zero will make the maximum series count unlimited.
    DefaultMaxSelectSeriesN = 0
)

Variables

var (
    // ErrTimeout is returned when a write times out.
    ErrTimeout = errors.New("timeout")

    // ErrPartialWrite is returned when a write partially succeeds but does
    // not meet the requested consistency level.
    ErrPartialWrite = errors.New("partial write")

    // ErrWriteFailed is returned when no writes succeeded.
    ErrWriteFailed = errors.New("write failed")
)

ErrDatabaseNameRequired is returned when executing statements that require a database, when a database has not been provided.

var ErrDatabaseNameRequired = errors.New("database name required")

type BufferedPointsWriter

BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries write their points to the destination in batches.

type BufferedPointsWriter struct {
    // contains filtered or unexported fields
}

func NewBufferedPointsWriter

func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter

NewBufferedPointsWriter returns a new BufferedPointsWriter.

func (*BufferedPointsWriter) Cap

func (w *BufferedPointsWriter) Cap() int

Cap returns the capacity (in points) of the buffer.

func (*BufferedPointsWriter) Flush

func (w *BufferedPointsWriter) Flush() error

Flush writes all buffered points to the underlying writer.

func (*BufferedPointsWriter) Len

func (w *BufferedPointsWriter) Len() int

Len returns the number of points buffered.

func (*BufferedPointsWriter) WritePointsInto

func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error

WritePointsInto implements pointsWriter for BufferedPointsWriter.

type Config

Config represents the configuration for the coordinator service.

type Config struct {
    WriteTimeout         toml.Duration `toml:"write-timeout"`
    MaxConcurrentQueries int           `toml:"max-concurrent-queries"`
    QueryTimeout         toml.Duration `toml:"query-timeout"`
    LogQueriesAfter      toml.Duration `toml:"log-queries-after"`
    MaxSelectPointN      int           `toml:"max-select-point"`
    MaxSelectSeriesN     int           `toml:"max-select-series"`
    MaxSelectBucketsN    int           `toml:"max-select-buckets"`
}

func NewConfig

func NewConfig() Config

NewConfig returns an instance of Config with defaults.

type IntoWriteRequest

IntoWriteRequest is a partial copy of cluster.WriteRequest

type IntoWriteRequest struct {
    Database        string
    RetentionPolicy string
    Points          []models.Point
}

type IteratorCreator

IteratorCreator is an interface that combines mapping fields and creating iterators.

type IteratorCreator interface {
    influxql.IteratorCreator
    influxql.FieldMapper
    io.Closer
}

type LocalShardMapper

LocalShardMapper implements a ShardMapper for local shards.

type LocalShardMapper struct {
    MetaClient interface {
        ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
    }

    TSDBStore interface {
        ShardGroup(ids []uint64) tsdb.ShardGroup
    }
}

func (*LocalShardMapper) MapShards

func (e *LocalShardMapper) MapShards(sources influxql.Sources, opt *influxql.SelectOptions) (IteratorCreator, error)

MapShards maps the sources to the appropriate shards into an IteratorCreator.

type LocalShardMapping

ShardMapper maps data sources to a list of shard information.

type LocalShardMapping struct {
    ShardMap map[Source]tsdb.ShardGroup
}

func (*LocalShardMapping) Close

func (a *LocalShardMapping) Close() error

Close does nothing for a LocalShardMapping.

func (*LocalShardMapping) CreateIterator

func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error)

func (*LocalShardMapping) FieldDimensions

func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)

func (*LocalShardMapping) MapType

func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType

type LocalTSDBStore

LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator to satisfy the TSDBStore interface.

type LocalTSDBStore struct {
    *tsdb.Store
}

type MetaClient

MetaClient is an interface for accessing meta data.

type MetaClient interface {
    CreateContinuousQuery(database, name, query string) error
    CreateDatabase(name string) (*meta.DatabaseInfo, error)
    CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
    CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
    CreateSubscription(database, rp, name, mode string, destinations []string) error
    CreateUser(name, password string, admin bool) (*meta.UserInfo, error)
    Database(name string) *meta.DatabaseInfo
    Databases() []meta.DatabaseInfo
    DropShard(id uint64) error
    DropContinuousQuery(database, name string) error
    DropDatabase(name string) error
    DropRetentionPolicy(database, name string) error
    DropSubscription(database, rp, name string) error
    DropUser(name string) error
    RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
    SetAdminPrivilege(username string, admin bool) error
    SetPrivilege(username, database string, p influxql.Privilege) error
    ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
    UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
    UpdateUser(name, password string) error
    UserPrivilege(username, database string) (*influxql.Privilege, error)
    UserPrivileges(username string) (map[string]influxql.Privilege, error)
    Users() []meta.UserInfo
}

type PointsWriter

PointsWriter handles writes across multiple local and remote data nodes.

type PointsWriter struct {
    WriteTimeout time.Duration
    Logger       zap.Logger

    Node *influxdb.Node

    MetaClient interface {
        Database(name string) (di *meta.DatabaseInfo)
        RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
        CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
        ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
    }

    TSDBStore interface {
        CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
        WriteToShard(shardID uint64, points []models.Point) error
    }

    ShardWriter interface {
        WriteShard(shardID, ownerID uint64, points []models.Point) error
    }

    Subscriber interface {
        Points() chan<- *WritePointsRequest
    }
    // contains filtered or unexported fields
}

func NewPointsWriter

func NewPointsWriter() *PointsWriter

NewPointsWriter returns a new instance of PointsWriter for a node.

func (*PointsWriter) Close

func (w *PointsWriter) Close() error

Close closes the communication channel with the point writer.

func (*PointsWriter) MapShards

func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)

MapShards maps the points contained in wp to a ShardMapping. If a point maps to a shard group or shard that does not currently exist, it will be created before returning the mapping.

func (*PointsWriter) Open

func (w *PointsWriter) Open() error

Open opens the communication channel with the point writer.

func (*PointsWriter) Statistics

func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*PointsWriter) WithLogger

func (w *PointsWriter) WithLogger(log zap.Logger)

WithLogger sets the Logger on w.

func (*PointsWriter) WritePoints

func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error

WritePoints writes across multiple local and remote data nodes according the consistency level.

func (*PointsWriter) WritePointsInto

func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error

WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of a cluster structure for information. This is to avoid a circular dependency.

type ShardIteratorCreator

ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.

type ShardIteratorCreator interface {
    ShardIteratorCreator(id uint64) influxql.IteratorCreator
}

type ShardMapper

ShardMapper retrieves and maps shards into an IteratorCreator that can later be used for executing queries.

type ShardMapper interface {
    MapShards(sources influxql.Sources, opt *influxql.SelectOptions) (IteratorCreator, error)
}

type ShardMapping

ShardMapping contains a mapping of shards to points.

type ShardMapping struct {
    Points map[uint64][]models.Point  // The points associated with a shard ID
    Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
}

func NewShardMapping

func NewShardMapping() *ShardMapping

NewShardMapping creates an empty ShardMapping.

func (*ShardMapping) MapPoint

func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point)

MapPoint adds the point to the ShardMapping, associated with the given shardInfo.

type Source

Source contains the database and retention policy source for data.

type Source struct {
    Database        string
    RetentionPolicy string
}

type StatementExecutor

StatementExecutor executes a statement in the query.

type StatementExecutor struct {
    MetaClient MetaClient

    // TaskManager holds the StatementExecutor that handles task-related commands.
    TaskManager influxql.StatementExecutor

    // TSDB storage for local node.
    TSDBStore TSDBStore

    // ShardMapper for mapping shards when executing a SELECT statement.
    ShardMapper ShardMapper

    // Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS.
    Monitor *monitor.Monitor

    // Used for rewriting points back into system for SELECT INTO statements.
    PointsWriter pointsWriter

    // Select statement limits
    MaxSelectPointN   int
    MaxSelectSeriesN  int
    MaxSelectBucketsN int
}

func (*StatementExecutor) ExecuteStatement

func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error

ExecuteStatement executes the given statement with the given execution context.

func (*StatementExecutor) NormalizeStatement

func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error)

NormalizeStatement adds a default database and policy to the measurements in statement.

type TSDBStore

TSDBStore is an interface for accessing the time series data store.

type TSDBStore interface {
    CreateShard(database, policy string, shardID uint64, enabled bool) error
    WriteToShard(shardID uint64, points []models.Point) error

    RestoreShard(id uint64, r io.Reader) error
    BackupShard(id uint64, since time.Time, w io.Writer) error

    DeleteDatabase(name string) error
    DeleteMeasurement(database, name string) error
    DeleteRetentionPolicy(database, name string) error
    DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
    DeleteShard(id uint64) error

    Measurements(database string, cond influxql.Expr) ([]string, error)
    TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error)
}

type WritePointsRequest

WritePointsRequest represents a request to write point data to the cluster.

type WritePointsRequest struct {
    Database        string
    RetentionPolicy string
    Points          []models.Point
}

func (*WritePointsRequest) AddPoint

func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string)

AddPoint adds a point to the WritePointRequest with field key 'value'

type WriteStatistics

WriteStatistics keeps statistics related to the PointsWriter.

type WriteStatistics struct {
    WriteReq           int64
    PointWriteReq      int64
    PointWriteReqLocal int64
    WriteOK            int64
    WriteDropped       int64
    WriteTimeout       int64
    WriteErr           int64
    SubWriteOK         int64
    SubWriteDrop       int64
}