Skip to content

enhancement(kafka source): add support for consuming records with null bodies#25625

Open
nviliunov-evolution-throwaway wants to merge 3 commits into
vectordotdev:masterfrom
nviliunov-evolution-throwaway:null-kafka-bodies
Open

enhancement(kafka source): add support for consuming records with null bodies#25625
nviliunov-evolution-throwaway wants to merge 3 commits into
vectordotdev:masterfrom
nviliunov-evolution-throwaway:null-kafka-bodies

Conversation

@nviliunov-evolution-throwaway

@nviliunov-evolution-throwaway nviliunov-evolution-throwaway commented Jun 15, 2026

Copy link
Copy Markdown

Summary

Add support for Kafka source to emit events with null bodies (tombstones).

Vector configuration

[sources.kafka_source]
type = "kafka"
bootstrap_servers = "kafka:9094"
group_id = "group-id"
topics = ["topic"]
auto_offset_reset = "latest"
decoding.codec = "bytes"
commit_interval_ms = 10000
ignore_tombstones = false

[transforms.extract]
type = "remap"
inputs = ["kafka_source"]
drop_on_error = true
drop_on_abort = true
source = '''
    header_raw, err = string(.headers."header.name")
    if err != null { abort }

    header_json, err = parse_json(header_raw)
    if err != null { abort }

    header_id, err = string(header_json.id)
    if err != null { abort }

    topic        = string!(.topic)
    key          = string!(.message_key)

    .message      = header_id
    .nats_subject = "transformed." + topic + "." + key
'''

[sinks.nats_publish]
type = "nats"
inputs = ["extract"]
url = "nats://nats:4222"
subject = "{{ nats_subject }}"
encoding.codec = "raw_message"

How did you test this PR?

I have a local setup of vector that consumes records from Kafka, extracts a value from headers, and produces a message to NATS with an extracted value. I tested with both null and not-null values, and with ignore_tombstones false or not present.

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • make fmt
      • make check-clippy (if there are failures it's possible some of them can be fixed with make clippy-fix)
      • make test
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run make build-licenses to regenerate the license inventory and commit the changes (if any). More details on the dd-rust-license-tool.

@nviliunov-evolution-throwaway nviliunov-evolution-throwaway requested a review from a team as a code owner June 15, 2026 14:45
@github-actions github-actions Bot added the domain: sources Anything related to the Vector's sources label Jun 15, 2026
@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

All contributors have signed the CLA ✍️ ✅
Posted by the CLA Assistant Lite bot.

@nviliunov-evolution-throwaway

Copy link
Copy Markdown
Author

I have read the CLA Document and I hereby sign the CLA

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: af9347567f

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/sources/kafka.rs Outdated
Comment on lines +1010 to +1014
let payload = match msg.payload() {
Some(payload) => payload,
// Tombstone (null-payload) message: either skip or process as empty body.
None if ignore_tombstones => return None,
None => &[],

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Bypass decoding when emitting tombstones

When ignore_tombstones = false on a topic decoded as JSON or any codec/framing that treats empty input as no event, this substitutes Kafka's null payload with [] and sends it through the normal decoder. JsonDeserializer::parse returns zero events for empty frames, and parse_message still adds the offset finalizer after send_event_stream succeeds, so the tombstone is committed without any event being emitted. This defeats the new option for common compacted JSON topics; construct a metadata-only/null-body event for None payloads instead of decoding an empty byte slice.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: a376132058

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/sources/kafka.rs
Some(payload) => payload,
None if ignore_tombstones => return None,
None => {
let mut event = Event::Log(LogEvent::default());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include log outputs when emitting tombstones

When ignore_tombstones = false is combined with a metric-only decoder such as decoding.codec = "influxdb", KafkaSourceConfig::outputs() still advertises only self.decoding.output_type() (Metric), but this branch emits an Event::Log. The topology will therefore accept metric-only downstream components, which filter out log arrays, and reject log-only consumers as a data-type mismatch, so tombstone records produced here cannot be consumed in those metric pipelines. Include DataType::Log in the source output type when this option can emit tombstones, or reject metric-only decoders for this mode.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: sources Anything related to the Vector's sources

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant