tsdb - ActiveState ActiveGo 1.8
...

Package tsdb

import "github.com/influxdata/influxdb/tsdb"
Overview
Index
Subdirectories

Overview ▾

Package tsdb implements a durable time series database.

Index ▾

Constants
Variables
func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string)
func MarshalTags(tags map[string]string) []byte
func MeasurementFromSeriesKey(key string) string
func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
func NewShardError(id uint64, err error) error
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
func RegisterEngine(name string, fn NewEngineFunc)
func RegisteredEngines() []string
type Config
    func NewConfig() Config
    func (c *Config) Validate() error
type Cursor
type DatabaseIndex
    func NewDatabaseIndex(name string) *DatabaseIndex
    func (d *DatabaseIndex) AssignShard(k string, shardID uint64)
    func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement
    func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series, forceCopy bool) *Series
    func (d *DatabaseIndex) DropMeasurement(name string)
    func (d *DatabaseIndex) DropSeries(keys []string)
    func (d *DatabaseIndex) Measurement(name string) *Measurement
    func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
    func (d *DatabaseIndex) Measurements() Measurements
    func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
    func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement
    func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements
    func (d *DatabaseIndex) RemoveShard(shardID uint64)
    func (d *DatabaseIndex) Series(key string) *Series
    func (d *DatabaseIndex) SeriesBytes(key []byte) *Series
    func (d *DatabaseIndex) SeriesKeys() []string
    func (d *DatabaseIndex) SeriesN() int
    func (d *DatabaseIndex) SeriesShardN(shardID uint64) int
    func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic
    func (d *DatabaseIndex) TagsForSeries(key string) models.Tags
    func (d *DatabaseIndex) UnassignShard(k string, shardID uint64)
type Engine
    func NewEngine(id uint64, path string, walPath string, options EngineOptions) (Engine, error)
type EngineFormat
type EngineOptions
    func NewEngineOptions() EngineOptions
type Field
type FieldCreate
type FilterExprs
    func (fe FilterExprs) DeleteBoolLiteralTrues()
    func (fe FilterExprs) Len() int
type IndexStatistics
type KeyValue
type KeyValues
    func (a KeyValues) Len() int
    func (a KeyValues) Less(i, j int) bool
    func (a KeyValues) Swap(i, j int)
type Measurement
    func NewMeasurement(name string) *Measurement
    func (m *Measurement) AddSeries(s *Series) bool
    func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string
    func (m *Measurement) Cardinality(key string) int
    func (m *Measurement) CardinalityBytes(key []byte) int
    func (m *Measurement) DropSeries(series *Series)
    func (m *Measurement) FieldNames() []string
    func (m *Measurement) HasField(name string) bool
    func (m *Measurement) HasSeries() bool
    func (m *Measurement) HasTagKey(k string) bool
    func (m *Measurement) HasTagKeyValue(k, v []byte) bool
    func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs
    func (m *Measurement) SeriesByID(id uint64) *Series
    func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series
    func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error)
    func (m *Measurement) SeriesKeys() []string
    func (m *Measurement) SetFieldName(name string)
    func (m *Measurement) TagKeys() []string
    func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
    func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
    func (m *Measurement) TagValues(key string) []string
    func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error
    func (m *Measurement) WalkTagKeys(fn func(k string))
type MeasurementFields
    func NewMeasurementFields() *MeasurementFields
    func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType, limitCount bool) error
    func (m *MeasurementFields) Field(name string) *Field
    func (m *MeasurementFields) FieldBytes(name []byte) *Field
    func (m *MeasurementFields) FieldSet() map[string]influxql.DataType
    func (m *MeasurementFields) MarshalBinary() ([]byte, error)
    func (m *MeasurementFields) UnmarshalBinary(buf []byte) error
type Measurements
    func (a Measurements) Len() int
    func (a Measurements) Less(i, j int) bool
    func (a Measurements) Swap(i, j int)
type NewEngineFunc
type PartialWriteError
    func (e PartialWriteError) Error() string
type PointBatcher
    func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher
    func (b *PointBatcher) Flush()
    func (b *PointBatcher) In() chan<- models.Point
    func (b *PointBatcher) Out() <-chan []models.Point
    func (b *PointBatcher) Start()
    func (b *PointBatcher) Stats() *PointBatcherStats
    func (b *PointBatcher) Stop()
type PointBatcherStats
type Series
    func NewSeries(key string, tags models.Tags) *Series
    func (s *Series) AssignShard(shardID uint64)
    func (s *Series) Assigned(shardID uint64) bool
    func (s *Series) CopyTags()
    func (s *Series) ForEachTag(fn func(models.Tag))
    func (s *Series) GetTagString(key string) string
    func (s *Series) MarshalBinary() ([]byte, error)
    func (s *Series) ShardN() int
    func (s *Series) Tags() models.Tags
    func (s *Series) UnassignShard(shardID uint64)
    func (s *Series) UnmarshalBinary(buf []byte) error
type SeriesCreate
type SeriesIDs
    func (a SeriesIDs) Equals(other SeriesIDs) bool
    func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs
    func (a SeriesIDs) Len() int
    func (a SeriesIDs) Less(i, j int) bool
    func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs
    func (a SeriesIDs) Swap(i, j int)
    func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs
