[Linux] Producer and Consumer Model, Semaphore, Deadlock

Table of contents

deadlock

Deadlock Generation Scenario

gdb analysis of deadlock

1. Analyze by debugging the executable program

2. By debugging the running program

Necessary conditions for deadlock

Deadlock Prevention

Producer and Consumer Model

123 rule

Application Scenarios and Features

Code:

amount of signal

principle:

interface:

Initialize the interface:

Waiting for the interface:

Release interface:

Destroy interface:

Precautions:

1. For producers and consumers, what is the sequence of acquiring semaphores and locking?

2. The semaphore can guarantee both synchronization and mutual exclusion

Using semaphores to implement the producer and consumer model

[deadlock]

Deadlock Generation Scenario

  1. After the thread is locked, it exits without releasing the [mutex]

Therefore, release the mutex in all possible places where the thread can exit.

  1. The two threads each hold a lock and want to request the other’s lock

gdb analysis of deadlock

A deadlock occurs. To analyze the cause, you can debug the program through gdb, and then judge the reason. The following is to debug in the above case 2.

1. Analyze by debugging the executable program

1, b + line number break point

  1. Use the thread apply all bt command to display the call stacks of all threads

3. Use the p + mutex variable to view the contents of the mutex

2. By debugging the running program

gdb attack + process number

At this point, we can enter gdb debugging. We can analyze our code according to the above commands to view the thread call stack and view the internal equivalent of the mutex.

Necessary conditions for deadlock

  1. Inalienable

After a thread acquires a mutex, other threads cannot release it except by itself

  1. Circular waiting

Thread A holds 1 lock and requests 2 locks, and thread B holds 2 locks and requests 1 lock

  1. Mutually exclusive conditions

A mutex can only be owned by one thread at a time

  1. Request and Hold

Eating what’s in the bowl and looking at what’s in the pot is actually the case of deadlock 2.

Deadlock Prevention

1. Necessary conditions for breaking the ring

  • break the waiting cycle
  • Destruction request and hold
  • The remaining inalienable and mutually exclusive conditions are inherent properties of the lock and cannot be broken

  • The locking sequence is consistent

Add the same lock first, then add another lock

  1. Avoid locks not being released

Unlock at all places where the thread exits

  1. One-time allocation of resources

It is possible that multiple resources in the code need to be protected with different locks for each resource. For example, resource A needs 1 lock, resource B needs 2 locks, and there may be multiple threads using these two resources. A circular wait occurs.

For example, multiple resources can share the same lock to achieve the purpose of one-time allocation.

Producer and Consumer Model

123 Rules

1 thread-safe queue: Guaranteed first-in, first-out data structure, mutual exclusion + synchronization

Threads with 2 roles: producer and consumer

3 rules:

  • Mutual exclusion between producer and producer
  • Mutual exclusion between consumers
  • Producer and Consumer Mutual Exclusion + Synchronization

Application Scenarios and Features

Generally used in back-end programs, such as WeChat’s back-end programs:

  1. The thread that accepts the message accepts the message and puts it in the queue
  2. Multiple threads act as consumers to read data from the queue and process it

  3. These multiple threads act as producers and put the processed results in another queue.

  4. The thread sending the data takes the data from the queue and sends it

advantage:

  1. Uneven busyness

  2. Producers and consumers are highly decoupled

  3. Support high concurrency 

Code:

