If you want to study further synchronization primitives, and to understand memory models, the blog post We Make a std::shared_mutex 10 Times Faster <https://www.codeproject.com/Articles/1183423/We-Make-a-std-shared-mutex-10-Times-Faster> discusses in details atomic operations, instruction reordering, C++ memory model and various synchronization primitives.
By default, an instruction modifying a variable is non-atomic
example : x++
gives :
register = load(x)
register ++
x = store (register)
\(\rightarrow\) Problem if the variable is modified by a other thread simultaneously
volatile
?volatile
does not ensure atomicityHere is an example of a program that may suffer from overly aggressive optimization by the compiler:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#if USE_VOLATILE
volatile int a = 0;
#else
int a = 0;
#endif
void* thread1(void*arg) {
while(a == 0) ;
("Hello\n");
printfreturn NULL;
}
void* thread2(void*arg) {
= 1;
a return NULL;
}
int main(int argc, char**argv) {
, t2;
pthread_t t1(&t1, NULL, thread1, NULL);
pthread_create(&t2, NULL, thread2, NULL);
pthread_create
(t1, NULL);
pthread_join(t2, NULL);
pthread_joinreturn EXIT_SUCCESS;
}
When compiled with the optimization level -O0
(i.e. without any optimization), thread1
spins waiting, and
when thread2
modifies the variable a
, it
unlocks thread1
which displays Hello
:
$ gcc -o volatile volatile.c -Wall -pthread -O0
$ ./volatile
Hello
$
When compiled with the optimization level -O1
, the
generated code no longer works:
$ gcc -o volatile volatile.c -Wall -pthread -O1
$ ./volatile
[waits indefinitely]
^C
$
Analyzing the code generated by the compiler reveals the problem:
$ gcc -o volatile volatile.c -Wall -pthread -O2
$ gdb ./volatile
[...]
(gdb) disassemble thread1
Dump of assembler code for function thread1:
0x0000000000000756 <+0>: auipc a5,0x2
0x000000000000075a <+4>: lw a5,-1778(a5) # 0x2064 <a>
0x000000000000075e <+8>: bnez a5,0x762 <thread1+12>
0x0000000000000760 <+10>: j 0x760 <thread1+10>
0x0000000000000762 <+12>: add sp,sp,-16
0x0000000000000764 <+14>: auipc a0,0x0
0x0000000000000768 <+18>: add a0,a0,36 # 0x788
0x000000000000076c <+22>: sd ra,8(sp)
0x000000000000076e <+24>: jal 0x620 <puts@plt>
0x0000000000000772 <+28>: ld ra,8(sp)
0x0000000000000774 <+30>: li a0,0
0x0000000000000776 <+32>: add sp,sp,16
0x0000000000000778 <+34>: ret
nd of assembler dump.
$
We see here that at the address 0x760
, the program jumps
to the address 0x760
. So it jumps in place
indefinitely.
This is explained by the fact that the variable a
is not
volatile
. The compiler therefore thinks it can optimize
access to this variable: since the thread1
function only
accesses the variable in read-mode, the program loads the variable in a
register (here, the a5
register, see the instruction
0x75a
), then consults the registry. When
thread2
modifies the variable a
, the
modification is therefore not perceived by thread1
!
Declaring the variable as volatile
forces the compiler
to read the variable each time:
$ gcc -o volatile volatile.c -Wall -pthread -O2 -DUSE_VOLATILE=1
$ gdb volatile
(gdb) disassemble thread1
Dump of assembler code for function thread1:
0x0000000000000756 <+0>: add sp,sp,-16
0x0000000000000758 <+2>: sd ra,8(sp)
0x000000000000075a <+4>: auipc a4,0x2
0x000000000000075e <+8>: add a4,a4,-1782 # 0x2064 <a>
0x0000000000000762 <+12>: lw a5,0(a4)
0x0000000000000764 <+14>: beqz a5,0x762 <thread1+12>
0x0000000000000766 <+16>: auipc a0,0x0
0x000000000000076a <+20>: add a0,a0,34 # 0x788
0x000000000000076e <+24>: jal 0x620 <puts@plt>
0x0000000000000772 <+28>: ld ra,8(sp)
0x0000000000000774 <+30>: li a0,0
0x0000000000000776 <+32>: add sp,sp,16
0x0000000000000778 <+34>: ret
End of assembler dump.
Here, the loop while (a == 0)
is translated to the lines
from 0x762
to 0x764
. At each loop iteration,
the value of a
is loaded, then tested.
C11 provides a set of atomic operations, including
atomic_flag_test_and_set
atomic_compare_exchange_strong
atomic_fetch_add
atomic_thread_fence
_Bool atomic_flag_test_and_set(volatile atomic_flag* obj);
Performs atomically:
int atomic_flag_test_and_set(int* flag) {
int old = *flag;
*flag = 1;
return old;
}
Implementing a lock:
void lock(int* lock) {
while(atomic_flag_test_and_set(lock) == 1) ;
}
Here is an example of a program using a test_and_set
based lock:
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#define NITER 1000000
#define NTHREADS 4
volatile int lock=0;
int x = 0;
#ifdef NOT_THREAD_SAFE
/* thread-unsafe version */
void do_lock() {
while(lock) ;
= 1;
lock }
void do_unlock() {
= 0;
lock }
#else
/* thread-safe version */
void do_lock() {
while(atomic_flag_test_and_set(&lock)) ;
}
void do_unlock() {
= 0;
lock }
#endif /* NOT_THREAD_SAFE */
void* thread_function(void* arg) {
for(int i=0; i<NITER; i++) {
();
do_lock++;
x();
do_unlock}
return NULL;
}
int main(int argc, char**argv) {
[NTHREADS];
pthread_t tidsint ret;
for(int i = 0; i<NTHREADS; i++) {
= pthread_create(&tids[i], NULL, thread_function, NULL);
ret (ret == 0);
assert}
for(int i = 0; i<NTHREADS; i++) {
= pthread_join(tids[i], NULL);
ret (ret == 0);
assert}
("x = %d\n", x);
printfreturn EXIT_SUCCESS;
}
_Bool atomic_compare_exchange_strong(volatile A* obj, C* expected, C desired);
compares *obj
and *expected
if equal, copy desired
into *obj
and
return true
else, copy the value of *obj
into
*expected
and return false
Performs atomically:
bool CAS(int* obj, int* expected, int desired) {
if(*obj != *expected) {
*expected = *obj;
return false;
} else {
*obj = desired;
return true;
}
}
Here is an example of a program handling a lock-free list
thanks to compare_and_swap
:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#define NITER 1000000
#define NTHREADS 4
struct node {
int value;
struct node* next;
};
struct node *stack = NULL;
#ifdef NOT_THREAD_SAFE
/* thread-unsafe version */
void push(int value) {
struct node* n = malloc(sizeof(struct node));
->value = value;
n->next = stack;
n= n;
stack }
int pop() {
struct node* n = stack;
int value = 0;
if(n) {
= n->value;
value = n->next;
stack (n);
free}
return value;
}
#else
/* thread-safe version */
void push(int value) {
struct node* n = malloc(sizeof(struct node));
->value = value;
n->next = stack;
n
int done = 0;
do {
= atomic_compare_exchange_strong(&stack, &n->next, n);
done } while(!done);
}
int pop() {
int value = 0;
struct node* old_head = NULL;
struct node* new_head = NULL;
int done = 0;
do {
/* Warning: this function still suffers a race condition (search for
* "ABA problem" for more information).
* Fixing this would be too complicated for this simple example.
*/
= stack;
old_head if(old_head)
= old_head->next;
new_head = atomic_compare_exchange_strong(&stack, &old_head, new_head);
done } while (!done);
if(old_head) {
= old_head->value;
value (old_head);
free}
return value;
}
#endif /* NOT_THREAD_SAFE */
_Atomic int sum = 0;
void* thread_function(void* arg) {
for(int i=0; i<NITER; i++) {
(1);
push}
int value;
while((value=pop()) != 0) {
+=value;
sum}
return NULL;
}
int main(int argc, char**argv) {
[NTHREADS];
pthread_t tidsfor(int i = 0; i<NTHREADS; i++) {
(&tids[i], NULL, thread_function, NULL);
pthread_create}
for(int i = 0; i<NTHREADS; i++) {
(tids[i], NULL);
pthread_join}
("sum = %d\n", sum);
printfreturn EXIT_SUCCESS;
}
C atomic_fetch_add( volatile A* obj, M arg );
obj
with arg+obj
obj
Performs atomically:
int fetch_and_add(int* obj, int value) {
int old = *obj;
*obj = old+value;
return old;
}
Here is an example of a program using fetch_and_add
to
atomically increment a variable:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#define NITER 1000000
#define NTHREADS 4
volatile int x = 0;
#ifdef NOT_THREAD_SAFE
/* thread-unsafe version */
void inc(volatile int * obj) {
*obj = (*obj)+1;
}
#else
/* thread-safe version */
void inc(volatile int * obj) {
(obj, 1);
atomic_fetch_add}
#endif /* NOT_THREAD_SAFE */
void* thread_function(void* arg) {
for(int i=0; i<NITER; i++) {
(&x);
inc}
return NULL;
}
int main(int argc, char**argv) {
[NTHREADS];
pthread_t tidsfor(int i = 0; i<NTHREADS; i++) {
(&tids[i], NULL, thread_function, NULL);
pthread_create}
for(int i = 0; i<NTHREADS; i++) {
(tids[i], NULL);
pthread_join}
("x = %d\n", x);
printfreturn EXIT_SUCCESS;
}
C atomic_thread_fence( memory_order order );
Properties to consider when choosing a synchronization primitive
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
Benefits
test_and_set
)Disadvantages
It is also possible to implement a spinlock using an atomic operation:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdatomic.h>
#include <assert.h>
#define NITER 1000000
#define NTHREADS 4
struct lock {
/* if flag=0, the lock is available
* if flag=1, the lock is taken
*/
volatile int flag;
};
typedef struct lock lock_t;
void lock(lock_t *l) {
/* try to set flag to 1.
* if the flag is already 1, loop and try again
*/
while(atomic_flag_test_and_set(&l->flag)) ;
}
void unlock(lock_t *l) {
->flag = 0;
l}
void lock_init(lock_t *l) {
->flag = 0;
l}
;
lock_t lint x;
void* thread_function(void*arg){
for(int i=0; i<NITER; i++) {
(&l);
lock++;
x(&l);
unlock}
return NULL;
}
int main(int argc, char**argv) {
(&l);
lock_init
[NTHREADS];
pthread_t tidsint ret;
for(int i = 0; i<NTHREADS; i++) {
= pthread_create(&tids[i], NULL, thread_function, NULL);
ret (ret == 0);
assert}
for(int i = 0; i<NTHREADS; i++) {
= pthread_join(tids[i], NULL);
ret (ret == 0);
assert}
("x = %d\n", x);
printf("expected: %d\n", NTHREADS*NITER);
printfreturn EXIT_SUCCESS;
}
System call allowing to build synchronization mechanisms in userland
Allows waiting without monopolizing the CPU
A futex is made up of:
Available operations (among others)
WAIT(int *addr, int value)
while(*addr == value) { sleep(); }
WAKE(int *addr, int value, int num)
*addr = value
num
threads waiting on addr
mutex: an integer with two possible values: 1
(unlocked), or 0
(locked)
mutex_lock(m)
:
0
, call FUTEX_WAIT
mutex_unlock(m)
:
FUTEX_WAKE
to wake up a thread from the waiting
listHere is an example of a program implementing a mutex using futex:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdatomic.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <sys/syscall.h>
#include <errno.h>
#include <assert.h>
#define NITER 1000000
#define NTHREADS 4
struct lock {
int flag;
};
typedef struct lock lock_t;
static int futex(int *uaddr, int futex_op, int val,
const struct timespec *timeout, int *uaddr2, int val3) {
return syscall(SYS_futex, uaddr, futex_op, val,
, uaddr2, val3);
timeout}
void lock(lock_t *l) {
while (1) {
/* Is the futex available? */
int expected = 1;
if (atomic_compare_exchange_strong(&l->flag, &expected, 0))
return; /* Yes */
/* Futex is not available; wait */
int s = futex(&l->flag, FUTEX_WAIT, 0, NULL, NULL, 0);
if (s == -1 && errno != EAGAIN) {
("futex_wait failed");
perror();
abort}
}
}
void unlock(lock_t *l) {
int expected = 0;
(&l->flag, &expected, 1);
atomic_compare_exchange_strongint s = futex(&l->flag, FUTEX_WAKE, 1, NULL, NULL, 0);
if (s == -1) {
("futex_wake failed");
perror();
abort}
}
void lock_init(lock_t *l) {
->flag = 1;
l}
;
lock_t lint x;
void* thread_function(void*arg){
for(int i=0; i<NITER; i++) {
// printf("%d\n", i);
(&l);
lock++;
x(&l);
unlock}
return NULL;
}
int main(int argc, char**argv) {
(&l);
lock_init
[NTHREADS];
pthread_t tidsint ret;
for(int i = 0; i<NTHREADS; i++) {
= pthread_create(&tids[i], NULL, thread_function, NULL);
ret (ret == 0);
assert}
for(int i = 0; i<NTHREADS; i++) {
= pthread_join(tids[i], NULL);
ret (ret == 0);
assert}
("x = %d\n", x);
printf("expected: %d\n", NTHREADS*NITER);
printfreturn EXIT_SUCCESS;
}
struct cond {
int cpt;
};
void cond_wait(cond_t *c, pthread_mutex_t *m) {
int value = atomic_load(&c->value);
(m);
pthread_mutex_unlock(&c->value, FUTEX_WAIT, value);
futex(m);
pthread_mutex_lock}
void cond_signal(cond_t *c) {
(&c->value, 1);
atomic_fetch_add(&c->value, FUTEX_WAKE, 0);
futex}
Here is an example of a program implementing a condition using futex:
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <stdatomic.h>
#include <assert.h>
#define N 10
int n_loops = 20;
struct cond {
int cpt;
};
typedef struct cond cond_t;
static int futex(int *uaddr, int futex_op, int val) {
return syscall(SYS_futex, uaddr, futex_op, val, NULL, uaddr, 0);
}
void cond_init(cond_t *c) {
->cpt = 0;
c}
void cond_wait(cond_t *c, pthread_mutex_t *m) {
int cpt = atomic_load(&c->cpt);
(m);
pthread_mutex_unlock(&c->cpt, FUTEX_WAIT, cpt);
futex(m);
pthread_mutex_lock}
void cond_signal(cond_t *c) {
(&c->cpt, 1);
atomic_fetch_add(&c->cpt, FUTEX_WAKE, 0);
futex}
struct monitor{
int value;
;
pthread_mutex_t mutex;
cond_t cond};
int infos[N];
int i_depot, i_extrait;
int nb_produits = 0;
struct monitor places_dispo;
struct monitor info_prete;
void* function_prod(void*arg) {
static _Atomic int nb_threads=0;
int my_rank = nb_threads++;
for(int i=0; i<n_loops; i++) {
int cur_indice;
int product_id;
(100);
usleep(&places_dispo.mutex);
pthread_mutex_lockwhile(places_dispo.value == 0) {
(&places_dispo.cond, &places_dispo.mutex);
cond_wait}
.value--;
places_dispo= i_depot++;
cur_indice = i_depot % N;
i_depot
= nb_produits++;
product_id (&places_dispo.mutex);
pthread_mutex_unlock
(500000);
usleep("P%d produit %d dans %d\n", my_rank, product_id, cur_indice);
printf
(&info_prete.mutex);
pthread_mutex_lock[cur_indice] = product_id;
infos.value ++;
info_prete(&info_prete.cond);
cond_signal(&info_prete.mutex);
pthread_mutex_unlock}
return NULL;
}
void* function_cons(void*arg) {
static _Atomic int nb_threads=0;
int my_rank = nb_threads++;
for(int i=0; i<n_loops; i++) {
int cur_indice;
int product_id;
(100);
usleep(&info_prete.mutex);
pthread_mutex_lockwhile(info_prete.value == 0) {
(&info_prete.cond, &info_prete.mutex);
cond_wait}
.value--;
info_prete= infos[i_extrait];
product_id = i_extrait;
cur_indice = (i_extrait+1) % N;
i_extrait (&info_prete.mutex);
pthread_mutex_unlock
(100000);
usleep("C%d consomme %d depuis %d\n", my_rank, product_id, cur_indice);
printf
(&places_dispo.mutex);
pthread_mutex_lock.value ++;
places_dispo(&places_dispo.cond);
cond_signal(&places_dispo.mutex);
pthread_mutex_unlock}
return NULL;
}
void init_monitor(struct monitor *m, int value) {
->value = value;
m(&m->mutex, NULL);
pthread_mutex_init(&m->cond);
cond_init}
int main(int argc, char**argv) {
(&places_dispo, N);
init_monitor(&info_prete, 0);
init_monitor= 0;
i_depot = 0;
i_extrait
int nthreads_prod=2;
int nthreads_cons=2;
[nthreads_prod];
pthread_t tid_prod[nthreads_cons];
pthread_t tid_consint ret;
for(int i=0; i<nthreads_prod; i++) {
= pthread_create(&tid_prod[i], NULL, function_prod, NULL);
ret (ret == 0);
assert}
for(int i=0; i<nthreads_cons; i++) {
= pthread_create(&tid_cons[i], NULL, function_cons, NULL);
ret (ret == 0);
assert}
for(int i=0; i<nthreads_prod; i++) {
= pthread_join(tid_prod[i], NULL);
ret (ret == 0);
assert}
for(int i=0; i<nthreads_cons; i++) {
= pthread_join(tid_cons[i], NULL);
ret (ret == 0);
assert}
return EXIT_SUCCESS;
}
pthread_mutex_timedlock
)Coarse grain locking
Fine grain locking
Each lock protects a small portion of the program
Advantage: possibility of using various resources in parallel
Disadvantages:
The notion of scalability is discussed in more detail in the module CSC5001 High Performance Systems.