type Shard
    func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard
    func (s *Shard) Close() error
    func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error)
    func (s *Shard) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
    func (s *Shard) CreateSnapshot() (string, error)
    func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error
    func (s *Shard) DeleteSeries(seriesKeys []string) error
    func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error
    func (s *Shard) DiskSize() (int64, error)
    func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error)
    func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
    func (s *Shard) LastModified() time.Time
    func (s *Shard) MapType(measurement, field string) influxql.DataType
    func (s *Shard) MeasurementsByRegex(re *regexp.Regexp) []string
    func (s *Shard) Open() error
    func (s *Shard) Path() string
    func (s *Shard) Restore(r io.Reader, basePath string) error
    func (s *Shard) SeriesCount() (int, error)
    func (s *Shard) SetEnabled(enabled bool)
    func (s *Shard) Statistics(tags map[string]string) []models.Statistic
    func (s *Shard) UnloadIndex()
    func (s *Shard) WithLogger(log zap.Logger)
    func (s *Shard) WritePoints(points []models.Point) error
    func (s *Shard) WriteTo(w io.Writer) (int64, error)
type ShardError
    func (e ShardError) Error() string
type ShardGroup
type ShardStatistics
type Shards
    func (a Shards) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
    func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error)
    func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
    func (a Shards) Len() int
    func (a Shards) Less(i, j int) bool
    func (a Shards) MapType(measurement, field string) influxql.DataType
    func (a Shards) MeasurementsByRegex(re *regexp.Regexp) []string
    func (a Shards) Swap(i, j int)
type Store
    func NewStore(path string) *Store
    func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error
    func (s *Store) Close() error
    func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
    func (s *Store) CreateShardSnapshot(id uint64) (string, error)
    func (s *Store) DatabaseIndex(name string) *DatabaseIndex
    func (s *Store) DatabaseIndexN() int
    func (s *Store) Databases() []string
    func (s *Store) DeleteDatabase(name string) error
    func (s *Store) DeleteMeasurement(database, name string) error
    func (s *Store) DeleteRetentionPolicy(database, name string) error
    func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
    func (s *Store) DeleteShard(shardID uint64) error
    func (s *Store) DiskSize() (int64, error)
    func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error)
    func (s *Store) Measurement(database, name string) *Measurement
    func (s *Store) Measurements(database string, cond influxql.Expr) ([]string, error)
    func (s *Store) Open() error
    func (s *Store) Path() string
    func (s *Store) RestoreShard(id uint64, r io.Reader) error
    func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error
    func (s *Store) Shard(id uint64) *Shard
    func (s *Store) ShardGroup(ids []uint64) ShardGroup
    func (s *Store) ShardIDs() []uint64
    func (s *Store) ShardN() int
    func (s *Store) ShardRelativePath(id uint64) (string, error)
    func (s *Store) Shards(ids []uint64) []*Shard
    func (s *Store) Statistics(tags map[string]string) []models.Statistic
    func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error)
    func (s *Store) WithLogger(log zap.Logger)
    func (s *Store) WriteToShard(shardID uint64, points []models.Point) error
type TagFilter
type TagValues

Package files

batcher.go config.go cursor.go doc.go engine.go meta.go shard.go store.go

Constants

const (
    // DefaultEngine is the default engine for new shards
    DefaultEngine = "tsm1"

    // DefaultCacheMaxMemorySize is the maximum size a shard's cache can
    // reach before it starts rejecting writes.
    DefaultCacheMaxMemorySize = 1024 * 1024 * 1024 // 1GB

    // DefaultCacheSnapshotMemorySize is the size at which the engine will
    // snapshot the cache and write it to a TSM file, freeing up memory
    DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB

    // DefaultCacheSnapshotWriteColdDuration is the length of time at which
    // the engine will snapshot the cache and write it to a new TSM file if
    // the shard hasn't received writes or deletes
    DefaultCacheSnapshotWriteColdDuration = time.Duration(10 * time.Minute)

    // DefaultCompactFullWriteColdDuration is the duration at which the engine
    // will compact all TSM files in a shard if it hasn't received a write or delete
    DefaultCompactFullWriteColdDuration = time.Duration(4 * time.Hour)

    // DefaultMaxPointsPerBlock is the maximum number of points in an encoded
    // block in a TSM file
    DefaultMaxPointsPerBlock = 1000

    // DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database.
    DefaultMaxSeriesPerDatabase = 1000000

    // DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement.
    DefaultMaxValuesPerTag = 100000
)

EOF represents a "not found" key returned by a Cursor.

const EOF = influxql.ZeroTime

Variables

var (
    // ErrFormatNotFound is returned when no format can be determined from a path.
    ErrFormatNotFound = errors.New("format not found")

    // ErrUnknownEngineFormat is returned when the engine format is
    // unknown. ErrUnknownEngineFormat is currently returned if a format
    // other than tsm1 is encountered.
    ErrUnknownEngineFormat = errors.New("unknown engine format")
)
var (
    // ErrFieldOverflow is returned when too many fields are created on a measurement.
    ErrFieldOverflow = errors.New("field overflow")

    // ErrFieldTypeConflict is returned when a new field already exists with a different type.
    ErrFieldTypeConflict = errors.New("field type conflict")

    // ErrFieldNotFound is returned when a field cannot be found.
    ErrFieldNotFound = errors.New("field not found")

    // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
    // there is no mapping for.
    ErrFieldUnmappedID = errors.New("field ID not mapped")

    // ErrEngineClosed is returned when a caller attempts indirectly to
    // access the shard's underlying engine.
    ErrEngineClosed = errors.New("engine is closed")

    // ErrShardDisabled is returned when a the shard is not available for
    // queries or writes.
    ErrShardDisabled = errors.New("shard is disabled")
)
var (
    // ErrShardNotFound is returned when trying to get a non existing shard.
    ErrShardNotFound = fmt.Errorf("shard not found")
    // ErrStoreClosed is returned when trying to use a closed Store.
    ErrStoreClosed = fmt.Errorf("store is closed")
)

func DecodeStorePath

func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string)

DecodeStorePath extracts the database and retention policy names from a given shard or WAL path.

func MarshalTags

func MarshalTags(tags map[string]string) []byte

MarshalTags converts a tag set to bytes for use as a lookup key.

func MeasurementFromSeriesKey

func MeasurementFromSeriesKey(key string) string

MeasurementFromSeriesKey returns the name of the measurement from a key that contains a measurement name.

func NewFieldKeysIterator

func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)

NewFieldKeysIterator returns an iterator that can be iterated over to retrieve field keys.

func NewSeriesIterator

func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)

NewSeriesIterator returns a new instance of SeriesIterator.

func NewShardError

func NewShardError(id uint64, err error) error

NewShardError returns a new ShardError.

func NewTagKeysIterator

func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)

NewTagKeysIterator returns a new instance of TagKeysIterator.

func NewTagValuesIterator

func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)

NewTagValuesIterator returns a new instance of TagValuesIterator.

func RegisterEngine

func RegisterEngine(name string, fn NewEngineFunc)

RegisterEngine registers a storage engine initializer by name.

func RegisteredEngines

func RegisteredEngines() []string

RegisteredEngines returns the slice of currently registered engines.

type Config

Config holds the configuration for the tsbd package.

type Config struct {
    Dir    string `toml:"dir"`
    Engine string `toml:"-"`

    // General WAL configuration options
    WALDir string `toml:"wal-dir"`

    // Query logging
    QueryLogEnabled bool `toml:"query-log-enabled"`

    // Compaction options for tsm1 (descriptions above with defaults)
    CacheMaxMemorySize             uint64        `toml:"cache-max-memory-size"`
    CacheSnapshotMemorySize        uint64        `toml:"cache-snapshot-memory-size"`
    CacheSnapshotWriteColdDuration toml.Duration `toml:"cache-snapshot-write-cold-duration"`
    CompactFullWriteColdDuration   toml.Duration `toml:"compact-full-write-cold-duration"`

    // MaxSeriesPerDatabase is the maximum number of series a node can hold per database.
    // When this limit is exceeded, writes return a 'max series per database exceeded' error.
    // A value of 0 disables the limit.
    MaxSeriesPerDatabase int `toml:"max-series-per-database"`

    // MaxValuesPerTag is the maximum number of tag values a single tag key can have within
    // a measurement.  When the limit is execeeded, writes return an error.
    // A value of 0 disables the limit.
    MaxValuesPerTag int `toml:"max-values-per-tag"`

    TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
}

func NewConfig

func NewConfig() Config

NewConfig returns the default configuration for tsdb.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration hold by c.

type Cursor

Cursor represents an iterator over a series.

type Cursor interface {
    SeekTo(seek int64) (key int64, value interface{})
    Next() (key int64, value interface{})
    Ascending() bool
}

type DatabaseIndex

DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks.

type DatabaseIndex struct {
    // contains filtered or unexported fields
}

func NewDatabaseIndex

func NewDatabaseIndex(name string) *DatabaseIndex

NewDatabaseIndex returns a new initialized DatabaseIndex.

func (*DatabaseIndex) AssignShard

func (d *DatabaseIndex) AssignShard(k string, shardID uint64)

AssignShard updates the index to indicate that series k exists in the given shardID.

func (*DatabaseIndex) CreateMeasurementIndexIfNotExists

func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement

CreateMeasurementIndexIfNotExists creates or retrieves an in-memory index object for the measurement.

func (*DatabaseIndex) CreateSeriesIndexIfNotExists

func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series, forceCopy bool) *Series

CreateSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object.

func (*DatabaseIndex) DropMeasurement

func (d *DatabaseIndex) DropMeasurement(name string)

DropMeasurement removes the measurement and all of its underlying series from the database index.

func (*DatabaseIndex) DropSeries

func (d *DatabaseIndex) DropSeries(keys []string)

DropSeries removes the series keys and their tags from the index.

func (*DatabaseIndex) Measurement

func (d *DatabaseIndex) Measurement(name string) *Measurement

Measurement returns the measurement object from the index by the name.

