Package bundler
Overview ▹
Index ▹
Constants
const ( DefaultDelayThreshold = time.Second DefaultBundleCountThreshold = 10 DefaultBundleByteThreshold = 1e6 // 1M DefaultBufferedByteLimit = 1e9 // 1G )
Variables
var ( // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit. ErrOverflow = errors.New("bundler reached buffered byte limit") // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size. ErrOversizedItem = errors.New("item size exceeds bundle byte limit") )
type Bundler ¶
A Bundler collects items added to it into a bundle until the bundle exceeds a given size, then calls a user-provided function to handle the bundle.
type Bundler struct { // Starting from the time that the first message is added to a bundle, once // this delay has passed, handle the bundle. The default is DefaultDelayThreshold. DelayThreshold time.Duration // Once a bundle has this many items, handle the bundle. Since only one // item at a time is added to a bundle, no bundle will exceed this // threshold, so it also serves as a limit. The default is // DefaultBundleCountThreshold. BundleCountThreshold int // Once the number of bytes in current bundle reaches this threshold, handle // the bundle. The default is DefaultBundleByteThreshold. This triggers handling, // but does not cap the total size of a bundle. BundleByteThreshold int // The maximum size of a bundle, in bytes. Zero means unlimited. BundleByteLimit int // The maximum number of bytes that the Bundler will keep in memory before // returning ErrOverflow. The default is DefaultBufferedByteLimit. BufferedByteLimit int // contains filtered or unexported fields }
func NewBundler ¶
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler
NewBundler creates a new Bundler.
itemExample is a value of the type that will be bundled. For example, if you want to create bundles of *Entry, you could pass &Entry{} for itemExample.
handler is a function that will be called on each bundle. If itemExample is of type T, the argument to handler is of type []T. handler is always called sequentially for each bundle, and never in parallel.
Configure the Bundler by setting its thresholds and limits before calling any of its methods.
func (*Bundler) Add ¶
func (b *Bundler) Add(item interface{}, size int) error
Add adds item to the current bundle. It marks the bundle for handling and starts a new one if any of the thresholds or limits are exceeded.
If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then the item can never be handled. Add returns ErrOversizedItem in this case.
If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for memory, Add returns ErrOverflow.
Add never blocks.
func (*Bundler) AddWait ¶
func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error
AddWait adds item to the current bundle. It marks the bundle for handling and starts a new one if any of the thresholds or limits are exceeded.
If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then the item can never be handled. AddWait returns ErrOversizedItem in this case.
If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), AddWait blocks until space is available or ctx is done.
Calls to Add and AddWait should not be mixed on the same Bundler.
func (*Bundler) Flush ¶
func (b *Bundler) Flush()
Flush invokes the handler for all remaining items in the Bundler and waits for it to return.