diff --git a/config/config.exs b/config/config.exs
index cf5f9cf27..727a2b0cb 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -856,8 +856,6 @@ config :pleroma, ConcurrentLimiter, [
{Pleroma.Search, [max_running: 30, max_waiting: 50]}
]
-config :pleroma, :search, provider: Pleroma.Search.Builtin
-
config :pleroma, Pleroma.Search, module: Pleroma.Search.DatabaseSearch
config :pleroma, Pleroma.Search.Meilisearch,
@@ -865,6 +863,22 @@ config :pleroma, Pleroma.Search.Meilisearch,
private_key: nil,
initial_indexing_chunk_size: 100_000
+config :pleroma, Pleroma.Search.Elasticsearch.Cluster,
+ url: "http://localhost:9200",
+ username: "elastic",
+ password: "changeme",
+ api: Elasticsearch.API.HTTP,
+ json_library: Jason,
+ indexes: %{
+ activities: %{
+ settings: "priv/es-mappings/activity.json",
+ store: Pleroma.Search.Elasticsearch.Store,
+ sources: [Pleroma.Activity],
+ bulk_page_size: 5000,
+ bulk_wait_interval: 15_000
+ }
+ }
+
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
diff --git a/lib/mix/tasks/pleroma/search.ex b/lib/mix/tasks/pleroma/search.ex
deleted file mode 100644
index 1fd880eab..000000000
--- a/lib/mix/tasks/pleroma/search.ex
+++ /dev/null
@@ -1,64 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Mix.Tasks.Pleroma.Search do
- use Mix.Task
- import Mix.Pleroma
- import Ecto.Query
- alias Pleroma.Activity
- alias Pleroma.Pagination
- alias Pleroma.User
- alias Pleroma.Hashtag
-
- @shortdoc "Manages elasticsearch"
-
- def run(["import", "activities" | _rest]) do
- start_pleroma()
-
- from(a in Activity, where: not ilike(a.actor, "%/relay"))
- |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
- |> Activity.with_preloaded_object()
- |> Activity.with_preloaded_user_actor()
- |> get_all(:activities)
- end
-
- def run(["import", "users" | _rest]) do
- start_pleroma()
-
- from(u in User, where: u.nickname not in ["internal.fetch", "relay"])
- |> get_all(:users)
- end
-
- def run(["import", "hashtags" | _rest]) do
- start_pleroma()
-
- from(h in Hashtag)
- |> Pleroma.Repo.all()
- |> Pleroma.Elasticsearch.bulk_post(:hashtags)
- end
-
- defp get_all(query, index, max_id \\ nil) do
- params = %{limit: 1000}
-
- params =
- if max_id == nil do
- params
- else
- Map.put(params, :max_id, max_id)
- end
-
- res =
- query
- |> Pagination.fetch_paginated(params)
-
- if res == [] do
- :ok
- else
- res
- |> Pleroma.Elasticsearch.bulk_post(index)
-
- get_all(query, index, List.last(res).id)
- end
- end
-end
diff --git a/lib/mix/tasks/pleroma/search/meilisearch.ex b/lib/mix/tasks/pleroma/search/meilisearch.ex
deleted file mode 100644
index d4a83c3cd..000000000
--- a/lib/mix/tasks/pleroma/search/meilisearch.ex
+++ /dev/null
@@ -1,144 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
- require Pleroma.Constants
-
- import Mix.Pleroma
- import Ecto.Query
-
- import Pleroma.Search.Meilisearch,
- only: [meili_post: 2, meili_put: 2, meili_get: 1, meili_delete!: 1]
-
- def run(["index"]) do
- start_pleroma()
-
- meili_version =
- (
- {:ok, result} = meili_get("/version")
-
- result["pkgVersion"]
- )
-
- # The ranking rule syntax was changed but nothing about that is mentioned in the changelog
- if not Version.match?(meili_version, ">= 0.25.0") do
- raise "Meilisearch <0.24.0 not supported"
- end
-
- {:ok, _} =
- meili_post(
- "/indexes/objects/settings/ranking-rules",
- [
- "published:desc",
- "words",
- "exactness",
- "proximity",
- "typo",
- "attribute",
- "sort"
- ]
- )
-
- {:ok, _} =
- meili_post(
- "/indexes/objects/settings/searchable-attributes",
- [
- "content"
- ]
- )
-
- IO.puts("Created indices. Starting to insert posts.")
-
- chunk_size = Pleroma.Config.get([Pleroma.Search.Meilisearch, :initial_indexing_chunk_size])
-
- Pleroma.Repo.transaction(
- fn ->
- query =
- from(Pleroma.Object,
- # Only index public and unlisted posts which are notes and have some text
- where:
- fragment("data->>'type' = 'Note'") and
- (fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) or
- fragment("data->'cc' \\? ?", ^Pleroma.Constants.as_public())),
- order_by: [desc: fragment("data->'published'")]
- )
-
- count = query |> Pleroma.Repo.aggregate(:count, :data)
- IO.puts("Entries to index: #{count}")
-
- Pleroma.Repo.stream(
- query,
- timeout: :infinity
- )
- |> Stream.map(&Pleroma.Search.Meilisearch.object_to_search_data/1)
- |> Stream.filter(fn o -> not is_nil(o) end)
- |> Stream.chunk_every(chunk_size)
- |> Stream.transform(0, fn objects, acc ->
- new_acc = acc + Enum.count(objects)
-
- # Reset to the beginning of the line and rewrite it
- IO.write("\r")
- IO.write("Indexed #{new_acc} entries")
-
- {[objects], new_acc}
- end)
- |> Stream.each(fn objects ->
- result =
- meili_put(
- "/indexes/objects/documents",
- objects
- )
-
- with {:ok, res} <- result do
- if not Map.has_key?(res, "uid") do
- IO.puts("\nFailed to index: #{inspect(result)}")
- end
- else
- e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}")
- end
- end)
- |> Stream.run()
- end,
- timeout: :infinity
- )
-
- IO.write("\n")
- end
-
- def run(["clear"]) do
- start_pleroma()
-
- meili_delete!("/indexes/objects/documents")
- end
-
- def run(["show-keys", master_key]) do
- start_pleroma()
-
- endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
-
- {:ok, result} =
- Pleroma.HTTP.get(
- Path.join(endpoint, "/keys"),
- [{"Authorization", "Bearer #{master_key}"}]
- )
-
- decoded = Jason.decode!(result.body)
-
- if decoded["results"] do
- Enum.each(decoded["results"], fn %{"description" => desc, "key" => key} ->
- IO.puts("#{desc}: #{key}")
- end)
- else
- IO.puts("Error fetching the keys, check the master key is correct: #{inspect(decoded)}")
- end
- end
-
- def run(["stats"]) do
- start_pleroma()
-
- {:ok, result} = meili_get("/indexes/objects/stats")
- IO.puts("Number of entries: #{result["numberOfDocuments"]}")
- IO.puts("Indexing? #{result["isIndexing"]}")
- end
-end
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index be03cdffb..b709e737b 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -105,6 +105,7 @@ defmodule Pleroma.Application do
{Oban, Config.get(Oban)},
Pleroma.Web.Endpoint
] ++
+ elasticsearch_children() ++
task_children(@mix_env) ++
dont_run_in_test(@mix_env) ++
shout_child(shout_enabled?())
@@ -303,6 +304,16 @@ defmodule Pleroma.Application do
defp http_children(_, _), do: []
+ def elasticsearch_children do
+ config = Config.get([Pleroma.Search, :module])
+
+ if config == Pleroma.Search.Elasticsearch do
+ [Pleroma.Search.Elasticsearch.Cluster]
+ else
+ []
+ end
+ end
+
@spec limiters_setup() :: :ok
def limiters_setup do
config = Config.get(ConcurrentLimiter, [])
diff --git a/lib/pleroma/elasticsearch/document_mappings/activity.ex b/lib/pleroma/elasticsearch/document_mappings/activity.ex
deleted file mode 100644
index a028c6fad..000000000
--- a/lib/pleroma/elasticsearch/document_mappings/activity.ex
+++ /dev/null
@@ -1,19 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Elasticsearch.DocumentMappings.Activity do
- alias Pleroma.Object
-
- def id(obj), do: obj.id
-
- def encode(%{object: %{data: %{"type" => "Note"}}} = activity) do
- %{
- _timestamp: activity.inserted_at,
- user: activity.user_actor.nickname,
- content: activity.object.data["content"],
- instance: URI.parse(activity.user_actor.ap_id).host,
- hashtags: Object.hashtags(activity.object)
- }
- end
-end
diff --git a/lib/pleroma/elasticsearch/document_mappings/hashtag.ex b/lib/pleroma/elasticsearch/document_mappings/hashtag.ex
deleted file mode 100644
index 7391983f6..000000000
--- a/lib/pleroma/elasticsearch/document_mappings/hashtag.ex
+++ /dev/null
@@ -1,21 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Elasticsearch.DocumentMappings.Hashtag do
- def id(obj), do: obj.id
-
- def encode(%{timestamp: _} = hashtag) do
- %{
- hashtag: hashtag.name,
- timestamp: hashtag.timestamp
- }
- end
-
- def encode(hashtag) do
- %{
- hashtag: hashtag.name,
- timestamp: hashtag.inserted_at
- }
- end
-end
diff --git a/lib/pleroma/elasticsearch/document_mappings/user.ex b/lib/pleroma/elasticsearch/document_mappings/user.ex
deleted file mode 100644
index d5cfca656..000000000
--- a/lib/pleroma/elasticsearch/document_mappings/user.ex
+++ /dev/null
@@ -1,17 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Elasticsearch.DocumentMappings.User do
- def id(obj), do: obj.id
-
- def encode(%{actor_type: "Person"} = user) do
- %{
- timestamp: user.inserted_at,
- instance: URI.parse(user.ap_id).host,
- nickname: user.nickname,
- bio: user.bio,
- display_name: user.name
- }
- end
-end
diff --git a/lib/pleroma/elasticsearch/store.ex b/lib/pleroma/elasticsearch/store.ex
deleted file mode 100644
index 98c88a7c7..000000000
--- a/lib/pleroma/elasticsearch/store.ex
+++ /dev/null
@@ -1,256 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Elasticsearch do
- alias Pleroma.Activity
- alias Pleroma.User
- alias Pleroma.Object
- alias Pleroma.Elasticsearch.DocumentMappings
- alias Pleroma.Config
- require Logger
-
- defp url do
- Config.get([:elasticsearch, :url])
- end
-
- defp enabled? do
- Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
- end
-
- def delete_by_id(:activity, id) do
- if enabled?() do
- Elastix.Document.delete(url(), "activities", "activity", id)
- end
- end
-
- def put_by_id(:activity, id) do
- id
- |> Activity.get_by_id_with_object()
- |> maybe_put_into_elasticsearch()
- end
-
- def maybe_put_into_elasticsearch({:ok, item}) do
- maybe_put_into_elasticsearch(item)
- end
-
- def maybe_put_into_elasticsearch(
- %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
- ) do
- if enabled?() do
- actor = Pleroma.Activity.user_actor(activity)
-
- activity
- |> Map.put(:user_actor, actor)
- |> put()
- end
- end
-
- def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
- if enabled?() do
- put(user)
- end
- end
-
- def maybe_put_into_elasticsearch(_) do
- {:ok, :skipped}
- end
-
- def maybe_bulk_post(data, type) do
- if enabled?() do
- bulk_post(data, type)
- end
- end
-
- def put(%Activity{} = activity) do
- with {:ok, _} <-
- Elastix.Document.index(
- url(),
- "activities",
- "activity",
- DocumentMappings.Activity.id(activity),
- DocumentMappings.Activity.encode(activity)
- ) do
- activity
- |> Map.get(:object)
- |> Object.hashtags()
- |> Enum.map(fn x ->
- %{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())}
- end)
- |> bulk_post(:hashtags)
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not put activity: #{err}")
- :skipped
- end
- end
-
- def put(%User{} = user) do
- with {:ok, _} <-
- Elastix.Document.index(
- url(),
- "users",
- "user",
- DocumentMappings.User.id(user),
- DocumentMappings.User.encode(user)
- ) do
- :ok
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not put user: #{err}")
- :skipped
- end
- end
-
- def bulk_post(data, :activities) do
- d =
- data
- |> Enum.filter(fn x ->
- t =
- x.object
- |> Map.get(:data, %{})
- |> Map.get("type", "")
-
- t == "Note"
- end)
- |> Enum.map(fn d ->
- [
- %{index: %{_id: DocumentMappings.Activity.id(d)}},
- DocumentMappings.Activity.encode(d)
- ]
- end)
- |> List.flatten()
-
- with {:ok, %{body: %{"errors" => false}}} <-
- Elastix.Bulk.post(
- url(),
- d,
- index: "activities",
- type: "activity"
- ) do
- :ok
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not bulk put activity: #{err}")
- :skipped
-
- {:ok, %{body: _}} ->
- :skipped
- end
- end
-
- def bulk_post(data, :users) do
- d =
- data
- |> Enum.filter(fn x -> x.actor_type == "Person" end)
- |> Enum.map(fn d ->
- [
- %{index: %{_id: DocumentMappings.User.id(d)}},
- DocumentMappings.User.encode(d)
- ]
- end)
- |> List.flatten()
-
- with {:ok, %{body: %{"errors" => false}}} <-
- Elastix.Bulk.post(
- url(),
- d,
- index: "users",
- type: "user"
- ) do
- :ok
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not bulk put users: #{err}")
- :skipped
-
- {:ok, %{body: _}} ->
- :skipped
- end
- end
-
- def bulk_post(data, :hashtags) when is_list(data) do
- d =
- data
- |> Enum.map(fn d ->
- [
- %{index: %{_id: DocumentMappings.Hashtag.id(d)}},
- DocumentMappings.Hashtag.encode(d)
- ]
- end)
- |> List.flatten()
-
- with {:ok, %{body: %{"errors" => false}}} <-
- Elastix.Bulk.post(
- url(),
- d,
- index: "hashtags",
- type: "hashtag"
- ) do
- :ok
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not bulk put hashtags: #{err}")
- :skipped
-
- {:ok, %{body: _}} ->
- :skipped
- end
- end
-
- def bulk_post(_, :hashtags), do: {:ok, nil}
-
- def search(_, _, _, :skip), do: []
-
- def search(:raw, index, type, q) do
- with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
- results =
- raw_results
- |> Map.get(:body, %{})
- |> Map.get("hits", %{})
- |> Map.get("hits", [])
-
- {:ok, results}
- else
- {:error, e} ->
- Logger.error(e)
- {:error, e}
- end
- end
-
- def search(:activities, q) do
- with {:ok, results} <- search(:raw, "activities", "activity", q) do
- results
- |> Enum.map(fn result -> result["_id"] end)
- |> Pleroma.Activity.all_by_ids_with_object()
- |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
- else
- e ->
- Logger.error(e)
- []
- end
- end
-
- def search(:users, q) do
- with {:ok, results} <- search(:raw, "users", "user", q) do
- results
- |> Enum.map(fn result -> result["_id"] end)
- |> Pleroma.User.get_all_by_ids()
- else
- e ->
- Logger.error(e)
- []
- end
- end
-
- def search(:hashtags, q) do
- with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
- results
- |> Enum.map(fn result -> result["_source"]["hashtag"] end)
- else
- e ->
- Logger.error(e)
- []
- end
- end
-end
diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex
index cdbfeab02..53e2e9c89 100644
--- a/lib/pleroma/hashtag.ex
+++ b/lib/pleroma/hashtag.ex
@@ -61,7 +61,6 @@ defmodule Pleroma.Hashtag do
{:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
end)
|> Repo.transaction() do
- Pleroma.Elasticsearch.maybe_bulk_post(hashtags, :hashtags)
{:ok, hashtags}
else
{:error, _name, value, _changes_so_far} -> {:error, value}
diff --git a/lib/pleroma/search/builtin.ex b/lib/pleroma/search/builtin.ex
deleted file mode 100644
index 3cbe2207a..000000000
--- a/lib/pleroma/search/builtin.ex
+++ /dev/null
@@ -1,138 +0,0 @@
-defmodule Pleroma.Search.Builtin do
- @behaviour Pleroma.Search
-
- alias Pleroma.Repo
- alias Pleroma.User
- alias Pleroma.Activity
- alias Pleroma.Web.MastodonAPI.AccountView
- alias Pleroma.Web.MastodonAPI.StatusView
- alias Pleroma.Web.Endpoint
-
- require Logger
-
- @impl Pleroma.Search
- def search(_conn, %{q: query} = params, options) do
- version = Keyword.get(options, :version)
- timeout = Keyword.get(Repo.config(), :timeout, 15_000)
- query = String.trim(query)
- default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
-
- default_values
- |> Enum.map(fn {resource, default_value} ->
- if params[:type] in [nil, resource] do
- {resource, fn -> resource_search(version, resource, query, options) end}
- else
- {resource, fn -> default_value end}
- end
- end)
- |> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
- timeout: timeout,
- on_timeout: :kill_task
- )
- |> Enum.reduce(default_values, fn
- {:ok, {resource, result}}, acc ->
- Map.put(acc, resource, result)
-
- _error, acc ->
- acc
- end)
- end
-
- defp resource_search(_, "accounts", query, options) do
- accounts = with_fallback(fn -> User.search(query, options) end)
-
- AccountView.render("index.json",
- users: accounts,
- for: options[:for_user],
- embed_relationships: options[:embed_relationships]
- )
- end
-
- defp resource_search(_, "statuses", query, options) do
- statuses = with_fallback(fn -> Activity.search(options[:for_user], query, options) end)
-
- StatusView.render("index.json",
- activities: statuses,
- for: options[:for_user],
- as: :activity
- )
- end
-
- defp resource_search(:v2, "hashtags", query, options) do
- tags_path = Endpoint.url() <> "/tag/"
-
- query
- |> prepare_tags(options)
- |> Enum.map(fn tag ->
- %{name: tag, url: tags_path <> tag}
- end)
- end
-
- defp resource_search(:v1, "hashtags", query, options) do
- prepare_tags(query, options)
- end
-
- defp prepare_tags(query, options) do
- tags =
- query
- |> preprocess_uri_query()
- |> String.split(~r/[^#\w]+/u, trim: true)
- |> Enum.uniq_by(&String.downcase/1)
-
- explicit_tags = Enum.filter(tags, fn tag -> String.starts_with?(tag, "#") end)
-
- tags =
- if Enum.any?(explicit_tags) do
- explicit_tags
- else
- tags
- end
-
- tags = Enum.map(tags, fn tag -> String.trim_leading(tag, "#") end)
-
- tags =
- if Enum.empty?(explicit_tags) && !options[:skip_joined_tag] do
- add_joined_tag(tags)
- else
- tags
- end
-
- Pleroma.Pagination.paginate(tags, options)
- end
-
- # If `query` is a URI, returns last component of its path, otherwise returns `query`
- defp preprocess_uri_query(query) do
- if query =~ ~r/https?:\/\// do
- query
- |> String.trim_trailing("/")
- |> URI.parse()
- |> Map.get(:path)
- |> String.split("/")
- |> Enum.at(-1)
- else
- query
- end
- end
-
- defp add_joined_tag(tags) do
- tags
- |> Kernel.++([joined_tag(tags)])
- |> Enum.uniq_by(&String.downcase/1)
- end
-
- defp joined_tag(tags) do
- tags
- |> Enum.map(fn tag -> String.capitalize(tag) end)
- |> Enum.join()
- end
-
- defp with_fallback(f, fallback \\ []) do
- try do
- f.()
- rescue
- error ->
- Logger.error("#{__MODULE__} search error: #{inspect(error)}")
- fallback
- end
- end
-end
diff --git a/lib/pleroma/search/elasticsearch.ex b/lib/pleroma/search/elasticsearch.ex
index 76d2c3277..7c7ca82c8 100644
--- a/lib/pleroma/search/elasticsearch.ex
+++ b/lib/pleroma/search/elasticsearch.ex
@@ -3,24 +3,22 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.Elasticsearch do
- @behaviour Pleroma.Search
+ @behaviour Pleroma.Search.SearchBackend
alias Pleroma.Activity
alias Pleroma.Object.Fetcher
- alias Pleroma.Web.MastodonAPI.StatusView
- alias Pleroma.Web.MastodonAPI.AccountView
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Search.Elasticsearch.Parsers
- alias Pleroma.Web.Endpoint
- def es_query(:activity, query) do
+ def es_query(:activity, query, offset, limit) do
must = Parsers.Activity.parse(query)
if must == [] do
:skip
else
%{
- size: 50,
+ size: limit,
+ from: offset,
terminate_after: 50,
timeout: "5s",
sort: [
@@ -36,50 +34,6 @@ defmodule Pleroma.Search.Elasticsearch do
end
end
- def es_query(:user, query) do
- must = Parsers.User.parse(query)
-
- if must == [] do
- :skip
- else
- %{
- size: 50,
- terminate_after: 50,
- timeout: "5s",
- sort: [
- "_score"
- ],
- query: %{
- bool: %{
- must: must
- }
- }
- }
- end
- end
-
- def es_query(:hashtag, query) do
- must = Parsers.Hashtag.parse(query)
-
- if must == [] do
- :skip
- else
- %{
- size: 50,
- terminate_after: 50,
- timeout: "5s",
- sort: [
- "_score"
- ],
- query: %{
- bool: %{
- must: Parsers.Hashtag.parse(query)
- }
- }
- }
- end
- end
-
defp maybe_fetch(:activity, search_query) do
with true <- Regex.match?(~r/https?:/, search_query),
{:ok, object} <- Fetcher.fetch_object_from_id(search_query),
@@ -90,8 +44,10 @@ defmodule Pleroma.Search.Elasticsearch do
end
end
- @impl Pleroma.Search
- def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do
+ def search(user, query, options) do
+ limit = Enum.min([Keyword.get(options, :limit), 40])
+ offset = Keyword.get(options, :offset, 0)
+
parsed_query =
query
|> String.trim()
@@ -104,30 +60,13 @@ defmodule Pleroma.Search.Elasticsearch do
activity_task =
Task.async(fn ->
- q = es_query(:activity, parsed_query)
+ q = es_query(:activity, parsed_query, offset, limit)
- Pleroma.Elasticsearch.search(:activities, q)
+ Pleroma.Search.Elasticsearch.Store.search(:activities, q)
|> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end)
end)
- user_task =
- Task.async(fn ->
- q = es_query(:user, parsed_query)
-
- Pleroma.Elasticsearch.search(:users, q)
- |> Enum.filter(fn x -> Pleroma.User.visible_for(x, user) == :visible end)
- end)
-
- hashtag_task =
- Task.async(fn ->
- q = es_query(:hashtag, parsed_query)
-
- Pleroma.Elasticsearch.search(:hashtags, q)
- end)
-
activity_results = Task.await(activity_task)
- user_results = Task.await(user_task)
- hashtag_results = Task.await(hashtag_task)
direct_activity = Task.await(activity_fetch_task)
activity_results =
@@ -137,25 +76,16 @@ defmodule Pleroma.Search.Elasticsearch do
[direct_activity | activity_results]
end
- %{
- "accounts" =>
- AccountView.render("index.json",
- users: user_results,
- for: user
- ),
- "hashtags" =>
- Enum.map(hashtag_results, fn x ->
- %{
- url: Endpoint.url() <> "/tag/" <> x,
- name: x
- }
- end),
- "statuses" =>
- StatusView.render("index.json",
- activities: activity_results,
- for: user,
- as: :activity
- )
- }
+ activity_results
+ end
+
+ @impl true
+ def add_to_index(activity) do
+ Elasticsearch.put_document(Pleroma.Search.Elasticsearch.Cluster, activity, "activities")
+ end
+
+ @impl true
+ def remove_from_index(object) do
+ Elasticsearch.delete_document(Pleroma.Search.Elasticsearch.Cluster, object, "activities")
end
end
diff --git a/lib/pleroma/search/elasticsearch/cluster.ex b/lib/pleroma/search/elasticsearch/cluster.ex
new file mode 100644
index 000000000..4f76c4ebc
--- /dev/null
+++ b/lib/pleroma/search/elasticsearch/cluster.ex
@@ -0,0 +1,4 @@
+defmodule Pleroma.Search.Elasticsearch.Cluster do
+ @moduledoc false
+ use Elasticsearch.Cluster, otp_app: :pleroma
+end
diff --git a/lib/pleroma/search/elasticsearch/document_mappings/activity.ex b/lib/pleroma/search/elasticsearch/document_mappings/activity.ex
new file mode 100644
index 000000000..edd8e03c1
--- /dev/null
+++ b/lib/pleroma/search/elasticsearch/document_mappings/activity.ex
@@ -0,0 +1,55 @@
+# Akkoma: A lightweight social networking server
+# Copyright © 2022-2022 Akkoma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defimpl Elasticsearch.Document, for: Pleroma.Activity do
+ alias Pleroma.Object
+ require Pleroma.Constants
+
+ def id(obj), do: obj.id
+ def routing(_), do: false
+
+ def object_to_search_data(object) do
+ # Only index public or unlisted Notes
+ if not is_nil(object) and object.data["type"] == "Note" and
+ not is_nil(object.data["content"]) and
+ (Pleroma.Constants.as_public() in object.data["to"] or
+ Pleroma.Constants.as_public() in object.data["cc"]) and
+ String.length(object.data["content"]) > 1 do
+ data = object.data
+
+ content_str =
+ case data["content"] do
+ [nil | rest] -> to_string(rest)
+ str -> str
+ end
+
+ content =
+ with {:ok, scrubbed} <- FastSanitize.strip_tags(content_str),
+ trimmed <- String.trim(scrubbed) do
+ trimmed
+ end
+
+ if String.length(content) > 1 do
+ {:ok, published, _} = DateTime.from_iso8601(data["published"])
+
+ %{
+ _timestamp: published,
+ content: content,
+ instance: URI.parse(object.data["actor"]).host,
+ hashtags: Object.hashtags(object),
+ user: Pleroma.User.get_cached_by_ap_id(object.data["actor"]).nickname
+ }
+ else
+ %{}
+ end
+ else
+ %{}
+ end
+ end
+
+ def encode(activity) do
+ object = Pleroma.Object.normalize(activity)
+ object_to_search_data(object)
+ end
+end
diff --git a/lib/pleroma/search/elasticsearch/hashtag_parser.ex b/lib/pleroma/search/elasticsearch/hashtag_parser.ex
deleted file mode 100644
index 911dc651c..000000000
--- a/lib/pleroma/search/elasticsearch/hashtag_parser.ex
+++ /dev/null
@@ -1,34 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Search.Elasticsearch.Parsers.Hashtag do
- defp to_es(term) when is_binary(term) do
- %{
- term: %{
- hashtag: %{
- value: String.downcase(term)
- }
- }
- }
- end
-
- defp to_es({:quoted, term}), do: to_es(term)
-
- defp to_es({:filter, ["hashtag", query]}) do
- %{
- term: %{
- hashtag: %{
- value: String.downcase(query)
- }
- }
- }
- end
-
- defp to_es({:filter, _}), do: nil
-
- def parse(q) do
- Enum.map(q, &to_es/1)
- |> Enum.filter(fn x -> x != nil end)
- end
-end
diff --git a/lib/pleroma/search/elasticsearch/store.ex b/lib/pleroma/search/elasticsearch/store.ex
new file mode 100644
index 000000000..895b76d7f
--- /dev/null
+++ b/lib/pleroma/search/elasticsearch/store.ex
@@ -0,0 +1,52 @@
+# Akkoma: A lightweight social networking server
+# Copyright © 2022-2022 Akkoma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Search.Elasticsearch.Store do
+ @behaviour Elasticsearch.Store
+ alias Pleroma.Search.Elasticsearch.Cluster
+ require Logger
+
+ alias Pleroma.Repo
+
+ @impl true
+ def stream(schema) do
+ Repo.stream(schema)
+ end
+
+ @impl true
+ def transaction(fun) do
+ {:ok, result} = Repo.transaction(fun, timeout: :infinity)
+ result
+ end
+
+ def search(_, _, _, :skip), do: []
+
+ def search(:raw, index, q) do
+ with {:ok, raw_results} <- Elasticsearch.post(Cluster, "/#{index}/_search", q) do
+ results =
+ raw_results
+ |> Map.get("hits", %{})
+ |> Map.get("hits", [])
+
+ {:ok, results}
+ else
+ {:error, e} ->
+ Logger.error(e)
+ {:error, e}
+ end
+ end
+
+ def search(:activities, q) do
+ with {:ok, results} <- search(:raw, "activities", q) do
+ results
+ |> Enum.map(fn result -> result["_id"] end)
+ |> Pleroma.Activity.all_by_ids_with_object()
+ |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
+ else
+ e ->
+ Logger.error(e)
+ []
+ end
+ end
+end
diff --git a/lib/pleroma/search/elasticsearch/user_paser.ex b/lib/pleroma/search/elasticsearch/user_paser.ex
deleted file mode 100644
index 4176c6141..000000000
--- a/lib/pleroma/search/elasticsearch/user_paser.ex
+++ /dev/null
@@ -1,57 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Search.Elasticsearch.Parsers.User do
- defp to_es(term) when is_binary(term) do
- %{
- bool: %{
- minimum_should_match: 1,
- should: [
- %{
- match: %{
- bio: %{
- query: term,
- operator: "AND"
- }
- }
- },
- %{
- term: %{
- nickname: %{
- value: term
- }
- }
- },
- %{
- match: %{
- display_name: %{
- query: term,
- operator: "AND"
- }
- }
- }
- ]
- }
- }
- end
-
- defp to_es({:quoted, term}), do: to_es(term)
-
- defp to_es({:filter, ["user", query]}) do
- %{
- term: %{
- nickname: %{
- value: query
- }
- }
- }
- end
-
- defp to_es({:filter, _}), do: nil
-
- def parse(q) do
- Enum.map(q, &to_es/1)
- |> Enum.filter(fn x -> x != nil end)
- end
-end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 9a50ee3ec..dc6c661ea 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -1095,7 +1095,6 @@ defmodule Pleroma.User do
was_superuser_before_update = User.superuser?(user)
with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
- Pleroma.Elasticsearch.maybe_put_into_elasticsearch(user)
set_cache(user)
end
|> maybe_remove_report_notifications(was_superuser_before_update)
diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex
index 214647dbf..d4e507287 100644
--- a/lib/pleroma/web/activity_pub/pipeline.ex
+++ b/lib/pleroma/web/activity_pub/pipeline.ex
@@ -28,7 +28,6 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
case Repo.transaction(fn -> do_common_pipeline(object, meta) end, Utils.query_timeout()) do
{:ok, {:ok, activity, meta}} ->
side_effects().handle_after_transaction(meta)
- side_effects().handle_after_transaction(activity)
{:ok, activity, meta}
{:ok, value} ->
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index 517dd0a4f..e2371b693 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors
+# Copyright © 2017-2022 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.SideEffects do
@@ -272,6 +272,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
reacted_object = Object.get_by_ap_id(object.data["object"])
Utils.add_emoji_reaction_to_object(object, reacted_object)
+
Notification.create_notifications(object)
{:ok, object, meta}
@@ -547,24 +548,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end
@impl true
- def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
- Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
- end
-
- def handle_after_transaction(%Pleroma.Activity{
- data: %{"type" => "Delete", "deleted_activity_id" => id}
- }) do
- Pleroma.Elasticsearch.delete_by_id(:activity, id)
- end
-
- def handle_after_transaction(%Pleroma.Activity{}) do
- :ok
- end
-
- def handle_after_transaction(%Pleroma.Object{}) do
- :ok
- end
-
def handle_after_transaction(meta) do
meta
|> send_notifications()
diff --git a/lib/pleroma/web/activity_pub/side_effects/handling.ex b/lib/pleroma/web/activity_pub/side_effects/handling.ex
index a82305155..eb012f576 100644
--- a/lib/pleroma/web/activity_pub/side_effects/handling.ex
+++ b/lib/pleroma/web/activity_pub/side_effects/handling.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors
+# Copyright © 2017-2022 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.SideEffects.Handling do
diff --git a/lib/pleroma/web/common_api.ex b/lib/pleroma/web/common_api.ex
index 92afd5cb6..856fa95b9 100644
--- a/lib/pleroma/web/common_api.ex
+++ b/lib/pleroma/web/common_api.ex
@@ -396,13 +396,7 @@ defmodule Pleroma.Web.CommonAPI do
def post(user, %{status: _} = data) do
with {:ok, draft} <- ActivityDraft.create(user, data) do
- activity = ActivityPub.create(draft.changes, draft.preview?)
-
- unless draft.preview? do
- Pleroma.Elasticsearch.maybe_put_into_elasticsearch(activity)
- end
-
- activity
+ ActivityPub.create(draft.changes, draft.preview?)
end
end
diff --git a/lib/pleroma/web/mastodon_api/controllers/search_controller.ex b/lib/pleroma/web/mastodon_api/controllers/search_controller.ex
index 751d46cdf..e4acba226 100644
--- a/lib/pleroma/web/mastodon_api/controllers/search_controller.ex
+++ b/lib/pleroma/web/mastodon_api/controllers/search_controller.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors
+# Copyright © 2017-2022 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.MastodonAPI.SearchController do
@@ -8,7 +8,9 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.ControllerHelper
+ alias Pleroma.Web.Endpoint
alias Pleroma.Web.MastodonAPI.AccountView
+ alias Pleroma.Web.MastodonAPI.StatusView
alias Pleroma.Web.Plugs.OAuthScopesPlug
alias Pleroma.Web.Plugs.RateLimiter
@@ -42,13 +44,34 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do
def search2(conn, params), do: do_search(:v2, conn, params)
def search(conn, params), do: do_search(:v1, conn, params)
- defp do_search(version, %{assigns: %{user: user}} = conn, params) do
- options =
- search_options(params, user)
- |> Keyword.put(:version, version)
+ defp do_search(version, %{assigns: %{user: user}} = conn, %{q: query} = params) do
+ query = String.trim(query)
+ options = search_options(params, user)
+ timeout = Keyword.get(Repo.config(), :timeout, 15_000)
+ default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
- search_provider = Pleroma.Config.get([:search, :provider])
- json(conn, search_provider.search(conn, params, options))
+ result =
+ default_values
+ |> Enum.map(fn {resource, default_value} ->
+ if params[:type] in [nil, resource] do
+ {resource, fn -> resource_search(version, resource, query, options) end}
+ else
+ {resource, fn -> default_value end}
+ end
+ end)
+ |> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
+ timeout: timeout,
+ on_timeout: :kill_task
+ )
+ |> Enum.reduce(default_values, fn
+ {:ok, {resource, result}}, acc ->
+ Map.put(acc, resource, result)
+
+ _error, acc ->
+ acc
+ end)
+
+ json(conn, result)
end
defp search_options(params, user) do
diff --git a/mix.exs b/mix.exs
index 564db2d75..558e71262 100644
--- a/mix.exs
+++ b/mix.exs
@@ -203,6 +203,7 @@ defmodule Pleroma.Mixfile do
{:nimble_parsec, "~> 1.0", override: true},
{:phoenix_live_dashboard, "~> 0.6.2"},
{:ecto_psql_extras, "~> 0.6"},
+ {:elasticsearch, "~> 1.0.0"},
# indirect dependency version override
{:plug, "~> 1.10.4", override: true},
diff --git a/priv/es-mappings/activity.json b/priv/es-mappings/activity.json
index e476fd59f..052633496 100644
--- a/priv/es-mappings/activity.json
+++ b/priv/es-mappings/activity.json
@@ -1,20 +1,22 @@
{
- "properties": {
- "_timestamp": {
- "type": "date",
- "index": true
- },
- "instance": {
- "type": "keyword"
- },
- "content": {
- "type": "text"
- },
- "hashtags": {
- "type": "keyword"
- },
- "user": {
- "type": "text"
+ "mappings": {
+ "properties": {
+ "_timestamp": {
+ "type": "date",
+ "index": true
+ },
+ "instance": {
+ "type": "keyword"
+ },
+ "content": {
+ "type": "text"
+ },
+ "hashtags": {
+ "type": "keyword"
+ },
+ "user": {
+ "type": "text"
+ }
}
}
}
diff --git a/test/pleroma/web/activity_pub/pipeline_test.exs b/test/pleroma/web/activity_pub/pipeline_test.exs
index 30fd5651b..e606fa3d1 100644
--- a/test/pleroma/web/activity_pub/pipeline_test.exs
+++ b/test/pleroma/web/activity_pub/pipeline_test.exs
@@ -28,7 +28,6 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
SideEffectsMock
|> expect(:handle, fn o, m -> {:ok, o, m} end)
|> expect(:handle_after_transaction, fn m -> m end)
- |> expect(:handle_after_transaction, fn m -> m end)
:ok
end