func (*DatabaseIndex) MeasurementSeriesCounts

func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)

MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database. Useful for reporting and monitoring.

func (*DatabaseIndex) Measurements

func (d *DatabaseIndex) Measurements() Measurements

Measurements returns a list of all measurements.

func (*DatabaseIndex) MeasurementsByExpr

func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)

MeasurementsByExpr takes an expression containing only tags and returns a list of matching *Measurement. The bool return argument returns if the expression was a measurement expression. It is used to differentiate a list of no measurements because all measurements were filtered out (when the bool is true) against when there are no measurements because the expression wasn't evaluated (when the bool is false).

func (*DatabaseIndex) MeasurementsByName

func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement

MeasurementsByName returns a list of all the measurements in the index that match any entry in names.

func (*DatabaseIndex) MeasurementsByRegex

func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements

MeasurementsByRegex returns the measurements that match the regex.

func (*DatabaseIndex) RemoveShard

func (d *DatabaseIndex) RemoveShard(shardID uint64)

RemoveShard removes all references to shardID from any series or measurements in the index. If the shard was the only owner of data for the series, the series is removed from the index.

func (*DatabaseIndex) Series

func (d *DatabaseIndex) Series(key string) *Series

Series returns a series by key.

func (*DatabaseIndex) SeriesBytes

func (d *DatabaseIndex) SeriesBytes(key []byte) *Series

SeriesBytes returns a series by key.

func (*DatabaseIndex) SeriesKeys

func (d *DatabaseIndex) SeriesKeys() []string

SeriesKeys returns a sorted slice of strings indicating all the series keys in the index.

func (*DatabaseIndex) SeriesN

func (d *DatabaseIndex) SeriesN() int

SeriesN returns the number of series.

func (*DatabaseIndex) SeriesShardN

func (d *DatabaseIndex) SeriesShardN(shardID uint64) int

SeriesShardN returns the series count for a shard.

func (*DatabaseIndex) Statistics

func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*DatabaseIndex) TagsForSeries

func (d *DatabaseIndex) TagsForSeries(key string) models.Tags

TagsForSeries returns the tag map for the passed in series

func (*DatabaseIndex) UnassignShard

func (d *DatabaseIndex) UnassignShard(k string, shardID uint64)

UnassignShard updates the index to indicate that series k does not exist in the given shardID.

type Engine

Engine represents a swappable storage engine for the shard.

type Engine interface {
    Open() error
    Close() error

    WithLogger(zap.Logger)
    LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error

    Backup(w io.Writer, basePath string, since time.Time) error
    Restore(r io.Reader, basePath string) error

    CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
    WritePoints(points []models.Point) error
    ContainsSeries(keys []string) (map[string]bool, error)
    DeleteSeries(keys []string) error
    DeleteSeriesRange(keys []string, min, max int64) error
    DeleteMeasurement(name string, seriesKeys []string) error
    SeriesCount() (n int, err error)
    MeasurementFields(measurement string) *MeasurementFields
    CreateSnapshot() (string, error)
    SetEnabled(enabled bool)

    // Format will return the format for the engine
    Format() EngineFormat

    // Statistics will return statistics relevant to this engine.
    Statistics(tags map[string]string) []models.Statistic
    LastModified() time.Time

    io.WriterTo
}

func NewEngine

func NewEngine(id uint64, path string, walPath string, options EngineOptions) (Engine, error)

NewEngine returns an instance of an engine based on its format. If the path does not exist then the DefaultFormat is used.

type EngineFormat

EngineFormat represents the format for an engine.

type EngineFormat int
const (
    // TSM1Format is the format used by the tsm1 engine.
    TSM1Format EngineFormat = 2
)

type EngineOptions

EngineOptions represents the options used to initialize the engine.

type EngineOptions struct {
    EngineVersion string
    ShardID       uint64

    Config Config
}

func NewEngineOptions

func NewEngineOptions() EngineOptions

NewEngineOptions returns the default options.

type Field

Field represents a series field.

type Field struct {
    ID   uint8             `json:"id,omitempty"`
    Name string            `json:"name,omitempty"`
    Type influxql.DataType `json:"type,omitempty"`
}

type FieldCreate

FieldCreate holds information for a field to create on a measurement.

type FieldCreate struct {
    Measurement string
    Field       *Field
}

type FilterExprs

FilterExprs represents a map of series IDs to filter expressions.

type FilterExprs map[uint64]influxql.Expr

func (FilterExprs) DeleteBoolLiteralTrues

func (fe FilterExprs) DeleteBoolLiteralTrues()

DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true.

func (FilterExprs) Len

func (fe FilterExprs) Len() int

Len returns the number of elements.

type IndexStatistics

IndexStatistics maintains statistics for the index.

type IndexStatistics struct {
    NumSeries       int64
    NumMeasurements int64
}

type KeyValue

KeyValue holds a string key and a string value.

type KeyValue struct {
    Key, Value string
}

type KeyValues

KeyValues is a sortable slice of KeyValue.

type KeyValues []KeyValue

func (KeyValues) Len

func (a KeyValues) Len() int

Len implements sort.Interface.

func (KeyValues) Less

func (a KeyValues) Less(i, j int) bool

