diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 272c9e3e5..a217a717d 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -178,7 +178,7 @@ defmodule Mix.Tasks.Pleroma.Database do |> DateTime.from_naive!("Etc/UTC") |> Timex.shift(days: days) - Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ + Pleroma.Workers.PurgeExpiredActivity.schedule(%{ activity_id: activity.id, expires_at: expires_at }) diff --git a/lib/pleroma/broadway.ex b/lib/pleroma/broadway.ex index c86ae6836..a9a8bf3a1 100644 --- a/lib/pleroma/broadway.ex +++ b/lib/pleroma/broadway.ex @@ -14,7 +14,11 @@ defmodule Pleroma.Broadway do queue: @queue, declare: [ durable: true, - auto_delete: false + auto_delete: false, + exclusive: true, + arguments: [ + {"x-delayed-type", "direct"} + ] ], on_failure: :reject_and_requeue } @@ -83,11 +87,35 @@ defmodule Pleroma.Broadway do [Pleroma.Broadway] end - def produce(topic, args) do + defp maybe_add_headers([headers: headers] = opts, key, value) when is_list(headers) do + Keyword.put(opts, :headers, [{key, value} | headers]) + end + defp maybe_add_headers(opts, key, value) do + Keyword.put(opts, :headers, [{key, value}]) + end + + defp maybe_with_priority(opts, [priority: priority]) when is_integer(priority) do + Keyword.put(opts, :priority, priority) + end + defp maybe_with_priority(opts, _), do: opts + + defp maybe_with_delay(opts, [scheduled_at: scheduled_at]) do + time_in_ms = DateTime.diff(DateTime.utc_now(), scheduled_at) + opts + |> maybe_add_headers("x-delay", to_string(time_in_ms)) + end + defp maybe_with_delay(opts, _), do: opts + + def produce(topic, args, opts \\ []) do IO.puts("Producing message on #{topic}: #{inspect(args)}") {:ok, connection} = AMQP.Connection.open() {:ok, channel} = AMQP.Channel.open(connection) - AMQP.Basic.publish(channel, "", @queue, args) + publish_options = + [] + |> maybe_with_priority(opts) + |> maybe_with_delay(opts) + + AMQP.Basic.publish(channel, "", @queue, args, publish_options) AMQP.Connection.close(connection) end end diff --git a/lib/pleroma/filter.ex b/lib/pleroma/filter.ex index 82b9caf9b..8653043f0 100644 --- a/lib/pleroma/filter.ex +++ b/lib/pleroma/filter.ex @@ -133,7 +133,7 @@ defmodule Pleroma.Filter do defp maybe_add_expires_at(changeset, _), do: changeset defp maybe_add_expiration_job(%{expires_at: %NaiveDateTime{} = expires_at} = filter) do - Pleroma.Workers.PurgeExpiredFilter.enqueue(%{ + Pleroma.Workers.PurgeExpiredFilter.schedule(%{ filter_id: filter.id, expires_at: DateTime.from_naive!(expires_at, "Etc/UTC") }) diff --git a/lib/pleroma/mfa/token.ex b/lib/pleroma/mfa/token.ex index 76573182a..83e22eb9c 100644 --- a/lib/pleroma/mfa/token.ex +++ b/lib/pleroma/mfa/token.ex @@ -52,7 +52,7 @@ defmodule Pleroma.MFA.Token do @spec create(User.t(), Authorization.t() | nil) :: {:ok, t()} | {:error, Ecto.Changeset.t()} def create(user, authorization \\ nil) do with {:ok, token} <- do_create(user, authorization) do - Pleroma.Workers.PurgeExpiredToken.enqueue(%{ + Pleroma.Workers.PurgeExpiredToken.schedule(%{ token_id: token.id, valid_until: DateTime.from_naive!(token.valid_until, "Etc/UTC"), mod: __MODULE__ diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 63709a484..e759c8496 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -34,7 +34,7 @@ defmodule Pleroma.User.Backup do def create(user, admin_id \\ nil) do with :ok <- validate_limit(user, admin_id), {:ok, backup} <- user |> new() |> Repo.insert() do - BackupWorker.process(backup, admin_id) + BackupWorker.enqueue("process", %{"backup_id" => backup.id, "admin_user_id" => admin_id}) end end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 8e55df0d8..1238aeeb0 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -230,7 +230,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity ) do with {:ok, _job} <- - Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ + Pleroma.Workers.PurgeExpiredActivity.schedule(%{ activity_id: activity.id, expires_at: expires_at }) do diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index 34617a218..b90422ba8 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -384,7 +384,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do {:ok, expires_at} = Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at]) - Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ + Pleroma.Workers.PurgeExpiredActivity.schedule(%{ activity_id: meta[:activity_id], expires_at: expires_at }) diff --git a/lib/pleroma/web/o_auth/token.ex b/lib/pleroma/web/o_auth/token.ex index c9398aeaa..7aeb86461 100644 --- a/lib/pleroma/web/o_auth/token.ex +++ b/lib/pleroma/web/o_auth/token.ex @@ -117,7 +117,7 @@ defmodule Pleroma.Web.OAuth.Token do def create(%App{} = app, %User{} = user, attrs \\ %{}) do with {:ok, token} <- do_create(app, user, attrs) do if Pleroma.Config.get([:oauth2, :clean_expired_tokens]) do - Pleroma.Workers.PurgeExpiredToken.enqueue(%{ + Pleroma.Workers.PurgeExpiredToken.schedule(%{ token_id: token.id, valid_until: DateTime.from_naive!(token.valid_until, "Etc/UTC"), mod: __MODULE__ diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index cf78f1cb9..95a6acd42 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -3,36 +3,27 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.BackupWorker do - use Oban.Worker, queue: :backup, max_attempts: 1 - + use Pleroma.Workers.WorkerHelper, queue: "backup", max_attempts: 1 alias Oban.Job alias Pleroma.User.Backup - def process(backup, admin_user_id \\ nil) do - %{"op" => "process", "backup_id" => backup.id, "admin_user_id" => admin_user_id} - |> new() - |> Oban.insert() - end - @impl Oban.Worker def timeout(_job) do Pleroma.Config.get([:workers, :timeout, :backup], :timer.minutes(1)) end + @spec schedule_deletion(Backup.t()) :: + {:error, any} | {:ok, Oban.Job.t()} def schedule_deletion(backup) do days = Pleroma.Config.get([Backup, :purge_after_days]) time = 60 * 60 * 24 * days scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time) - %{"op" => "delete", "backup_id" => backup.id} - |> new(scheduled_at: scheduled_at) - |> Oban.insert() + enqueue("delete", %{"backup_id" => backup.id}, scheduled_at: scheduled_at) end def delete(backup) do - %{"op" => "delete", "backup_id" => backup.id} - |> new() - |> Oban.insert() + enqueue("delete", %{"backup_id" => backup.id}) end @impl true diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex index 652e1d6b1..e326727ba 100644 --- a/lib/pleroma/workers/purge_expired_activity.ex +++ b/lib/pleroma/workers/purge_expired_activity.ex @@ -7,23 +7,17 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do Worker which purges expired activity. """ - use Oban.Worker, queue: :activity_expiration, max_attempts: 1, unique: [period: :infinity] + use Pleroma.Workers.WorkerHelper, queue: "activity_expiration", max_attempts: 1, unique: [period: :infinity] import Ecto.Query alias Pleroma.Activity - @spec enqueue(map()) :: - {:ok, Oban.Job.t()} - | {:error, :expired_activities_disabled} - | {:error, :expiration_too_close} - def enqueue(args) do + def schedule(args) do with true <- enabled?() do {scheduled_at, args} = Map.pop(args, :expires_at) - args - |> new(scheduled_at: scheduled_at) - |> Oban.insert() + enqueue("delete", args, scheduled_at: scheduled_at) end end diff --git a/lib/pleroma/workers/purge_expired_filter.ex b/lib/pleroma/workers/purge_expired_filter.ex index 593380d13..13f8ec681 100644 --- a/lib/pleroma/workers/purge_expired_filter.ex +++ b/lib/pleroma/workers/purge_expired_filter.ex @@ -7,21 +7,17 @@ defmodule Pleroma.Workers.PurgeExpiredFilter do Worker which purges expired filters """ - use Oban.Worker, queue: :filter_expiration, max_attempts: 1, unique: [period: :infinity] + use Pleroma.Workers.WorkerHelper, queue: "filter_expiration", max_attempts: 1, unique: [period: :infinity] import Ecto.Query alias Oban.Job alias Pleroma.Repo - @spec enqueue(%{filter_id: integer(), expires_at: DateTime.t()}) :: - {:ok, Job.t()} | {:error, Ecto.Changeset.t()} - def enqueue(args) do + def schedule(args) do {scheduled_at, args} = Map.pop(args, :expires_at) - args - |> new(scheduled_at: scheduled_at) - |> Oban.insert() + enqueue("delete", args, scheduled_at: scheduled_at) end @impl Oban.Worker diff --git a/lib/pleroma/workers/purge_expired_token.ex b/lib/pleroma/workers/purge_expired_token.ex index b4db84f4e..a3303f310 100644 --- a/lib/pleroma/workers/purge_expired_token.ex +++ b/lib/pleroma/workers/purge_expired_token.ex @@ -7,16 +7,12 @@ defmodule Pleroma.Workers.PurgeExpiredToken do Worker which purges expired OAuth tokens """ - use Oban.Worker, queue: :token_expiration, max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "token_expiration", max_attempts: 1 - @spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) :: - {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()} - def enqueue(args) do + def schedule(args) do {scheduled_at, args} = Map.pop(args, :valid_until) - args - |> __MODULE__.new(scheduled_at: scheduled_at) - |> Oban.insert() + enqueue("delete", args, scheduled_at: scheduled_at) end @impl Oban.Worker