1 #include<stdio.h>                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
    2 #include<queue>
    3 using namespace std;
    4 #include<pthread.h>
    5 #include<unistd.h>
    7 #define THREAD_COUNT 2
    9 class RingQueue{
   10     public:
   11         RingQueue(){
   12             capacity = 10;
   13             pthread_mutex_init(&_que_lock, NULL);
   14             pthread_cond_init(&_cons_cond, NULL);
   15             pthread_cond_init(&_prod_cond, NULL);
   16         }
   17         ~RingQueue(){
   18             pthread_mutex_destroy(&_que_lock);
   19             pthread_cond_destroy(&_cons_cond);
   20             pthread_cond_destroy(&_prod_cond);
   21         }
   23         void push(int data){
   24             pthread_mutex_lock(&_que_lock);
   25             while(_que.size() == capacity){
   26                 pthread_cond_wait(&_prod_cond, &_que_lock);
   27             }
   28              _que.push(data); 
W>  29 printf("I am produce pthread %p: I produce %d\n", pthread_self(), data); 
   30              pthread_mutex_unlock(&_que_lock); 
   31              //Notify the consumer to consume 
   32              pthread_cond_signal(&_cons_cond); 
   33          } 
   34 //Thread          35 provided to the consumer thread for consumption 
   void pop(int* data){ 36 pthread_mutex_lock(&_que_lock); 37 while(_que.size() == 0){ 38 pthread_cond_wait( &_cons_cond, &_que_lock); 39 } 40 *data = _que.front(); 41 _que.pop(); W> 42 printf("I am consume thread %p : I consume %d\n", pthread_self(), * data);         







   43              pthread_mutex_unlock(&_que_lock); 
   44              //Notify the producer thread to produce 
   45              pthread_cond_signal(&_prod_cond); 
   46          } 
   47      private: 
   48          queue<int> _que; 
   49          size_t capacity; 
   50          //mutex 
   51          pthread_mutex_t _que_lock; 
   52          //synchronization 
   53          pthread_cond_t _cons_cond; 
   54          pthread_cond_t _prod_cond; 
   55  }; 
   57  int g_data = 0; 
      //Note: static initialization of mutex guarantees the atomicity of critical section resource code 
      //Mutex variables cannot be defined inside this function, because there are so many A producer does not get a mutex, which will cause the program result            
   58 pthread_mutex_t g_lock = PTHREAD_MUTEX_INITIALIZER;
   59 void* pro_thread_start(void* arg){
   60     RingQueue* rq = (RingQueue*)arg;
   61     while(1){
   62        pthread_mutex_lock(&g_lock);
   63        rq->push(g_data);
   64        g_data++;
   65        //sleep(1);
   66        pthread_mutex_unlock(&g_lock);
   67     }
   68 }
   69 void* con_thread_start(void* arg){
   70     RingQueue* rq = (RingQueue*)arg;
   71     while(1){
   72         int data;
   73         rq->pop(&data);
   74     }
   75 }
   76 int main(){
   77     RingQueue* que = new RingQueue();
   78     pthread_t pro[THREAD_COUNT], con[THREAD_COUNT];
   79     for(int i = 0; i < THREAD_COUNT; i++){
   80         int ret = pthread_create(&pro[i], NULL, pro_thread_start, que);
   81         if(ret < 0){
   82             perror("pthread_create");
   83         }
   84         ret = pthread_create(&con[i], NULL, con_thread_start, que);
   85         if(ret < 0){
   86             perror("pthread_create");
   88         }
   89     }
   91     for(int i = 0; i < THREAD_COUNT; i++){
   92         pthread_join(pro[i], NULL);
   93         pthread_join(con[i], NULL);
   94     }
   95     delete que;
   96     return 0;
   97 }

result: 

[amount of signal]

principle:

A semaphore consists of a resource counter and a PCB waiting queue.

PCB waiting queue: It is the same as the PCB waiting queue in the synchronization of condition variables.

Resource Technician:

Execute the flow to acquire the semaphore:        

Obtained successfully: the resource counter is decremented by one

Fetch failed: execution flow is put into PCB waiting queue

After the execution flow releases the semaphore, +1 the resource counter.

When the producer semaphore is initialized, the value of the resource resource counter is generally the capacity of the thread-safe queue. The value of the resource counter when the consumer semaphore is initialized is generally because at the beginning, there are no resources available in the thread-safe queue. 

interface:

Initialize the interface:

sem: semaphore, sem_t is the type of semaphore

pshared: an identifier indicating the purpose of the semaphore; 0 is used between threads, which is a global structure variable; non-0 is used between processes, involving inter-process communication, this variable must exist and can be accessed by every process places, such as shared memory.