Less implements sort.Interface. Keys are compared before values.

func (KeyValues) Swap

func (a KeyValues) Swap(i, j int)

Swap implements sort.Interface.

type Measurement

Measurement represents a collection of time series in a database. It also contains in-memory structures for indexing tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks.

type Measurement struct {
    Name string `json:"name,omitempty"`
    // contains filtered or unexported fields
}

func NewMeasurement

func NewMeasurement(name string) *Measurement

NewMeasurement allocates and initializes a new Measurement.

func (*Measurement) AddSeries

func (m *Measurement) AddSeries(s *Series) bool

AddSeries adds a series to the measurement's index. It returns true if the series was added successfully or false if the series was already present.

func (*Measurement) AppendSeriesKeysByID

func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string

AppendSeriesKeysByID appends keys for a list of series ids to a buffer.

func (*Measurement) Cardinality

func (m *Measurement) Cardinality(key string) int

Cardinality returns the number of values associated with the given tag key.

func (*Measurement) CardinalityBytes

func (m *Measurement) CardinalityBytes(key []byte) int

CardinalityBytes returns the number of values associated with the given tag key.

func (*Measurement) DropSeries

func (m *Measurement) DropSeries(series *Series)

DropSeries removes a series from the measurement's index.

func (*Measurement) FieldNames

func (m *Measurement) FieldNames() []string

FieldNames returns a list of the measurement's field names, in an arbitrary order.

func (*Measurement) HasField

func (m *Measurement) HasField(name string) bool

HasField returns true if the measurement has a field by the given name.

func (*Measurement) HasSeries

func (m *Measurement) HasSeries() bool

HasSeries returns true if there is at least 1 series under this measurement.

func (*Measurement) HasTagKey

func (m *Measurement) HasTagKey(k string) bool

HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key.

func (*Measurement) HasTagKeyValue

func (m *Measurement) HasTagKeyValue(k, v []byte) bool

HasTagKeyValue returns true if at least one series in this measurement has written a value the given tag key and tag value.

func (*Measurement) IDsForExpr

func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs

IDsForExpr returns the series IDs that are candidates to match the given expression.

func (*Measurement) SeriesByID

func (m *Measurement) SeriesByID(id uint64) *Series

SeriesByID returns a series by identifier.

func (*Measurement) SeriesByIDSlice

func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series

SeriesByIDSlice returns a list of series by identifiers.

func (*Measurement) SeriesIDsAllOrByExpr

func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error)

SeriesIDsAllOrByExpr walks an expressions for matching series IDs or, if no expression is given, returns all series IDs for the measurement.

func (*Measurement) SeriesKeys

func (m *Measurement) SeriesKeys() []string

SeriesKeys returns the keys of every series in this measurement.

func (*Measurement) SetFieldName

func (m *Measurement) SetFieldName(name string)

SetFieldName adds the field name to the measurement.

func (*Measurement) TagKeys

func (m *Measurement) TagKeys() []string

TagKeys returns a list of the measurement's tag names, in sorted order.

func (*Measurement) TagKeysByExpr

func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)

tagKeysByExpr extracts the tag keys wanted by the expression.

func (*Measurement) TagSets

func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)

TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine what composite series will be created by a group by. i.e. "group by region" should return:

{"region":"uswest"}, {"region":"useast"}

or region, service returns

{"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, ...

This will also populate the TagSet objects with the series IDs that match each tagset and any influx filter expression that goes with the series TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.

func (*Measurement) TagValues

func (m *Measurement) TagValues(key string) []string

TagValues returns all the values for the given tag key, in an arbitrary order.

func (*Measurement) ValidateGroupBy

func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error

ValidateGroupBy ensures that the GROUP BY is not a field.

func (*Measurement) WalkTagKeys

func (m *Measurement) WalkTagKeys(fn func(k string))

WalkTagKeys calls fn for each tag key associated with m. The order of the keys is undefined.

type MeasurementFields

MeasurementFields holds the fields of a measurement and their codec.

type MeasurementFields struct {
    // contains filtered or unexported fields
}

func NewMeasurementFields

func NewMeasurementFields() *MeasurementFields

NewMeasurementFields returns an initialised *MeasurementFields value.

func (*MeasurementFields) CreateFieldIfNotExists

func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType, limitCount bool) error

CreateFieldIfNotExists creates a new field with an autoincrementing ID. Returns an error if 255 fields have already been created on the measurement or the fields already exists with a different type.

func (*MeasurementFields) Field

func (m *MeasurementFields) Field(name string) *Field

Field returns the field for name, or nil if there is no field for name.

func (*MeasurementFields) FieldBytes

func (m *MeasurementFields) FieldBytes(name []byte) *Field

FieldBytes returns the field for name, or nil if there is no field for name. FieldBytes should be preferred to Field when the caller has a []byte, because it avoids a string allocation, which can't be avoided if the caller converts the []byte to a string and calls Field.

func (*MeasurementFields) FieldSet

func (m *MeasurementFields) FieldSet() map[string]influxql.DataType

FieldSet returns the set of fields and their types for the measurement.

func (*MeasurementFields) MarshalBinary

