书接上回 -> 实现基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
Ring Queue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <semaphore.h>
template <typename T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
RingQueue(int max_cap)
: _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, max_cap);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void Push(const T &in) //生产者
{
//信号量 : 是一个计数器, 是资源的预定机制, 预定 : 在外部, 可以不判断资源时候满足, 就可以知道内部资源的情况
P(_space_sem);//信号量这里, 对资源进行使用, 申请, 为什么不判断一下条件是否满足???信号量本身就是判断条件
pthread_mutex_lock(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _max_cap;
pthread_mutex_unlock(&_p_mutex);
V(_data_sem);
}
void Pop(T *out)
{
P(_data_sem);
pthread_mutex_lock(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step %= _max_cap;
pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ringqueue;
int _max_cap;
int _c_step;
int _p_step;
sem_t _data_sem;//消费者关心
sem_t _space_sem;//生产者关心
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
类成员变量 :
_ringqueue : 存储队列元素的动态数组, 使用 std::vector 来实现
_max_cap : 队列的最大容量
_c_step : 消费者指针, 指向下一个要被消费的元素的位置
_p_step : 生产者指针, 指向下一个要被生产的元素的位置
_data_sem : 计数信号量, 表示队列中可供消费的元素数量
_space_sem : 计数信号量, 表示队列中可供生产的空间数量
_c_mutex : 互斥锁, 用于保护消费者操作
_p_mutex : 互斥锁, 用于保护生产者操作
构造函数 :
RingQueue(int max_cap) : 初始化环形队列的最大容量, 并设置初始值, 初始化信号量和互斥锁
sem_init : 用于初始化信号量, _data_sem 初始化为0, 因为队列开始为空, _space_sem 初始化为 max_cap : 因为队列一开始是空的.
pthread_mutex_init : 初始化互斥锁
生产者方法 :
功能 : 向队列中添加一个元素
1. 调用 P(_space_sem), 等待有空余空间 (空间信号量减少)
2. 加锁 _p_mutex, 确保线程安全
3. 将输入元素存储到 _ringqueue 的 _p_step 位置
4. 更新 _p_step , 并使用取模操作确保它在容量范围内
5. 解锁 _p_mutex
6. 调用 V(_data_sem), 增加可用元素的计数
消费者方法 :
功能 : 从队列中取出一个元素.
1. 调用 P(_data_sem) , 等待有可用数据 (数据信号量减少)
2. 加锁 _c_mutex , 确保线程安全
3. 将 _ringqueue 中, _c_step 位置的元素赋给 out
4. 更新 _c_step, 并使用取模操作确保它在容量范围内
5. 解锁 _c_mutex
6. 调用 V(_space_sem), 增加可用空间的计数
析构函数 :
Task.hpp
#pragma once
#include <iostream>
#include <functional>
class Task
{
public:
Task()
{
}
Task(int x, int y) : _x(x), _y(y)
{
}
void Excute()
{
_result = _x + _y;
}
void operator()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *Consumer(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
// 1. 消费
rq->Pop(&t);
//2. 处理数据
t();
std::cout << "Consumer->" << t.result() << std::endl;
}
}
void *Productor(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while(true)
{
sleep(1);
//1. 构造数据
int x = rand() % 10 + 1;//[1, 10]
usleep(x * 1000);
int y = rand() % 10 + 1;
Task t(x, y);
//2. 生产
rq->Push(t);
std::cout << "Productor ->" << t.debug() << std::endl;
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>(5);
pthread_t c1, c2, p1, p2, p3;
pthread_create(&c1, nullptr, Consumer, rq);
pthread_create(&c2, nullptr, Consumer, rq);
pthread_create(&p1, nullptr, Productor, rq);
pthread_create(&p2, nullptr, Productor, rq);
pthread_create(&p3, nullptr, Productor, rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}