Radio for DuckDB – DuckDB Now Talks to WebSockets and Redis Pub/Sub

1 day ago 5

A duck with a radio

The Radio extension by Query.Farm enables DuckDB to interact with real-time event systems such as WebSocket and Redis Publish/Subscribe servers. It allows DuckDB to both receive and send events: incoming messages are buffered and queryable with SQL, while outgoing events are buffered and support delivery tracking.

The extension is named Radio because it effectively equips DuckDB with a two-way radio—allowing it to listen for and broadcast messages across event-driven systems.

DuckDB extensions are modular plugins that enhance the core engine with powerful new capabilities like this.

Getting Started

Radio will be a DuckDB community extension maintained and supported by Query.Farm.

Install Radio in DuckDB by running:

INSTALL radio FROM community;

Then load it with:

Functionality

The Radio extension introduces a few core concepts.

At its core is a single radio object that manages multiple subscriptions to event sources. Think of the radio as being a radio that can listen to multiple broadcasts at the same time. The event sources that the radio extension can interact with include WebSocket servers and Redis publish/subscribe. Support for additional event systems will be added over time.

Additional event sources planned are: Google PubSub, Azure ServiceBus, MQTT / Mosquito, Subprocesses / Pipes.

If you’re interested in these additional event sources, contact us.

You can create multiple subscriptions to event sources, each subscription is indepent of other subscriptions. Each subscription has an independent connection to the event source and has a independent send and receive thread. Multiple subscriptions can exist simultaneously in the same DuckDB process (even to the same event source).

Every subscription maintains two internal queues:

  • Received messages
  • Messages to transmit

The received messages queue is size limited, as it fills up older messages are discarded based on the configured queue size. The size of the receive queue is configurable when the subcription is created.

Received messages have a field called seen_count which tracks the number of times the message has been seen by DuckDB. Newly received messages will have a seen_count of 1.

Architecture Diagram

This diagram shows the internal architecture of the extension and how it integrated into DuckDB.

Integration Flowchart

This flowchart shows which functions to use for each type of activity with the extension.

When Should I Use This?

Use the Radio extension when you want to receive or send events from a Websocket server or Redis Publish/Subscribe service within DuckDB—without needing external event-handling infrastructure or orchestration logic.

Here’s an example of connecting to a WebSocket server and receiving messages:

-- Connect to the server CALL radio_subscribe('ws://127.0.0.1:20400'); ┌─────────────────┐ │ subscription_id │ │ uint64 │ ├─────────────────┤ 0 └─────────────────┘ -- Block until a message is received, or timeout after 10 seconds CALL radio_listen(true, interval '10 seconds'); ┌─────────────────┬──────────────────────┐ │ subscription_id │ subscription_url │ │ uint64 │ varchar ├─────────────────┼──────────────────────┤ 0 │ ws://127.0.0.1:20400 └─────────────────┴──────────────────────┘ -- Return all received messages in the received message queue -- -- In this case it shows that the connection was made to the websocket server. -- The second message is just a normal message from the server. SELECT * FROM radio_received_messages(); subscription_id = 0 subscription_url = ws://127.0.0.1:20400 message_id = 1000 message_type = connection receive_time = 2025-06-02 15:11:26.676 seen_count = 1 channel = NULL message = subscription_id = 0 subscription_url = ws://127.0.0.1:20400 message_id = 1001 message_type = message receive_time = 2025-06-02 15:11:26.676 seen_count = 1 channel = NULL message = Message counter: 263 Server time: 2025-06-02T15:11:26.675966Z

API

The extension adds these functions:


radio_subscribe

This function creates a new subscription.

Signature:

radio_subscribe(VARCHAR, receive_message_capacity : INTEGER, transmit_retry_max_delay_ms : INTEGER, transmit_retry_initial_delay_ms : DOUBLE, transmit_retry_multiplier : DOUBLE)

Positional Arguments

url VARCHAR The URL of the event source, for websockets use wss://

Named Parameters

All named parameters are not required to be passed, if the parameter is not passed the default value will be used.

receive_message_capacity INTEGER 1000 Maximum number of messages that can be queued for receipt.
transmit_retry_max_delay_ms INTEGER 10000 Maximum delay (in milliseconds) between retry attempts when transmitting.
transmit_retry_initial_delay_ms INTEGER 100.0 Initial delay (in milliseconds) before the first retry attempt when transmitting.
transmit_retry_multiplier DOUBLE 1.5 Multiplier for exponential backoff between retry attempts during transmission.

Return Value

The unique subscription identifier is returned for the new subscription.

Example

CALL radio_subscribe('ws://127.0.0.1:20400'); ┌─────────────────┐ │ subscription_id │ │ uint64 │ ├─────────────────┤ 0 └─────────────────┘

radio_unsubscribe

This function creates a deletes a previously created subscription.