value: the number of resources, the resource counter used to initialize the semaphore

Waiting for the interface:

1. Perform – 1 operation on the resource counter

2. Determine whether the value of the resource counter is less than 0

Less than 0: Block waiting, put the execution flow into the PCB waiting queue

Not less than 0: the interface returns

Release interface:

  1. Operate on resource counter + 1

  2. Determine whether the value of the resource counter is less than or equal to 0

Yes: notify the PCB waiting queue (indicating that there are other threads in the PCB waiting queue)

No: Different notification PCB wait queue because no thread is waiting 

Destroy interface:

int sem_destroy(sem_t* sem)

Semaphores are dynamically initialized and therefore need to be destroyed

Precautions:

1. For producers and consumers, what is the sequence of acquiring semaphores and locking?

Scenario: There is a thread-safe queue with a capacity of 2, there are 3 producer threads ABC, the initial value of the resource counter in the semaphore corresponding to the producer is 2, and there is one consumer, and the initial value of the consumer resource counter is 0.

Scenario 1: Take the mutex first, then the semaphore

Suppose that the producer thread A first obtains the mutex lock, then obtains the semaphore, and sets the resource counter of the producer to -1, and the value becomes 1; then thread A accesses the critical resource of the thread safe queue, and the access is completed. After that, thread A releases the semaphore and adds 1 to the resource counter of the consumer semaphore. At this time, the consumer resource counter is 1.

Suppose that the producer thread B obtains the mutex and then acquires the semaphore. At this time, the producer resource counter becomes 0, and after a series of operations, the value of the consumer resource counter becomes 2.

At this point, there is no free space available in the thread-safe queue.

Suppose producer C gets the mutex again, then goes to acquire the semaphore, and the resource counter of the producer is – 1, now the resource counter is less than 0. So block waiting, put thread C into the producer’s PCB waiting queue .

At this point, thread C enters the PCB waiting queue with a mutex and waits for other threads to wake up. However, other threads cannot acquire the mutex, and thus cannot wake up the thread. The entire thread is stuck and cannot continue to execute downwards.

So this situation is wrong.

Scenario 2: Acquire the semaphore first, then acquire the mutex

For the acquisition of the semaphore, the thread ABC acquires it preemptively, and then acquiring the semaphore modification counter is atomic.

Assuming that AB first acquires the semaphore and decrements the resource counter by one, the value of the counter is 0 at this time. For AB that acquires the semaphore, it is assumed that thread A acquires the mutex first. Then access the critical resource, release the semaphore, and increase the consumer resource counter by 1.

Suppose that the producer thread B gets the mutex next, and the entire thread-safe queue is full at this time.

Next, the consumer thread acquires the mutex, and after the consumer thread finishes accessing the critical section, it releases the semaphore, adds 1 to the producer’s resource counter, and finds that the value is less than or equal to 0, so it will notify the producer’s PCB. Wait for the queue, and then the thread directly acquires the mutex. 

Therefore, the semaphore is obtained first, and then the mutual exclusion is guaranteed (method: mutex || semaphore)

2. The semaphore can guarantee both synchronization and mutual exclusion

The semaphore guarantees synchronization, which has been reflected in the analysis of the previous problem.

To ensure mutual exclusion, you only need to set the initial value of the resource counter in the semaphore to 1 to ensure mutual exclusion.

Setting the value of the resource counter to 1 means that only one thread can acquire the semaphore. When other threads acquire the semaphore, the value of the resource counter is less than 0, and they will all be placed in the PCB waiting queue for the semaphore. ; Only after the thread that acquired the semaphore finishes accessing the critical resource and releases the semaphore, other threads can acquire the semaphore and then access the critical resource. 

Using semaphores to implement the producer and consumer model

Use a circular queue as a thread-safe queue that uses an array to simulate:

