Merge branch 'tusooa/3018-unified-stream' into 'develop'

Unified streaming endpoint

Closes #3018

See merge request pleroma/pleroma!3864
This commit is contained in:
tusooa 2023-10-15 21:35:30 +00:00
commit 340c881296
11 changed files with 1187 additions and 62 deletions

View file

@ -0,0 +1 @@
Add unified streaming endpoint

View file

@ -357,6 +357,122 @@ The message payload consist of:
- `follower_count`: follower count - `follower_count`: follower count
- `following_count`: following count - `following_count`: following count
### Authenticating via `sec-websocket-protocol` header
Pleroma allows to authenticate via the `sec-websocket-protocol` header, for example, if your access token is `your-access-token`, you can authenticate using the following:
```
sec-websocket-protocol: your-access-token
```
### Authenticating after connection via `pleroma:authenticate` event
Pleroma allows to authenticate after connection is established, via the `pleroma:authenticate` event. For example, if your access token is `your-access-token`, you can send the following after the connection is established:
```
{"type": "pleroma:authenticate", "token": "your-access-token"}
```
### Response to client-sent events
Pleroma will respond to client-sent events that it recognizes. Supported event types are:
- `subscribe`
- `unsubscribe`
- `pleroma:authenticate`
The reply will be in the following format:
```
{
"event": "pleroma:respond",
"payload": "{\"type\": \"<type of the client-sent event>\", \"result\": \"<result of the action>\", \"error\": \"<error code>\"}"
}
```
Result of the action can be either `success`, `ignored` or `error`. If it is `error`, the `error` property will contain the error code. Otherwise, the `error` property will not be present. Below are some examples:
```
{
"event": "pleroma:respond",
"payload": "{\"type\": \"pleroma:authenticate\", \"result\": \"success\"}"
}
{
"event": "pleroma:respond",
"payload": "{\"type\": \"subscribe\", \"result\": \"ignored\"}"
}
{
"event": "pleroma:respond",
"payload": "{\"type\": \"unsubscribe\", \"result\": \"error\", \"error\": \"bad_topic\"}"
}
```
If the sent event is not of a type that Pleroma supports, it will not reply.
### The `stream` attribute of a server-sent event
Technically, this is in Mastodon, but its documentation does nothing to specify its format.
This attribute appears on every event type except `pleroma:respond` and `delete`. It helps clients determine where they should display the new statuses.
The value of the attribute is an array containing one or two elements. The first element is the type of the stream. The second is the identifier related to that specific stream, if applicable.
For the following stream types, there is a second element in the array:
- `list`: The second element is the id of the list, as a string.
- `hashtag`: The second element is the name of the hashtag.
- `public:remote:media` and `public:remote`: The second element is the domain of the corresponding instance.
For all other stream types, there is no second element.
Some examples of valid `stream` values:
- `["list", "1"]`: List of id 1.
- `["hashtag", "mew"]`: The hashtag #mew.
- `["user:notifications"]`: Notifications for the current user.
- `["user"]`: Home timeline.
- `["public:remote", "mew.moe"]`: Public posts from the instance mew.moe .
### The unified streaming endpoint
If you do not specify a stream to connect to when requesting `/api/v1/streaming`, you will enter a connection that subscribes to no streams. After the connection is established, you can authenticate and then subscribe to different streams.
### List of supported streams
Below is a list of supported streams by Pleroma. To make a single-stream WebSocket connection, append the string specified in "Query style" to the streaming endpoint url.
To subscribe to a stream after the connection is established, merge the JSON object specified in "Subscribe style" with `{"type": "subscribe"}`. To unsubscribe, merge it with `{"type": "unsubscribe"}`.
For example, to receive updates on the list 1, you can connect to `/api/v1/streaming/?stream=list&list=1`, or send
```
{"type": "subscribe", "stream": "list", "list": "1"}
```
upon establishing the websocket connection.
To unsubscribe to list 1, send
```
{"type": "unsubscribe", "stream": "list", "list": "1"}
```
Note that if you specify a stream that requires a logged-in user in the query string (for example, `user` or `list`), you have to specify the access token when you are trying to establish the connection, i.e. in the query string or via the `sec-websocket-protocol` header.
- `list`
- Query style: `?stream=list&list=<id>`
- Subscribe style: `{"stream": "list", "list": "<id>"}`
- `public`, `public:local`, `public:media`, `public:local:media`, `user`, `user:pleroma_chat`, `user:notifications`, `direct`
- Query style: `?stream=<stream name>`
- Subscribe style: `{"stream": "<stream name>"}`
- `hashtag`
- Query style: `?stream=hashtag&tag=<name>`
- Subscribe style: `{"stream": "hashtag", "tag": "<name>"}`
- `public:remote`, `public:remote:media`
- Query style: `?stream=<stream name>&instance=<instance domain>`
- Subscribe style: `{"stream": "<stream name>", "instance": "<instance domain>"}`
## User muting and thread muting ## User muting and thread muting
Both user muting and thread muting can be done for only a certain time by adding an `expires_in` parameter to the API calls and giving the expiration time in seconds. Both user muting and thread muting can be done for only a certain time by adding an `expires_in` parameter to the API calls and giving the expiration time in seconds.

View file

@ -94,4 +94,8 @@ defmodule Pleroma.Constants do
"application/activity+json" "application/activity+json"
] ]
) )
const(public_streams,
do: ["public", "public:local", "public:media", "public:local:media"]
)
end end

View file

