servers - ActiveState ActiveGo 1.8
...

Package servers

import "github.com/hashicorp/consul/consul/servers"
Overview
Index

Overview ▾

Package servers provides a Manager interface for Manager managed agent.Server objects. The servers package manages servers from a Consul client's perspective (i.e. a list of servers that a client talks with for RPCs). The servers package does not provide any API guarantees and should be called only by `hashicorp/consul`.

Index ▾

func FloodJoins(logger *log.Logger, portFn FloodPortFn, localDatacenter string, localSerf *serf.Serf, globalSerf *serf.Serf)
func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event)
type FloodPortFn
type Manager
    func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger) (m *Manager)
    func (m *Manager) AddServer(s *agent.Server)
    func (m *Manager) FindServer() *agent.Server
    func (m *Manager) IsOffline() bool
    func (m *Manager) NotifyFailedServer(s *agent.Server)
    func (m *Manager) NumServers() int
    func (m *Manager) RebalanceServers()
    func (m *Manager) RemoveServer(s *agent.Server)
    func (m *Manager) ResetRebalanceTimer()
    func (m *Manager) Start()
type ManagerSerfCluster
type Pinger
type Router
    func NewRouter(logger *log.Logger, localDatacenter string) *Router
    func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error
    func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error
    func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error
    func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool)
    func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error)
    func (r *Router) GetDatacenters() []string
    func (r *Router) GetDatacentersByDistance() ([]string, error)
    func (r *Router) RemoveArea(areaID types.AreaID) error
    func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error
    func (r *Router) Shutdown()
type RouterSerfCluster

Package files

manager.go router.go serf_adapter.go serf_flooder.go

func FloodJoins

func FloodJoins(logger *log.Logger, portFn FloodPortFn,
    localDatacenter string, localSerf *serf.Serf, globalSerf *serf.Serf)

FloodJoins attempts to make sure all Consul servers in the local Serf instance are joined in the global Serf instance. It assumes names in the local area are of the form <node> and those in the global area are of the form <node>.<dc> as is done for WAN and general network areas in Consul Enterprise.

func HandleSerfEvents

func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event)

HandleSerfEvents is a long-running goroutine that pushes incoming events from a Serf manager's channel into the given router. This will return when the shutdown channel is closed.

type FloodPortFn

FloodPortFn gets the port to use for a given server when flood-joining. This will return false if it doesn't have one.

type FloodPortFn func(*agent.Server) (int, bool)

type Manager

type Manager struct {
    // contains filtered or unexported fields
}

func New

func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger) (m *Manager)

New is the only way to safely create a new Manager struct.

func (*Manager) AddServer

func (m *Manager) AddServer(s *agent.Server)

AddServer takes out an internal write lock and adds a new server. If the server is not known, appends the server to the list. The new server will begin seeing use after the rebalance timer fires or enough servers fail organically. If the server is already known, merge the new server details.

func (*Manager) FindServer

func (m *Manager) FindServer() *agent.Server

FindServer takes out an internal "read lock" and searches through the list of servers to find a "healthy" server. If the server is actually unhealthy, we rely on Serf to detect this and remove the node from the server list. If the server at the front of the list has failed or fails during an RPC call, it is rotated to the end of the list. If there are no servers available, return nil.

func (*Manager) IsOffline

func (m *Manager) IsOffline() bool

IsOffline checks to see if all the known servers have failed their ping test during the last rebalance.

func (*Manager) NotifyFailedServer

func (m *Manager) NotifyFailedServer(s *agent.Server)

NotifyFailedServer marks the passed in server as "failed" by rotating it to the end of the server list.

func (*Manager) NumServers

func (m *Manager) NumServers() int

NumServers takes out an internal "read lock" and returns the number of servers. numServers includes both healthy and unhealthy servers.

func (*Manager) RebalanceServers

func (m *Manager) RebalanceServers()

