The Radio extension by Query.Farm enables DuckDB to interact with real-time event systems such as WebSocket and Redis Publish/Subscribe servers. It allows DuckDB to both receive and send events: incoming messages are buffered and queryable with SQL, while outgoing events are buffered and support delivery tracking.
The extension is named Radio because it effectively equips DuckDB with a two-way radio—allowing it to listen for and broadcast messages across event-driven systems.
DuckDB extensions are modular plugins that enhance the core engine with powerful new capabilities like this.
Getting Started
Radio will be a DuckDB community extension maintained and supported by Query.Farm.
Install Radio in DuckDB by running:
Then load it with:
Functionality
The Radio extension introduces a few core concepts.
At its core is a single radio object that manages multiple subscriptions to event sources. Think of the radio as being a radio that can listen to multiple broadcasts at the same time. The event sources that the radio extension can interact with include WebSocket servers and Redis publish/subscribe. Support for additional event systems will be added over time.
Additional event sources planned are: Google PubSub, Azure ServiceBus, MQTT / Mosquito, Subprocesses / Pipes.
If you’re interested in these additional event sources, contact us.
You can create multiple subscriptions to event sources, each subscription is indepent of other subscriptions. Each subscription has an independent connection to the event source and has a independent send and receive thread. Multiple subscriptions can exist simultaneously in the same DuckDB process (even to the same event source).
Every subscription maintains two internal queues:
- Received messages
- Messages to transmit
The received messages queue is size limited, as it fills up older messages are discarded based on the configured queue size. The size of the receive queue is configurable when the subcription is created.
Received messages have a field called seen_count which tracks the number of times the message has been seen by DuckDB. Newly received messages will have a seen_count of 1.
Architecture Diagram
This diagram shows the internal architecture of the extension and how it integrated into DuckDB.
Integration Flowchart
This flowchart shows which functions to use for each type of activity with the extension.
When Should I Use This?
Use the Radio extension when you want to receive or send events from a Websocket server or Redis Publish/Subscribe service within DuckDB—without needing external event-handling infrastructure or orchestration logic.
Here’s an example of connecting to a WebSocket server and receiving messages:
API
The extension adds these functions:
- Subscriptions
- Blocking / Waiting
- Messages
- Messages from subscriptions
- Messages for a single subscription
- Transmit Messages
- Versioning
radio_subscribe
This function creates a new subscription.
Signature:
Positional Arguments
url | VARCHAR | The URL of the event source, for websockets use wss:// |
Named Parameters
All named parameters are not required to be passed, if the parameter is not passed the default value will be used.
receive_message_capacity | INTEGER | 1000 | Maximum number of messages that can be queued for receipt. |
transmit_retry_max_delay_ms | INTEGER | 10000 | Maximum delay (in milliseconds) between retry attempts when transmitting. |
transmit_retry_initial_delay_ms | INTEGER | 100.0 | Initial delay (in milliseconds) before the first retry attempt when transmitting. |
transmit_retry_multiplier | DOUBLE | 1.5 | Multiplier for exponential backoff between retry attempts during transmission. |
Return Value
The unique subscription identifier is returned for the new subscription.
Example
radio_unsubscribe
This function creates a deletes a previously created subscription.
Signature: radio_unsubscribe(VARCHAR)
Positional Arguments
url | VARCHAR | The URL of the event source, for websockets use wss:// |
Return Value
The unique identifier of the subscription that was removed.
Example
radio_subscriptions
This function returns a table that contains information about all current subscriptions.
Signature: radio_subscriptions()
Return Value
Returns a table with this schema:
subscription_id | UBIGINT | Unique identifier for the subscription. |
url | VARCHAR | The event source URL associated with the subscription. |
creation_time | TIMESTAMP_MS | Timestamp when the subscription was created. |
activation_time | TIMESTAMP_MS | Timestamp when the subscription was activated (i.e., connected to the URL). |
disabled | BOOLEAN | Indicates whether the subscription is currently disabled. |
received_last_message_time | TIMESTAMP_MS | Timestamp of the most recent received message. |
received_messages_processed | UBIGINT | Total number of messages successfully received. |
transmit_messages_processed | UBIGINT | Total number of transmit messages queued. |
transmit_last_queue_time | TIMESTAMP_MS | Timestamp when the last transmit message was queued. |
transmit_last_success_time | TIMESTAMP_MS | Timestamp of the last successful message transmission. |
transmit_last_failure_time | TIMESTAMP_MS | Timestamp of the last failed message transmission attempt. |
transmit_successes | UBIGINT | Total number of transmit messages sent successfully. |
transmit_failures | UBIGINT | Total number of failed transmit attempts. |
Example
radio_listen
radio_listen checks for newly arrived messages across all subscriptions. A message is considered newly arrived if its never been seen before as indicated by the seen_count on the message record. radio_listen can optionally block execution for a specified time interval, waiting for any new message to arrive on any subscription. If no wait time is provided, radio_listen returns immediately with information about any currently subscriptions with unread messages. Use this function to pause execution until at least one message is ready to be processed.
Signature: radio_listen(BOOLEAN, INTERVAL)
Positional Arguments
wait_for_messages | BOOLEAN | Wait for messages to arrive before returning result. |
duration | INTERVAL | The amount of time to wait for messages to arrive. |
Return Value
Returns a table with this schema:
subscription_id | UBIGINT | The unique identifier of the subscription |
subscription_url | VARCHAR | The URL of the subscription that has newly messages. |
Example
radio_flush
radio_flush wants for any messages that are pending to be sent to be sent. A time interval is supplied after which radio_flush returns a row indicating if all messages have been sent.
Signature: radio_flush(INTERVAL)
Positional Arguments
duration | INTERVAL | The amount of time to wait for messages to be sent. |
Return Value
Returns a table with this schema:
all_messages_flushed | BOOLEAN | Indicate if all messages from all subscriptions have been sent. |
Example
radio_received_messages
This function returns messages received from all subscriptions.
Messages remain in the queue until they are automatically evicted due to limited queue capacity. The queue follows a first-in, first-out (FIFO) policy, so newer messages will overwrite the oldest ones when the queue is full. You can filter for new messages by adding seen_count = 0 to the WHERE clause if desired.
Each message has a unique identifier that is unique to the subscription where it was received.
Signature: radio_received_messages()
Return Value
Returns a table with this schema:
subscription_id | VARCHAR | The unique id of the subscription. |
subscription_url | VARCHAR | The URL of the subscription where the message was received. |
message_type | ENUM | The type of message on of message, error, connection, disconnection. |
message_id | UBIGINT | The unique id of the message in the subscription. |
receive_time | TIMESTAMP_NS | The timestamp that the message was received. |
seen_count | UBIGINT | The number of times the message has been seen or retrieved. |
channel | VARCHAR | The name of the channel where the message was received. |
message | BLOB | The message from the event bus. |
Example
radio_subscription_received_messages
Retrieves messages received from a given subscription.
Errors are stored separately from regular messages to ensure clarity, as they may not be easily distinguished in the message stream. Received messages are held in a queue until they are automatically evicted due to limited capacity. The queue operates in a first-in, first-out (FIFO) manner—newer messages will replace the oldest when the queue is full.
Signature: radio_subscription_received_messages(VARCHAR, VARCHAR)
Positional Arguments
url | VARCHAR | The URL of the subscription. |
Return Value
Returns a table with this schema:
message_id | UBIGINT | The unique id of the message in the subscription. |
receive_time | TIMESTAMP_NS | The timestamp that the message was received. |
seen_count | UBIGINT | The number of times the message has been retrieved. |
channel | VARCHAR | The name of the channel where the message was received. |
message | BLOB | The message from the event bus. |
Example
radio_subscription_transmit_messages
Retrieves transmit messages that will be sent via the specified subscription. Messages that have been sent will continue to be returned until evicted from the send queue. This is useful for delivery tracking and determining if a message was actually sent.
Signature: radio_subscription_transmit_messages(VARCHAR)
Positional Arguments
url | VARCHAR | The URL of the subscription. |
Return Value
Returns a table with this schema:
message_id | VARCHAR | The unique identifier of the transmit message. |
creation_time | TIMESTAMP_MS | The timestamp when the transmit message was crated. |
last_attempt_time | TIMESTAMP_MS | The timestamp when the message was last attempted to be transmitted. |
sent | BOOLEAN | Indicate if the message has been successfully sent. |
expire_time | TIMESTAMP_MS | The time at which attempts to transmit the message will end. |
max_attempts | UINTEGER | The number of attempts to try to send the message. |
try_count | UINTEGER | The number of attempts that have been performed to transmit the message. |
channel | VARCHAR | The channel where the message should be sent. |
message | BLOB | The message to transmit. |
Example
radio_transmit_message
Adds a new message to be transmitted via a subscription. Returns the unique identifier of the newly created message.
If the message is not able to be transmitted it will be retrieved using exponential backoff until the either the maximum number of attempts is exhausted or the expiration time is reached.
Signature: radio_transmit_message(VARCHAR, VARCHAR, BLOB, INTEGER, INTERVAL)
Positional Arguments
url | VARCHAR | The URL of the subscription where the message should be sent. |
channel | VARCHAR | The channel where the message should be sent |
message | BLOB | The message to send |
max_attempts | INTEGER | The maximum number of attempts to transmit this message. Must be greater than zero. |
expire_interval | INTERVAL | The maximum amount of time to spend attempting to transmit this message. If the message is not sent after this interval, it is marked as not be able to be sent. |
Example
radio_transmit_messages_delete_finished
Remove all transmit messags that have reached a terminal state from the transmit queue.
Signature: radio_transmit_messages_delete_finished(VARCHAR)
Positional Arguments
url | VARCHAR | The URL of the subscription where the message should be sent. |
Example
radio_subscription_transmit_message_delete
Remove a transmit message from the transmit queue by supplying its message identifier.
Signature: radio_subscription_transmit_message_delete(VARCHAR, UBIGINT)
Positional Arguments
url | VARCHAR | The URL of the subscription where the message should be sent. |
message_id | UBIGINT | The unique identifier of the transmit message to delete. |
Example
radio_version
Return the current version of the radio extension.
Signature: radio_version()