Signature: radio_unsubscribe(VARCHAR)

Positional Arguments

url VARCHAR The URL of the event source, for websockets use wss://

Return Value

The unique identifier of the subscription that was removed.

Example

CALL radio_unsubscribe('ws://127.0.0.1:20400'); ┌─────────────────┐ │ subscription_id │ │ uint64 │ ├─────────────────┤ 0 └─────────────────┘

radio_subscriptions

This function returns a table that contains information about all current subscriptions.

Signature: radio_subscriptions()

Return Value

Returns a table with this schema:

subscription_id UBIGINT Unique identifier for the subscription.
url VARCHAR The event source URL associated with the subscription.
creation_time TIMESTAMP_MS Timestamp when the subscription was created.
activation_time TIMESTAMP_MS Timestamp when the subscription was activated (i.e., connected to the URL).
disabled BOOLEAN Indicates whether the subscription is currently disabled.
received_last_message_time TIMESTAMP_MS Timestamp of the most recent received message.
received_messages_processed UBIGINT Total number of messages successfully received.
transmit_messages_processed UBIGINT Total number of transmit messages queued.
transmit_last_queue_time TIMESTAMP_MS Timestamp when the last transmit message was queued.
transmit_last_success_time TIMESTAMP_MS Timestamp of the last successful message transmission.
transmit_last_failure_time TIMESTAMP_MS Timestamp of the last failed message transmission attempt.
transmit_successes UBIGINT Total number of transmit messages sent successfully.
transmit_failures UBIGINT Total number of failed transmit attempts.

Example

SELECT * from radio_subscriptions(); subscription_id = 0 url = ws://127.0.0.1:20400 creation_time = 2025-05-28 23:11:33.749 activation_time = 2025-05-28 23:11:33.755 disabled = false received_last_message_time = 2025-05-28 23:11:33.755 received_messages_processed = 1 transmit_messages_processed = 0 transmit_messages_dropped = 0 transmit_last_queue_time = NULL transmit_last_success_time = NULL transmit_last_failure_time = NULL transmit_successes = 0 transmit_failures = 0

radio_listen

radio_listen checks for newly arrived messages across all subscriptions. A message is considered newly arrived if its never been seen before as indicated by the seen_count on the message record. radio_listen can optionally block execution for a specified time interval, waiting for any new message to arrive on any subscription. If no wait time is provided, radio_listen returns immediately with information about any currently subscriptions with unread messages. Use this function to pause execution until at least one message is ready to be processed.

Signature: radio_listen(BOOLEAN, INTERVAL)

Positional Arguments

wait_for_messages BOOLEAN Wait for messages to arrive before returning result.
duration INTERVAL The amount of time to wait for messages to arrive.

Return Value

Returns a table with this schema:

subscription_id UBIGINT The unique identifier of the subscription
subscription_url VARCHAR The URL of the subscription that has newly messages.

Example

CALL radio_listen(true, interval '10 seconds'); ┌─────────────────┬──────────────────────┐ │ subscription_id │ subscription_url │ │ uint64 │ varchar ├─────────────────┼──────────────────────┤ 0 │ ws://127.0.0.1:20400 └─────────────────┴──────────────────────┘

radio_flush

radio_flush wants for any messages that are pending to be sent to be sent. A time interval is supplied after which radio_flush returns a row indicating if all messages have been sent.

Signature: radio_flush(INTERVAL)

Positional Arguments

duration INTERVAL The amount of time to wait for messages to be sent.

Return Value

Returns a table with this schema:

all_messages_flushed BOOLEAN Indicate if all messages from all subscriptions have been sent.

Example

CALL radio_flush(interval '10 seconds'); ┌──────────────────────┐ │ all_messages_flushed │ boolean ├──────────────────────┤ true └──────────────────────┘

radio_received_messages

This function returns messages received from all subscriptions.

Messages remain in the queue until they are automatically evicted due to limited queue capacity. The queue follows a first-in, first-out (FIFO) policy, so newer messages will overwrite the oldest ones when the queue is full. You can filter for new messages by adding seen_count = 0 to the WHERE clause if desired.

Each message has a unique identifier that is unique to the subscription where it was received.

Signature: radio_received_messages()

Return Value

Returns a table with this schema:

subscription_id VARCHAR The unique id of the subscription.
subscription_url VARCHAR The URL of the subscription where the message was received.
message_type ENUM The type of message on of message, error, connection, disconnection.
message_id UBIGINT The unique id of the message in the subscription.
receive_time TIMESTAMP_NS The timestamp that the message was received.
seen_count UBIGINT The number of times the message has been seen or retrieved.
channel VARCHAR The name of the channel where the message was received.
message BLOB The message from the event bus.

Example

