Note that there are some explanatory texts on larger screens.

plurals
  1. POint queue with compare and swap has race condition
    text
    copied!<p>I have written a synchronised queue for holding integers and am faced with a weird race condition which I cannot seem to be able to understand.</p> <p>Please do <strong>NOT</strong> post solutions, I know how to fix the code and make it work, I want to know what the race condition is and why it is not working as intended. Please help me understand what is going wrong and why.</p> <p>First the important part of the code:</p> <p>This assumes that the application will never put in more then the buffer can hold, thus no check for the current buffer size</p> <pre><code>static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) { if (value) { // 0 values are not allowed to be put in size_t write_offset; // holds a current copy of the array index where to put the element for (;;) { // retrieve up to date write_offset copy and apply power-of-two modulus write_offset = int_queue-&gt;write_offset &amp; int_queue-&gt;modulus; // if that cell currently holds 0 (thus is empty) if (!int_queue-&gt;int_container[write_offset]) // Appetmt to compare and swap the new value in if (__sync_bool_compare_and_swap(&amp;(int_queue-&gt;int_container[write_offset]), (long int)0, value)) // if successful then this thread was the first do do this, terminate the loop, else try again break; } // increment write offset signaling other threads where the next free cell is int_queue-&gt;write_offset++; // doing a synchronised increment here does not fix the race condition } } </code></pre> <p>This seems to have a rare race condition which seems to not increment the <code>write_offset</code>. Tested on OS X gcc 4.2, Intel Core i5 quadcore and Linux Intel C Compiler 12 on RedHat 2.6.32 Intel(R) Xeon(R). Both produce race conditions.</p> <p>Full source with test cases:</p> <pre><code>#include &lt;pthread.h&gt; #include &lt;stdlib.h&gt; #include &lt;stdio.h&gt; #include &lt;unistd.h&gt; #include &lt;stdint.h&gt; // #include "int_queue.h" #include &lt;stddef.h&gt; #include &lt;string.h&gt; #include &lt;unistd.h&gt; #include &lt;sys/mman.h&gt; #ifndef INT_QUEUE_H #define INT_QUEUE_H #ifndef MAP_ANONYMOUS #define MAP_ANONYMOUS MAP_ANON #endif struct int_queue_s { size_t size; size_t modulus; volatile size_t read_offset; volatile size_t write_offset; volatile long int int_container[0]; }; static inline void int_queue_put(struct int_queue_s * const __restrict int_queue, const long int value ) { if (value) { int_queue-&gt;int_container[int_queue-&gt;write_offset &amp; int_queue-&gt;modulus] = value; int_queue-&gt;write_offset++; } } static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) { if (value) { size_t write_offset; for (;;) { write_offset = int_queue-&gt;write_offset &amp; int_queue-&gt;modulus; if (!int_queue-&gt;int_container[write_offset]) if (__sync_bool_compare_and_swap(&amp;(int_queue-&gt;int_container[write_offset]), (long int)0, value)) break; } int_queue-&gt;write_offset++; } } static inline long int int_queue_get(struct int_queue_s * const __restrict int_queue) { size_t read_offset = int_queue-&gt;read_offset &amp; int_queue-&gt;modulus; if (int_queue-&gt;write_offset != int_queue-&gt;read_offset) { const long int value = int_queue-&gt;int_container[read_offset]; int_queue-&gt;int_container[read_offset] = 0; int_queue-&gt;read_offset++; return value; } else return 0; } static inline long int int_queue_get_sync(struct int_queue_s * const __restrict int_queue) { size_t read_offset; long int volatile value; for (;;) { read_offset = int_queue-&gt;read_offset; if (int_queue-&gt;write_offset == read_offset) return 0; read_offset &amp;= int_queue-&gt;modulus; value = int_queue-&gt;int_container[read_offset]; if (value) if (__sync_bool_compare_and_swap(&amp;(int_queue-&gt;int_container[read_offset]), (long int)value, (long int)0)) break; } int_queue-&gt;read_offset++; return value; } static inline struct int_queue_s * int_queue_create(size_t num_values) { struct int_queue_s * int_queue; size_t modulus; size_t temp = num_values + 1; do { modulus = temp; temp--; temp &amp;= modulus; } while (temp); modulus &lt;&lt;= 1; size_t int_queue_mem = sizeof(*int_queue) + ( sizeof(int_queue-&gt;int_container[0]) * modulus); if (int_queue_mem % sysconf(_SC_PAGE_SIZE)) int_queue_mem += sysconf(_SC_PAGE_SIZE) - (int_queue_mem % sysconf(_SC_PAGE_SIZE)); int_queue = mmap(NULL, int_queue_mem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE , -1, 0); if (int_queue == MAP_FAILED) return NULL; int_queue-&gt;modulus = modulus-1; int_queue-&gt;read_offset = 0; int_queue-&gt;write_offset = 0; int_queue-&gt;size = num_values; memset((void*)int_queue-&gt;int_container, 0, sizeof(int_queue-&gt;int_container[0]) * modulus); size_t i; for (i = 0; i &lt; num_values; ) { int_queue_put(int_queue, ++i ); } return int_queue; } #endif void * test_int_queue_thread(struct int_queue_s * int_queue) { long int value; size_t i; for (i = 0; i &lt; 10000000; i++) { int waited = -1; do { value = int_queue_get_sync(int_queue); waited++; } while (!value); if (waited &gt; 0) { printf("waited %d cycles to get a new value\n", waited); // continue; } // else { printf("thread %p got value %ld, i = %zu\n", (void *)pthread_self(), value, i); // } int timesleep = rand(); timesleep &amp;= 0xFFF; usleep(timesleep); int_queue_put_sync(int_queue, value); printf("thread %p put value %ld back, i = %zu\n", (void *)pthread_self(), value, i); } return NULL; } int main(int argc, char ** argv) { struct int_queue_s * int_queue = int_queue_create(2); if (!int_queue) { fprintf(stderr, "error initializing int_queue\n"); return -1; } srand(0); long int value[100]; size_t i; for (i = 0; i &lt; 100; i++) { value[0] = int_queue_get(int_queue); if (!value[0]) { printf("error getting value\n"); } else { printf("got value %ld\n", value[0]); } int_queue_put(int_queue, value[0]); printf("put value %ld back successfully\n", value[0]); } pthread_t threads[100]; for (i = 0; i &lt; 4; i++) { pthread_create(threads + i, NULL, (void * (*)(void *))test_int_queue_thread, int_queue); } for (i = 0; i &lt; 4; i++) { pthread_join(threads[i], NULL); } return 0; } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload