From 415ebc5b70439bb994bac18a608630e446fce022 Mon Sep 17 00:00:00 2001 From: Alyona Chalkina Date: Mon, 26 May 2025 14:26:55 +0300 Subject: [PATCH] Add truncate method support The tube_truncate method was created to clean tubes, as well as the truncate method on the router to call it --- sharded_queue/api.lua | 1 + sharded_queue/drivers/fifo.lua | 5 ++++ sharded_queue/drivers/fifottl.lua | 5 ++++ sharded_queue/router/tube.lua | 23 ++++++++++++++++ sharded_queue/stats/storage.lua | 3 +++ sharded_queue/storage/methods.lua | 1 + test/metrics_test.lua | 4 +++ test/simple_test.lua | 44 +++++++++++++++++++++++++++++++ 8 files changed, 86 insertions(+) diff --git a/sharded_queue/api.lua b/sharded_queue/api.lua index e8a076e..dfd6c13 100644 --- a/sharded_queue/api.lua +++ b/sharded_queue/api.lua @@ -89,6 +89,7 @@ return { kick = queue_action_wrapper('kick'), peek = queue_action_wrapper('peek'), drop = queue_action_wrapper('drop'), + truncate = queue_action_wrapper('truncate'), cfg = setmetatable({}, { __index = config, diff --git a/sharded_queue/drivers/fifo.lua b/sharded_queue/drivers/fifo.lua index 16bf653..1afd706 100644 --- a/sharded_queue/drivers/fifo.lua +++ b/sharded_queue/drivers/fifo.lua @@ -181,6 +181,11 @@ function method.peek(args) return normalize_task(get_space(args):get { args.task_id }) end +function method.truncate(args) + update_stat(args.tube_name, "truncate") + return get_space(args):truncate() +end + return { create = tube_create, drop = tube_drop, diff --git a/sharded_queue/drivers/fifottl.lua b/sharded_queue/drivers/fifottl.lua index c7be755..c148d09 100644 --- a/sharded_queue/drivers/fifottl.lua +++ b/sharded_queue/drivers/fifottl.lua @@ -456,6 +456,11 @@ function method.kick(args) return args.count end +function method.truncate(args) + update_stat(args.tube_name, "truncate") + return box.space[args.tube_name]:truncate() +end + return { create = tube_create, drop = tube_drop, diff --git a/sharded_queue/router/tube.lua b/sharded_queue/router/tube.lua index 032be32..ff3e5e5 100644 --- a/sharded_queue/router/tube.lua +++ b/sharded_queue/router/tube.lua @@ -350,6 +350,28 @@ function drop(self) cartridge.config_patch_clusterwide({ tubes = tubes }) end +function truncate(self, options) + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local _, err, alias = vshard.router.map_callrw('tube_truncate', { + options + }) + -- Re-raise storage errors. + if err then + if alias then + error("Error occurred on replicaset \"" .. alias .. "\": " .. err.message) + else + error("Error occurred: " .. err.message) + end + return + end +end + local methods = { put = put, take = take, @@ -360,6 +382,7 @@ local methods = { bury = bury, kick = kick, peek = peek, + truncate = truncate, } -- The Tarantool 3.0 does not support to update dinamically a configuration, so diff --git a/sharded_queue/stats/storage.lua b/sharded_queue/stats/storage.lua index 320e972..d920a99 100644 --- a/sharded_queue/stats/storage.lua +++ b/sharded_queue/stats/storage.lua @@ -15,6 +15,7 @@ local actions = { touch = 8, ack = 9, release = 10, + truncate = 11, } function statistics.init() @@ -32,6 +33,7 @@ function statistics.init() { 'touch', 'unsigned' }, { 'ack', 'unsigned' }, { 'release', 'unsigned' }, + { 'truncate', 'unsigned' }, }) space_stat:create_index('primary', { @@ -65,6 +67,7 @@ function statistics.reset(tube_name) touch = 0, ack = 0, release = 0, + truncate = 0, }) box.space._queue_statistics:replace(default_stat) end diff --git a/sharded_queue/storage/methods.lua b/sharded_queue/storage/methods.lua index 01a622d..2574906 100644 --- a/sharded_queue/storage/methods.lua +++ b/sharded_queue/storage/methods.lua @@ -14,6 +14,7 @@ local methods = { 'release', 'bury', 'kick', + 'truncate', } local function init(metrics, tubes) diff --git a/test/metrics_test.lua b/test/metrics_test.lua index 183cba7..8a868f5 100644 --- a/test/metrics_test.lua +++ b/test/metrics_test.lua @@ -150,6 +150,7 @@ g.test_metrics_router = function() touch = 0, ack = 0, release = 0, + truncate = 0, }) assert_metric(metrics, "tnt_sharded_queue_router_statistics_tasks", "state", { ready = 0, @@ -177,6 +178,7 @@ g.test_metrics_router = function() touch = 0, ack = 0, release = 0, + truncate = 0, }) assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", { ready = 0, @@ -268,6 +270,7 @@ g.test_metrics_storage = function() 'release', 'bury', 'kick', + 'truncate', } -- Some of them will fail, some of them not - it does not metter. We just @@ -301,6 +304,7 @@ g.test_metrics_storage = function() touch = 0, ack = 0, release = 0, + truncate = 1, }) assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", { ready = 0, diff --git a/test/simple_test.lua b/test/simple_test.lua index a7413c3..5df2269 100644 --- a/test/simple_test.lua +++ b/test/simple_test.lua @@ -250,3 +250,47 @@ function g.test_bury_kick() t.assert_equals(cur_stat.calls.kick, bury_task_count) t.assert_equals(cur_stat.tasks.ready, task_count) end + +function g.test_truncate() + local tube_name = 'truncate_test' + helper.create_tube(tube_name) + + -- task data for putting + local task_count = 20 + local tasks_data = {} + + for i = 1, task_count do + table.insert(tasks_data, { + name = 'task_' .. i, + raw = '*' + }) + end + + local task_ids = {} + + -- returned tasks + for _, data in pairs(tasks_data) do + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data }) + local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), { + task[utils.index.task_id] + }) + t.assert_equals(peek_task[utils.index.status], utils.state.READY) + table.insert(task_ids, task[utils.index.task_id]) + end + + -- truncate the tube + g.queue_conn:call(utils.shape_cmd(tube_name, 'truncate')) + + -- check that created tasks don't exist + for _, taskId in pairs(task_ids) do + local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), { + taskId + }) + t.assert_equals(peek_task, nil) + end + + -- check stats + local stat = g.queue_conn:call('queue.statistics', { tube_name }) + t.assert_equals(stat.calls.put, task_count) + t.assert_equals(stat.calls.truncate, 2) +end \ No newline at end of file