akkoma/lib/pleroma/web/federator/retry_queue.ex

75 lines
1.8 KiB
Elixir
Raw Normal View History

2018-08-26 12:17:13 -06:00
defmodule Pleroma.Web.Federator.RetryQueue do
use GenServer
2018-12-09 02:12:48 -07:00
2018-08-26 12:17:13 -06:00
require Logger
# initial timeout, 5 min
@initial_timeout 30_000
@max_retries 5
def init(args) do
{:ok, args}
end
def start_link() do
enabled = Pleroma.Config.get([:retry_queue, :enabled], false)
if enabled do
Logger.info("Starting retry queue")
GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
else
Logger.info("Retry queue disabled")
:ignore
end
2018-08-26 12:17:13 -06:00
end
def enqueue(data, transport, retries \\ 0) do
GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
end
2018-11-19 09:08:41 -07:00
def get_retry_params(retries) do
2018-08-26 12:17:13 -06:00
if retries > @max_retries do
2018-11-19 09:08:41 -07:00
{:drop, "Max retries reached"}
2018-08-26 12:17:13 -06:00
else
2018-11-19 09:08:41 -07:00
{:retry, growth_function(retries)}
2018-08-26 12:17:13 -06:00
end
end
2018-11-19 09:08:41 -07:00
def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
case get_retry_params(retries) do
2018-12-09 23:39:57 -07:00
{:retry, timeout} ->
2018-11-19 09:08:41 -07:00
Process.send_after(
__MODULE__,
{:send, data, transport, retries},
2018-12-09 23:39:57 -07:00
timeout
2018-11-19 09:08:41 -07:00
)
2018-08-26 12:17:13 -06:00
{:noreply, state}
2018-11-19 09:08:41 -07:00
{:drop, message} ->
Logger.debug(message)
{:noreply, %{state | dropped: drop_count + 1}}
2018-08-26 12:17:13 -06:00
end
end
2018-11-19 09:08:41 -07:00
def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
case transport.publish_one(data) do
2018-08-26 12:17:13 -06:00
{:ok, _} ->
2018-11-19 09:08:41 -07:00
{:noreply, %{state | delivered: delivery_count + 1}}
2018-08-26 12:17:13 -06:00
2018-12-09 02:12:48 -07:00
{:error, _reason} ->
2018-11-19 09:08:41 -07:00
enqueue(data, transport, retries)
2018-08-26 12:17:13 -06:00
{:noreply, state}
end
end
def handle_info(unknown, state) do
Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
{:noreply, state}
end
defp growth_function(retries) do
round(@initial_timeout * :math.pow(retries, 3))
end
end