Note that there are some explanatory texts on larger screens.

plurals
  1. POBest way to implement global counters for highly concurrent applications?
    primarykey
    data
    text
    <p>What is the best way to implement global counters for a highly concurrent application? In my case I may have 10K-20K go routines performing "work", and I want to count the number and types of items that the routines are working on collectively...</p> <p>The "classic" synchronous coding style would look like:</p> <pre><code>var work_counter int func GoWorkerRoutine() { for { // do work atomic.AddInt32(&amp;work_counter,1) } } </code></pre> <p>Now this gets more complicated because I want to track the "type" of work being done, so really I'd need something like this:</p> <pre><code>var work_counter map[string]int var work_mux sync.Mutex func GoWorkerRoutine() { for { // do work work_mux.Lock() work_counter["type1"]++ work_mux.Unlock() } } </code></pre> <p>It seems like there should be a "go" optimized way using channels or something similar to this:</p> <pre><code>var work_counter int var work_chan chan int // make() called somewhere else (buffered) // started somewher else func GoCounterRoutine() { for { select { case c := &lt;- work_chan: work_counter += c break } } } func GoWorkerRoutine() { for { // do work work_chan &lt;- 1 } } </code></pre> <p>This last example is still missing the map, but that's easy enough to add. Will this style provide better performance than just a simple atomic increment? I can't tell if this is more or less complicated when we're talking about concurrent access to a global value versus something that may block on I/O to complete...</p> <p>Thoughts are appreciated.</p> <p>Update 5/28/2013:</p> <p>I tested a couple implementations, and the results were not what I expected, here's my counter source code:</p> <pre><code>package helpers import ( ) type CounterIncrementStruct struct { bucket string value int } type CounterQueryStruct struct { bucket string channel chan int } var counter map[string]int var counterIncrementChan chan CounterIncrementStruct var counterQueryChan chan CounterQueryStruct var counterListChan chan chan map[string]int func CounterInitialize() { counter = make(map[string]int) counterIncrementChan = make(chan CounterIncrementStruct,0) counterQueryChan = make(chan CounterQueryStruct,100) counterListChan = make(chan chan map[string]int,100) go goCounterWriter() } func goCounterWriter() { for { select { case ci := &lt;- counterIncrementChan: if len(ci.bucket)==0 { return } counter[ci.bucket]+=ci.value break case cq := &lt;- counterQueryChan: val,found:=counter[cq.bucket] if found { cq.channel &lt;- val } else { cq.channel &lt;- -1 } break case cl := &lt;- counterListChan: nm := make(map[string]int) for k, v := range counter { nm[k] = v } cl &lt;- nm break } } } func CounterIncrement(bucket string, counter int) { if len(bucket)==0 || counter==0 { return } counterIncrementChan &lt;- CounterIncrementStruct{bucket,counter} } func CounterQuery(bucket string) int { if len(bucket)==0 { return -1 } reply := make(chan int) counterQueryChan &lt;- CounterQueryStruct{bucket,reply} return &lt;- reply } func CounterList() map[string]int { reply := make(chan map[string]int) counterListChan &lt;- reply return &lt;- reply } </code></pre> <p>It uses channels for both writes and reads which seems logical.</p> <p>Here are my test cases:</p> <pre><code>func bcRoutine(b *testing.B,e chan bool) { for i := 0; i &lt; b.N; i++ { CounterIncrement("abc123",5) CounterIncrement("def456",5) CounterIncrement("ghi789",5) CounterIncrement("abc123",5) CounterIncrement("def456",5) CounterIncrement("ghi789",5) } e&lt;-true } func BenchmarkChannels(b *testing.B) { b.StopTimer() CounterInitialize() e:=make(chan bool) b.StartTimer() go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) &lt;-e &lt;-e &lt;-e &lt;-e &lt;-e } var mux sync.Mutex var m map[string]int func bmIncrement(bucket string, value int) { mux.Lock() m[bucket]+=value mux.Unlock() } func bmRoutine(b *testing.B,e chan bool) { for i := 0; i &lt; b.N; i++ { bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) } e&lt;-true } func BenchmarkMutex(b *testing.B) { b.StopTimer() m=make(map[string]int) e:=make(chan bool) b.StartTimer() for i := 0; i &lt; b.N; i++ { bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) } go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) &lt;-e &lt;-e &lt;-e &lt;-e &lt;-e } </code></pre> <p>I implemented a simple benchmark with just a mutex around the map (just testing writes), and benchmarked both with 5 goroutines running in parallel. Here are the results:</p> <pre><code>$ go test --bench=. helpers PASS BenchmarkChannels 100000 15560 ns/op BenchmarkMutex 1000000 2669 ns/op ok helpers 4.452s </code></pre> <p>I would not have expected the mutex to be that much faster...</p> <p>Further thoughts?</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload