$meta
Definition
The $meta
expression returns an object containing all streaming
metadata for a document. You can expose this data for either the
entire stream, or one of the following Atlas Stream Processing aggregation
stages:
A $meta
expression has the following prototype form:
{ "$meta": <string> }
{ source: { type: string, ts: date, source.topic: string source.partition: int source.offset: int source.key: string|int|long|double|object|binData source.headers: array[obj] }, window: { start: date, end: date }, https: { url: string method: string httpStatusCode: int responseTimeMs: int } }
Syntax
The $meta
expression takes a single string input corresponding to
the fully qualified, dot-syntax path of a source of metadata. The root
of this path must be "stream"
. You can query the following paths:
Path | Type | Description |
---|---|---|
| object | |
| object | All metadata for the |
| string | Type of connection used as a source. |
| ISODate | Date and time of the record at the point of ingestion. |
| string | Kafka topic from which the stream ingests records. Available only from a Kafka source. |
| integer | Partition of the Kafka topic from which the stream ingests records. Available only from a Kafka source. |
| integer | Offset tracking message order and queue position within a Kafka source partition. Available only from a Kafka source. |
| string|int|long|double|object|binData | Key assigned to Kafka messages for partitioning and load distribution. Available only from a Kafka source. |
| array | Set of key-value pairs describing Kafka message metadata. |
| object | All metadata for the |
| ISODate | Start time of the current window. |
| ISODate | End time of the current window. |
| object | |
| string | URL from which the |
| string | HTTPS request method sent to the URL. |
| int | HTTP response code for the request sent to the URL. |
| int | Time, in milliseconds, it took to receive the response from the URL. |
Behavior
The Atlas Stream Processing $meta
expression provides all of the
functionality of the existing MongoDB $meta
aggregation
expression. However, you can't use the functionality specific to the
Atlas Stream Processing version of $meta
in a standard MongoDB aggregation
query.
Examples
The following example enriches the output of a stream with an array of the Kafka source topics from which the data was ingested:
{ $source: { connectionName: “kafka”, topic: [“t1”, “t2”, “t3”] } }, { $emit: { connectionName: “kafka”, topic: { $concat: [ { $meta: “stream.source.topic” }, “out" ] } } }
The following example adds a field to the stream reporting the start time of each window.
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }