Skip to content

Consumers

TL;DR — Consumers are communication actions that receive data from the system. Every consumer creates an Output in SessionData with its own name.

Consumers are communication actions that receive data from the system. Every consumer creates an Output in SessionData with its own name.

At runtime a consumer connects to the selected reader, reads data into session Output, applies optional deserialization and data filtering, and exports running output data for other actions in the same session. Single-item readers keep reading until the configured timeout returns no data or a policy stops the action. Chunk readers wait for their inactivity window and then return the chunk once.

Use this page for behavior and YAML shape. The same action can be built in C# with the ConsumerBuilder API; that page is the function reference for the code-first surface.

Table Property Path - Sessions[].Consumers[]

RabbitMq

Consumes byte payloads from RabbitMQ with BasicGet until no message arrives before the timeout. If no queue is configured, the reader creates a temporary queue, binds it to the configured exchange and routing key, and deletes it on disconnect. Consumed output includes the payload plus RabbitMQ routing key, headers, expiration, content type, and message type metadata.

Table Property Path - Sessions[].Consumers[].RabbitMq

RabbitMq: {}

KafkaTopic

Consumes byte payloads from a Kafka topic with the configured consumer group and consumer options. Each successful consume is committed, and the output metadata includes the Kafka message key and headers.

Table Property Path - Sessions[].Consumers[].KafkaTopic

KafkaTopic: {}

Using Kafka consumer to empty a topic

Kafka consumers can advance the offset for future runs. If you want to clear a topic for later consumers, run an earlier consumer with AutoOffsetReset set to earliest, then keep the later consumer on latest.

OracleSqlTable

Reads rows from an Oracle SQL table. The reader waits until the configured inactivity timeout has passed since the latest table change, then queries rows from the configured table, optional WhereStatement, and optional insertion-time filter, returning each row as JSON output.

Table Property Path - Sessions[].Consumers[].OracleSqlTable

OracleSqlTable: {}

MsSqlTable

Reads rows from a Microsoft SQL Server table. The reader waits for the table-change inactivity window, executes the configured query shape, omits configured columns, and returns each row as JSON output.

Table Property Path - Sessions[].Consumers[].MsSqlTable

MsSqlTable: {}

PostgreSqlTable

Reads rows from a PostgreSQL table. The reader uses the same SQL chunk-reader flow as the other SQL consumers: wait for inactivity, query rows with the configured filters, and return each row as JSON output.

Table Property Path - Sessions[].Consumers[].PostgreSqlTable

PostgreSqlTable: {}

TrinoSqlTable

Reads rows through a Trino coordinator. The reader builds a Trino connection from the configured host, catalog, schema, tag, and authentication values, waits for the inactivity window, and returns matching rows as JSON output.

Table Property Path - Sessions[].Consumers[].TrinoSqlTable

TrinoSqlTable: {}

S3Bucket

Consumes objects from an S3-compatible bucket. The reader waits until the bucket has been inactive for the configured timeout, lists objects by prefix and delimiter, optionally skips empty objects, and returns object bytes plus MetaData.Storage.Key. If the data filter excludes bodies, object contents are not downloaded.

Table Property Path - Sessions[].Consumers[].S3Bucket

S3Bucket: {}

ElasticIndices

Consumes documents from Elasticsearch indices using an index pattern. The reader waits until no matching document has been inserted within the timeout window, then uses the Elasticsearch scroll API with the configured query string, timestamp field, batch size, and scroll expiration. Returned outputs contain serialized JSON documents and timestamps derived from the configured timestamp field.

Table Property Path - Sessions[].Consumers[].ElasticIndices

ElasticIndices: {}

ElasticSearch server timeout

When sending requests to the Elasticsearch server, remember that it can enforce its own request timeout. If increasing RequestTimeoutMilliseconds does not help, the server-side timeout may still be the limiting factor.

Elastic keywords

When using keywords in Elasticsearch queries, capitalize them. Lowercase keywords can be rejected or misread by the consumer.

Socket

Consumes byte payloads from a socket. The reader opens the configured socket, waits until bytes are available or the timeout expires, and returns the received buffer as output.

Table Property Path - Sessions[].Consumers[].Socket

Socket: {}

IbmMqQueue

Consumes byte payloads from an IBM MQ queue. The reader opens the configured queue manager and queue, waits with the configured timeout, returns the message body when one is available, and treats MQRC_NO_MSG_AVAILABLE as a timeout with no output.

Table Property Path - Sessions[].Consumers[].IbmMqQueue

IbmMqQueue: {}

See also