在进行多线程编程的时候,各个线程之间需要协作,协作可以采用任务队列的模式。即将一个线程的输出作为另一个线程的输入,通过任务队列把相关的输入和输出结合起来的模式就是任务队列模式。
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
std::queue<int> nSourceQueue;//原始数据队列
std::queue<int> nFirstStepResultQueue;//FirstThread处理结果的队列,FirstThread写入,SecondThread读取
std::queue<int> nSecondStepResultQueue;//SecondThread处理结果的队列,SecondThread写入,ThirdThread读取
std::queue<int> nThirdStepResultQueue;//ThirdThread处理结果队列,ThirdThread写入,FourthThread读取
std::queue<int> nFourthStepResultQueue;//FourthThread处理结果队列,FourthThread写入
std::mutex firstMutex;//nFirstStepResultQueue对应的锁
std::mutex secondMutex;//nSecondStepResultQueue 对应的锁
std::mutex thirdMutex;//nThirdStepResultQueue 对应的锁
std::mutex fourthMutex;//nFourthStepResultQueue 对应的锁
//第一个线程函数
void FirstThread()
{
std::cout << "First Thread Start " << std::endl;
while (!nSourceQueue.empty())
{
int nTop = nSourceQueue.front();
nSourceQueue.pop();
nTop = nTop + 1;
std::cout << "First Thread: " << nTop << std::endl;
firstMutex.lock();
nFirstStepResultQueue.push(nTop);
firstMutex.unlock();
}
std::cout << "First Thread Stop" << std::endl;
}
//第二个线程函数
void SecondThread()
{
std::cout << "SecondThread Start " << std::endl;
int nTop = 0;
int nIndex = 0;
while (!nFirstStepResultQueue.empty() || !nSourceQueue.empty())
{
firstMutex.lock();
if (!nFirstStepResultQueue.empty())
{
nTop = nFirstStepResultQueue.front();
nFirstStepResultQueue.pop();
firstMutex.unlock();
nTop = nTop + 2;
std::cout << "Second Thread: " << nTop << std::endl;
secondMutex.lock();
nSecondStepResultQueue.push(nTop);
secondMutex.unlock();
}
else
{
std::cout << "Second Thread Idle Run " << nIndex << std::endl << std::flush;
nIndex++;
firstMutex.unlock();
}
}
std::cout << "Second Thread Stop" << std::endl;
}
//第三个线程函数
void ThirdThread()
{
std::cout << "Third Thread Start " << std::endl;
int nTop = 0;
int nIndex = 0;
while (!nSecondStepResultQueue.empty() || !nFirstStepResultQueue.empty() || !nSourceQueue.empty())
{
secondMutex.lock();
if (!nSecondStepResultQueue.empty())
{
nTop = nSecondStepResultQueue.front();
nSecondStepResultQueue.pop();
secondMutex.unlock();
nTop = nTop + 3;
std::cout << "Third Thread: " << nTop << std::endl;
thirdMutex.lock();
nThirdStepResultQueue.push(nTop);
thirdMutex.unlock();
}
else
{
std::cout << "Third Thread Idle Run " << nIndex << std::endl << std::flush;
nIndex++;
secondMutex.unlock();
}
}
std::cout << "Third Thread Stop" << std::endl;
}
//第四个线程函数
void FourthThread()
{
std::cout << "Fourth Thread Start " << std::endl;
int nTop = 0;
int nIndex = 0;
while (!nThirdStepResultQueue.empty() || !nSecondStepResultQueue.empty() || !nFirstStepResultQueue.empty() || !nSourceQueue.empty())
{
thirdMutex.lock();
if (!nThirdStepResultQueue.empty())
{
nTop = nThirdStepResultQueue.front();
nThirdStepResultQueue.pop();
thirdMutex.unlock();
nTop = nTop + 4;
std::cout << "Fourth Thread: " << nTop << std::endl;
nFourthStepResultQueue.push(nTop);
}
else
{
std::cout << "Fourth Thread Idle Run " << nIndex << std::endl << std::flush;
nIndex++;
thirdMutex.unlock();
}
}
std::cout << "Fourth Thread Stop" << std::endl;
}
//主函数
int main()
{
for (int i = 0; i < 100; i++)
{
nSourceQueue.push(i);
}
std::thread threads[4];
threads[0] = std::thread(FirstThread);
threads[1] = std::thread(SecondThread);
threads[2] = std::thread(ThirdThread);
threads[3] = std::thread(FourthThread);
for (auto& th : threads)
{
th.join();
}
std::cout << "Main Thread Quit" << std::endl;
return 0;
}