RebalanceServers shuffles the list of servers on this agent. The server at the front of the list is selected for the next RPC. RPC calls that fail for a particular server are rotated to the end of the list. This method reshuffles the list periodically in order to redistribute work across all known consul servers (i.e. guarantee that the order of servers in the server list is not positively correlated with the age of a server in the Consul cluster). Periodically shuffling the server list prevents long-lived clients from fixating on long-lived servers.

Unhealthy servers are removed when serf notices the server has been deregistered. Before the newly shuffled server list is saved, the new remote endpoint is tested to ensure its responsive.

func (*Manager) RemoveServer

func (m *Manager) RemoveServer(s *agent.Server)

RemoveServer takes out an internal write lock and removes a server from the server list.

func (*Manager) ResetRebalanceTimer

func (m *Manager) ResetRebalanceTimer()

ResetRebalanceTimer resets the rebalance timer. This method exists for testing and should not be used directly.

func (*Manager) Start

func (m *Manager) Start()

Start is used to start and manage the task of automatically shuffling and rebalancing the list of Consul servers. This maintenance only happens periodically based on the expiration of the timer. Failed servers are automatically cycled to the end of the list. New servers are appended to the list. The order of the server list must be shuffled periodically to distribute load across all known and available Consul servers.

type ManagerSerfCluster

ManagerSerfCluster is an interface wrapper around Serf in order to make this easier to unit test.

type ManagerSerfCluster interface {
    NumNodes() int
}

type Pinger

Pinger is an interface wrapping client.ConnPool to prevent a cyclic import dependency.

type Pinger interface {
    PingConsulServer(s *agent.Server) (bool, error)
}

type Router

Router keeps track of a set of network areas and their associated Serf membership of Consul servers. It then indexes this by datacenter to provide healthy routes to servers by datacenter.

type Router struct {

    // This top-level lock covers all the internal state.
    sync.RWMutex
    // contains filtered or unexported fields
}

func NewRouter

func NewRouter(logger *log.Logger, localDatacenter string) *Router

NewRouter returns a new Router with the given configuration.

func (*Router) AddArea

func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error

AddArea registers a new network area with the router.

func (*Router) AddServer

func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error

AddServer should be called whenever a new server joins an area. This is typically hooked into the Serf event handler area for this area.

func (*Router) FailServer

func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error

FailServer should be called whenever a server is failed in an area. This is typically hooked into the Serf event handler area for this area. We will immediately shift traffic away from this server, but it will remain in the list of servers.

func (*Router) FindRoute

func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool)

FindRoute returns a healthy server with a route to the given datacenter. The Boolean return parameter will indicate if a server was available. In some cases this may return a best-effort unhealthy server that can be used for a connection attempt. If any problem occurs with the given server, the caller should feed that back to the manager associated with the server, which is also returned, by calling NofifyFailedServer().

func (*Router) GetDatacenterMaps

func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error)

GetDatacenterMaps returns a structure with the raw network coordinates of each known server, organized by datacenter and network area.

func (*Router) GetDatacenters

func (r *Router) GetDatacenters() []string

GetDatacenters returns a list of datacenters known to the router, sorted by name.

func (*Router) GetDatacentersByDistance

func (r *Router) GetDatacentersByDistance() ([]string, error)

GetDatacentersByDeistance returns a list of datacenters known to the router, sorted by median RTT from this server to the servers in each datacenter. If there are multiple areas that reach a given datacenter, this will use the lowest RTT for the sort.

func (*Router) RemoveArea

func (r *Router) RemoveArea(areaID types.AreaID) error

RemoveArea removes an existing network area from the router.

func (*Router) RemoveServer

func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error

RemoveServer should be called whenever a server is removed from an area. This is typically hooked into the Serf event handler area for this area.

func (*Router) Shutdown

func (r *Router) Shutdown()

Shutdown removes all areas from the router, which stops all their respective managers. No new areas can be added after the router is shut down.

type RouterSerfCluster

RouterSerfCluster is an interface wrapper around Serf in order to make this easier to unit test.

type RouterSerfCluster interface {
    NumNodes() int
    Members() []serf.Member
    GetCoordinate() (*coordinate.Coordinate, error)
    GetCachedCoordinate(name string) (*coordinate.Coordinate, bool)
}