Docs Menu
Docs Home
/
Atlas
/ /

$meta

The $meta expression returns an object containing all streaming metadata for a document. You can expose this data for either the entire stream, or one of the following Atlas Stream Processing aggregation stages:

  • $source

  • $hoppingWindow

  • $tumblingWindow

  • $https

A $meta expression has the following prototype form:

{ "$meta": <string> }
{
source: {
type: string,
ts: date,
source.topic: string
source.partition: int
source.offset: int
source.key: string|int|long|double|object|binData
source.headers: array[obj]
},
window: {
start: date,
end: date
},
https: {
url: string
method: string
httpStatusCode: int
responseTimeMs: int
}
}

The $meta expression takes a single string input corresponding to the fully qualified, dot-syntax path of a source of metadata. The root of this path must be "stream". You can query the following paths:

Path
Type
Description

stream

object

All metadata for the $source stage and any window stage or $https stage configured in the pipeline.

stream.source

object

All metadata for the $source stage.

stream.source.type

string

Type of connection used as a source.

stream.source.ts

ISODate

Date and time of the record at the point of ingestion.

stream.source.topic

string

Kafka topic from which the stream ingests records. Available only from a Kafka source.

stream.source.partition

integer

Partition of the Kafka topic from which the stream ingests records. Available only from a Kafka source.

stream.source.offset

integer

Offset tracking message order and queue position within a Kafka source partition. Available only from a Kafka source.

stream.source.key

string|int|long|double|object|binData

Key assigned to Kafka messages for partitioning and load distribution. Available only from a Kafka source.

stream.source.headers

array

Set of key-value pairs describing Kafka message metadata.

stream.window

object

All metadata for the $hoppingWindow or $tumblingWindow stage. This object is set only when a document enters a $hoppingWindow or $tumblingWindow stage.

stream.window.start

ISODate

Start time of the current window.

stream.window.end

ISODate

End time of the current window.

stream.https

object

All metadata for the $https stage. This object is set only when an $https stage outputs a document.

stream.https.url

string

URL from which the $https stage sources data.

stream.https.method

string

HTTPS request method sent to the URL.

stream.https.httpStatusCode

int

HTTP response code for the request sent to the URL.

stream.https.responseTimeMs

int

Time, in milliseconds, it took to receive the response from the URL.

The Atlas Stream Processing $meta expression provides all of the functionality of the existing MongoDB $meta aggregation expression. However, you can't use the functionality specific to the Atlas Stream Processing version of $meta in a standard MongoDB aggregation query.

The following example enriches the output of a stream with an array of the Kafka source topics from which the data was ingested:

{
$source: {
connectionName: “kafka”,
topic: [“t1”, “t2”, “t3”]
}
},
{
$emit: {
connectionName: “kafka”,
topic: {
$concat: [
{
$meta: “stream.source.topic”
},
“out"
]
}
}
}

The following example adds a field to the stream reporting the start time of each window.

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}

Back

$currentDate