func (m *MeasurementFields) MarshalBinary() ([]byte, error)

MarshalBinary encodes the object to a binary format.

func (*MeasurementFields) UnmarshalBinary

func (m *MeasurementFields) UnmarshalBinary(buf []byte) error

UnmarshalBinary decodes the object from a binary format.

type Measurements

Measurements represents a list of *Measurement.

type Measurements []*Measurement

func (Measurements) Len

func (a Measurements) Len() int

Len implements sort.Interface.

func (Measurements) Less

func (a Measurements) Less(i, j int) bool

Less implements sort.Interface.

func (Measurements) Swap

func (a Measurements) Swap(i, j int)

Swap implements sort.Interface.

type NewEngineFunc

NewEngineFunc creates a new engine.

type NewEngineFunc func(id uint64, path string, walPath string, options EngineOptions) Engine

type PartialWriteError

PartialWriteError indicates a write request could only write a portion of the requested values.

type PartialWriteError struct {
    Reason  string
    Dropped int
}

func (PartialWriteError) Error

func (e PartialWriteError) Error() string

type PointBatcher

PointBatcher accepts Points and will emit a batch of those points when either a) the batch reaches a certain size, or b) a certain time passes.

type PointBatcher struct {
    // contains filtered or unexported fields
}

func NewPointBatcher

func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher

NewPointBatcher returns a new PointBatcher. sz is the batching size, bp is the maximum number of batches that may be pending. d is the time after which a batch will be emitted after the first point is received for the batch, regardless of its size.

func (*PointBatcher) Flush

func (b *PointBatcher) Flush()

Flush instructs the batcher to emit any pending points in a batch, regardless of batch size. If there are no pending points, no batch is emitted.

func (*PointBatcher) In

func (b *PointBatcher) In() chan<- models.Point

In returns the channel to which points should be written.

func (*PointBatcher) Out

func (b *PointBatcher) Out() <-chan []models.Point

Out returns the channel from which batches should be read.

func (*PointBatcher) Start

func (b *PointBatcher) Start()

Start starts the batching process. Returns the in and out channels for points and point-batches respectively.

func (*PointBatcher) Stats

func (b *PointBatcher) Stats() *PointBatcherStats

Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be closely correlated with each other statistic, it is not guaranteed.

func (*PointBatcher) Stop

func (b *PointBatcher) Stop()

Stop stops the batching process. Stop waits for the batching routine to stop before returning.

type PointBatcherStats

PointBatcherStats are the statistics each batcher tracks.

type PointBatcherStats struct {
    BatchTotal   uint64 // Total count of batches transmitted.
    PointTotal   uint64 // Total count of points processed.
    SizeTotal    uint64 // Number of batches that reached size threshold.
    TimeoutTotal uint64 // Number of timeouts that occurred.
}

type Series

Series belong to a Measurement and represent unique time series in a database.

type Series struct {
    Key string

    ID uint64
    // contains filtered or unexported fields
}

func NewSeries

func NewSeries(key string, tags models.Tags) *Series

NewSeries returns an initialized series struct.

func (*Series) AssignShard

func (s *Series) AssignShard(shardID uint64)

AssignShard adds shardID to the list of shards this series is assigned to.

func (*Series) Assigned

func (s *Series) Assigned(shardID uint64) bool

Assigned returns whether this series is assigned to the given shard.

func (*Series) CopyTags

func (s *Series) CopyTags()

CopyTags clones the tags on the series in-place,

func (*Series) ForEachTag

func (s *Series) ForEachTag(fn func(models.Tag))

ForEachTag executes fn for every tag. Iteration occurs under lock.

func (*Series) GetTagString

func (s *Series) GetTagString(key string) string

GetTagString returns a tag value under lock.

func (*Series) MarshalBinary

func (s *Series) MarshalBinary() ([]byte, error)

MarshalBinary encodes the object to a binary format.

func (*Series) ShardN

func (s *Series) ShardN() int

ShardN returns the number of shards this series is assigned to.

func (*Series) Tags

func (s *Series) Tags() models.Tags

Tags returns a copy of the tags under lock.

func (*Series) UnassignShard

func (s *Series) UnassignShard(shardID uint64)

UnassignShard removes the shardID from the list of shards this series is assigned to.

func (*Series) UnmarshalBinary

func (s *Series) UnmarshalBinary(buf []byte) error

UnmarshalBinary decodes the object from a binary format.

type SeriesCreate

SeriesCreate holds information for a series to create.

type SeriesCreate struct {
    Measurement string
    Series      *Series
}

type SeriesIDs

SeriesIDs is a convenience type for sorting, checking equality, and doing union and intersection of collections of series ids.

type SeriesIDs []uint64

func (SeriesIDs) Equals

func (a SeriesIDs) Equals(other SeriesIDs) bool

Equals assumes that both are sorted.

func (SeriesIDs) Intersect

func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs

Intersect returns a new collection of series ids in sorted order that is the intersection of the two. The two collections must already be sorted.

func (SeriesIDs) Len

func (a SeriesIDs) Len() int

Len implements sort.Interface.

func (SeriesIDs) Less

func (a SeriesIDs) Less(i, j int) bool

