mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
176 lines
3.6 KiB
Go
176 lines
3.6 KiB
Go
// Copyright (c) 2012-2016 Eli Janssen
|
|
// Use of this source code is governed by an MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package statsd
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var senderPool = newBufferPool()
|
|
|
|
// BufferedSender provides a buffered statsd udp, sending multiple
|
|
// metrics, where possible.
|
|
type BufferedSender struct {
|
|
sender Sender
|
|
flushBytes int
|
|
flushInterval time.Duration
|
|
// buffers
|
|
bufmx sync.Mutex
|
|
buffer *bytes.Buffer
|
|
bufs chan *bytes.Buffer
|
|
// lifecycle
|
|
runmx sync.RWMutex
|
|
shutdown chan chan error
|
|
running bool
|
|
}
|
|
|
|
// Send bytes.
|
|
func (s *BufferedSender) Send(data []byte) (int, error) {
|
|
s.runmx.RLock()
|
|
if !s.running {
|
|
s.runmx.RUnlock()
|
|
return 0, fmt.Errorf("BufferedSender is not running")
|
|
}
|
|
|
|
s.withBufferLock(func() {
|
|
blen := s.buffer.Len()
|
|
if blen > 0 && blen+len(data)+1 >= s.flushBytes {
|
|
s.swapnqueue()
|
|
}
|
|
|
|
s.buffer.Write(data)
|
|
s.buffer.WriteByte('\n')
|
|
|
|
if s.buffer.Len() >= s.flushBytes {
|
|
s.swapnqueue()
|
|
}
|
|
})
|
|
s.runmx.RUnlock()
|
|
return len(data), nil
|
|
}
|
|
|
|
// Close Buffered Sender
|
|
func (s *BufferedSender) Close() error {
|
|
// since we are running, write lock during cleanup
|
|
s.runmx.Lock()
|
|
defer s.runmx.Unlock()
|
|
if !s.running {
|
|
return nil
|
|
}
|
|
|
|
errChan := make(chan error)
|
|
s.running = false
|
|
s.shutdown <- errChan
|
|
return <-errChan
|
|
}
|
|
|
|
// Start Buffered Sender
|
|
// Begins ticker and read loop
|
|
func (s *BufferedSender) Start() {
|
|
// write lock to start running
|
|
s.runmx.Lock()
|
|
defer s.runmx.Unlock()
|
|
if s.running {
|
|
return
|
|
}
|
|
|
|
s.running = true
|
|
s.bufs = make(chan *bytes.Buffer, 32)
|
|
go s.run()
|
|
}
|
|
|
|
func (s *BufferedSender) withBufferLock(fn func()) {
|
|
s.bufmx.Lock()
|
|
fn()
|
|
s.bufmx.Unlock()
|
|
}
|
|
|
|
func (s *BufferedSender) swapnqueue() {
|
|
if s.buffer.Len() == 0 {
|
|
return
|
|
}
|
|
ob := s.buffer
|
|
nb := senderPool.Get()
|
|
s.buffer = nb
|
|
s.bufs <- ob
|
|
}
|
|
|
|
func (s *BufferedSender) run() {
|
|
ticker := time.NewTicker(s.flushInterval)
|
|
defer ticker.Stop()
|
|
|
|
doneChan := make(chan bool)
|
|
go func() {
|
|
for buf := range s.bufs {
|
|
s.flush(buf)
|
|
senderPool.Put(buf)
|
|
}
|
|
doneChan <- true
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.withBufferLock(func() {
|
|
s.swapnqueue()
|
|
})
|
|
case errChan := <-s.shutdown:
|
|
s.withBufferLock(func() {
|
|
s.swapnqueue()
|
|
})
|
|
close(s.bufs)
|
|
<-doneChan
|
|
errChan <- s.sender.Close()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// send to remove endpoint and truncate buffer
|
|
func (s *BufferedSender) flush(b *bytes.Buffer) (int, error) {
|
|
bb := b.Bytes()
|
|
bbl := len(bb)
|
|
if bb[bbl-1] == '\n' {
|
|
bb = bb[:bbl-1]
|
|
}
|
|
//n, err := s.sender.Send(bytes.TrimSuffix(b.Bytes(), []byte("\n")))
|
|
n, err := s.sender.Send(bb)
|
|
b.Truncate(0) // clear the buffer
|
|
return n, err
|
|
}
|
|
|
|
// NewBufferedSender returns a new BufferedSender
|
|
//
|
|
// addr is a string of the format "hostname:port", and must be parsable by
|
|
// net.ResolveUDPAddr.
|
|
//
|
|
// flushInterval is a time.Duration, and specifies the maximum interval for
|
|
// packet sending. Note that if you send lots of metrics, you will send more
|
|
// often. This is just a maximal threshold.
|
|
//
|
|
// flushBytes specifies the maximum udp packet size you wish to send. If adding
|
|
// a metric would result in a larger packet than flushBytes, the packet will
|
|
// first be send, then the new data will be added to the next packet.
|
|
func NewBufferedSender(addr string, flushInterval time.Duration, flushBytes int) (Sender, error) {
|
|
simpleSender, err := NewSimpleSender(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sender := &BufferedSender{
|
|
flushBytes: flushBytes,
|
|
flushInterval: flushInterval,
|
|
sender: simpleSender,
|
|
buffer: senderPool.Get(),
|
|
shutdown: make(chan chan error),
|
|
}
|
|
|
|
sender.Start()
|
|
return sender, nil
|
|
}
|