feat: 新增流程创建任务并发控制 --story=128208486#469
feat: 新增流程创建任务并发控制 --story=128208486#469guohelu wants to merge 2 commits intoTencentBlueKing:masterfrom
Conversation
# Reviewed, transaction id: 65429
There was a problem hiding this comment.
🔍 代码审查总结
此 PR 实现了流程并发控制功能。发现几个需要修复的关键问题:
🚨 严重问题:
count_running_tasks()存在 N+1 查询和竞态条件,可能导致并发计数不准确- Redis 操作缺少错误处理,lpop/rpush 失败可能导致任务丢失
extra_info直接赋值会覆盖现有数据(应使用 update)
⚡ 性能问题:
- 每次操作都调用
get_space_infos查询配置,建议添加缓存层
- callback.py:83 使用
>而非>=判断并发限制,与其他位置不一致
✨ 改进建议:
- 在
count_running_tasks中使用 select_for_update 或原子计数器 - 为 space config 添加本地缓存(TTL 60s)
- 统一使用
task.extra_info.update()而非直接赋值
建议在合并前修复竞态条件和数据丢失风险。
# Reviewed, transaction id: 65437
There was a problem hiding this comment.
🔍 代码审查更新
✅ 已改进的部分:
extra_info改用.update()避免覆盖现有数据TaskCallBacker优化了参数传递减少查询
🚨 仍需修复的关键问题:
- 竞态条件:
count_running_tasks()仍存在并发计数不准确风险 (utils.py:215) - 数据丢失风险: Redis 操作缺少异常处理 (utils.py:183, utils.py:195)
- 并发判断不一致: callback.py:83 使用
>而其他位置用>= - 性能问题: 多处重复调用
get_space_infos未缓存 (views.py:200, tasks.py:201, callback.py:76, v1_0_0.py:215)
建议在合并前修复竞态条件和错误处理。
| {"space_id": task_instance.space_id, "config_names": "concurrency_control"} | ||
| ) | ||
| space_configs = space_infos_result.get("data", {}).get("configs", {}) | ||
| concurrency_control = space_configs.get("concurrency_control", 0) |
There was a problem hiding this comment.
现在代码里会越来越多需要读取空间配置来做调度控制的逻辑,这里获取空间配置的逻辑最好抽象一下,并作为后台worker的一个全局单例来复用配置,空间配置不是一个经常变的数据,可以适当加一个缓存(1分钟),来防止任务并发量大后,频繁调接口访问interface的问题
| return task_instance | ||
|
|
||
|
|
||
| def count_running_tasks(task_instance): |
There was a problem hiding this comment.
这个实现还是太重了,并发量一大,每个任务都需要统计一遍当前task_instances的数据量,db扛不住的
| create_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z") | ||
| start_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z") | ||
| finish_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z") | ||
| is_wait = serializers.SerializerMethodField() |
| node_operation = TaskNodeOperation(task_instance, task_data.get("node_id")) | ||
| operation_method = getattr(node_operation, operation, None) | ||
|
|
||
| operation_method(operator=operation, **task_data.get("node_data", {})) |
There was a problem hiding this comment.
这里确认下是否是一个同步操作,如果是同步操作,这里的任务从队列取出并执行的动作,不应该在bamboo_engine_eri_post_set_state_handler这个用于设置状态的逻辑里(而且这里会导致队列里的worker始终跟上一个无关任务在同一个worker中,使任务不够分散),应该重新把任务通过celery来分发出去执行
There was a problem hiding this comment.
在分发任务出去执行时,需要关注下trace的传递是正确的,否则对于这种api或者页面触发变成后台触发的任务,就会丢掉trace相关的信息了
|
|
||
| template_id = TaskInstance.objects.get(instance_id=instance_id).template_id | ||
| redis_key = f"task_wait_{template_id}" | ||
| task_json = redis_cli.lpop(redis_key) |
There was a problem hiding this comment.
这里取出任务后,如果调度有问题,任务就丢了,需要加一下保证机制
|
|
||
|
|
||
| @redis_inst_check | ||
| def push_task_to_queue(redis_cli, task, operation, node_id=None, data=None): |
There was a problem hiding this comment.
需要加一个最大限制,以免打爆redis的队列,如果超出这个上限了,可以在触发执行任务的地方给用户返回错误,告知当前流程达到最大的执行等待上限
| from bkflow.contrib.api.collections.interface import InterfaceModuleClient | ||
| from bkflow.exceptions import ValidationError | ||
| from bkflow.pipeline_plugins.components.collections.base import BKFlowBaseService | ||
| from bkflow.task.utils import push_task_to_queue |
There was a problem hiding this comment.
整体方案还需要补充一些可观测性的指标,来发现当前系统中,某些流程的队列在不断增加,有达到上限的风险,这个可以通过with start_trace里注入流程当前队列长度来实现
Reviewed, transaction id: 65429