这是一个基于 生产者-消费者模式 的线程安全 Token 管理系统,使用 C++11 标准库实现。系统模拟了多个生产者线程定期生成 Token,多个消费者线程并发消费 Token 的场景,展示了现代 C++ 并发编程的核心概念和实践。
本项目作为 C++ 并发编程面试题,主要考察以下能力:
- 多线程编程基础:线程创建、同步、通信
- 线程安全设计:互斥锁、条件变量的使用
- 现代 C++ 特性:智能指针、lambda 表达式、原子操作
- 资源管理:RAII 原则、异常安全
- 问题解决能力:线程阻塞、死锁预防、优雅停止
┌─────────────────┐
│ TokenManager │ ← 核心管理器(线程安全的 Token 存储)
└────────┬────────┘
│
┌────┴────┐
│ │
┌───▼───┐ ┌──▼──────┐
│Producer│ │Customer │ ← 多个生产者和消费者线程
└────────┘ └─────────┘
-
TokenManager (
token_manager.h)- 线程安全的 Token 存储和管理
- 使用
std::mutex保护共享数据 - 使用
std::condition_variable实现线程间通信 - 支持阻塞和非阻塞的消费操作
- 关键特性:可中断的消费操作(避免永久阻塞)
-
TokenProducer (
token_producer.h)- 独立线程运行的生产者
- 每 500ms 生产一个 Token
- 支持优雅停止
-
TokenCustomer (
token_customer.h)- 独立线程运行的消费者
- 持续尝试消费指定数量的 Token
- 支持回调函数通知消费成功
- 支持优雅停止(可中断等待)
互斥锁 (Mutex)
std::mutex mtx_; // 保护共享数据 current_tokens_
std::lock_guard<std::mutex> lock(mtx_); // RAII 自动加锁解锁条件变量 (Condition Variable)
std::condition_variable cond_;
cond_.wait(lock, predicate); // 阻塞等待直到条件满足
cond_.wait_for(lock, timeout, predicate); // 可中断的等待std::atomic<bool> running_{false}; // 线程安全的标志位
running_.load(); // 原子读取
running_ = false; // 原子写入std::shared_ptr<TokenManager> token_manager_; // 共享所有权
std::unique_ptr<TokenProducer> producer; // 独占所有权如果不支持中断会发生什么?
假设消费者线程正在等待 token,但此时程序需要关闭:
// ❌ 问题代码:无法中断的等待
void consumer_thread() {
while (running_) {
// 如果 token 不足,这里会永久阻塞
token_manager_->ConsumeTokens(3); // 无法响应 stop() 信号!
}
}
// main 函数中
consumer->stop(); // 设置 running_ = false
consumer->join(); // 等待线程结束... ❌ 永远等不到!线程在等待中!问题场景:
-
程序无法正常退出
- 主程序调用
stop()后,消费者线程仍在wait()中阻塞 join()会一直等待,程序无法退出- 用户按 Ctrl+C 也无法立即响应
- 主程序调用
-
资源无法释放
- 线程对象无法销毁,资源泄漏
- 如果消费者对象在析构函数中调用
stop(),析构函数会永久阻塞
-
用户体验差
- 服务器程序需要重启时,无法优雅关闭
- 应用程序退出需要等待很长时间(甚至永远)
-
实际生产环境问题
- 服务更新时无法平滑重启
- 系统关闭时可能被强制 kill,导致数据丢失
解决方案:使用 wait_for() 定期检查停止标志
bool ConsumeTokensWithStopCheck(size_t n, std::atomic<bool>* stop_flag) {
std::unique_lock<std::mutex> lock(mtx_);
while (current_tokens_ < n) {
// ✅ 定期检查停止标志
if (stop_flag && stop_flag->load()) {
return false; // 立即响应停止信号
}
// ✅ 使用 wait_for 而不是 wait,每 100ms 检查一次
cond_.wait_for(lock, std::chrono::milliseconds(100), [this, n, stop_flag] () {
return current_tokens_ >= n || (stop_flag && stop_flag->load());
});
// ✅ 再次检查,确保及时响应
if (stop_flag && stop_flag->load()) {
return false;
}
}
current_tokens_ -= n;
return true;
}中断机制的优势:
- ✅ 优雅退出:程序可以快速响应停止信号
- ✅ 资源管理:确保线程能够正确退出,资源得到释放
- ✅ 用户体验:程序关闭时无需长时间等待
- ✅ 生产可用:满足实际生产环境的需求
性能权衡:
- 使用
wait_for(100ms)而不是wait()会在每 100ms 检查一次停止标志 - 这带来微小的性能开销,但换来了可中断性
- 对于大多数应用场景,这个开销是可以接受的
~TokenProducer() { stop(); } // 析构函数自动清理
~TokenCustomer() { stop(); } // 确保线程正确退出- C++11 或更高版本的编译器
- 支持多线程的 C++ 标准库
Linux/macOS:
g++ -std=c++11 -pthread main.cpp -o token_systemWindows (MinGW):
g++ -std=c++11 main.cpp -o token_system.exeWindows (MSVC):
cl /EHsc /std:c++11 main.cpp./token_system # Linux/macOS
token_system.exe # Windowstoken manager show case
initialize the manager, the max_tokens is :10
active the token manager
initial the consumer, per 3:
1
2
3
4
5
Waiting for consumers and producers to run...
consumer 1 success consume: 3 tokens
consumer 2 success consume: 3 tokens
...
total time: 10000
consumer[1]
consumer[2]
...
last tokens: X
case finish
考察点:
- 如何保护共享数据?
- 为什么使用
mutable修饰互斥锁? - 死锁如何预防?
答案要点:
- 使用互斥锁保护所有对
current_tokens_的访问 mutable允许在const方法中使用互斥锁- 使用
lock_guard和unique_lock的 RAII 特性避免死锁
考察点:
- 为什么使用条件变量而不是轮询?
wait()和wait_for()的区别?- 为什么需要
notify_all()?
答案要点:
- 条件变量避免 CPU 空转,提高效率
wait()无限等待,wait_for()可设置超时notify_all()唤醒所有等待的线程(多个消费者场景)
考察点:
- 如何优雅地停止线程?
- 如何避免线程永久阻塞?
- 为什么需要
joinable()检查?
答案要点:
- 使用原子标志位
running_控制线程循环 - 使用可中断的等待机制(
ConsumeTokensWithStopCheck) joinable()检查避免重复 join 导致的未定义行为
考察点:
- 智能指针的选择(
shared_ptrvsunique_ptr)? - Lambda 表达式的使用场景?
- 原子操作的作用?
答案要点:
shared_ptr用于多线程共享资源,unique_ptr用于独占所有权- Lambda 用于线程函数,捕获
this访问成员变量 - 原子操作保证标志位的读写是线程安全的,无需加锁
考察点:
- RAII 原则的应用?
- 析构函数中为什么需要
stop()? - 如何确保异常安全?
答案要点:
- 使用智能指针和
lock_guard实现 RAII - 析构函数中停止线程,确保资源正确释放
- 使用 RAII 确保即使发生异常也能正确清理
- 使用读写锁(
shared_mutex)优化读多写少的场景 - 实现无锁队列(lock-free queue)
- 使用线程池管理生产者和消费者
- 支持不同的消费优先级
- 支持 Token 的过期机制
- 添加统计信息(生产/消费速率、等待时间等)
- 支持配置文件动态调整参数
- 添加异常处理机制
- 实现超时机制
- 添加日志记录
- 单元测试(gtest)
- 并发测试(压力测试、竞态条件测试)
- 性能基准测试
token_/
├── main.cpp # 主程序入口
├── token_manager.h # Token管理器(核心类)
├── token_producer.h # 生产者类
├── token_customer.h # 消费者类
└── README.md # 项目说明文档
- 代码审查能力:能够指出代码中的潜在问题(如线程阻塞、资源泄漏)
- 设计思路:能够解释为什么选择某种设计(如为什么用
shared_ptr) - 问题解决:能够提出改进方案(如可中断等待的实现)
- 性能意识:能够分析性能瓶颈和优化方向
- 调试能力:能够分析多线程问题的调试方法
A: 如果使用 ConsumeTokens() 而不是 ConsumeTokensWithStopCheck(),线程在等待时会无法响应停止信号。解决方案是使用可中断的等待机制。
A: 不中断会导致以下严重问题:
- 程序无法退出:
stop()后线程仍在wait()中,join()会永久等待 - 资源泄漏:线程无法正常销毁,资源无法释放
- 用户体验差:程序关闭需要很长时间或永远无法关闭
- 生产环境问题:服务无法平滑重启,系统关闭时可能被强制 kill
中断机制允许线程在等待时定期检查停止标志,确保能够及时响应停止信号,实现优雅退出。
A: 对已经 join 过的线程再次 join 会导致未定义行为。joinable() 检查确保线程是可 join 的。
A: 多个生产者和消费者都需要访问同一个 TokenManager,需要共享所有权,因此使用 shared_ptr。
作者: robll1
日期: 2025
版本: 1.0