Less implements sort.Interface.

func (SeriesIDs) Reject

func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs

Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. This is useful for the NOT operator. The two collections must already be sorted.

func (SeriesIDs) Swap

func (a SeriesIDs) Swap(i, j int)

Swap implements sort.Interface.

func (SeriesIDs) Union

func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs

Union returns a new collection of series ids in sorted order that is the union of the two. The two collections must already be sorted.

type Shard

Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.

type Shard struct {
    EnableOnOpen bool
    // contains filtered or unexported fields
}

func NewShard

func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard

NewShard returns a new initialized Shard.

func (*Shard) Close

func (s *Shard) Close() error

Close shuts down the shard's store.

func (*Shard) ContainsSeries

func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error)

ContainsSeries determines if the shard contains the provided series keys. The returned map contains all the provided keys that are in the shard, and the value for each key will be true if the shard has values for that key.

func (*Shard) CreateIterator

func (s *Shard) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)

CreateIterator returns an iterator for the data in the shard.

func (*Shard) CreateSnapshot

func (s *Shard) CreateSnapshot() (string, error)

CreateSnapshot will return a path to a temp directory containing hard links to the underlying shard files.

func (*Shard) DeleteMeasurement

func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error

DeleteMeasurement deletes a measurement and all underlying series.

func (*Shard) DeleteSeries

func (s *Shard) DeleteSeries(seriesKeys []string) error

DeleteSeries deletes a list of series.

func (*Shard) DeleteSeriesRange

func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error

DeleteSeriesRange deletes all values from seriesKeys with timestamps between min and max (inclusive).

func (*Shard) DiskSize

func (s *Shard) DiskSize() (int64, error)

DiskSize returns the size on disk of this shard.

func (*Shard) ExpandSources

func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error)

ExpandSources expands regex sources and removes duplicates. NOTE: sources must be normalized (db and rp set) before calling this function.

func (*Shard) FieldDimensions

func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)

FieldDimensions returns unique sets of fields and dimensions across a list of sources.

func (*Shard) LastModified

func (s *Shard) LastModified() time.Time

LastModified returns the time when this shard was last modified.

func (*Shard) MapType

func (s *Shard) MapType(measurement, field string) influxql.DataType

MapType returns the data type for the field within the measurement.

func (*Shard) MeasurementsByRegex

func (s *Shard) MeasurementsByRegex(re *regexp.Regexp) []string

func (*Shard) Open

func (s *Shard) Open() error

Open initializes and opens the shard's store.

func (*Shard) Path

func (s *Shard) Path() string

Path returns the path set on the shard when it was created.

func (*Shard) Restore

func (s *Shard) Restore(r io.Reader, basePath string) error

Restore restores data to the underlying engine for the shard. The shard is reopened after restore.

func (*Shard) SeriesCount

func (s *Shard) SeriesCount() (int, error)

SeriesCount returns the number of series buckets on the shard.

func (*Shard) SetEnabled

func (s *Shard) SetEnabled(enabled bool)

SetEnabled enables the shard for queries and write. When disabled, all writes and queries return an error and compactions are stopped for the shard.

func (*Shard) Statistics

func (s *Shard) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Shard) UnloadIndex

func (s *Shard) UnloadIndex()

UnloadIndex removes all references to this shard from the DatabaseIndex

func (*Shard) WithLogger

func (s *Shard) WithLogger(log zap.Logger)

WithLogger sets the logger on the shard.

func (*Shard) WritePoints

func (s *Shard) WritePoints(points []models.Point) error

WritePoints will write the raw data points and any new metadata to the index in the shard.

func (*Shard) WriteTo

func (s *Shard) WriteTo(w io.Writer) (int64, error)

WriteTo writes the shard's data to w.

type ShardError

A ShardError implements the error interface, and contains extra context about the shard that generated the error.

type ShardError struct {
    Err error
    // contains filtered or unexported fields
}

func (ShardError) Error

func (e ShardError) Error() string

Error returns the string representation of the error, to satisfy the error interface.

type ShardGroup

type ShardGroup interface {
    MeasurementsByRegex(re *regexp.Regexp) []string
    FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
    MapType(measurement, field string) influxql.DataType
    CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
    ExpandSources(sources influxql.Sources) (influxql.Sources, error)
}

type ShardStatistics

ShardStatistics maintains statistics for a shard.

type ShardStatistics struct {
    WriteReq           int64
    WriteReqOK         int64
    WriteReqErr        int64
    SeriesCreated      int64
    FieldsCreated      int64
    WritePointsErr     int64
    WritePointsDropped int64
    WritePointsOK      int64
    BytesWritten       int64
    DiskBytes          int64
}

type Shards

Shards represents a sortable list of shards.

type Shards []*Shard

func (Shards) CreateIterator

func (a Shards) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)

func (Shards) ExpandSources

func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error)

func (Shards) FieldDimensions

func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)

func (Shards) Len

func (a Shards) Len() int

Len implements sort.Interface.

func (Shards) Less

func (a Shards) Less(i, j int) bool

Less implements sort.Interface.

func (Shards) MapType

func (a Shards) MapType(measurement, field string) influxql.DataType

