Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.16.0-otp-26
erlang 26.2.1
erlang 26.2.1
8 changes: 8 additions & 0 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ defmodule EventStore do
@type transient_subscribe_options :: [transient_subscribe_option]
@type persistent_subscription_option ::
transient_subscribe_option
| {:buffer_flush_after, non_neg_integer()}
| {:buffer_size, pos_integer()}
| {:checkpoint_after, non_neg_integer()}
| {:checkpoint_threshold, pos_integer()}
Expand Down Expand Up @@ -1146,6 +1147,13 @@ defmodule EventStore do
message queue from getting filled with events. Defaults to one in-flight
event.

- `buffer_flush_after` (milliseconds) used to ensure events are flushed
to the subscriber after a period of time even if the buffer size has not
been reached. This ensures events are delivered with bounded latency
during less busy periods. When set to 0 (default), no time-based
flushing is performed and events are only sent when the buffer_size is
reached. Each partition has its own independent timer.

- `checkpoint_threshold` determines how frequently a checkpoint is written
to the database for the subscription after events are acknowledged.
Increasing the threshold will reduce the number of database writes for
Expand Down
12 changes: 12 additions & 0 deletions lib/event_store/subscriptions/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ defmodule EventStore.Subscriptions.Subscription do
{:noreply, state}
end

@impl GenServer
def handle_info({:flush_buffer, partition_key}, %Subscription{} = state) do
%Subscription{subscription: subscription} = state

state =
subscription
|> SubscriptionFsm.flush_buffer(partition_key)
|> apply_subscription_to_state(state)

{:noreply, state}
end