1  #include<stdio.h> 
    2  #include<unistd.h> 
    3  #include<pthread.h> 
    4  #include<vector> 
    5  #include<semaphore.h> 
    7  using namespace std; 
    8  /* 
    9   * define thread safe queue 
   10   * ring queue (emulated with array) 
   11   * thread safety: 
   12   * synchronization: semaphore 
   13   * mutex: semaphore 
   14   * */ 
   15  #define CAPACITY 2 
   16  #define THREAD_COUNT 2 
   18  class RingQueue{ 
   19      public : 
   20          RingQueue() 
   21          :_vec(CAPACITY){                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
   22             _capacity = CAPACITY;
   23             sem_init(&_sem_lock, 0, 1);
   24             sem_init(&_cons_sem, 0, 0);
   25             sem_init(&_prod_sem, 0, CAPACITY);
   26              write_pos = 0; 
   27              read_pos = 0; 
   28          } 
   29          ~RingQueue(){ 
   30              sem_destroy(&_sem_lock); 
   31              sem_destroy(&_cons_sem); 
   32              sem_destroy(&_prod_sem); 
   33          } 
   34          void push(int data){ 
   35              //Get Producer semaphore 
   36              sem_wait(&_prod_sem); 
   37              //Acquire mutex 
   38              sem_wait(&_sem_lock); 
W>  39 printf("I am produce thread %p: I produce %d\n", pthread_self(), data) ; 
   40              _vec[write_pos] = data; 
   41              write_pos = (write_pos + 1) % _capacity; 
   42              sem_post(&_sem_lock); 
   43             //Notify consumers to consume 
   44              sem_post(&_cons_sem); 
   45          } 
   46          void pop(){ 
   47              //Get consumer semaphore 
   48              sem_wait(&_cons_sem); 
   49              sem_wait(&_sem_lock); 
   50              int data = _vec[read_pos]; 
W>  51 printf("I am consum thread %p: I consum %d\n", pthread_self(), data); 
   52              read_pos = (read_pos + 1) % _capacity; 
   53              sem_post(&_sem_lock); 
   54              //Notify production Producer 
   55              sem_post(&_prod_sem); 
   56          } 
   57      private: 
   58          vector<int> _vec; 
   59          size_t _capacity; 
   60          //Guaranteed mutually exclusive semaphore 
   61         sem_t _sem_lock; 
   62          sem_t _cons_sem; //Consumer's semaphore 
   63          sem_t _prod_sem; //Producer's semaphore 
   65          int write_pos; 
   66          int read_pos; 
   67  }; 
   68  int g_data = 0; 
   69  sem_t g_lock; 
   70  void* prod_thread_start (void* arg){ 
   71      RingQueue* rq = (RingQueue*)arg; 
   72      while(1){ 
   73          sem_wait(&g_lock); 
   74          rq->push(g_data); 
   75          g_data++; 
   76          sem_post(&g_lock); 
   77      } 
   78  } 
   79  void* cons_thread_start(void* arg){ 
   80      RingQueue* rq = (RingQueue*)arg;
   81     while(1){
   82         rq->pop();
   83     }
   84 }
   85 int main(){
   86     RingQueue* rq = new RingQueue();
   88     sem_init(&g_lock, 0, 1);
   90     pthread_t cons[THREAD_COUNT], prod[THREAD_COUNT];
   91     for(int i = 0; i < THREAD_COUNT; ++i){
   92         int ret = pthread_create(&prod[i], NULL, prod_thread_start, (void*)rq);
   93         if(ret < 0){
   94             perror("pthread_create");
   95         }
   96         ret = pthread_create(&cons[i], NULL, cons_thread_start, (void*)rq);
   97         if(ret < 0){
   98             perror("pthread_create");
   99         }
  100     }
  101     for(int i = 0; i < THREAD_COUNT; ++i){
  102         pthread_join(prod[i], NULL);
  103         pthread_join(cons[i], NULL);
  104     }
  105     sem_destroy(&g_lock);
  106     delete rq;
  107     return 0;
  108 }

Leave a Comment

Your email address will not be published. Required fields are marked *