enhancement(kafka source): add support for consuming records with null bodies#25625
Conversation
|
All contributors have signed the CLA ✍️ ✅ |
|
I have read the CLA Document and I hereby sign the CLA |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: af9347567f
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let payload = match msg.payload() { | ||
| Some(payload) => payload, | ||
| // Tombstone (null-payload) message: either skip or process as empty body. | ||
| None if ignore_tombstones => return None, | ||
| None => &[], |
There was a problem hiding this comment.
Bypass decoding when emitting tombstones
When ignore_tombstones = false on a topic decoded as JSON or any codec/framing that treats empty input as no event, this substitutes Kafka's null payload with [] and sends it through the normal decoder. JsonDeserializer::parse returns zero events for empty frames, and parse_message still adds the offset finalizer after send_event_stream succeeds, so the tombstone is committed without any event being emitted. This defeats the new option for common compacted JSON topics; construct a metadata-only/null-body event for None payloads instead of decoding an empty byte slice.
Useful? React with 👍 / 👎.
2721495 to
a376132
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a376132058
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| Some(payload) => payload, | ||
| None if ignore_tombstones => return None, | ||
| None => { | ||
| let mut event = Event::Log(LogEvent::default()); |
There was a problem hiding this comment.
Include log outputs when emitting tombstones
When ignore_tombstones = false is combined with a metric-only decoder such as decoding.codec = "influxdb", KafkaSourceConfig::outputs() still advertises only self.decoding.output_type() (Metric), but this branch emits an Event::Log. The topology will therefore accept metric-only downstream components, which filter out log arrays, and reject log-only consumers as a data-type mismatch, so tombstone records produced here cannot be consumed in those metric pipelines. Include DataType::Log in the source output type when this option can emit tombstones, or reject metric-only decoders for this mode.
Useful? React with 👍 / 👎.
Summary
Add support for Kafka source to emit events with null bodies (tombstones).
Vector configuration
How did you test this PR?
I have a local setup of vector that consumes records from Kafka, extracts a value from headers, and produces a message to NATS with an extracted value. I tested with both null and not-null values, and with
ignore_tombstonesfalse or not present.Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details on the dd-rust-license-tool.