@ -10,6 +10,14 @@ defmodule Pleroma.Web.ApiSpec do
@behaviour OpenApi @behaviour OpenApi
defp streaming_paths do
%{
"/api/v1/streaming" => %OpenApiSpex.PathItem{
get: Pleroma.Web.ApiSpec.StreamingOperation.streaming_operation()
}
}
end
@impl OpenApi @impl OpenApi
def spec(opts \\ []) do def spec(opts \\ []) do
%OpenApi{ %OpenApi{
@ -45,7 +53,7 @@ defmodule Pleroma.Web.ApiSpec do
} }
}, },
# populate the paths from a phoenix router # populate the paths from a phoenix router
paths: OpenApiSpex.Paths.from_router(Router), paths: Map.merge(streaming_paths(), OpenApiSpex.Paths.from_router(Router)),
components: %OpenApiSpex.Components{ components: %OpenApiSpex.Components{
parameters: %{ parameters: %{
"accountIdOrNickname" => "accountIdOrNickname" =>

View file

@ -0,0 +1,464 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ApiSpec.StreamingOperation do
alias OpenApiSpex.Operation
alias OpenApiSpex.Response
alias OpenApiSpex.Schema
alias Pleroma.Web.ApiSpec.NotificationOperation
alias Pleroma.Web.ApiSpec.Schemas.Chat
alias Pleroma.Web.ApiSpec.Schemas.Conversation
alias Pleroma.Web.ApiSpec.Schemas.FlakeID
alias Pleroma.Web.ApiSpec.Schemas.Status
require Pleroma.Constants
@spec open_api_operation(atom) :: Operation.t()
def open_api_operation(action) do
operation = String.to_existing_atom("#{action}_operation")
apply(__MODULE__, operation, [])
end
@spec streaming_operation() :: Operation.t()
def streaming_operation do
%Operation{
tags: ["Timelines"],
summary: "Establish streaming connection",
description: """
Receive statuses in real-time via WebSocket.
You can specify the access token on the query string or through the `sec-websocket-protocol` header. Using
the query string to authenticate is considered unsafe and should not be used unless you have to (e.g. to maintain
your client's compatibility with Mastodon).
You may specify a stream on the query string. If you do so and you are connecting to a stream that requires logged-in users,
you must specify the access token at the time of the connection (i.e. via query string or header).
Otherwise, you have the option to authenticate after you have established the connection through client-sent events.
The "Request body" section below describes what events clients can send through WebSocket, and the "Responses" section
describes what events server will send through WebSocket.
""",
security: [%{"oAuth" => ["read:statuses", "read:notifications"]}],
operationId: "WebsocketHandler.streaming",
parameters:
[
Operation.parameter(:connection, :header, %Schema{type: :string}, "connection header",
required: true
),
Operation.parameter(:upgrade, :header, %Schema{type: :string}, "upgrade header",
required: true
),
Operation.parameter(
:"sec-websocket-key",
:header,
%Schema{type: :string},
"sec-websocket-key header",
required: true
),
Operation.parameter(
:"sec-websocket-version",
:header,
%Schema{type: :string},
"sec-websocket-version header",
required: true
)
] ++ stream_params() ++ access_token_params(),
requestBody: request_body("Client-sent events", client_sent_events()),
responses: %{
101 => switching_protocols_response(),
200 =>
Operation.response(
"Server-sent events",
"application/json",
server_sent_events()
)
}
}
end
defp stream_params do
stream_specifier()
|> Enum.map(fn {name, schema} ->
Operation.parameter(name, :query, schema, get_schema(schema).description)
end)
end
defp access_token_params do
[
Operation.parameter(:access_token, :query, token(), token().description),
Operation.parameter(:"sec-websocket-protocol", :header, token(), token().description)
]
end
defp switching_protocols_response do
%Response{
description: "Switching protocols",
headers: %{
"connection" => %OpenApiSpex.Header{required: true},
"upgrade" => %OpenApiSpex.Header{required: true},
"sec-websocket-accept" => %OpenApiSpex.Header{required: true}
}
}
end
defp server_sent_events do
%Schema{
oneOf: [
update_event(),
status_update_event(),
notification_event(),
chat_update_event(),
follow_relationships_update_event(),
conversation_event(),
delete_event(),
pleroma_respond_event()
]
}
end
defp stream do
%Schema{
type: :array,
title: "Stream",
description: """
The stream identifier.
The first item is the name of the stream. If the stream needs a differentiator, the second item will be the corresponding identifier.
Currently, for the following stream types, there is a second element in the array:
- `list`: The second element is the id of the list, as a string.
- `hashtag`: The second element is the name of the hashtag.
- `public:remote:media` and `public:remote`: The second element is the domain of the corresponding instance.
""",
maxItems: 2,
minItems: 1,
items: %Schema{type: :string},
example: ["hashtag", "mew"]
}
end
defp get_schema(%Schema{} = schema), do: schema
defp get_schema(schema), do: schema.schema
defp server_sent_event_helper(name, description, type, payload, opts \\ []) do
payload_type = Keyword.get(opts, :payload_type, :json)
has_stream = Keyword.get(opts, :has_stream, true)
stream_properties =
if has_stream do
%{stream: stream()}
else
%{}
end
stream_example = if has_stream, do: %{"stream" => get_schema(stream()).example}, else: %{}
stream_required = if has_stream, do: [:stream], else: []
payload_schema =
if payload_type == :json do
%Schema{
title: "Event payload",
description: "JSON-encoded string of #{get_schema(payload).title}",
allOf: [payload]
}
else
payload
end
payload_example =
if payload_type == :json do
get_schema(payload).example |> Jason.encode!()
else
get_schema(payload).example
end
%Schema{
type: :object,
title: name,
description: description,
required: [:event, :payload] ++ stream_required,
properties:
%{
event: %Schema{
title: "Event type",
description: "Type of the event.",
type: :string,
required: true,
enum: [type]
},
payload: payload_schema
}
|> Map.merge(stream_properties),
example:
%{
"event" => type,
"payload" => payload_example
}
|> Map.merge(stream_example)
}
end
defp update_event do
server_sent_event_helper("New status", "A newly-posted status.", "update", Status)
end
defp status_update_event do
server_sent_event_helper("Edit", "A status that was just edited", "status.update", Status)
end
defp notification_event do
server_sent_event_helper(
"Notification",
"A new notification.",
"notification",
NotificationOperation.notification()
)
end
defp follow_relationships_update_event do
server_sent_event_helper(
"Follow relationships update",
"An update to follow relationships.",
"pleroma:follow_relationships_update",
%Schema{
type: :object,
title: "Follow relationships update",
required: [:state, :follower, :following],
properties: %{
state: %Schema{
type: :string,
description: "Follow state of the relationship.",
enum: ["follow_pending", "follow_accept", "follow_reject", "unfollow"]
},
follower: %Schema{
type: :object,
description: "Information about the follower.",
required: [:id, :follower_count, :following_count],
properties: %{
id: FlakeID,
follower_count: %Schema{type: :integer},
following_count: %Schema{type: :integer}
}
},
following: %Schema{
type: :object,
description: "Information about the following person.",
required: [:id, :follower_count, :following_count],
properties: %{
id: FlakeID,
follower_count: %Schema{type: :integer},
following_count: %Schema{type: :integer}
}
}
},
example: %{
"state" => "follow_pending",
"follower" => %{
"id" => "someUser1",
"follower_count" => 1,
"following_count" => 1
},
"following" => %{
"id" => "someUser2",
"follower_count" => 1,
"following_count" => 1
}
}
}
)
end
defp chat_update_event do
server_sent_event_helper(
"Chat update",
"A new chat message.",
"pleroma:chat_update",
Chat
)
end
defp conversation_event do
server_sent_event_helper(
"Conversation update",
"An update about a conversation",
"conversation",
Conversation
)
end
defp delete_event do
server_sent_event_helper(
"Delete",
"A status that was just deleted.",
"delete",
%Schema{
type: :string,
title: "Status id",
description: "Id of the deleted status",
allOf: [FlakeID],
example: "some-opaque-id"
},
payload_type: :string,
has_stream: false
)
end
defp pleroma_respond_event do
server_sent_event_helper(
"Server response",
"A response to a client-sent event.",
"pleroma:respond",
%Schema{
type: :object,
title: "Results",
required: [:result, :type],
properties: %{
result: %Schema{
type: :string,
title: "Result of the request",
enum: ["success", "error", "ignored"]
},
error: %Schema{
type: :string,
title: "Error code",
description: "An error identifier. Only appears if `result` is `error`."
},
type: %Schema{
type: :string,
description: "Type of the request."
}
},
example: %{"result" => "success", "type" => "pleroma:authenticate"}
},
has_stream: false
)
end
defp client_sent_events do
%Schema{
oneOf: [
subscribe_event(),
unsubscribe_event(),
authenticate_event()
]
}
end
defp request_body(description, schema, opts \\ []) do
%OpenApiSpex.RequestBody{
description: description,
content: %{
"application/json" => %OpenApiSpex.MediaType{
schema: schema,
example: opts[:example],
examples: opts[:examples]
}
}
}
end
defp client_sent_event_helper(name, description, type, properties, opts) do
required = opts[:required] || []
%Schema{
type: :object,
title: name,
required: [:type] ++ required,
description: description,
properties:
%{
type: %Schema{type: :string, enum: [type], description: "Type of the event."}
}
|> Map.merge(properties),
example: opts[:example]
}
end
defp subscribe_event do
client_sent_event_helper(
"Subscribe",
"Subscribe to a stream.",
"subscribe",
stream_specifier(),
required: [:stream],
example: %{"type" => "subscribe", "stream" => "list", "list" => "1"}
)
end
defp unsubscribe_event do
client_sent_event_helper(
"Unsubscribe",
"Unsubscribe from a stream.",
"unsubscribe",
stream_specifier(),
required: [:stream],
example: %{
"type" => "unsubscribe",
"stream" => "public:remote:media",
"instance" => "example.org"
}
)
end
defp authenticate_event do
client_sent_event_helper(
"Authenticate",
"Authenticate via an access token.",
"pleroma:authenticate",
%{
token: token()
},
required: [:token]
)
end
defp token do
%Schema{
type: :string,
description: "An OAuth access token with corresponding permissions.",
example: "some token"
}
end
defp stream_specifier do
%{
stream: %Schema{
type: :string,
description: "The name of the stream.",
enum:
Pleroma.Constants.public_streams() ++
[
"public:remote",
"public:remote:media",
"user",
"user:pleroma_chat",
"user:notification",
"direct",
"list",
"hashtag"
]
},
list: %Schema{
type: :string,
title: "List id",
description: "The id of the list. Required when `stream` is `list`.",
example: "some-id"
},
tag: %Schema{
type: :string,
title: "Hashtag name",
description: "The name of the hashtag. Required when `stream` is `hashtag`.",
example: "mew"
},
instance: %Schema{
type: :string,
title: "Domain name",
description:
"Domain name of the instance. Required when `stream` is `public:remote` or `public:remote:media`.",
example: "example.org"
}
}
end
end

View file

@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.OAuth.Token alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Streamer alias Pleroma.Web.Streamer
alias Pleroma.Web.StreamerView
@behaviour :cowboy_websocket @behaviour :cowboy_websocket
@ -32,8 +33,15 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req req
end end
topics =
if topic do
[topic]
else
[]
end
{:cowboy_websocket, req, {:cowboy_websocket, req,
%{user: user, topic: topic, oauth_token: oauth_token, count: 0, timer: nil}, %{user: user, topics: topics, oauth_token: oauth_token, count: 0, timer: nil},
%{idle_timeout: @timeout}} %{idle_timeout: @timeout}}
else else
{:error, :bad_topic} -> {:error, :bad_topic} ->
@ -50,10 +58,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
def websocket_init(state) do def websocket_init(state) do
Logger.debug( Logger.debug(
"#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}" "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics}"
) )
Streamer.add_socket(state.topic, state.oauth_token) Enum.each(state.topics, fn topic -> Streamer.add_socket(topic, state.oauth_token) end)
{:ok, %{state | timer: timer()}} {:ok, %{state | timer: timer()}}
end end
@ -66,16 +74,26 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
# We only receive pings for now # We only receive pings for now
def websocket_handle(:ping, state), do: {:ok, state} def websocket_handle(:ping, state), do: {:ok, state}
def websocket_handle({:text, text}, state) do
with {:ok, %{} = event} <- Jason.decode(text) do
handle_client_event(event, state)
else
_ ->
Logger.error("#{__MODULE__} received non-JSON event: #{inspect(text)}")
{:ok, state}
end
end
def websocket_handle(frame, state) do def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}") Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state} {:ok, state}
end end
def websocket_info({:render_with_user, view, template, item}, state) do def websocket_info({:render_with_user, view, template, item, topic}, state) do
user = %User{} = User.get_cached_by_ap_id(state.user.ap_id) user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
unless Streamer.filtered_by_user?(user, item) do unless Streamer.filtered_by_user?(user, item) do
websocket_info({:text, view.render(template, item, user)}, %{state | user: user}) websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user})
else else
{:ok, state} {:ok, state}
end end
@ -109,10 +127,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
def terminate(reason, _req, state) do def terminate(reason, _req, state) do
Logger.debug( Logger.debug(
"#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic || "?"}: #{inspect(reason)}" "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics || "?"}: #{inspect(reason)}"
) )
Streamer.remove_socket(state.topic) Enum.each(state.topics, fn topic -> Streamer.remove_socket(topic) end)
:ok :ok
end end
@ -137,4 +155,103 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
defp timer do defp timer do
Process.send_after(self(), :tick, @tick) Process.send_after(self(), :tick, @tick)
end end
defp handle_client_event(%{"type" => "subscribe", "stream" => _topic} = params, state) do
with {_, {:ok, topic}} <-
{:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
{_, false} <- {:subscribed, topic in state.topics} do
Streamer.add_socket(topic, state.oauth_token)
{[
{:text,
StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "success"})}
], %{state | topics: [topic | state.topics]}}
else
{:subscribed, true} ->
{[
{:text,
StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "ignored"})}
], state}
{:topic, {:error, error}} ->
{[
{:text,
StreamerView.render("pleroma_respond.json", %{
type: "subscribe",
result: "error",
error: error
})}
], state}
end
end
defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params, state) do
with {_, {:ok, topic}} <-
{:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
{_, true} <- {:subscribed, topic in state.topics} do
Streamer.remove_socket(topic)
{[
{:text,
StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "success"})}
], %{state | topics: List.delete(state.topics, topic)}}
else
{:subscribed, false} ->
{[
{:text,
StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "ignored"})}
], state}
{:topic, {:error, error}} ->
{[
{:text,
StreamerView.render("pleroma_respond.json", %{
type: "unsubscribe",
result: "error",
error: error
})}
], state}
end
end
defp handle_client_event(
%{"type" => "pleroma:authenticate", "token" => access_token} = _params,
state
) do
with {:auth, nil, nil} <- {:auth, state.user, state.oauth_token},
{:ok, user, oauth_token} <- authenticate_request(access_token, nil) do
{[
{:text,
StreamerView.render("pleroma_respond.json", %{
type: "pleroma:authenticate",
result: "success"
})}
], %{state | user: user, oauth_token: oauth_token}}
else
{:auth, _, _} ->
{[
{:text,
StreamerView.render("pleroma_respond.json", %{
type: "pleroma:authenticate",
result: "error",
error: :already_authenticated
})}
], state}
_ ->
{[
{:text,
StreamerView.render("pleroma_respond.json", %{
type: "pleroma:authenticate",
result: "error",
error: :unauthorized
})}
], state}
end
end
defp handle_client_event(params, state) do
Logger.error("#{__MODULE__} received unknown event: #{inspect(params)}")
{[], state}
end
end end