@impl GenServer
def handle_info(
{EventStore.AdvisoryLocks, :lock_released, lock_ref, reason},
Expand Down
113 changes: 101 additions & 12 deletions lib/event_store/subscriptions/subscription_fsm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
selector: opts[:selector],
partition_by: opts[:partition_by],
buffer_size: opts[:buffer_size] || 1,
buffer_flush_after: opts[:buffer_flush_after] || 0,
checkpoint_after: opts[:checkpoint_after] || 0,
checkpoint_threshold: opts[:checkpoint_threshold] || 1,
query_timeout: opts[:query_timeout] || 15_000,
Expand Down Expand Up @@ -179,6 +180,15 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
defevent checkpoint(), data: %SubscriptionState{} = data do
next_state(:subscribed, persist_checkpoint(data))
end

defevent flush_buffer(partition_key), data: %SubscriptionState{} = data do
data =
data
|> clear_partition_timer(partition_key)
|> flush_partition_on_timeout(partition_key)

next_state(:subscribed, data)
end
end

defstate max_capacity do
Expand All @@ -199,6 +209,12 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
defevent checkpoint(), data: %SubscriptionState{} = data do
next_state(:subscribed, persist_checkpoint(data))
end

# Handle flush_buffer in max_capacity - just clear the timer, events stay queued
defevent flush_buffer(partition_key), data: %SubscriptionState{} = data do
data = clear_partition_timer(data, partition_key)
next_state(:max_capacity, data)
end
end

defstate disconnected do
Expand Down Expand Up @@ -497,12 +513,22 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

partition_key = partition_key(data, event)

# Check if this is a new partition (no existing queue)
is_new_partition = not Map.has_key?(partitions, partition_key)

partitions =
partitions
|> Map.put_new(partition_key, :queue.new())
|> Map.update!(partition_key, fn pending_events -> enqueue.(event, pending_events) end)

%SubscriptionState{data | partitions: partitions, queue_size: queue_size + 1}
data = %SubscriptionState{data | partitions: partitions, queue_size: queue_size + 1}

# Start timer when partition gets its first event
if is_new_partition do
maybe_start_partition_timer(data, partition_key)
else
data
end
end

def partition_key(%SubscriptionState{partition_by: nil}, %RecordedEvent{}), do: nil
Expand Down Expand Up @@ -545,20 +571,25 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

subscriber = Subscriber.track_in_flight(subscriber, event, partition_key)

partitions =
{partitions, partition_emptied} =
case :queue.is_empty(pending_events) do
true -> Map.delete(partitions, partition_key)
false -> Map.put(partitions, partition_key, pending_events)
true -> {Map.delete(partitions, partition_key), true}
false -> {Map.put(partitions, partition_key, pending_events), false}
end

%SubscriptionState{
data
| partitions: partitions,
subscribers: Map.put(subscribers, subscriber_pid, subscriber),
queue_size: max(queue_size - 1, 0)
}
|> track_sent(event_number)
|> notify_partition_subscriber(partition_key, [{subscriber_pid, event} | events_to_send])
data =
%SubscriptionState{
data
| partitions: partitions,
subscribers: Map.put(subscribers, subscriber_pid, subscriber),
queue_size: max(queue_size - 1, 0)
}
|> track_sent(event_number)

# Cancel the timer when the partition becomes empty
data = if partition_emptied, do: cancel_partition_timer(data, partition_key), else: data

notify_partition_subscriber(data, partition_key, [{subscriber_pid, event} | events_to_send])
else
_ ->
# No further queued event or available subscriber, send ready events to
Expand Down Expand Up @@ -755,4 +786,62 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

defp describe(%SubscriptionState{stream_uuid: stream_uuid, subscription_name: name}),
do: "Subscription #{inspect(name)}@#{inspect(stream_uuid)}"

# Buffer flush timer management

# Start a timer for a partition if buffer_flush_after is configured and no timer exists
defp maybe_start_partition_timer(
%SubscriptionState{buffer_flush_after: 0} = data,
_partition_key
),
do: data

defp maybe_start_partition_timer(%SubscriptionState{} = data, partition_key) do
%SubscriptionState{buffer_flush_after: buffer_flush_after, buffer_timers: buffer_timers} =
data

if Map.has_key?(buffer_timers, partition_key) do
# Timer already exists for this partition
data
else
# Start a new timer for this partition
timer_ref = Process.send_after(self(), {:flush_buffer, partition_key}, buffer_flush_after)
%SubscriptionState{data | buffer_timers: Map.put(buffer_timers, partition_key, timer_ref)}
end
end

# Cancel and clear the timer for a specific partition
defp cancel_partition_timer(%SubscriptionState{} = data, partition_key) do
%SubscriptionState{buffer_timers: buffer_timers} = data

case Map.get(buffer_timers, partition_key) do
nil ->
data

timer_ref ->
Process.cancel_timer(timer_ref)
%SubscriptionState{data | buffer_timers: Map.delete(buffer_timers, partition_key)}
end
end

# Clear the timer reference without cancelling (timer already fired)
defp clear_partition_timer(%SubscriptionState{} = data, partition_key) do
%SubscriptionState{buffer_timers: buffer_timers} = data
%SubscriptionState{data | buffer_timers: Map.delete(buffer_timers, partition_key)}
end

# Flush a partition when the buffer timeout fires
defp flush_partition_on_timeout(%SubscriptionState{} = data, partition_key) do
%SubscriptionState{partitions: partitions} = data

case Map.get(partitions, partition_key) do
nil ->
# Partition is empty, nothing to flush
data

_pending_events ->
# Try to notify subscribers for this partition
notify_partition_subscriber(data, partition_key)
end
end
end
10 changes: 10 additions & 0 deletions lib/event_store/subscriptions/subscription_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ defmodule EventStore.Subscriptions.SubscriptionState do
last_ack: 0,
queue_size: 0,
buffer_size: 1,
buffer_flush_after: 0,
buffer_timers: %{},
checkpoint_after: 0,
checkpoint_threshold: 1,
checkpoint_timer_ref: nil,
Expand All @@ -36,10 +38,18 @@ defmodule EventStore.Subscriptions.SubscriptionState do
]

def reset_event_tracking(%SubscriptionState{} = state) do
%SubscriptionState{buffer_timers: buffer_timers} = state

# Cancel all buffer flush timers
Enum.each(buffer_timers, fn {_partition_key, timer_ref} ->
Process.cancel_timer(timer_ref)
end)

%SubscriptionState{
state
| queue_size: 0,
partitions: %{},
buffer_timers: %{},
acknowledged_event_numbers: MapSet.new(),
in_flight_event_numbers: [],
checkpoints_pending: 0
Expand Down
Loading
Loading