Content of this lecture
int pipe(int pipefd[2]);
pipefd[0]
for reading, pipefd[1]
for
writingint mkfifo(const char *pathname, mode_t mode);
lseek
is impossible
You have already handled pipes without necessarily realizing it: in
bash
, the sequence of commands linked by pipes is
done via anonymous pipes created by the bash
process.
So when we run cmd1 | cmd2 | cmd3
, bash
creates 2 anonymous pipes and 3 processes, then redirects (thanks to the
dup2
system call, see Lecture #11) standard input and
output of processes to the different tubes.
int shm_open(const char *name, int oflag, mode_t mode);
name
is a key of the form /key
int ftruncate(int fd, off_t length);
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
flags
must contain MAP_SHARED
We will see later (during lecture 11 on I/O) another use of
mmap
.
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
name
is a key of the form /key
int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared != 0
, ca be used by several processes (using
a shared memory segment)int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
int sem_post(sem_t *sem);
pthread_mutex_t
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_init(ptread_mutex_t *m, const pthread_mutexattr_t *attr);
int pthread_mutex_lock(pthread_mutex_t *mutex));
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
It is possible to synchronize threads from several processes with a
pthread_mutex_t
if it is in a shared memory area. For this,
it is necessary to position the PTHREAD_PROCESS_SHARED
attribute of the mutex with the function
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);
(&l);
pthread_mutex_lockwhile(!condition) {
(&c, &l);
pthread_cond_wait}
();
process_data(&l); pthread_mutex_unlock
(&l);
pthread_mutex_lock();
produce_data(&c);
pthread_cond_signal(&l); pthread_mutex_unlock
Here are the prototypes of the functions associated with the conditions:
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
The mutex ensures that between testing for the condition (
while (! condition)
) and wait
(pthread_cond_wait()
), no thread performs the
condition.
To synchronize multiple processes with a monitor, it is necessary to set the following attributes:
PTHREAD_MUTEX_SHARED
of the mutex (using
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared)
).PTHREAD_PROCESS_SHARED
of the condition
(using
int pthread_condattr_setpshared(pthread_condattr_t *attr, int pshared)
).int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_wait(pthread_barrier_t *barrier);
count
threads reach
pthread_barrier_wait
count
threads
Once all the threads have reached the barrier, they are all unblocked
and pthread_barrier_wait
returns 0 except for one thread
which returns PTHREAD_BARRIER_SERIAL_THREAD
.
To synchronize threads from multiple processes with a barrier, it is
necessary to set the attribute PTHREAD_PROCESS_SHARED
with
int pthread_barrierattr_setpshared(pthread_barrierattr_t *attr, int pshared);
pthread_rwlock_t
int pthread_rwlock_rdlock(pthread_rwlock_t* lock)
int pthread_rwlock_wrlock(pthread_rwlock_t* lock)
int pthread_rwlock_unlock(pthread_rwlock_t* lock)
In the literature, these problems are usually solved by using semaphores. This is because these problems have been theorized in the 1960s and 1970s by Dijkstra based on semaphores. In addition, semaphores have the advantage of being able to be used for inter-process synchronizations or intra-process.
However, modern operating systems implement many synchronization primitives which are much more efficient than semaphores. In the next slides, we will therefore rely on these mechanisms rather than semaphores.
m
initializedmutex_lock(m)
at the start of the critical
sectionmutex_unlock(m)
at the end of the critical
sectionm
initialized
Prog1(m)
mutex_lock=read (account)
x= x + 10
x (account=x)
write (m) mutex_unlock
Prog2 (m)
mutex_lock=read (account)
x= x - 100
x (account=x)
write(m) mutex_unlock
In a multi-threaded process, we just need to use a mutex of type
pthread_mutex_t
.
To implement a mutual exclusion between several processes, several solutions exist
using a pthread_mutex_t
in a shared memory segment
between processes. For this, it is necessary to set the attribute
PTHREAD_MUTEX_SHARED
in the mutex (using
pthread_mutexattr_setpshared
);
using a semaphore initialized to 1. The entry in section critical
is protected by sem_wait
, and we call sem_post
when leaving the critical section.
N
, and a monitor
m
to protect the counter Prog Vehicule
...
mutex_lock(m);
while(cpt == 0){ cond_wait(m); }
cpt--;
mutex_unlock(m);
|...
mutex_lock(m);
cpt++;
cond_signal(m);
mutex_unlock(m);
N
blocks buffer
Produc
: produces info0
Produc
: produces info1
Conso
: consumes info0
Produc
: produces info2
available_spots
monitor initialized to
N
ready_info
monitor initialized to 0
Producer: Consumer:
repeat repeat
... ...
mutex_lock(available_spots); mutex_lock(ready_info);
while(available_spots<=0) while(ready_info<=0)
cond_wait(available_spots); cond_wait(ready_info);
reserve_slot(); extract(info)
mutex_unlock(available_spots); mutex_unlock(ready_info);
calcul(info) mutex_lock(available_spots);
free_slot();
mutex_lock(ready_info); cond_signal(available_spots)
push(info); mutex_unlock(available_spots);
cond_signal(ready_info);
mutex_unlock(ready_info); ...
... endRepeat
endRepeat
It is of course possible to implement a producer / consumer scheme between processes using conditions and mutexes. Another simpler solution is to use a pipe: since writing in a pipe being atomic, the deposit of a data boils down to writing into the pipe, and reading from the pipe extracts the data.
pthread_rwlock_t
int pthread_rwlock_rdlock(pthread_rwlock_t* lock)
to
protect read operationsint pthread_rwlock_wrlock(pthread_rwlock_t* lock)
to
protect write operationsint pthread_rwlock_unlock(pthread_rwlock_t* lock)
to
release the lock
It is possible to implement the reader / writer synchronization
pattern using a mutex instead of rwlock
: read and write
operations are protected by a mutex. However, this implementation does
not not allow multiple readers to work in parallel.
The implementation of the monitor-based reader / writer is more
complex. It mainly requires: * an integer readers
which
counts the number of threads reading * a boolean writing
which indicates that a thread is writing * a cond
condition
to notify changes to these variables * a mutex mutex
to
protect concurrent access
Here is an implementation of the reader / writer using a monitor:
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
// This program simulates operations on a set of bank accounts
// Two kinds of operations are available:
// - read operation: compute the global balance (ie. the sum of all accounts)
// - write operation: transfer money from one account to another
//
// Here's an example of the program output:
//
// $ ./rw_threads_condition
// Balance: 0 (expected: 0)
// 3982358 operation, including:
// 3581969 read operations (89.945932 % )
// 400389 write operations (10.054068 % )
#define N 200
int n_loops = 1000000;
int accounts[N];
int nb_read = 0;
int nb_write = 0;
= PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond
int readers=0;
int writing=0;
/* read all the accounts */
int read_accounts() {
(&mutex);
pthread_mutex_lockwhile(writing)
(&cond, &mutex);
pthread_cond_wait++;
readers(&mutex);
pthread_mutex_unlock
++;
nb_readint sum = 0;
for(int i=0; i<N; i++) {
+= accounts[i];
sum }
(&mutex);
pthread_mutex_lock--;
readersif(!readers) {
(&cond);
pthread_cond_signal}
(&mutex);
pthread_mutex_unlockreturn sum;
}
/* transfer amount units from account src to account dest */
void transfer(int src, int dest, int amount) {
(&mutex);
pthread_mutex_lockwhile(writing || readers)
(&cond, &mutex);
pthread_cond_wait= 1;
writing (&mutex);
pthread_mutex_unlock
++;
nb_write[dest] += amount;
accounts[src] -= amount;
accounts
(&mutex);
pthread_mutex_lock=0;
writing(&cond);
pthread_cond_signal(&mutex);
pthread_mutex_unlock}
void* thread_function(void*arg) {
for(int i=0; i<n_loops; i++) {
/* randomly perform an operation
* threshold sets the proportion of read operation.
* here, 90% of all the operations are read operation
* and 10% are write operations
*/
int threshold = 90;
int x = rand()%100;
if(x < threshold) {
/* read */
int balance = read_accounts();
if(balance != 0) {
(stderr, "Error : balance = %d !\n", balance);
fprintf();
abort}
} else {
/* write */
int src = rand()%N;
int dest = rand()%N;
int amount = rand()%100;
(src, dest, amount);
transfer}
}
return NULL;
}
int main(int argc, char**argv) {
for(int i = 0; i<N; i++) {
[i] = 0;
accounts}
int nthreads=4;
[nthreads];
pthread_t tid
for(int i=0; i<nthreads; i++) {
(&tid[i], NULL, thread_function, NULL);
pthread_create}
for(int i=0; i<nthreads; i++) {
(tid[i], NULL);
pthread_join}
int balance = read_accounts();
("Balance: %d (expected: 0)\n", balance);
printf
int nb_op = nb_read+nb_write;
("%d operation, including:\n",nb_op);
printf("\t%d read operations (%f %% )\n", nb_read, 100.*nb_read/nb_op);
printf("\t%d write operations (%f %% )\n", nb_write, 100.*nb_write/nb_op);
printf
return EXIT_SUCCESS;
}