func (Shards) MeasurementsByRegex

func (a Shards) MeasurementsByRegex(re *regexp.Regexp) []string

func (Shards) Swap

func (a Shards) Swap(i, j int)

Swap implements sort.Interface.

type Store

Store manages shards and indexes for databases.

type Store struct {
    EngineOptions EngineOptions

    Logger zap.Logger
    // contains filtered or unexported fields
}

func NewStore

func NewStore(path string) *Store

NewStore returns a new store with the given path and a default configuration. The returned store must be initialized by calling Open before using it.

func (*Store) BackupShard

func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error

BackupShard will get the shard and have the engine backup since the passed in time to the writer.

func (*Store) Close

func (s *Store) Close() error

Close closes the store and all associated shards. After calling Close accessing shards through the Store will result in ErrStoreClosed being returned.

func (*Store) CreateShard

func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error

CreateShard creates a shard with the given id and retention policy on a database.

func (*Store) CreateShardSnapshot

func (s *Store) CreateShardSnapshot(id uint64) (string, error)

CreateShardSnapShot will create a hard link to the underlying shard and return a path. The caller is responsible for cleaning up (removing) the file path returned.

func (*Store) DatabaseIndex

func (s *Store) DatabaseIndex(name string) *DatabaseIndex

DatabaseIndex returns the index for a database by its name.

func (*Store) DatabaseIndexN

func (s *Store) DatabaseIndexN() int

DatabaseIndexN returns the number of databases indices in the store.

func (*Store) Databases

func (s *Store) Databases() []string

Databases returns all the databases in the indexes.

func (*Store) DeleteDatabase

func (s *Store) DeleteDatabase(name string) error

DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.

func (*Store) DeleteMeasurement

func (s *Store) DeleteMeasurement(database, name string) error

DeleteMeasurement removes a measurement and all associated series from a database.

func (*Store) DeleteRetentionPolicy

func (s *Store) DeleteRetentionPolicy(database, name string) error

DeleteRetentionPolicy will close all shards associated with the provided retention policy, remove the retention policy directories on both the DB and WAL, and remove all shard files from disk.

func (*Store) DeleteSeries

func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error

DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys.

func (*Store) DeleteShard

func (s *Store) DeleteShard(shardID uint64) error

DeleteShard removes a shard from disk.

func (*Store) DiskSize

func (s *Store) DiskSize() (int64, error)

DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size.

func (*Store) ExpandSources

func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error)

ExpandSources expands sources against all local shards.

func (*Store) Measurement

func (s *Store) Measurement(database, name string) *Measurement

Measurement returns a measurement by name from the given database.

func (*Store) Measurements

func (s *Store) Measurements(database string, cond influxql.Expr) ([]string, error)

Measurements returns a slice of sorted measurement names in the given database, matching the given condition.

func (*Store) Open

func (s *Store) Open() error

Open initializes the store, creating all necessary directories, loading all shards and indexes and initializing periodic maintenance of all shards.

func (*Store) Path

func (s *Store) Path() string

Path returns the store's root path.

func (*Store) RestoreShard

func (s *Store) RestoreShard(id uint64, r io.Reader) error

RestoreShard restores a backup from r to a given shard. This will only overwrite files included in the backup.

func (*Store) SetShardEnabled

func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error

SetShardEnabled enables or disables a shard for read and writes.

func (*Store) Shard

func (s *Store) Shard(id uint64) *Shard

Shard returns a shard by id.

func (*Store) ShardGroup

func (s *Store) ShardGroup(ids []uint64) ShardGroup

ShardGroup returns a ShardGroup with a list of shards by id.

func (*Store) ShardIDs

func (s *Store) ShardIDs() []uint64

ShardIDs returns a slice of all ShardIDs under management, in arbitrary order.

func (*Store) ShardN

func (s *Store) ShardN() int

ShardN returns the number of shards in the store.

func (*Store) ShardRelativePath

func (s *Store) ShardRelativePath(id uint64) (string, error)

ShardRelativePath will return the relative path to the shard. i.e. <database>/<retention>/<id>.

func (*Store) Shards

func (s *Store) Shards(ids []uint64) []*Shard

Shards returns a list of shards by id.

func (*Store) Statistics

func (s *Store) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for period monitoring.

func (*Store) TagValues

func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error)

TagValues returns the tag keys and values in the given database, matching the condition.

func (*Store) WithLogger

func (s *Store) WithLogger(log zap.Logger)

WithLogger sets the logger for the store.

func (*Store) WriteToShard

func (s *Store) WriteToShard(shardID uint64, points []models.Point) error

WriteToShard writes a list of points to a shard identified by its ID.

type TagFilter

TagFilter represents a tag filter when looking up other tags or measurements.

type TagFilter struct {
    Op    influxql.Token
    Key   string
    Value string
    Regex *regexp.Regexp
}

type TagValues

TagValues represents the tag keys and values in a measurement.

type TagValues struct {
    Measurement string
    Values      []KeyValue
}

Subdirectories

Name Synopsis
..
engine Package engine can be imported to initialize and register all available TSDB engines.
tsm1 Package tsm1 provides a TSDB in the Time Structured Merge tree format.