Package zk
Overview ▹
Index ▹
Constants
const ( FlagEphemeral = 1 FlagSequence = 2 )
Constants for ACL permissions
const ( PermRead = 1 << iota PermWrite PermCreate PermDelete PermAdmin PermAll = 0x1f )
const ( DefaultServerTickTime = 2000 DefaultServerInitLimit = 10 DefaultServerSyncLimit = 5 DefaultServerAutoPurgeSnapRetainCount = 3 DefaultPeerPort = 2888 DefaultLeaderElectionPort = 3888 )
const ( DefaultPort = 2181 )
Variables
var ( ErrConnectionClosed = errors.New("zk: connection closed") ErrUnknown = errors.New("zk: unknown error") ErrAPIError = errors.New("zk: api error") ErrNoNode = errors.New("zk: node does not exist") ErrNoAuth = errors.New("zk: not authenticated") ErrBadVersion = errors.New("zk: version conflict") ErrNoChildrenForEphemerals = errors.New("zk: ephemeral nodes may not have children") ErrNodeExists = errors.New("zk: node already exists") ErrNotEmpty = errors.New("zk: node has children") ErrSessionExpired = errors.New("zk: session has been expired by the server") ErrInvalidACL = errors.New("zk: invalid ACL specified") ErrAuthFailed = errors.New("zk: client authentication failed") ErrClosing = errors.New("zk: zookeeper is closing") ErrNothing = errors.New("zk: no server responsees to process") ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored") )
var ( // ErrDeadlock is returned by Lock when trying to lock twice without unlocking first ErrDeadlock = errors.New("zk: trying to acquire a lock twice") // ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired. ErrNotLocked = errors.New("zk: not locked") )
var ( ErrUnhandledFieldType = errors.New("zk: unhandled field type") ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct") ErrShortBuffer = errors.New("zk: buffer too small") )
ErrInvalidPath indicates that an operation was being attempted on an invalid path. (e.g. empty path)
var ErrInvalidPath = errors.New("zk: invalid path")
ErrNoServer indicates that an operation cannot be completed because attempts to connect to all servers in the list failed.
var ErrNoServer = errors.New("zk: could not connect to a server")
func AuthACL ¶
func AuthACL(perms int32) []ACL
AuthACL produces an ACL list containing a single ACL which uses the provided permissions, with the scheme "auth", and ID "", which is used by ZooKeeper to represent any authenticated user.
func DigestACL ¶
func DigestACL(perms int32, user, password string) []ACL
func FLWCons ¶
func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool)
FLWCons is a FourLetterWord helper function. In particular, this function pulls the ruok output from each server.
As with FLWSrvr, the boolean value indicates whether one of the requests had an issue. The Clients struct has an Error value that can be checked.
func FLWRuok ¶
func FLWRuok(servers []string, timeout time.Duration) []bool
FLWRuok is a FourLetterWord helper function. In particular, this function pulls the ruok output from each server.
func FLWSrvr ¶
func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool)
FLWSrvr is a FourLetterWord helper function. In particular, this function pulls the srvr output from the zookeeper instances and parses the output. A slice of *ServerStats structs are returned as well as a boolean value to indicate whether this function processed successfully.
If the boolean value is false there was a problem. If the *ServerStats slice is empty or nil, then the error happened before we started to obtain 'srvr' values. Otherwise, one of the servers had an issue and the "Error" value in the struct should be inspected to determine which server had the issue.
func FormatServers ¶
func FormatServers(servers []string) []string
FormatServers takes a slice of addresses, and makes sure they are in a format that resembles <addr>:<port>. If the server has no port provided, the DefaultPort constant is added to the end.
func WithDialer ¶
func WithDialer(dialer Dialer) connOption
WithDialer returns a connection option specifying a non-default Dialer.
func WithEventCallback ¶
func WithEventCallback(cb EventCallback) connOption
WithEventCallback returns a connection option that specifies an event callback. The callback must not block - doing so would delay the ZK go routines.
func WithHostProvider ¶
func WithHostProvider(hostProvider HostProvider) connOption
WithHostProvider returns a connection option specifying a non-default HostProvider.
func WorldACL ¶
func WorldACL(perms int32) []ACL
WorldACL produces an ACL list containing a single ACL which uses the provided permissions, with the scheme "world", and ID "anyone", which is used by ZooKeeper to represent any user at all.
type ACL ¶
type ACL struct { Perms int32 Scheme string ID string }
type CheckVersionRequest ¶
type CheckVersionRequest PathVersionRequest
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
func Connect ¶
func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error)
Connect establishes a new connection to a pool of zookeeper servers. The provided session timeout sets the amount of time for which a session is considered valid after losing connection to a server. Within the session timeout it's possible to reestablish a connection to a different server and keep the same session. This is means any ephemeral nodes and watches are maintained.
func ConnectWithDialer ¶
func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error)
ConnectWithDialer establishes a new connection to a pool of zookeeper servers using a custom Dialer. See Connect for further information about session timeout. This method is deprecated and provided for compatibility: use the WithDialer option instead.
func (*Conn) AddAuth ¶
func (c *Conn) AddAuth(scheme string, auth []byte) error
func (*Conn) Children ¶
func (c *Conn) Children(path string) ([]string, *Stat, error)
func (*Conn) ChildrenW ¶
func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error)
func (*Conn) Close ¶
func (c *Conn) Close()
func (*Conn) Create ¶
func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error)
func (*Conn) CreateProtectedEphemeralSequential ¶
func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error)
CreateProtectedEphemeralSequential fixes a race condition if the server crashes after it creates the node. On reconnect the session may still be valid so the ephemeral node still exists. Therefore, on reconnect we need to check if a node with a GUID generated on create exists.
func (*Conn) Delete ¶
func (c *Conn) Delete(path string, version int32) error
func (*Conn) Exists ¶
func (c *Conn) Exists(path string) (bool, *Stat, error)
func (*Conn) ExistsW ¶
func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error)
func (*Conn) Get ¶
func (c *Conn) Get(path string) ([]byte, *Stat, error)
func (*Conn) GetACL ¶
func (c *Conn) GetACL(path string) ([]ACL, *Stat, error)
func (*Conn) GetW ¶
func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error)
GetW returns the contents of a znode and sets a watch
func (*Conn) Multi ¶
func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error)
Multi executes multiple ZooKeeper operations or none of them. The provided ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or *CheckVersionRequest.
func (*Conn) Server ¶
func (c *Conn) Server() string
Server returns the current or last-connected server name.
func (*Conn) SessionID ¶
func (c *Conn) SessionID() int64
SessionID returns the current session id of the connection.
func (*Conn) Set ¶
func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error)
func (*Conn) SetACL ¶
func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error)
func (*Conn) SetLogger ¶
func (c *Conn) SetLogger(l Logger)
SetLogger sets the logger to be used for printing errors. Logger is an interface provided by this package.
func (*Conn) State ¶
func (c *Conn) State() State
State returns the current state of the connection.
func (*Conn) Sync ¶
func (c *Conn) Sync(path string) (string, error)
type CreateRequest ¶
type CreateRequest struct { Path string Data []byte Acl []ACL Flags int32 }
type DNSHostProvider ¶
DNSHostProvider is the default HostProvider. It currently matches the Java StaticHostProvider, resolving hosts from DNS once during the call to Init. It could be easily extended to re-query DNS periodically or if there is trouble connecting.
type DNSHostProvider struct {
// contains filtered or unexported fields
}
func (*DNSHostProvider) Connected ¶
func (hp *DNSHostProvider) Connected()
Connected notifies the HostProvider of a successful connection.
func (*DNSHostProvider) Init ¶
func (hp *DNSHostProvider) Init(servers []string) error
Init is called first, with the servers specified in the connection string. It uses DNS to look up addresses for each server, then shuffles them all together.
func (*DNSHostProvider) Len ¶
func (hp *DNSHostProvider) Len() int
Len returns the number of servers available
func (*DNSHostProvider) Next ¶
func (hp *DNSHostProvider) Next() (server string, retryStart bool)
Next returns the next server to connect to. retryStart will be true if we've looped through all known servers without Connected() being called.
type DeleteRequest ¶
type DeleteRequest PathVersionRequest
type Dialer ¶
type Dialer func(network, address string, timeout time.Duration) (net.Conn, error)
type ErrCode ¶
type ErrCode int32
type ErrMissingServerConfigField ¶
type ErrMissingServerConfigField string
func (ErrMissingServerConfigField) Error ¶
func (e ErrMissingServerConfigField) Error() string
type Event ¶
type Event struct { Type EventType State State Path string // For non-session events, the path of the watched node. Err error Server string // For connection events }
type EventCallback ¶
EventCallback is a function that is called when an Event occurs.
type EventCallback func(Event)
type EventType ¶
type EventType int32
const ( EventNodeCreated EventType = 1 EventNodeDeleted EventType = 2 EventNodeDataChanged EventType = 3 EventNodeChildrenChanged EventType = 4 EventSession EventType = -1 EventNotWatching EventType = -2 )
func (EventType) String ¶
func (t EventType) String() string
type HostProvider ¶
HostProvider is used to represent a set of hosts a ZooKeeper client should connect to. It is an analog of the Java equivalent: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java?view=markup
type HostProvider interface { // Init is called first, with the servers specified in the connection string. Init(servers []string) error // Len returns the number of servers. Len() int // Next returns the next server to connect to. retryStart will be true if we've looped through // all known servers without Connected() being called. Next() (server string, retryStart bool) // Notify the HostProvider of a successful connection. Connected() }
type Lock ¶
Lock is a mutual exclusion lock.
type Lock struct {
// contains filtered or unexported fields
}
func NewLock ¶
func NewLock(c *Conn, path string, acl []ACL) *Lock
NewLock creates a new lock instance using the provided connection, path, and acl. The path must be a node that is only used by this lock. A lock instances starts unlocked until Lock() is called.
func (*Lock) Lock ¶
func (l *Lock) Lock() error
Lock attempts to acquire the lock. It will wait to return until the lock is acquired or an error occurs. If this instance already has the lock then ErrDeadlock is returned.
func (*Lock) Unlock ¶
func (l *Lock) Unlock() error
Unlock releases an acquired lock. If the lock is not currently acquired by this Lock instance than ErrNotLocked is returned.
type Logger ¶
Logger is an interface that can be implemented to provide custom log output.
type Logger interface { Printf(string, ...interface{}) }
DefaultLogger uses the stdlib log package for logging.
var DefaultLogger Logger = defaultLogger{}
type Mode ¶
Mode is used to build custom server modes (leader|follower|standalone).
type Mode uint8
const ( ModeUnknown Mode = iota ModeLeader Mode = iota ModeFollower Mode = iota ModeStandalone Mode = iota )
func (Mode) String ¶
func (m Mode) String() string
type MultiResponse ¶
type MultiResponse struct { Stat *Stat String string Error error }
type PathVersionRequest ¶
type PathVersionRequest struct { Path string Version int32 }
type Server ¶
type Server struct {
JarPath string
ConfigPath string
Stdout, Stderr io.Writer
// contains filtered or unexported fields
}
func (*Server) Start ¶
func (srv *Server) Start() error
func (*Server) Stop ¶
func (srv *Server) Stop() error
type ServerClient ¶
ServerClient is the information for a single Zookeeper client and its session. This is used to parse/extract the output fo the `cons` command.
type ServerClient struct {
Queued int64
Received int64
Sent int64
SessionID int64
Lcxid int64
Lzxid int64
Timeout int32
LastLatency int32
MinLatency int32
AvgLatency int32
MaxLatency int32
Established time.Time
LastResponse time.Time
Addr string
LastOperation string // maybe?
Error error
}
type ServerClients ¶
ServerClients is a struct for the FLWCons() function. It's used to provide the list of Clients.
This is needed because FLWCons() takes multiple servers.
type ServerClients struct { Clients []*ServerClient Error error }
type ServerConfig ¶
type ServerConfig struct { TickTime int // Number of milliseconds of each tick InitLimit int // Number of ticks that the initial synchronization phase can take SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement DataDir string // Direcrory where the snapshot is stored ClientPort int // Port at which clients will connect AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge) Servers []ServerConfigServer }
func (ServerConfig) Marshall ¶
func (sc ServerConfig) Marshall(w io.Writer) error
type ServerConfigServer ¶
type ServerConfigServer struct { ID int Host string PeerPort int LeaderElectionPort int }
type ServerStats ¶
ServerStats is the information pulled from the Zookeeper `stat` command.
type ServerStats struct { Sent int64 Received int64 NodeCount int64 MinLatency int64 AvgLatency int64 MaxLatency int64 Connections int64 Outstanding int64 Epoch int32 Counter int32 BuildTime time.Time Mode Mode Version string Error error }
type SetDataRequest ¶
type SetDataRequest struct { Path string Data []byte Version int32 }
type Stat ¶
type Stat struct { Czxid int64 // The zxid of the change that caused this znode to be created. Mzxid int64 // The zxid of the change that last modified this znode. Ctime int64 // The time in milliseconds from epoch when this znode was created. Mtime int64 // The time in milliseconds from epoch when this znode was last modified. Version int32 // The number of changes to the data of this znode. Cversion int32 // The number of changes to the children of this znode. Aversion int32 // The number of changes to the ACL of this znode. EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero. DataLength int32 // The length of the data field of this znode. NumChildren int32 // The number of children of this znode. Pzxid int64 // last modified children }
type State ¶
type State int32
const ( StateUnknown State = -1 StateDisconnected State = 0 StateConnecting State = 1 StateAuthFailed State = 4 StateConnectedReadOnly State = 5 StateSaslAuthenticated State = 6 StateExpired State = -112 StateConnected = State(100) StateHasSession = State(101) )
func (State) String ¶
func (s State) String() string
type TestCluster ¶
type TestCluster struct { Path string Servers []TestServer }
func StartTestCluster ¶
func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error)
func (*TestCluster) Connect ¶
func (tc *TestCluster) Connect(idx int) (*Conn, error)
func (*TestCluster) ConnectAll ¶
func (tc *TestCluster) ConnectAll() (*Conn, <-chan Event, error)
func (*TestCluster) ConnectAllTimeout ¶
func (tc *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error)
func (*TestCluster) ConnectWithOptions ¶
func (tc *TestCluster) ConnectWithOptions(sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error)
func (*TestCluster) StartAllServers ¶
func (tc *TestCluster) StartAllServers() error
func (*TestCluster) StartServer ¶
func (tc *TestCluster) StartServer(server string)
func (*TestCluster) Stop ¶
func (tc *TestCluster) Stop() error
func (*TestCluster) StopAllServers ¶
func (tc *TestCluster) StopAllServers() error
func (*TestCluster) StopServer ¶
func (tc *TestCluster) StopServer(server string)
type TestServer ¶
type TestServer struct { Port int Path string Srv *Server }