Skip to content

Commit 85a7f0c

Browse files
committed
chore: fix data corruption bug
The bug happenned when an command with less than required arguments was processed during the squashing. The test: 1. Reproduces the original bug 2. Reproduces the fact that the ERROR in squashing was not handled correctly The fix: removes the ERROR state, adds a verification step that ensures we fail before accessing non-existing index. Fixes #6165 Signed-off-by: Roman Gershman <[email protected]>
1 parent 627805f commit 85a7f0c

File tree

4 files changed

+30
-15
lines changed

4 files changed

+30
-15
lines changed

src/server/main_service.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,6 @@ ABSL_FLAG(uint32_t, memcached_port, 0, "Memcached port");
8686

8787
ABSL_FLAG(uint32_t, num_shards, 0, "Number of database shards, 0 - to choose automatically");
8888

89-
ABSL_RETIRED_FLAG(uint32_t, multi_exec_mode, 2, "DEPRECATED. Sets multi exec atomicity mode");
90-
9189
ABSL_FLAG(bool, multi_exec_squash, true,
9290
"Whether multi exec will squash single shard commands to optimize performance");
9391

src/server/multi_command_squasher.cc

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,22 +120,27 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar
120120
MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredCmd* cmd) {
121121
DCHECK(cmd->Cid());
122122

123-
if (!cmd->Cid()->IsTransactional() || (cmd->Cid()->opt_mask() & CO::BLOCKING) ||
124-
(cmd->Cid()->opt_mask() & CO::GLOBAL_TRANS))
123+
const CommandId& cid = *cmd->Cid();
124+
if (!cid.IsTransactional() || (cid.opt_mask() & CO::BLOCKING) ||
125+
(cid.opt_mask() & CO::GLOBAL_TRANS))
125126
return SquashResult::NOT_SQUASHED;
126127

127-
if (cmd->Cid()->name() == "CLIENT" || cntx_->conn_state.tracking_info_.IsTrackingOn()) {
128+
if (cid.name() == "CLIENT" || cntx_->conn_state.tracking_info_.IsTrackingOn()) {
128129
return SquashResult::NOT_SQUASHED;
129130
}
130131

131132
auto args = cmd->ArgList(&tmp_keylist_);
132133
if (args.empty())
133134
return SquashResult::NOT_SQUASHED;
134135

135-
auto keys = DetermineKeys(cmd->Cid(), args);
136-
if (!keys.ok())
137-
return SquashResult::ERROR;
138-
if (keys->NumArgs() == 0)
136+
// Instead of returning an error, we treat command as non-squashable, allowing the
137+
// standalone execution path to handle it.
138+
// Validate returns an optional ErrorReply
139+
if (cid.Validate(args).has_value())
140+
return SquashResult::NOT_SQUASHED;
141+
142+
auto keys = DetermineKeys(&cid, args);
143+
if (!keys.ok() || keys->NumArgs() == 0)
139144
return SquashResult::NOT_SQUASHED;
140145

141146
// Check if all command keys belong to one shard
@@ -382,9 +387,6 @@ void MultiCommandSquasher::Run(RedisReplyBuilder* rb) {
382387
for (auto& cmd : cmds_) {
383388
auto res = TrySquash(&cmd);
384389

385-
if (res == SquashResult::ERROR)
386-
break;
387-
388390
if (res == SquashResult::NOT_SQUASHED || res == SquashResult::SQUASHED_FULL) {
389391
if (!ExecuteSquashed(rb))
390392
break;

src/server/multi_command_squasher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class MultiCommandSquasher {
6666
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard
6767
};
6868

69-
enum class SquashResult : uint8_t { SQUASHED, SQUASHED_FULL, NOT_SQUASHED, ERROR };
69+
enum class SquashResult : uint8_t { SQUASHED, SQUASHED_FULL, NOT_SQUASHED };
7070

7171
MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx, Service* Service,
7272
const Opts& opts);

tests/dragonfly/connection_test.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,7 +1416,6 @@ async def test_client_migrate(df_server: DflyInstance):
14161416
assert resp == 1 # migrated successfully
14171417

14181418

1419-
@dfly_args({})
14201419
async def test_issue_5931_malformed_protocol_crash(df_server: DflyInstance):
14211420
"""
14221421
Regression test for #5931
@@ -1461,7 +1460,6 @@ async def test_issue_5931_malformed_protocol_crash(df_server: DflyInstance):
14611460
assert await client.ping() == True
14621461

14631462

1464-
@dfly_args({})
14651463
async def test_issue_5949_nil_bulk_string_crash(df_server: DflyInstance):
14661464
"""
14671465
Regression test for #5949
@@ -1504,3 +1502,20 @@ async def test_issue_5949_nil_bulk_string_crash(df_server: DflyInstance):
15041502
client = df_server.client()
15051503
await client.ping()
15061504
assert await client.ping() == True
1505+
1506+
1507+
async def test_issue_6165_squash_invalid_syntax(async_client):
1508+
pipe = async_client.pipeline(transaction=False)
1509+
pipe.set("k", "v")
1510+
pipe.execute_command("RENAME bar")
1511+
res = await pipe.execute(raise_on_error=False)
1512+
1513+
assert res[0] == True # SET key1
1514+
assert isinstance(res[1], aioredis.ResponseError) # INVALID SYNTAX COMMAND
1515+
1516+
pip = async_client.pipeline(transaction=False)
1517+
pip.set("k", "v")
1518+
pip.execute_command("ZUNION 2 set1")
1519+
res = await pip.execute(raise_on_error=False)
1520+
assert res[0] == True # SET key1
1521+
assert isinstance(res[1], aioredis.ResponseError) # INVALID SYNTAX

0 commit comments

Comments
 (0)