spin_lock(&producer_lock);
unsigned long head = buffer->head;
+ /* The spin_unlock() and next spin_lock() provide needed ordering. */
unsigned long tail = ACCESS_ONCE(buffer->tail);
if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
produce_item(item);
- smp_wmb(); /* commit the item before incrementing the head */
-
- buffer->head = (head + 1) & (buffer->size - 1);
+ smp_store_release(buffer->head,
+ (head + 1) & (buffer->size - 1));
/* wake_up() will make sure that the head is committed before
* waking anyone up */
before the head index makes it available to the consumer and then instructs the
CPU that the revised head index must be written before the consumer is woken.
-Note that wake_up() doesn't have to be the exact mechanism used, but whatever
-is used must guarantee a (write) memory barrier between the update of the head
-index and the change of state of the consumer, if a change of state occurs.
+Note that wake_up() does not guarantee any sort of barrier unless something
+is actually awakened. We therefore cannot rely on it for ordering. However,
+there is always one element of the array left empty. Therefore, the
+producer must produce two elements before it could possibly corrupt the
+element currently being read by the consumer. Therefore, the unlock-lock
+pair between consecutive invocations of the consumer provides the necessary
+ordering between the read of the index indicating that the consumer has
+vacated a given element and the write by the producer to that same element.
THE CONSUMER
spin_lock(&consumer_lock);
- unsigned long head = ACCESS_ONCE(buffer->head);
+ /* Read index before reading contents at that index. */
+ unsigned long head = smp_load_acquire(buffer->head);
unsigned long tail = buffer->tail;
if (CIRC_CNT(head, tail, buffer->size) >= 1) {
- /* read index before reading contents at that index */
- smp_read_barrier_depends();
/* extract one item from the buffer */
struct item *item = buffer[tail];
consume_item(item);
- smp_mb(); /* finish reading descriptor before incrementing tail */
-
- buffer->tail = (tail + 1) & (buffer->size - 1);
+ /* Finish reading descriptor before incrementing tail. */
+ smp_store_release(buffer->tail,
+ (tail + 1) & (buffer->size - 1));
}
spin_unlock(&consumer_lock);
the new item, and then it shall make sure the CPU has finished reading the item
before it writes the new tail pointer, which will erase the item.
-
-Note the use of ACCESS_ONCE() in both algorithms to read the opposition index.
-This prevents the compiler from discarding and reloading its cached value -
-which some compilers will do across smp_read_barrier_depends(). This isn't
-strictly needed if you can be sure that the opposition index will _only_ be
-used the once.
+Note the use of ACCESS_ONCE() and smp_load_acquire() to read the
+opposition index. This prevents the compiler from discarding and
+reloading its cached value - which some compilers will do across
+smp_read_barrier_depends(). This isn't strictly needed if you can
+be sure that the opposition index will _only_ be used the once.
+The smp_load_acquire() additionally forces the CPU to order against
+subsequent memory references. Similarly, smp_store_release() is used
+in both algorithms to write the thread's index. This documents the
+fact that we are writing to something that can be read concurrently,
+prevents the compiler from tearing the store, and enforces ordering
+against previous accesses.
===============