Finally rid of capital Sirupsen??

This commit is contained in:
Travis Reeder
2017-09-18 23:33:47 -07:00
parent f335d34636
commit 3aecebdf48
1127 changed files with 41199 additions and 41383 deletions

View File

@@ -612,6 +612,9 @@ func (client *client) backgroundMetadataUpdater() {
if specificTopics, err := client.Topics(); err != nil {
Logger.Println("Client background metadata topic load:", err)
break
} else if len(specificTopics) == 0 {
Logger.Println("Client background metadata update: no specific topics to update")
break
} else {
topics = specificTopics
}

View File

@@ -196,11 +196,23 @@ type Config struct {
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration
// The maximum amount of time the consumer expects a message takes to process
// for the user. If writing to the Messages channel takes longer than this,
// that partition will stop fetching more messages until it can proceed again.
// The maximum amount of time the consumer expects a message takes to
// process for the user. If writing to the Messages channel takes longer
// than this, that partition will stop fetching more messages until it
// can proceed again.
// Note that, since the Messages channel is buffered, the actual grace time is
// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
// If a message is not written to the Messages channel between two ticks
// of the expiryTicker then a timeout is detected.
// Using a ticker instead of a timer to detect timeouts should typically
// result in many fewer calls to Timer functions which may result in a
// significant performance improvement if many messages are being sent
// and timeouts are infrequent.
// The disadvantage of using a ticker instead of a timer is that
// timeouts will be less accurate. That is, the effective timeout could
// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
// between two messages being sent may not be recognized as a timeout.
MaxProcessingTime time.Duration
// Return specifies what channels will be populated. If they are set to true,

View File

@@ -440,35 +440,37 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
expireTimedOut := false
msgSent := false
feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
for i, msg := range msgs {
if !expiryTimer.Stop() && !expireTimedOut {
// expiryTimer was expired; clear out the waiting msg
<-expiryTimer.C
}
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
expireTimedOut = false
messageSelect:
select {
case child.messages <- msg:
case <-expiryTimer.C:
expireTimedOut = true
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
msgSent = true
case <-expiryTicker.C:
if !msgSent {
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
}
child.broker.input <- child
continue feederLoop
} else {
// current message has not been sent, return to select
// statement
msgSent = false
goto messageSelect
}
child.broker.input <- child
continue feederLoop
}
}
expiryTicker.Stop()
child.broker.acks.Done()
}

View File

@@ -803,6 +803,48 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
broker0.Close()
}
func TestConsumerExpiryTicker(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
fetchResponse1 := &FetchResponse{}
for i := 1; i <= 8; i++ {
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
}
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 1),
"FetchRequest": NewMockSequence(fetchResponse1),
})
config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
}
// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}
// Then: messages with offsets 1 through 8 are read
for i := 1; i <= 8; i++ {
assertMessageOffset(t, <-consumer.Messages(), int64(i))
time.Sleep(2 * time.Millisecond)
}
safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}
func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
if msg.Offset != expectedOffset {
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

View File

@@ -2,6 +2,7 @@ package sarama
import (
"runtime"
"strings"
"testing"
"time"
)
@@ -106,7 +107,7 @@ func TestMessageEncoding(t *testing.T) {
message.Value = []byte{}
message.Codec = CompressionGZIP
if runtime.Version() == "go1.8" {
if runtime.Version() == "go1.8" || strings.HasPrefix(runtime.Version(), "go1.8.") {
testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
} else {
testEncodable(t, "empty gzip", &message, emptyGzipMessage)

View File

@@ -151,6 +151,13 @@ type PartitionOffsetManager interface {
// message twice, and your processing should ideally be idempotent.
MarkOffset(offset int64, metadata string)
// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
// reset an offset to an earlier or smaller value, where MarkOffset only
// allows incrementing the offset. cf MarkOffset for more details.
ResetOffset(offset int64, metadata string)
// Errors returns a read channel of errors that occur during offset management, if
// enabled. By default, errors are logged and not returned over this channel. If
// you want to implement any custom error handling, set your config's
@@ -329,6 +336,17 @@ func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
}
}
func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
pom.lock.Lock()
defer pom.lock.Unlock()
if offset < pom.offset {
pom.offset = offset
pom.metadata = metadata
pom.dirty = true
}
}
func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
pom.lock.Lock()
defer pom.lock.Unlock()

View File

@@ -204,6 +204,70 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
safeClose(t, testClient)
}
func TestPartitionOffsetManagerResetOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
coordinator.Returns(ocResponse)
expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
actual, meta := pom.NextOffset()
if actual != expected {
t.Errorf("Expected offset %v. Actual: %v", expected, actual)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}
func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
testClient.Config().Consumer.Offsets.Retention = time.Hour
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
if req.body.version() != 2 {
t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
}
offsetCommitRequest := req.body.(*OffsetCommitRequest)
if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
}
return ocResponse
}
coordinator.setHandler(handler)
expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
actual, meta := pom.NextOffset()
if actual != expected {
t.Errorf("Expected offset %v. Actual: %v", expected, actual)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}
func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")