Package sarama
Overview ▹
Index ▹
Constants
const ( // OffsetNewest stands for the log head offset, i.e. the offset that will be // assigned to the next message that will be produced to the partition. You // can send this to a client's GetOffset method to get this offset, or when // calling ConsumePartition to start consuming new messages. OffsetNewest int64 = -1 // OffsetOldest stands for the oldest offset available on the broker for a // partition. You can send this to a client's GetOffset method to get this // offset, or when calling ConsumePartition to start consuming from the // oldest offset that is still available on the broker. OffsetOldest int64 = -2 )
GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.
const GroupGenerationUndefined = -1
ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
const ReceiveTime int64 = -1
Variables
Effective constants defining the supported kafka versions.
var ( V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) )
ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
var ErrNotConnected = errors.New("kafka: broker not connected")
ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")
ErrShuttingDown is returned when a producer receives a message during shutdown.
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.
var MaxRequestSize int32 = 100 * 1024 * 1024
MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to protect the client from running out of memory. Please note that brokers do not have any natural limit on the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers (see https://issues.apache.org/jira/browse/KAFKA-2063).
var MaxResponseSize int32 = 100 * 1024 * 1024
PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
var PanicHandler func(interface{})
type ApiVersionsRequest ¶
type ApiVersionsRequest struct { }
type ApiVersionsResponse ¶
type ApiVersionsResponse struct { Err KError ApiVersions []*ApiVersionsResponseBlock }
type ApiVersionsResponseBlock ¶
type ApiVersionsResponseBlock struct { ApiKey int16 MinVersion int16 MaxVersion int16 }
type AsyncProducer ¶
AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope.
type AsyncProducer interface { // AsyncClose triggers a shutdown of the producer. The shutdown has completed // when both the Errors and Successes channels have been closed. When calling // AsyncClose, you *must* continue to read from those channels in order to // drain the results of any messages in flight. AsyncClose() // Close shuts down the producer and waits for any buffered messages to be // flushed. You must call this function before a producer object passes out of // scope, as it may otherwise leak memory. You must call this before calling // Close on the underlying client. Close() error // Input is the input channel for the user to write messages to that they // wish to send. Input() chan<- *ProducerMessage // Successes is the success output channel back to the user when Return.Successes is // enabled. If Return.Successes is true, you MUST read from this channel or the // Producer will deadlock. It is suggested that you send and read messages // together in a single select statement. Successes() <-chan *ProducerMessage // Errors is the error output channel back to the user. You MUST read from this // channel or the Producer will deadlock when the channel is full. Alternatively, // you can set Producer.Return.Errors in your config to false, which prevents // errors to be returned. Errors() <-chan *ProducerError }
▹ Example (Goroutines)
▹ Example (Select)
func NewAsyncProducer ¶
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error)
NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
func NewAsyncProducerFromClient ¶
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error)
NewAsyncProducerFromClient creates a new Producer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.
type Broker ¶
Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
type Broker struct {
// contains filtered or unexported fields
}
▹ Example
func NewBroker ¶
func NewBroker(addr string) *Broker
NewBroker creates and returns a Broker targeting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.
func (*Broker) Addr ¶
func (b *Broker) Addr() string
Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
func (*Broker) ApiVersions ¶
func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error)
func (*Broker) Close ¶
func (b *Broker) Close() error
func (*Broker) CommitOffset ¶
func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error)
func (*Broker) Connected ¶
func (b *Broker) Connected() (bool, error)
Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.
func (*Broker) DescribeGroups ¶
func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error)
func (*Broker) Fetch ¶
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error)
func (*Broker) FetchOffset ¶
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error)
func (*Broker) GetAvailableOffsets ¶
func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error)
func (*Broker) GetConsumerMetadata ¶
func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)
func (*Broker) GetMetadata ¶
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error)
func (*Broker) Heartbeat ¶
func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error)
func (*Broker) ID ¶
func (b *Broker) ID() int32
ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
func (*Broker) JoinGroup ¶
func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error)
func (*Broker) LeaveGroup ¶
func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error)
func (*Broker) ListGroups ¶
func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error)
func (*Broker) Open ¶
func (b *Broker) Open(conf *Config) error
Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewConfig() is used.
func (*Broker) Produce ¶
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error)
func (*Broker) SyncGroup ¶
func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error)
type ByteEncoder ¶
ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.
type ByteEncoder []byte
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
func (ByteEncoder) Length ¶
func (b ByteEncoder) Length() int
type Client ¶
Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.
type Client interface { // Config returns the Config struct of the client. This struct should not be // altered after it has been created. Config() *Config // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker // Topics returns the set of available topics as retrieved from cluster metadata. Topics() ([]string, error) // Partitions returns the sorted list of all partition IDs for the given topic. Partitions(topic string) ([]int32, error) // WritablePartitions returns the sorted list of all writable partition IDs for // the given topic, where "writable" means "having a valid leader accepting // writes". WritablePartitions(topic string) ([]int32, error) // Leader returns the broker object that is the leader of the current // topic/partition, as determined by querying the cluster metadata. Leader(topic string, partitionID int32) (*Broker, error) // Replicas returns the set of all replica IDs for the given partition. Replicas(topic string, partitionID int32) ([]int32, error) // InSyncReplicas returns the set of all in-sync replica IDs for the given // partition. In-sync replicas are replicas which are fully caught up with // the partition leader. InSyncReplicas(topic string, partitionID int32) ([]int32, error) // RefreshMetadata takes a list of topics and queries the cluster to refresh the // available metadata for those topics. If no topics are provided, it will refresh // metadata for all topics. RefreshMetadata(topics ...string) error // GetOffset queries the cluster to get the most recent available offset at the // given time on the topic/partition combination. Time should be OffsetOldest for // the earliest available offset, OffsetNewest for the offset of the message that // will be produced next, or a time. GetOffset(topic string, partitionID int32, time int64) (int64, error) // Coordinator returns the coordinating broker for a consumer group. It will // return a locally cached value if it's available. You can call // RefreshCoordinator to update the cached value. This function only works on // Kafka 0.8.2 and higher. Coordinator(consumerGroup string) (*Broker, error) // RefreshCoordinator retrieves the coordinator for a consumer group and stores it // in local cache. This function only works on Kafka 0.8.2 and higher. RefreshCoordinator(consumerGroup string) error // Close shuts down all broker connections managed by this client. It is required // to call this function before a client object passes out of scope, as it will // otherwise leak memory. You must close any Producers or Consumers using a client // before you close the client. Close() error // Closed returns true if the client has already had Close called on it Closed() bool }
func NewClient ¶
func NewClient(addrs []string, conf *Config) (Client, error)
NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.
type CompressionCodec ¶
CompressionCodec represents the various compression codecs recognized by Kafka in messages.
type CompressionCodec int8
const ( CompressionNone CompressionCodec = 0 CompressionGZIP CompressionCodec = 1 CompressionSnappy CompressionCodec = 2 CompressionLZ4 CompressionCodec = 3 )
type Config ¶
Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct { // Net is the namespace for network-level properties used by the Broker, and // shared by the Client/Producer/Consumer. Net struct { // How many outstanding requests a connection is allowed to have before // sending on it blocks (default 5). MaxOpenRequests int // All three of the below configurations are similar to the // `socket.timeout.ms` setting in JVM kafka. All of them default // to 30 seconds. DialTimeout time.Duration // How long to wait for the initial connection. ReadTimeout time.Duration // How long to wait for a response. WriteTimeout time.Duration // How long to wait for a transmit. TLS struct { // Whether or not to use TLS when connecting to the broker // (defaults to false). Enable bool // The TLS configuration to use for secure connections if // enabled (defaults to nil). Config *tls.Config } // SASL based authentication with broker. While there are multiple SASL authentication methods // the current implementation is limited to plaintext (SASL/PLAIN) authentication SASL struct { // Whether or not to use SASL authentication when connecting to the broker // (defaults to false). Enable bool // Whether or not to send the Kafka SASL handshake first if enabled // (defaults to true). You should only set this to false if you're using // a non-Kafka SASL proxy. Handshake bool //username and password for SASL/PLAIN authentication User string Password string } // KeepAlive specifies the keep-alive period for an active network connection. // If zero, keep-alives are disabled. (default is 0: disabled). KeepAlive time.Duration } // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. Metadata struct { Retry struct { // The total number of times to retry a metadata request when the // cluster is in the middle of a leader election (default 3). Max int // How long to wait for leader election to occur before retrying // (default 250ms). Similar to the JVM's `retry.backoff.ms`. Backoff time.Duration } // How frequently to refresh the cluster metadata in the background. // Defaults to 10 minutes. Set to 0 to disable. Similar to // `topic.metadata.refresh.interval.ms` in the JVM version. RefreshFrequency time.Duration } // Producer is the namespace for configuration related to producing messages, // used by the Producer. Producer struct { // The maximum permitted size of a message (defaults to 1000000). Should be // set equal to or smaller than the broker's `message.max.bytes`. MaxMessageBytes int // The level of acknowledgement reliability needed from the broker (defaults // to WaitForLocal). Equivalent to the `request.required.acks` setting of the // JVM producer. RequiredAcks RequiredAcks // The maximum duration the broker will wait the receipt of the number of // RequiredAcks (defaults to 10 seconds). This is only relevant when // RequiredAcks is set to WaitForAll or a number > 1. Only supports // millisecond resolution, nanoseconds will be truncated. Equivalent to // the JVM producer's `request.timeout.ms` setting. Timeout time.Duration // The type of compression to use on messages (defaults to no compression). // Similar to `compression.codec` setting of the JVM producer. Compression CompressionCodec // Generates partitioners for choosing the partition to send messages to // (defaults to hashing the message key). Similar to the `partitioner.class` // setting for the JVM producer. Partitioner PartitionerConstructor // Return specifies what channels will be populated. If they are set to true, // you must read from the respective channels to prevent deadlock. Return struct { // If enabled, successfully delivered messages will be returned on the // Successes channel (default disabled). Successes bool // If enabled, messages that failed to deliver will be returned on the // Errors channel, including error (default enabled). Errors bool } // The following config options control how often messages are batched up and // sent to the broker. By default, messages are sent as fast as possible, and // all messages received while the current batch is in-flight are placed // into the subsequent batch. Flush struct { // The best-effort number of bytes needed to trigger a flush. Use the // global sarama.MaxRequestSize to set a hard upper limit. Bytes int // The best-effort number of messages needed to trigger a flush. Use // `MaxMessages` to set a hard upper limit. Messages int // The best-effort frequency of flushes. Equivalent to // `queue.buffering.max.ms` setting of JVM producer. Frequency time.Duration // The maximum number of messages the producer will send in a single // broker request. Defaults to 0 for unlimited. Similar to // `queue.buffering.max.messages` in the JVM producer. MaxMessages int } Retry struct { // The total number of times to retry sending a message (default 3). // Similar to the `message.send.max.retries` setting of the JVM producer. Max int // How long to wait for the cluster to settle between retries // (default 100ms). Similar to the `retry.backoff.ms` setting of the // JVM producer. Backoff time.Duration } } // Consumer is the namespace for configuration related to consuming messages, // used by the Consumer. // // Note that Sarama's Consumer type does not currently support automatic // consumer-group rebalancing and offset tracking. For Zookeeper-based // tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka // library builds on Sarama to add this support. For Kafka-based tracking // (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library // builds on Sarama to add this support. Consumer struct { Retry struct { // How long to wait after a failing to read from a partition before // trying again (default 2s). Backoff time.Duration } // Fetch is the namespace for controlling how many bytes are retrieved by any // given request. Fetch struct { // The minimum number of message bytes to fetch in a request - the broker // will wait until at least this many are available. The default is 1, // as 0 causes the consumer to spin when no messages are available. // Equivalent to the JVM's `fetch.min.bytes`. Min int32 // The default number of message bytes to fetch from the broker in each // request (default 32768). This should be larger than the majority of // your messages, or else the consumer will spend a lot of time // negotiating sizes and not actually consuming. Similar to the JVM's // `fetch.message.max.bytes`. Default int32 // The maximum number of message bytes to fetch from the broker in a // single request. Messages larger than this will return // ErrMessageTooLarge and will not be consumable, so you must be sure // this is at least as large as your largest message. Defaults to 0 // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The // global `sarama.MaxResponseSize` still applies. Max int32 } // The maximum amount of time the broker will wait for Consumer.Fetch.Min // bytes to become available before it returns fewer than that anyways. The // default is 250ms, since 0 causes the consumer to spin when no events are // available. 100-500ms is a reasonable range for most cases. Kafka only // supports precision up to milliseconds; nanoseconds will be truncated. // Equivalent to the JVM's `fetch.wait.max.ms`. MaxWaitTime time.Duration // The maximum amount of time the consumer expects a message takes to process // for the user. If writing to the Messages channel takes longer than this, // that partition will stop fetching more messages until it can proceed again. // Note that, since the Messages channel is buffered, the actual grace time is // (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms. MaxProcessingTime time.Duration // Return specifies what channels will be populated. If they are set to true, // you must read from them to prevent deadlock. Return struct { // If enabled, any errors that occurred while consuming are returned on // the Errors channel (default disabled). Errors bool } // Offsets specifies configuration for how and when to commit consumed // offsets. This currently requires the manual use of an OffsetManager // but will eventually be automated. Offsets struct { // How frequently to commit updated offsets. Defaults to 1s. CommitInterval time.Duration // The initial offset to use if no offset was previously committed. // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest. Initial int64 // The retention duration for committed offsets. If zero, disabled // (in which case the `offsets.retention.minutes` option on the // broker will be used). Kafka only supports precision up to // milliseconds; nanoseconds will be truncated. Requires Kafka // broker version 0.9.0 or later. // (default is 0: disabled). Retention time.Duration } } // A user-provided string sent with every request to the brokers for logging, // debugging, and auditing purposes. Defaults to "sarama", but you should // probably set it to something specific to your application. ClientID string // The number of events to buffer in internal and external channels. This // permits the producer and consumer to continue processing some messages // in the background while user code is working, greatly improving throughput. // Defaults to 256. ChannelBufferSize int // The version of Kafka that Sarama will assume it is running against. // Defaults to the oldest supported stable version. Since Kafka provides // backwards-compatibility, setting it to a version older than you have // will not break anything, although it may prevent you from using the // latest features. Setting it to a version greater than you are actually // running may lead to random breakage. Version KafkaVersion // The registry to define metrics into. // Defaults to a local registry. // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true" // prior to starting Sarama. // See Examples on how to use the metrics registry MetricRegistry metrics.Registry }
▹ Example (Metrics)
func NewConfig ¶
func NewConfig() *Config
NewConfig returns a new configuration instance with sane defaults.
func (*Config) Validate ¶
func (c *Config) Validate() error
Validate checks a Config instance. It will return a ConfigurationError if the specified values don't make sense.
type ConfigurationError ¶
ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.
type ConfigurationError string
func (ConfigurationError) Error ¶
func (err ConfigurationError) Error() string
type Consumer ¶
Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.
Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
type Consumer interface { // Topics returns the set of available topics as retrieved from the cluster // metadata. This method is the same as Client.Topics(), and is provided for // convenience. Topics() ([]string, error) // Partitions returns the sorted list of all partition IDs for the given topic. // This method is the same as Client.Partitions(), and is provided for convenience. Partitions(topic string) ([]int32, error) // ConsumePartition creates a PartitionConsumer on the given topic/partition with // the given offset. It will return an error if this Consumer is already consuming // on the given topic/partition. Offset can be a literal offset, or OffsetNewest // or OffsetOldest ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) // HighWaterMarks returns the current high water marks for each topic and partition. // Consistency between partitions is not guaranteed since high water marks are updated separately. HighWaterMarks() map[string]map[int32]int64 // Close shuts down the consumer. It must be called after all child // PartitionConsumers have already been closed. Close() error }
▹ Example
func NewConsumer ¶
func NewConsumer(addrs []string, config *Config) (Consumer, error)
NewConsumer creates a new consumer using the given broker addresses and configuration.
func NewConsumerFromClient ¶
func NewConsumerFromClient(client Client) (Consumer, error)
NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.
type ConsumerError ¶
ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.
type ConsumerError struct { Topic string Partition int32 Err error }
func (ConsumerError) Error ¶
func (ce ConsumerError) Error() string
type ConsumerErrors ¶
ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.
type ConsumerErrors []*ConsumerError
func (ConsumerErrors) Error ¶
func (ce ConsumerErrors) Error() string
type ConsumerGroupMemberAssignment ¶
type ConsumerGroupMemberAssignment struct { Version int16 Topics map[string][]int32 UserData []byte }
type ConsumerGroupMemberMetadata ¶
type ConsumerGroupMemberMetadata struct { Version int16 Topics []string UserData []byte }
type ConsumerMessage ¶
ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+
}
type ConsumerMetadataRequest ¶
type ConsumerMetadataRequest struct { ConsumerGroup string }
type ConsumerMetadataResponse ¶
type ConsumerMetadataResponse struct { Err KError Coordinator *Broker CoordinatorID int32 // deprecated: use Coordinator.ID() CoordinatorHost string // deprecated: use Coordinator.Addr() CoordinatorPort int32 // deprecated: use Coordinator.Addr() }
type DescribeGroupsRequest ¶
type DescribeGroupsRequest struct { Groups []string }
func (*DescribeGroupsRequest) AddGroup ¶
func (r *DescribeGroupsRequest) AddGroup(group string)
type DescribeGroupsResponse ¶
type DescribeGroupsResponse struct { Groups []*GroupDescription }
type Encoder ¶
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().
type Encoder interface { Encode() ([]byte, error) Length() int }
type FetchRequest ¶
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
Version int16
// contains filtered or unexported fields
}
func (*FetchRequest) AddBlock ¶
func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32)
type FetchResponse ¶
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
}
func (*FetchResponse) AddError ¶
func (r *FetchResponse) AddError(topic string, partition int32, err KError)
func (*FetchResponse) AddMessage ¶
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)
func (*FetchResponse) GetBlock ¶
func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock
type FetchResponseBlock ¶
type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 MsgSet MessageSet }
type GroupDescription ¶
type GroupDescription struct { Err KError GroupId string State string ProtocolType string Protocol string Members map[string]*GroupMemberDescription }
type GroupMemberDescription ¶
type GroupMemberDescription struct { ClientId string ClientHost string MemberMetadata []byte MemberAssignment []byte }
func (*GroupMemberDescription) GetMemberAssignment ¶
func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)
func (*GroupMemberDescription) GetMemberMetadata ¶
func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)
type GroupProtocol ¶
type GroupProtocol struct { Name string Metadata []byte }
type HeartbeatRequest ¶
type HeartbeatRequest struct { GroupId string GenerationId int32 MemberId string }
type HeartbeatResponse ¶
type HeartbeatResponse struct { Err KError }
type JoinGroupRequest ¶
type JoinGroupRequest struct {
GroupId string
SessionTimeout int32
MemberId string
ProtocolType string
GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols
OrderedGroupProtocols []*GroupProtocol
}
func (*JoinGroupRequest) AddGroupProtocol ¶
func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)
func (*JoinGroupRequest) AddGroupProtocolMetadata ¶
func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error
type JoinGroupResponse ¶
type JoinGroupResponse struct { Err KError GenerationId int32 GroupProtocol string LeaderId string MemberId string Members map[string][]byte }
func (*JoinGroupResponse) GetMembers ¶
func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error)
type KError ¶
KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
type KError int16
Numeric error codes returned by the Kafka server.
const ( ErrNoError KError = 0 ErrUnknown KError = -1 ErrOffsetOutOfRange KError = 1 ErrInvalidMessage KError = 2 ErrUnknownTopicOrPartition KError = 3 ErrInvalidMessageSize KError = 4 ErrLeaderNotAvailable KError = 5 ErrNotLeaderForPartition KError = 6 ErrRequestTimedOut KError = 7 ErrBrokerNotAvailable KError = 8 ErrReplicaNotAvailable KError = 9 ErrMessageSizeTooLarge KError = 10 ErrStaleControllerEpochCode KError = 11 ErrOffsetMetadataTooLarge KError = 12 ErrNetworkException KError = 13 ErrOffsetsLoadInProgress KError = 14 ErrConsumerCoordinatorNotAvailable KError = 15 ErrNotCoordinatorForConsumer KError = 16 ErrInvalidTopic KError = 17 ErrMessageSetSizeTooLarge KError = 18 ErrNotEnoughReplicas KError = 19 ErrNotEnoughReplicasAfterAppend KError = 20 ErrInvalidRequiredAcks KError = 21 ErrIllegalGeneration KError = 22 ErrInconsistentGroupProtocol KError = 23 ErrInvalidGroupId KError = 24 ErrUnknownMemberId KError = 25 ErrInvalidSessionTimeout KError = 26 ErrRebalanceInProgress KError = 27 ErrInvalidCommitOffsetSize KError = 28 ErrTopicAuthorizationFailed KError = 29 ErrGroupAuthorizationFailed KError = 30 ErrClusterAuthorizationFailed KError = 31 ErrInvalidTimestamp KError = 32 ErrUnsupportedSASLMechanism KError = 33 ErrIllegalSASLState KError = 34 ErrUnsupportedVersion KError = 35 ErrTopicAlreadyExists KError = 36 ErrInvalidPartitions KError = 37 ErrInvalidReplicationFactor KError = 38 ErrInvalidReplicaAssignment KError = 39 ErrInvalidConfig KError = 40 ErrNotController KError = 41 ErrInvalidRequest KError = 42 ErrUnsupportedForMessageFormat KError = 43 ErrPolicyViolation KError = 44 )
func (KError) Error ¶
func (err KError) Error() string
type KafkaVersion ¶
KafkaVersion instances represent versions of the upstream Kafka broker.
type KafkaVersion struct {
// contains filtered or unexported fields
}
func (KafkaVersion) IsAtLeast ¶
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool
IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:
V1.IsAtLeast(V2) // false V2.IsAtLeast(V1) // true
type LeaveGroupRequest ¶
type LeaveGroupRequest struct { GroupId string MemberId string }
type LeaveGroupResponse ¶
type LeaveGroupResponse struct { Err KError }
type ListGroupsRequest ¶
type ListGroupsRequest struct { }
type ListGroupsResponse ¶
type ListGroupsResponse struct { Err KError Groups map[string]string }
type Message ¶
type Message struct { Codec CompressionCodec // codec used to compress the message contents Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap Version int8 // v1 requires Kafka 0.10 Timestamp time.Time // the timestamp of the message (version 1+ only) // contains filtered or unexported fields }
type MessageBlock ¶
type MessageBlock struct { Offset int64 Msg *Message }
func (*MessageBlock) Messages ¶
func (msb *MessageBlock) Messages() []*MessageBlock
Messages convenience helper which returns either all the messages that are wrapped in this block
type MessageSet ¶
type MessageSet struct {
PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
Messages []*MessageBlock
}
type MetadataRequest ¶
type MetadataRequest struct { Topics []string }
type MetadataResponse ¶
type MetadataResponse struct { Brokers []*Broker Topics []*TopicMetadata }
func (*MetadataResponse) AddBroker ¶
func (r *MetadataResponse) AddBroker(addr string, id int32)
func (*MetadataResponse) AddTopic ¶
func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata
func (*MetadataResponse) AddTopicPartition ¶
func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError)
type MockBroker ¶
MockBroker is a mock Kafka broker that is used in unit tests. It is exposed to facilitate testing of higher level or specialized consumers and producers built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, but rather provides a facility to do that. It takes care of the TCP transport, request unmarshaling, response marshaling, and makes it the test writer responsibility to program correct according to the Kafka API protocol MockBroker behaviour.
MockBroker is implemented as a TCP server listening on a kernel-selected localhost port that can accept many connections. It reads Kafka requests from that connection and returns responses programmed by the SetHandlerByMap function. If a MockBroker receives a request that it has no programmed response for, then it returns nothing and the request times out.
A set of MockRequest builders to define mappings used by MockBroker is provided by Sarama. But users can develop MockRequests of their own and use them along with or instead of the standard ones.
When running tests with MockBroker it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.
It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.
type MockBroker struct {
// contains filtered or unexported fields
}
func NewMockBroker ¶
func NewMockBroker(t TestReporter, brokerID int32) *MockBroker
NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestReporter and the broker exits.
func NewMockBrokerAddr ¶
func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker
NewMockBrokerAddr behaves like newMockBroker but listens on the address you give it rather than just some ephemeral port.
func (*MockBroker) Addr ¶
func (b *MockBroker) Addr() string
Addr returns the broker connection string in the form "<address>:<port>".
func (*MockBroker) BrokerID ¶
func (b *MockBroker) BrokerID() int32
BrokerID returns broker ID assigned to the broker.
func (*MockBroker) Close ¶
func (b *MockBroker) Close()
Close terminates the broker blocking until it stops internal goroutines and releases all resources.
func (*MockBroker) History ¶
func (b *MockBroker) History() []RequestResponse
History returns a slice of RequestResponse pairs in the order they were processed by the broker. Note that in case of multiple connections to the broker the order expected by a test can be different from the order recorded in the history, unless some synchronization is implemented in the test.
func (*MockBroker) Port ¶
func (b *MockBroker) Port() int32
Port returns the TCP port number the broker is listening for requests on.
func (*MockBroker) Returns ¶
func (b *MockBroker) Returns(e encoder)
func (*MockBroker) SetHandlerByMap ¶
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse)
SetHandlerByMap defines mapping of Request types to MockResponses. When a request is received by the broker, it looks up the request type in the map and uses the found MockResponse instance to generate an appropriate reply. If the request type is not found in the map then nothing is sent.
func (*MockBroker) SetLatency ¶
func (b *MockBroker) SetLatency(latency time.Duration)
SetLatency makes broker pause for the specified period every time before replying.
func (*MockBroker) SetNotifier ¶
func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc)
SetNotifier set a function that will get invoked whenever a request has been processed successfully and will provide the number of bytes read and written
type MockConsumerMetadataResponse ¶
MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
type MockConsumerMetadataResponse struct {
// contains filtered or unexported fields
}
func NewMockConsumerMetadataResponse ¶
func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse
func (*MockConsumerMetadataResponse) For ¶
func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder
func (*MockConsumerMetadataResponse) SetCoordinator ¶
func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse
func (*MockConsumerMetadataResponse) SetError ¶
func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse
type MockFetchResponse ¶
MockFetchResponse is a `FetchResponse` builder.
type MockFetchResponse struct {
// contains filtered or unexported fields
}
func NewMockFetchResponse ¶
func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse
func (*MockFetchResponse) For ¶
func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder
func (*MockFetchResponse) SetHighWaterMark ¶
func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse
func (*MockFetchResponse) SetMessage ¶
func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse
type MockMetadataResponse ¶
MockMetadataResponse is a `MetadataResponse` builder.
type MockMetadataResponse struct {
// contains filtered or unexported fields
}
func NewMockMetadataResponse ¶
func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse
func (*MockMetadataResponse) For ¶
func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder
func (*MockMetadataResponse) SetBroker ¶
func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse
func (*MockMetadataResponse) SetLeader ¶
func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse
type MockOffsetCommitResponse ¶
MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
type MockOffsetCommitResponse struct {
// contains filtered or unexported fields
}
func NewMockOffsetCommitResponse ¶
func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse
func (*MockOffsetCommitResponse) For ¶
func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder
func (*MockOffsetCommitResponse) SetError ¶
func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse
type MockOffsetFetchResponse ¶
MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
type MockOffsetFetchResponse struct {
// contains filtered or unexported fields
}
func NewMockOffsetFetchResponse ¶
func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse
func (*MockOffsetFetchResponse) For ¶
func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder
func (*MockOffsetFetchResponse) SetOffset ¶
func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse
type MockOffsetResponse ¶
MockOffsetResponse is an `OffsetResponse` builder.
type MockOffsetResponse struct {
// contains filtered or unexported fields
}
func NewMockOffsetResponse ¶
func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse
func (*MockOffsetResponse) For ¶
func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder
func (*MockOffsetResponse) SetOffset ¶
func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse
type MockProduceResponse ¶
MockProduceResponse is a `ProduceResponse` builder.
type MockProduceResponse struct {
// contains filtered or unexported fields
}
func NewMockProduceResponse ¶
func NewMockProduceResponse(t TestReporter) *MockProduceResponse
func (*MockProduceResponse) For ¶
func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder
func (*MockProduceResponse) SetError ¶
func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse
type MockResponse ¶
MockResponse is a response builder interface it defines one method that allows generating a response based on a request body. MockResponses are used to program behavior of MockBroker in tests.
type MockResponse interface { For(reqBody versionedDecoder) (res encoder) }
type MockSequence ¶
MockSequence is a mock response builder that is created from a sequence of concrete responses. Every time when a `MockBroker` calls its `For` method the next response from the sequence is returned. When the end of the sequence is reached the last element from the sequence is returned.
type MockSequence struct {
// contains filtered or unexported fields
}
func NewMockSequence ¶
func NewMockSequence(responses ...interface{}) *MockSequence
func (*MockSequence) For ¶
func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder)
type MockWrapper ¶
MockWrapper is a mock response builder that returns a particular concrete response regardless of the actual request passed to the `For` method.
type MockWrapper struct {
// contains filtered or unexported fields
}
func NewMockWrapper ¶
func NewMockWrapper(res encoder) *MockWrapper
func (*MockWrapper) For ¶
func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder)
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { ConsumerGroup string ConsumerGroupGeneration int32 // v1 or later ConsumerID string // v1 or later RetentionTime int64 // v2 or later // Version can be: // - 0 (kafka 0.8.1 and later) // - 1 (kafka 0.8.2 and later) // - 2 (kafka 0.9.0 and later) Version int16 // contains filtered or unexported fields }
func (*OffsetCommitRequest) AddBlock ¶
func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)
type OffsetCommitResponse ¶
type OffsetCommitResponse struct { Errors map[string]map[int32]KError }
func (*OffsetCommitResponse) AddError ¶
func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError)
type OffsetFetchRequest ¶
type OffsetFetchRequest struct {
ConsumerGroup string
Version int16
// contains filtered or unexported fields
}
func (*OffsetFetchRequest) AddPartition ¶
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)
type OffsetFetchResponse ¶
type OffsetFetchResponse struct { Blocks map[string]map[int32]*OffsetFetchResponseBlock }
func (*OffsetFetchResponse) AddBlock ¶
func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)
func (*OffsetFetchResponse) GetBlock ¶
func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock
type OffsetFetchResponseBlock ¶
type OffsetFetchResponseBlock struct { Offset int64 Metadata string Err KError }
type OffsetManager ¶
OffsetManager uses Kafka to store and fetch consumed partition offsets.
type OffsetManager interface { // ManagePartition creates a PartitionOffsetManager on the given topic/partition. // It will return an error if this OffsetManager is already managing the given // topic/partition. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) // Close stops the OffsetManager from managing offsets. It is required to call // this function before an OffsetManager object passes out of scope, as it // will otherwise leak memory. You must call this after all the // PartitionOffsetManagers are closed. Close() error }
func NewOffsetManagerFromClient ¶
func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error)
NewOffsetManagerFromClient creates a new OffsetManager from the given client. It is still necessary to call Close() on the underlying client when finished with the partition manager.
type OffsetRequest ¶
type OffsetRequest struct {
Version int16
// contains filtered or unexported fields
}
func (*OffsetRequest) AddBlock ¶
func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32)
type OffsetResponse ¶
type OffsetResponse struct { Version int16 Blocks map[string]map[int32]*OffsetResponseBlock }
func (*OffsetResponse) AddTopicPartition ¶
func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)
func (*OffsetResponse) GetBlock ¶
func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock
type OffsetResponseBlock ¶
type OffsetResponseBlock struct { Err KError Offsets []int64 // Version 0 Offset int64 // Version 1 Timestamp int64 // Version 1 }
type PacketDecodingError ¶
PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
type PacketDecodingError struct { Info string }
func (PacketDecodingError) Error ¶
func (err PacketDecodingError) Error() string
type PacketEncodingError ¶
PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
type PacketEncodingError struct { Info string }
func (PacketEncodingError) Error ¶
func (err PacketEncodingError) Error() string
type PartitionConsumer ¶
PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close() or AsyncClose() on a PartitionConsumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.
The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
type PartitionConsumer interface { // AsyncClose initiates a shutdown of the PartitionConsumer. This method will // return immediately, after which you should wait until the 'messages' and // 'errors' channel are drained. It is required to call this function, or // Close before a consumer object passes out of scope, as it will otherwise // leak memory. You must call this before calling Close on the underlying client. AsyncClose() // Close stops the PartitionConsumer from fetching messages. It is required to // call this function (or AsyncClose) before a consumer object passes out of // scope, as it will otherwise leak memory. You must call this before calling // Close on the underlying client. Close() error // Messages returns the read channel for the messages that are returned by // the broker. Messages() <-chan *ConsumerMessage // Errors returns a read channel of errors that occurred during consuming, if // enabled. By default, errors are logged and not returned over this channel. // If you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. Errors() <-chan *ConsumerError // HighWaterMarkOffset returns the high water mark offset of the partition, // i.e. the offset that will be used for the next message that will be produced. // You can use this to determine how far behind the processing is. HighWaterMarkOffset() int64 }
type PartitionMetadata ¶
type PartitionMetadata struct { Err KError ID int32 Leader int32 Replicas []int32 Isr []int32 }
type PartitionOffsetManager ¶
PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.
type PartitionOffsetManager interface { // NextOffset returns the next offset that should be consumed for the managed // partition, accompanied by metadata which can be used to reconstruct the state // of the partition consumer when it resumes. NextOffset() will return // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset // was committed for this partition yet. NextOffset() (int64, string) // MarkOffset marks the provided offset, alongside a metadata string // that represents the state of the partition consumer at that point in time. The // metadata string can be used by another consumer to restore that state, so it // can resume consumption. // // To follow upstream conventions, you are expected to mark the offset of the // next message to read, not the last message read. Thus, when calling `MarkOffset` // you should typically add one to the offset of the last consumed message. // // Note: calling MarkOffset does not necessarily commit the offset to the backend // store immediately for efficiency reasons, and it may never be committed if // your application crashes. This means that you may end up processing the same // message twice, and your processing should ideally be idempotent. MarkOffset(offset int64, metadata string) // Errors returns a read channel of errors that occur during offset management, if // enabled. By default, errors are logged and not returned over this channel. If // you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. Errors() <-chan *ConsumerError // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will // return immediately, after which you should wait until the 'errors' channel has // been drained and closed. It is required to call this function, or Close before // a consumer object passes out of scope, as it will otherwise leak memory. You // must call this before calling Close on the underlying client. AsyncClose() // Close stops the PartitionOffsetManager from managing offsets. It is required to // call this function (or AsyncClose) before a PartitionOffsetManager object // passes out of scope, as it will otherwise leak memory. You must call this // before calling Close on the underlying client. Close() error }
type Partitioner ¶
Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.
type Partitioner interface { // Partition takes a message and partition count and chooses a partition Partition(message *ProducerMessage, numPartitions int32) (int32, error) // RequiresConsistency indicates to the user of the partitioner whether the // mapping of key->partition is consistent or not. Specifically, if a // partitioner requires consistency then it must be allowed to choose from all // partitions (even ones known to be unavailable), and its choice must be // respected by the caller. The obvious example is the HashPartitioner. RequiresConsistency() bool }
▹ Example (Manual)
▹ Example (Random)
func NewHashPartitioner ¶
func NewHashPartitioner(topic string) Partitioner
NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.
func NewManualPartitioner ¶
func NewManualPartitioner(topic string) Partitioner
NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided ProducerMessage's Partition field as the partition to produce to.
func NewRandomPartitioner ¶
func NewRandomPartitioner(topic string) Partitioner
NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
func NewRoundRobinPartitioner ¶
func NewRoundRobinPartitioner(topic string) Partitioner
NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
type PartitionerConstructor ¶
PartitionerConstructor is the type for a function capable of constructing new Partitioners.
type PartitionerConstructor func(topic string) Partitioner
func NewCustomHashPartitioner ¶
func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor
NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
type ProduceRequest ¶
type ProduceRequest struct { RequiredAcks RequiredAcks Timeout int32 Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10 // contains filtered or unexported fields }
func (*ProduceRequest) AddMessage ¶
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)
func (*ProduceRequest) AddSet ¶
func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)
type ProduceResponse ¶
type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseBlock
Version int16
ThrottleTime time.Duration // only provided if Version >= 1
}
func (*ProduceResponse) AddTopicPartition ¶
func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)
func (*ProduceResponse) GetBlock ¶
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock
type ProduceResponseBlock ¶
type ProduceResponseBlock struct {
Err KError
Offset int64
// only provided if Version >= 2 and the broker is configured with `LogAppendTime`
Timestamp time.Time
}
type ProducerError ¶
ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.
type ProducerError struct { Msg *ProducerMessage Err error }
func (ProducerError) Error ¶
func (pe ProducerError) Error() string
type ProducerErrors ¶
ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.
type ProducerErrors []*ProducerError
func (ProducerErrors) Error ¶
func (pe ProducerErrors) Error() string
type ProducerMessage ¶
ProducerMessage is the collection of elements passed to the Producer in order to send a message.
type ProducerMessage struct { Topic string // The Kafka topic for this message. // The partitioning key for this message. Pre-existing Encoders include // StringEncoder and ByteEncoder. Key Encoder // The actual message to store in Kafka. Pre-existing Encoders include // StringEncoder and ByteEncoder. Value Encoder // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for // pass-through data. Metadata interface{} // Offset is the offset of the message stored on the broker. This is only // guaranteed to be defined if the message was successfully delivered and // RequiredAcks is not NoResponse. Offset int64 // Partition is the partition that the message was sent to. This is only // guaranteed to be defined if the message was successfully delivered. Partition int32 // Timestamp is the timestamp assigned to the message by the broker. This // is only guaranteed to be defined if the message was successfully // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at // least version 0.10.0. Timestamp time.Time // contains filtered or unexported fields }
type RequestNotifierFunc ¶
RequestNotifierFunc is invoked when a mock broker processes a request successfully and will provides the number of bytes read and written.
type RequestNotifierFunc func(bytesRead, bytesWritten int)
type RequestResponse ¶
RequestResponse represents a Request/Response pair processed by MockBroker.
type RequestResponse struct { Request protocolBody Response encoder }
type RequiredAcks ¶
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).
type RequiredAcks int16
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1 )
type SaslHandshakeRequest ¶
type SaslHandshakeRequest struct { Mechanism string }
type SaslHandshakeResponse ¶
type SaslHandshakeResponse struct { Err KError EnabledMechanisms []string }
type StdLogger ¶
StdLogger is used to log error messages.
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
Logger is the instance of a StdLogger interface that Sarama writes connection management events to. By default it is set to discard all log messages via ioutil.Discard, but you can set it to redirect wherever you want.
var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
type StringEncoder ¶
StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.
type StringEncoder string
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
func (StringEncoder) Length ¶
func (s StringEncoder) Length() int
type SyncGroupRequest ¶
type SyncGroupRequest struct { GroupId string GenerationId int32 MemberId string GroupAssignments map[string][]byte }
func (*SyncGroupRequest) AddGroupAssignment ¶
func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte)
func (*SyncGroupRequest) AddGroupAssignmentMember ¶
func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error
type SyncGroupResponse ¶
type SyncGroupResponse struct { Err KError MemberAssignment []byte }
func (*SyncGroupResponse) GetMemberAssignment ¶
func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)
type SyncProducer ¶
SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.
type SyncProducer interface { // SendMessage produces a given message, and returns only when it either has // succeeded or failed to produce. It will return the partition and the offset // of the produced message, or an error if the message failed to produce. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) // SendMessages produces a given set of messages, and returns only when all // messages in the set have either succeeded or failed. Note that messages // can succeed and fail individually; if some succeed and some fail, // SendMessages will return an error. SendMessages(msgs []*ProducerMessage) error // Close shuts down the producer and waits for any buffered messages to be // flushed. You must call this function before a producer object passes out of // scope, as it may otherwise leak memory. You must call this before calling // Close on the underlying client. Close() error }
▹ Example
func NewSyncProducer ¶
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error)
NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
func NewSyncProducerFromClient ¶
func NewSyncProducerFromClient(client Client) (SyncProducer, error)
NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.
type TestReporter ¶
TestReporter has methods matching go's testing.T to avoid importing `testing` in the main part of the library.
type TestReporter interface { Error(...interface{}) Errorf(string, ...interface{}) Fatal(...interface{}) Fatalf(string, ...interface{}) }
type TopicMetadata ¶
type TopicMetadata struct { Err KError Name string Partitions []*PartitionMetadata }
Subdirectories
Name | Synopsis |
---|---|
.. | |
examples | |
http_server | |
mocks | Package mocks provides mocks that can be used for testing applications that use Sarama. |
tools | |
kafka-console-consumer | |
kafka-console-partitionconsumer | |
kafka-console-producer |