CREATE TABLE `jobs` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`queue` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '不同渠道队列执行的优先级,可以是high,default,low等。',
`payload` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '队列进程所需要的数据,序列化后的json字符串,包含了框架所需要的序列化数据和业务数据。',
`attempts` tinyint unsigned NOT NULL COMMENT '记录任务已经被尝试执行的次数。每次任务失败后,attempts的值会递增。当attempts达到设定的最大重试次数时,任务将被标记为失败。',
`reserved_at` int unsigned DEFAULT NULL COMMENT '表示任务被锁定(reserved)的时间戳。当一个worker开始处理任务时,任务会被锁定,以防止其它 worker同时处理相同的任务。reserved_at存储的是记录被锁定时的时间戳。',
`available_at` int unsigned NOT NULL COMMENT '记录任务应该变为可执行状态的时间戳。将任务推送到队列时,可以选择延迟任务的执行时间。',
`created_at` int unsigned NOT NULL COMMENT '队列被创建的时间戳。',
PRIMARY KEY (`id`) USING BTREE,
KEY `jobs_queue_index` (`queue`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=DYNAMIC;
INSERT INTO `jobs` (`id`, `queue`, `payload`, `attempts`, `reserved_at`, `available_at`, `created_at`) VALUES (1, 'default', '这里是要被处理的任务:xxxxx', 1, NULL, 1735097169, 1735097169);
INSERT INTO `jobs` (`id`, `queue`, `payload`, `attempts`, `reserved_at`, `available_at`, `created_at`) VALUES (2, 'default', '这里是要被处理的任务:xxxxx', 1, NULL, 1735097179, 1735097179);
INSERT INTO `jobs` (`id`, `queue`, `payload`, `attempts`, `reserved_at`, `available_at`, `created_at`) VALUES (3, 'default', '这里是要被处理的任务:xxxxx', 1, NULL, 1735097189, 1735097189);
select * from `jobs` where `queue` = 'high' and ((`reserved_at` is null and `available_at`
证明跳过被锁定的行
步骤 | 会话1 | 会话2 | 说明 |
---|---|---|---|
1 | start transaction; | start transaction; | 双方开启事务 |
2 | select * from jobs where queue = 'default' and ((reserved_at is null and available_at reserved_at id asc limit 1 FOR UPDATE SKIP LOCKED |
/ | 会话1返回id为1的数据 |
3 | / | select * from jobs where queue = 'default' and ((reserved_at is null and available_at reserved_at id asc limit 1 FOR UPDATE SKIP LOCKED |
会话2返回id为2的数据,直接跳过加锁的1 |
4 | update jobs set reserved_at = UNIX_TIMESTAMP() where id = 1; | update jobs set reserved_at = UNIX_TIMESTAMP() where id = 2; | 将找到的任务标记为队列正在处理 |
5 | commit; | commit; | 提交事务,结束流程 |
证明锁定相同范围的数据会报错。
步骤 | 会话1 | 会话2 | 说明 |
---|---|---|---|
1 | start transaction; | start transaction; | 双方开启事务 |
2 | select * from jobs where id = 3 FOR UPDATE nowait; |
/ | 可正常加锁 |
3 | / | select * from jobs where id = 3 FOR UPDATE nowait; |
报错:3572 - Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set. |
4 | commit; | commit; | 提交事务,结束流程 |
参与评论
手机查看
返回顶部