Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ return {
kick = queue_action_wrapper('kick'),
peek = queue_action_wrapper('peek'),
drop = queue_action_wrapper('drop'),
truncate = queue_action_wrapper('truncate'),

cfg = setmetatable({}, {
__index = config,
Expand Down
5 changes: 5 additions & 0 deletions sharded_queue/drivers/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ function method.peek(args)
return normalize_task(get_space(args):get { args.task_id })
end

function method.truncate(args)
update_stat(args.tube_name, "truncate")
return get_space(args):truncate()
end

return {
create = tube_create,
drop = tube_drop,
Expand Down
5 changes: 5 additions & 0 deletions sharded_queue/drivers/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ function method.kick(args)
return args.count
end

function method.truncate(args)
update_stat(args.tube_name, "truncate")
return box.space[args.tube_name]:truncate()
end

return {
create = tube_create,
drop = tube_drop,
Expand Down
23 changes: 23 additions & 0 deletions sharded_queue/router/tube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,28 @@ function drop(self)
cartridge.config_patch_clusterwide({ tubes = tubes })
end

function truncate(self, options)
options = table.deepcopy(options or {})
options.tube_name = self.tube_name

options.extra = {
log_request = utils.normalize.log_request(options.log_request) or self.log_request,
}

local _, err, alias = vshard.router.map_callrw('tube_truncate', {
options
})
-- Re-raise storage errors.
if err then
if alias then
error("Error occurred on replicaset \"" .. alias .. "\": " .. err.message)
else
error("Error occurred: " .. err.message)
end
return
end
end

local methods = {
put = put,
take = take,
Expand All @@ -360,6 +382,7 @@ local methods = {
bury = bury,
kick = kick,
peek = peek,
truncate = truncate,
}

-- The Tarantool 3.0 does not support to update dinamically a configuration, so
Expand Down
3 changes: 3 additions & 0 deletions sharded_queue/stats/storage.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ local actions = {
touch = 8,
ack = 9,
release = 10,
truncate = 11,
}

function statistics.init()
Expand All @@ -32,6 +33,7 @@ function statistics.init()
{ 'touch', 'unsigned' },
{ 'ack', 'unsigned' },
{ 'release', 'unsigned' },
{ 'truncate', 'unsigned' },
})

space_stat:create_index('primary', {
Expand Down Expand Up @@ -65,6 +67,7 @@ function statistics.reset(tube_name)
touch = 0,
ack = 0,
release = 0,
truncate = 0,
})
box.space._queue_statistics:replace(default_stat)
end
Expand Down
1 change: 1 addition & 0 deletions sharded_queue/storage/methods.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local methods = {
'release',
'bury',
'kick',
'truncate',
}

local function init(metrics, tubes)
Expand Down
4 changes: 4 additions & 0 deletions test/metrics_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ g.test_metrics_router = function()
touch = 0,
ack = 0,
release = 0,
truncate = 0,
})
assert_metric(metrics, "tnt_sharded_queue_router_statistics_tasks", "state", {
ready = 0,
Expand Down Expand Up @@ -177,6 +178,7 @@ g.test_metrics_router = function()
touch = 0,
ack = 0,
release = 0,
truncate = 0,
})
assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", {
ready = 0,
Expand Down Expand Up @@ -268,6 +270,7 @@ g.test_metrics_storage = function()
'release',
'bury',
'kick',
'truncate',
}

-- Some of them will fail, some of them not - it does not metter. We just
Expand Down Expand Up @@ -301,6 +304,7 @@ g.test_metrics_storage = function()
touch = 0,
ack = 0,
release = 0,
truncate = 1,
})
assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", {
ready = 0,
Expand Down
44 changes: 44 additions & 0 deletions test/simple_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,47 @@ function g.test_bury_kick()
t.assert_equals(cur_stat.calls.kick, bury_task_count)
t.assert_equals(cur_stat.tasks.ready, task_count)
end

function g.test_truncate()
local tube_name = 'truncate_test'
helper.create_tube(tube_name)

-- task data for putting
local task_count = 20
local tasks_data = {}

for i = 1, task_count do
table.insert(tasks_data, {
name = 'task_' .. i,
raw = '*'
})
end

local task_ids = {}

-- returned tasks
for _, data in pairs(tasks_data) do
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data })
local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), {
task[utils.index.task_id]
})
t.assert_equals(peek_task[utils.index.status], utils.state.READY)
table.insert(task_ids, task[utils.index.task_id])
end

-- truncate the tube
g.queue_conn:call(utils.shape_cmd(tube_name, 'truncate'))

-- check that created tasks don't exist
for _, taskId in pairs(task_ids) do
local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), {
taskId
})
t.assert_equals(peek_task, nil)
end

-- check stats
local stat = g.queue_conn:call('queue.statistics', { tube_name })
t.assert_equals(stat.calls.put, task_count)
t.assert_equals(stat.calls.truncate, 2)
end
Loading