View file

@ -4,6 +4,7 @@
defmodule Pleroma.Web.Streamer do defmodule Pleroma.Web.Streamer do
require Logger require Logger
require Pleroma.Constants
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Chat.MessageReference alias Pleroma.Chat.MessageReference
@ -24,7 +25,7 @@ defmodule Pleroma.Web.Streamer do
def registry, do: @registry def registry, do: @registry
@public_streams ["public", "public:local", "public:media", "public:local:media"] @public_streams Pleroma.Constants.public_streams()
@local_streams ["public:local", "public:local:media"] @local_streams ["public:local", "public:local:media"]
@user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
@ -59,10 +60,14 @@ defmodule Pleroma.Web.Streamer do
end end
@doc "Expand and authorizes a stream" @doc "Expand and authorizes a stream"
@spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) :: @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, Map.t()) ::
{:ok, topic :: String.t()} | {:error, :bad_topic} {:ok, topic :: String.t() | nil} | {:error, :bad_topic}
def get_topic(stream, user, oauth_token, params \\ %{}) def get_topic(stream, user, oauth_token, params \\ %{})
def get_topic(nil = _stream, _user, _oauth_token, _params) do
{:ok, nil}
end
# Allow all public steams if the instance allows unauthenticated access. # Allow all public steams if the instance allows unauthenticated access.
# Otherwise, only allow users with valid oauth tokens. # Otherwise, only allow users with valid oauth tokens.
def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
@ -219,8 +224,8 @@ defmodule Pleroma.Web.Streamer do
end end
defp do_stream("follow_relationship", item) do defp do_stream("follow_relationship", item) do
text = StreamerView.render("follow_relationships_update.json", item)
user_topic = "user:#{item.follower.id}" user_topic = "user:#{item.follower.id}"
text = StreamerView.render("follow_relationships_update.json", item, user_topic)
Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n") Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
@ -266,9 +271,11 @@ defmodule Pleroma.Web.Streamer do
defp do_stream(topic, %Notification{} = item) defp do_stream(topic, %Notification{} = item)
when topic in ["user", "user:notification"] do when topic in ["user", "user:notification"] do
Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list -> user_topic = "#{topic}:#{item.user_id}"
Registry.dispatch(@registry, user_topic, fn list ->
Enum.each(list, fn {pid, _auth} -> Enum.each(list, fn {pid, _auth} ->
send(pid, {:render_with_user, StreamerView, "notification.json", item}) send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic})
end) end)
end) end)
end end
@ -277,7 +284,7 @@ defmodule Pleroma.Web.Streamer do
when topic in ["user", "user:pleroma_chat"] do when topic in ["user", "user:pleroma_chat"] do
topic = "#{topic}:#{user.id}" topic = "#{topic}:#{user.id}"
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic)
Registry.dispatch(@registry, topic, fn list -> Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _auth} -> Enum.each(list, fn {pid, _auth} ->
@ -305,7 +312,7 @@ defmodule Pleroma.Web.Streamer do
end end
defp push_to_socket(topic, %Participation{} = participation) do defp push_to_socket(topic, %Participation{} = participation) do
rendered = StreamerView.render("conversation.json", participation) rendered = StreamerView.render("conversation.json", participation, topic)
Registry.dispatch(@registry, topic, fn list -> Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _} -> Enum.each(list, fn {pid, _} ->
@ -333,12 +340,15 @@ defmodule Pleroma.Web.Streamer do
Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"]) Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
|> Map.put(:object, item.object) |> Map.put(:object, item.object)
anon_render = StreamerView.render("status_update.json", create_activity) anon_render = StreamerView.render("status_update.json", create_activity, topic)
Registry.dispatch(@registry, topic, fn list -> Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} -> Enum.each(list, fn {pid, auth?} ->
if auth? do if auth? do
send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity}) send(
pid,
{:render_with_user, StreamerView, "status_update.json", create_activity, topic}
)
else else
send(pid, {:text, anon_render}) send(pid, {:text, anon_render})
end end
@ -347,12 +357,12 @@ defmodule Pleroma.Web.Streamer do
end end
defp push_to_socket(topic, item) do defp push_to_socket(topic, item) do
anon_render = StreamerView.render("update.json", item) anon_render = StreamerView.render("update.json", item, topic)
Registry.dispatch(@registry, topic, fn list -> Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} -> Enum.each(list, fn {pid, auth?} ->
if auth? do if auth? do
send(pid, {:render_with_user, StreamerView, "update.json", item}) send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
else else
send(pid, {:text, anon_render}) send(pid, {:text, anon_render})
end end

View file

@ -11,8 +11,11 @@ defmodule Pleroma.Web.StreamerView do
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView alias Pleroma.Web.MastodonAPI.NotificationView
def render("update.json", %Activity{} = activity, %User{} = user) do require Pleroma.Constants
def render("update.json", %Activity{} = activity, %User{} = user, topic) do
%{ %{
stream: render("stream.json", %{topic: topic}),
event: "update", event: "update",
payload: payload:
Pleroma.Web.MastodonAPI.StatusView.render( Pleroma.Web.MastodonAPI.StatusView.render(
@ -25,8 +28,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!() |> Jason.encode!()
end end
def render("status_update.json", %Activity{} = activity, %User{} = user) do def render("status_update.json", %Activity{} = activity, %User{} = user, topic) do
%{ %{
stream: render("stream.json", %{topic: topic}),
event: "status.update", event: "status.update",
payload: payload:
Pleroma.Web.MastodonAPI.StatusView.render( Pleroma.Web.MastodonAPI.StatusView.render(
@ -39,8 +43,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!() |> Jason.encode!()
end end
def render("notification.json", %Notification{} = notify, %User{} = user) do def render("notification.json", %Notification{} = notify, %User{} = user, topic) do
%{ %{
stream: render("stream.json", %{topic: topic}),
event: "notification", event: "notification",
payload: payload:
NotificationView.render( NotificationView.render(
@ -52,8 +57,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!() |> Jason.encode!()
end end
def render("update.json", %Activity{} = activity) do def render("update.json", %Activity{} = activity, topic) do
%{ %{
stream: render("stream.json", %{topic: topic}),
event: "update", event: "update",
payload: payload:
Pleroma.Web.MastodonAPI.StatusView.render( Pleroma.Web.MastodonAPI.StatusView.render(
@ -65,8 +71,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!() |> Jason.encode!()
end end
def render("status_update.json", %Activity{} = activity) do def render("status_update.json", %Activity{} = activity, topic) do
%{ %{
stream: render("stream.json", %{topic: topic}),
event: "status.update", event: "status.update",
payload: payload:
Pleroma.Web.MastodonAPI.StatusView.render( Pleroma.Web.MastodonAPI.StatusView.render(
@ -78,7 +85,7 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!() |> Jason.encode!()
end end
def render("chat_update.json", %{chat_message_reference: cm_ref}) do def render("chat_update.json", %{chat_message_reference: cm_ref}, topic) do
# Explicitly giving the cmr for the object here, so we don't accidentally # Explicitly giving the cmr for the object here, so we don't accidentally
# send a later 'last_message' that was inserted between inserting this and # send a later 'last_message' that was inserted between inserting this and
# streaming it out # streaming it out
@ -93,6 +100,7 @@ defmodule Pleroma.Web.StreamerView do
) )
%{ %{
stream: render("stream.json", %{topic: topic}),
event: "pleroma:chat_update", event: "pleroma:chat_update",
payload: payload:
representation representation
@ -101,8 +109,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!() |> Jason.encode!()
end end
def render("follow_relationships_update.json", item) do def render("follow_relationships_update.json", item, topic) do
%{ %{
stream: render("stream.json", %{topic: topic}),
event: "pleroma:follow_relationships_update", event: "pleroma:follow_relationships_update",
payload: payload:
%{ %{
@ -123,8 +132,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!() |> Jason.encode!()
end end
def render("conversation.json", %Participation{} = participation) do def render("conversation.json", %Participation{} = participation, topic) do
%{ %{
stream: render("stream.json", %{topic: topic}),
event: "conversation", event: "conversation",
payload: payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
@ -135,4 +145,39 @@ defmodule Pleroma.Web.StreamerView do
} }
|> Jason.encode!() |> Jason.encode!()
end end
def render("pleroma_respond.json", %{type: type, result: result} = params) do
%{
event: "pleroma:respond",
payload:
%{
result: result,
type: type
}
|> Map.merge(maybe_error(params))
|> Jason.encode!()
}
|> Jason.encode!()
end
def render("stream.json", %{topic: "user:pleroma_chat:" <> _}), do: ["user:pleroma_chat"]
def render("stream.json", %{topic: "user:notification:" <> _}), do: ["user:notification"]
def render("stream.json", %{topic: "user:" <> _}), do: ["user"]
def render("stream.json", %{topic: "direct:" <> _}), do: ["direct"]
def render("stream.json", %{topic: "list:" <> id}), do: ["list", id]
def render("stream.json", %{topic: "hashtag:" <> tag}), do: ["hashtag", tag]
def render("stream.json", %{topic: "public:remote:media:" <> instance}),
do: ["public:remote:media", instance]
def render("stream.json", %{topic: "public:remote:" <> instance}),
do: ["public:remote", instance]
def render("stream.json", %{topic: stream}) when stream in Pleroma.Constants.public_streams(),
do: [stream]
defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"}
defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"}
defp maybe_error(%{error: :already_authenticated}), do: %{error: "already_authenticated"}
defp maybe_error(_), do: %{}
end end

View file

@ -31,9 +31,22 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
WebsocketClient.start_link(self(), path, headers) WebsocketClient.start_link(self(), path, headers)
end end
defp decode_json(json) do
with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json),
{:ok, payload} <- Jason.decode(payload_text) do
{:ok, %{"event" => event, "payload" => payload}}
end
end
# Turns atom keys to strings
defp atom_key_to_string(json) do
json
|> Jason.encode!()
|> Jason.decode!()
end
test "refuses invalid requests" do test "refuses invalid requests" do
capture_log(fn -> capture_log(fn ->
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket()
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk") assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
Process.sleep(30) Process.sleep(30)
end) end)
@ -49,6 +62,10 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
end) end)
end end
test "allows unified stream" do
assert {:ok, _} = start_socket()
end
test "allows public streams without authentication" do test "allows public streams without authentication" do
assert {:ok, _} = start_socket("?stream=public") assert {:ok, _} = start_socket("?stream=public")
assert {:ok, _} = start_socket("?stream=public:local") assert {:ok, _} = start_socket("?stream=public:local")
@ -68,6 +85,35 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
assert json["payload"] assert json["payload"]
assert {:ok, json} = Jason.decode(json["payload"]) assert {:ok, json} = Jason.decode(json["payload"])
view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
|> atom_key_to_string()
assert json == view_json
end
describe "subscribing via WebSocket" do
test "can subscribe" do
user = insert(:user)
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
{:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
assert_receive {:text, raw_json}, 1_000
assert {:ok, json} = Jason.decode(raw_json)
assert "update" == json["event"]
assert json["payload"]
assert {:ok, json} = Jason.decode(json["payload"])
view_json = view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil) Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
|> Jason.encode!() |> Jason.encode!()
@ -76,6 +122,108 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
assert json == view_json assert json == view_json
end end
test "can subscribe to multiple streams" do
user = insert(:user)
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(
pid,
%{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber #mew"})
assert_receive {:text, raw_json}, 1_000
assert {:ok, %{"stream" => stream1}} = Jason.decode(raw_json)
assert_receive {:text, raw_json}, 1_000
assert {:ok, %{"stream" => stream2}} = Jason.decode(raw_json)
streams = [stream1, stream2]
assert ["hashtag", "mew"] in streams
assert ["public"] in streams
end
test "won't double subscribe" do
user = insert(:user)
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "ignored"}
}} = decode_json(raw_json)
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
assert_receive {:text, _}, 1_000
refute_receive {:text, _}, 1_000
end
test "rejects invalid streams" do
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "nonsense"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "error", "error" => "bad_topic"}
}} = decode_json(raw_json)
end
test "can unsubscribe" do
user = insert(:user)
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "unsubscribe", "result" => "success"}
}} = decode_json(raw_json)
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
refute_receive {:text, _}, 1_000
end
end
describe "with a valid user token" do describe "with a valid user token" do
setup do setup do
{:ok, app} = {:ok, app} =
@ -131,6 +279,124 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
end) end)
end end
test "accepts valid token on client-sent event", %{token: token} do
assert {:ok, pid} = start_socket()
WebsocketClient.send_text(
pid,
%{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "user"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
end
test "rejects invalid token on client-sent event" do
assert {:ok, pid} = start_socket()
WebsocketClient.send_text(
pid,
%{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{
"type" => "pleroma:authenticate",
"result" => "error",
"error" => "unauthorized"
}
}} = decode_json(raw_json)
end
test "rejects new authenticate request if already logged-in", %{token: token} do
assert {:ok, pid} = start_socket()
WebsocketClient.send_text(
pid,
%{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(
pid,
%{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{
"type" => "pleroma:authenticate",
"result" => "error",
"error" => "already_authenticated"
}
}} = decode_json(raw_json)
end
test "accepts the 'list' stream", %{token: token, user: user} do
posting_user = insert(:user)
{:ok, list} = Pleroma.List.create("test", user)
Pleroma.List.follow(list, posting_user)
assert {:ok, _} = start_socket("?stream=list&access_token=#{token.token}&list=#{list.id}")
assert {:ok, pid} = start_socket("?access_token=#{token.token}")
WebsocketClient.send_text(
pid,
%{type: "subscribe", stream: "list", list: list.id} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(
pid,
%{type: "subscribe", stream: "list", list: to_string(list.id)} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "ignored"}
}} = decode_json(raw_json)
end
test "disconnect when token is revoked", %{app: app, user: user, token: token} do test "disconnect when token is revoked", %{app: app, user: user, token: token} do
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
@ -146,5 +412,85 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
assert_receive {:close, _} assert_receive {:close, _}
refute_receive {:close, _} refute_receive {:close, _}
end end
test "receives private statuses", %{user: reading_user, token: token} do
user = insert(:user)
CommonAPI.follow(reading_user, user)
{:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
{:ok, activity} =
CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
assert_receive {:text, raw_json}, 1_000
assert {:ok, json} = Jason.decode(raw_json)
assert "update" == json["event"]
assert json["payload"]
assert {:ok, json} = Jason.decode(json["payload"])
view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json",
activity: activity,
for: reading_user
)
|> Jason.encode!()
|> Jason.decode!()
assert json == view_json
end
test "receives edits", %{user: reading_user, token: token} do
user = insert(:user)
CommonAPI.follow(reading_user, user)
{:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
{:ok, activity} =
CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
assert_receive {:text, _raw_json}, 1_000
{:ok, _} = CommonAPI.update(user, activity, %{status: "mew mew", visibility: "private"})
assert_receive {:text, raw_json}, 1_000
activity = Pleroma.Activity.normalize(activity)
view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json",
activity: activity,
for: reading_user
)
|> Jason.encode!()
|> Jason.decode!()
assert {:ok, %{"event" => "status.update", "payload" => ^view_json}} = decode_json(raw_json)
end
test "receives notifications", %{user: reading_user, token: token} do
user = insert(:user)
CommonAPI.follow(reading_user, user)
{:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
{:ok, %Pleroma.Activity{id: activity_id} = _activity} =
CommonAPI.post(user, %{
status: "nice echo chamber @#{reading_user.nickname}",
visibility: "private"
})
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "notification",
"payload" => %{
"status" => %{
"id" => ^activity_id
}
}
}} = decode_json(raw_json)
end
end end
end end

View file

@ -252,7 +252,7 @@ defmodule Pleroma.NotificationTest do
task = task =
Task.async(fn -> Task.async(fn ->
{:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token) {:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token)
assert_receive {:render_with_user, _, _, _}, 4_000 assert_receive {:render_with_user, _, _, _, _}, 4_000
end) end)
task_user_notification = task_user_notification =
@ -260,7 +260,7 @@ defmodule Pleroma.NotificationTest do
{:ok, _topic} = {:ok, _topic} =
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
assert_receive {:render_with_user, _, _, _}, 4_000 assert_receive {:render_with_user, _, _, _, _}, 4_000
end) end)
activity = insert(:note_activity) activity = insert(:note_activity)

View file

@ -22,6 +22,10 @@ defmodule Pleroma.Web.StreamerTest do
setup do: clear_config([:instance, :skip_thread_containment]) setup do: clear_config([:instance, :skip_thread_containment])
describe "get_topic/_ (unauthenticated)" do describe "get_topic/_ (unauthenticated)" do
test "allows no stream" do
assert {:ok, nil} = Streamer.get_topic(nil, nil, nil)
end
test "allows public" do test "allows public" do
assert {:ok, "public"} = Streamer.get_topic("public", nil, nil) assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil) assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
@ -242,7 +246,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.get_topic_and_add_socket("user", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(user, activity) refute Streamer.filtered_by_user?(user, activity)
end end
@ -253,7 +257,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
{:ok, announce} = CommonAPI.repeat(activity.id, user) {:ok, announce} = CommonAPI.repeat(activity.id, user)
assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce} assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
refute Streamer.filtered_by_user?(user, announce) refute Streamer.filtered_by_user?(user, announce)
end end
@ -306,7 +310,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, %Pleroma.Activity{data: _data, local: false} = announce} = {:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data) Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce} assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
refute Streamer.filtered_by_user?(user, announce) refute Streamer.filtered_by_user?(user, announce)
end end
@ -318,7 +322,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", notify) Streamer.stream("user", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify, _}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
@ -330,7 +334,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify) Streamer.stream("user:notification", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify, _}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
@ -351,7 +355,12 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token) Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
Streamer.stream("user:pleroma_chat", {user, cm_ref}) Streamer.stream("user:pleroma_chat", {user, cm_ref})
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) text =
StreamerView.render(
"chat_update.json",
%{chat_message_reference: cm_ref},
"user:pleroma_chat:#{user.id}"
)
assert text =~ "hey cirno" assert text =~ "hey cirno"
assert_receive {:text, ^text} assert_receive {:text, ^text}
@ -369,7 +378,12 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", {user, cm_ref}) Streamer.stream("user", {user, cm_ref})
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) text =
StreamerView.render(
"chat_update.json",
%{chat_message_reference: cm_ref},
"user:#{user.id}"
)
assert text =~ "hey cirno" assert text =~ "hey cirno"
assert_receive {:text, ^text} assert_receive {:text, ^text}
@ -390,7 +404,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify) Streamer.stream("user:notification", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify, _}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
@ -436,7 +450,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif, _}
assert notif.activity.id == favorite_activity.id assert notif.activity.id == favorite_activity.id
refute Streamer.filtered_by_user?(user, notif) refute Streamer.filtered_by_user?(user, notif)
end end
@ -465,7 +479,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user) {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif, _}
assert notif.activity.id == follow_activity.id assert notif.activity.id == follow_activity.id
refute Streamer.filtered_by_user?(user, notif) refute Streamer.filtered_by_user?(user, notif)
end end
@ -530,7 +544,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"}) {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"]) create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
assert_receive {:render_with_user, _, "status_update.json", ^create} assert_receive {:render_with_user, _, "status_update.json", ^create, _}
refute Streamer.filtered_by_user?(user, edited) refute Streamer.filtered_by_user?(user, edited)
end end
@ -541,7 +555,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"}) {:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"})
create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"]) create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
assert_receive {:render_with_user, _, "status_update.json", ^create} assert_receive {:render_with_user, _, "status_update.json", ^create, _}
refute Streamer.filtered_by_user?(user, edited) refute Streamer.filtered_by_user?(user, edited)
end end
end end
@ -554,7 +568,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(other_user, %{status: "Test"}) {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(other_user, activity) refute Streamer.filtered_by_user?(other_user, activity)
end end
@ -654,7 +668,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity, _}
assert Streamer.filtered_by_user?(user, activity) assert Streamer.filtered_by_user?(user, activity)
end end
@ -676,7 +690,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(user, activity) refute Streamer.filtered_by_user?(user, activity)
end end
@ -699,7 +713,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(user, activity) refute Streamer.filtered_by_user?(user, activity)
end end
end end
@ -713,7 +727,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"}) {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity, _}
assert Streamer.filtered_by_user?(user, activity) assert Streamer.filtered_by_user?(user, activity)
end end
@ -730,17 +744,17 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"}) {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
assert_receive {:render_with_user, _, _, ^activity_one} assert_receive {:render_with_user, _, _, ^activity_one, _}
assert Streamer.filtered_by_user?(blocker, activity_one) assert Streamer.filtered_by_user?(blocker, activity_one)
{:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"}) {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
assert_receive {:render_with_user, _, _, ^activity_two} assert_receive {:render_with_user, _, _, ^activity_two, _}
assert Streamer.filtered_by_user?(blocker, activity_two) assert Streamer.filtered_by_user?(blocker, activity_two)
{:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"}) {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
assert_receive {:render_with_user, _, _, ^activity_three} assert_receive {:render_with_user, _, _, ^activity_three, _}
assert Streamer.filtered_by_user?(blocker, activity_three) assert Streamer.filtered_by_user?(blocker, activity_three)
end end
end end
@ -801,7 +815,7 @@ defmodule Pleroma.Web.StreamerTest do
visibility: "private" visibility: "private"
}) })
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(user_a, activity) refute Streamer.filtered_by_user?(user_a, activity)
end end
end end
@ -819,7 +833,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2) {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, _, ^announce_activity} assert_receive {:render_with_user, _, _, ^announce_activity, _}
assert Streamer.filtered_by_user?(user1, announce_activity) assert Streamer.filtered_by_user?(user1, announce_activity)
end end
@ -835,7 +849,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2) {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif, _}
assert Streamer.filtered_by_user?(user1, notif) assert Streamer.filtered_by_user?(user1, notif)
end end
@ -851,7 +865,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id) {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif, _}
refute Streamer.filtered_by_user?(user1, notif) refute Streamer.filtered_by_user?(user1, notif)
end end
end end
@ -866,7 +880,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
{:ok, _} = CommonAPI.add_mute(user2, activity) {:ok, _} = CommonAPI.add_mute(user2, activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity, _}
assert Streamer.filtered_by_user?(user2, activity) assert Streamer.filtered_by_user?(user2, activity)
end end
end end
@ -908,7 +922,7 @@ defmodule Pleroma.Web.StreamerTest do
}) })
create_activity_id = create_activity.id create_activity_id = create_activity.id
assert_receive {:render_with_user, _, _, ^create_activity} assert_receive {:render_with_user, _, _, ^create_activity, _}
assert_receive {:text, received_conversation1} assert_receive {:text, received_conversation1}
assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
@ -943,8 +957,8 @@ defmodule Pleroma.Web.StreamerTest do
visibility: "direct" visibility: "direct"
}) })
assert_receive {:render_with_user, _, _, ^create_activity} assert_receive {:render_with_user, _, _, ^create_activity, _}
assert_receive {:render_with_user, _, _, ^create_activity2} assert_receive {:render_with_user, _, _, ^create_activity2, _}
assert_receive {:text, received_conversation1} assert_receive {:text, received_conversation1}
assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
assert_receive {:text, received_conversation1} assert_receive {:text, received_conversation1}
@ -973,7 +987,7 @@ defmodule Pleroma.Web.StreamerTest do
receive do receive do
{StreamerTest, :ready} -> {StreamerTest, :ready} ->
assert_receive {:render_with_user, _, "update.json", _} assert_receive {:render_with_user, _, "update.json", _, _}
receive do receive do
{StreamerTest, :revoked} -> finalize.() {StreamerTest, :revoked} -> finalize.()