SELECT * FROM radio_received_messages(); subscription_id = 0 subscription_url = ws://127.0.0.1:20400 message_id = 1000 channel = NULL message_type = message receive_time = 2025-05-30 00:04:40.289 seen_count = 2 message = Message counter: 74 Server time: 2025-05-30T00:04:40.289464Z

radio_subscription_received_messages

Retrieves messages received from a given subscription.

Errors are stored separately from regular messages to ensure clarity, as they may not be easily distinguished in the message stream. Received messages are held in a queue until they are automatically evicted due to limited capacity. The queue operates in a first-in, first-out (FIFO) manner—newer messages will replace the oldest when the queue is full.

Signature: radio_subscription_received_messages(VARCHAR, VARCHAR)

Positional Arguments

url VARCHAR The URL of the subscription.

Return Value

Returns a table with this schema:

message_id UBIGINT The unique id of the message in the subscription.
receive_time TIMESTAMP_NS The timestamp that the message was received.
seen_count UBIGINT The number of times the message has been retrieved.
channel VARCHAR The name of the channel where the message was received.
message BLOB The message from the event bus.

Example

SELECT * from radio_subscription_received_messages('ws://127.0.0.1:20400', 'message'); message_id = 1000 receive_time = 2025-05-29 03:55:59.109 channel = NULL seen_count = 4 message = Message counter: 70 Server time: 2025-05-29T03:55:59.109567Z

radio_subscription_transmit_messages

Retrieves transmit messages that will be sent via the specified subscription. Messages that have been sent will continue to be returned until evicted from the send queue. This is useful for delivery tracking and determining if a message was actually sent.

Signature: radio_subscription_transmit_messages(VARCHAR)

Positional Arguments

url VARCHAR The URL of the subscription.

Return Value

Returns a table with this schema:

message_id VARCHAR The unique identifier of the transmit message.
creation_time TIMESTAMP_MS The timestamp when the transmit message was crated.
last_attempt_time TIMESTAMP_MS The timestamp when the message was last attempted to be transmitted.
sent BOOLEAN Indicate if the message has been successfully sent.
expire_time TIMESTAMP_MS The time at which attempts to transmit the message will end.
max_attempts UINTEGER The number of attempts to try to send the message.
try_count UINTEGER The number of attempts that have been performed to transmit the message.
channel VARCHAR The channel where the message should be sent.
message BLOB The message to transmit.

Example

SELECT * FROM radio_subscription_transmit_messages('ws://127.0.0.1:20400'); message_id = 1001 creation_time = 2025-05-29 03:56:00.206 last_attempt_time = NULL sent = false expire_time = 2025-05-29 03:57:00.202 max_attempts = 10 try_count = 0 channel = radio_test message = test_message

radio_transmit_message

Adds a new message to be transmitted via a subscription. Returns the unique identifier of the newly created message.

If the message is not able to be transmitted it will be retrieved using exponential backoff until the either the maximum number of attempts is exhausted or the expiration time is reached.

Signature: radio_transmit_message(VARCHAR, VARCHAR, BLOB, INTEGER, INTERVAL)

Positional Arguments

url VARCHAR The URL of the subscription where the message should be sent.
channel VARCHAR The channel where the message should be sent
message BLOB The message to send
max_attempts INTEGER The maximum number of attempts to transmit this message. Must be greater than zero.
expire_interval INTERVAL The maximum amount of time to spend attempting to transmit this message. If the message is not sent after this interval, it is marked as not be able to be sent.

Example

CALL radio_transmit_message( 'ws://127.0.0.1:20400', NULL, 'test message'::blob, 10, interval '1 minute'); ┌────────────┐ │ message_id │ │ uint64 │ ├────────────┤ 1002 └────────────┘

radio_transmit_messages_delete_finished

Remove all transmit messags that have reached a terminal state from the transmit queue.

Signature: radio_transmit_messages_delete_finished(VARCHAR)

Positional Arguments

url VARCHAR The URL of the subscription where the message should be sent.

Example

CALL radio_transmit_messages_delete_finished('ws://127.0.0.1:20400'); ┌─────────┐ │ ok │ boolean ├─────────┤ true └─────────┘

radio_subscription_transmit_message_delete

Remove a transmit message from the transmit queue by supplying its message identifier.

Signature: radio_subscription_transmit_message_delete(VARCHAR, UBIGINT)

Positional Arguments

url VARCHAR The URL of the subscription where the message should be sent.
message_id UBIGINT The unique identifier of the transmit message to delete.

Example

CALL radio_subscription_transmit_message_delete('ws://127.0.0.1:20400', 3022::UBIGINT); ┌─────────┐ │ ok │ boolean ├─────────┤ true └─────────┘

radio_version

Return the current version of the radio extension.

Signature: radio_version()

Example

SELECT radio_version(); ┌─────────────────┐ │ radio_version() │ varchar ├─────────────────┤ 20250601.01 └─────────────────┘
Read Entire Article