Skip to content

Commit 85fd684

Browse files
committed
changefeedccl: validate avro_schema_prefix follows Avro naming rules
An invalid avro_schema_prefix caused changefeeds to hang indefinitely. This validates that avro_schema_prefix follows Avro naming rules at statement time. Release note: Invalid avro_schema_prefix is now caught during statement time. The prefix must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_], as specified in the avro specification https://avro.apache.org/docs/1.8.1/spec.html Part of: #3513
1 parent 0e89695 commit 85fd684

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"net/url"
12+
"regexp"
1213
"strings"
1314
"time"
1415

@@ -490,6 +491,22 @@ var redactSimple = func(string) (string, error) {
490491
return "redacted", nil
491492
}
492493

494+
// Regex from https://avro.apache.org/docs/1.8.1/spec.html.
495+
var avroNameRegexp = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`)
496+
497+
func validateAvroSchemaPrefix(prefix string) error {
498+
if prefix == "" {
499+
return nil
500+
}
501+
if !avroNameRegexp.MatchString(prefix) {
502+
return errors.Errorf(
503+
`%s must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]`,
504+
OptAvroSchemaPrefix,
505+
)
506+
}
507+
return nil
508+
}
509+
493510
// RedactUserFromURI takes a URI string and removes the user from it.
494511
// If there is no user, the original URI is returned.
495512
func RedactUserFromURI(uri string) (string, error) {

pkg/ccl/changefeedccl/changefeedbase/options_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,53 @@ func TestEncodingOptionsValidations(t *testing.T) {
7979
}
8080

8181
}
82+
83+
func TestAvroSchemaPrefixValidation(t *testing.T) {
84+
defer leaktest.AfterTest(t)()
85+
defer log.Scope(t).Close(t)
86+
87+
cases := []struct {
88+
prefix string
89+
expectErr string
90+
}{
91+
// Valid prefixes
92+
{"", ""},
93+
{"a", ""},
94+
{"A", ""},
95+
{"_", ""},
96+
{"abc", ""},
97+
{"ABC", ""},
98+
{"_abc", ""},
99+
{"a1", ""},
100+
{"A1_b2_C3", ""},
101+
{"crdb_cdc_", ""},
102+
{"super", ""},
103+
{"____", ""},
104+
105+
// Invalid prefixes - starts with invalid character
106+
{"1abc", "must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]"},
107+
{"123", "must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]"},
108+
{"-abc", "must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]"},
109+
110+
// Invalid prefixes - contains invalid characters
111+
{"abc-def", "must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]"},
112+
{"abc.def", "must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]"},
113+
{"abc def", "must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]"},
114+
{"abc!", "must start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]"},
115+
}
116+
117+
for _, c := range cases {
118+
t.Run(fmt.Sprintf("prefix=%q", c.prefix), func(t *testing.T) {
119+
opts := MakeStatementOptions(map[string]string{
120+
OptAvroSchemaPrefix: c.prefix,
121+
})
122+
err := opts.ValidateForCreateChangefeed(false)
123+
if c.expectErr == "" {
124+
require.NoError(t, err)
125+
} else {
126+
require.Error(t, err)
127+
require.Contains(t, err.Error(), c.expectErr)
128+
}
129+
})
130+
}
131+
}

0 commit comments

Comments
 (0)