2019-09-08 01:49:56 +02:00
|
|
|
defmodule Mobilizon.Storage.Page do
|
2019-09-07 02:32:23 +02:00
|
|
|
@moduledoc """
|
|
|
|
Module for pagination of queries.
|
|
|
|
"""
|
|
|
|
|
2019-09-08 01:49:56 +02:00
|
|
|
import Ecto.Query
|
2019-09-07 02:32:23 +02:00
|
|
|
|
2019-09-08 01:49:56 +02:00
|
|
|
alias Mobilizon.Storage.Repo
|
2019-09-07 02:32:23 +02:00
|
|
|
|
|
|
|
defstruct [
|
|
|
|
:total,
|
|
|
|
:elements
|
|
|
|
]
|
|
|
|
|
2021-09-24 16:46:42 +02:00
|
|
|
@type t(structure) :: %__MODULE__{
|
2019-09-07 02:32:23 +02:00
|
|
|
total: integer,
|
2021-09-24 16:46:42 +02:00
|
|
|
elements: list(structure)
|
2019-09-07 02:32:23 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@doc """
|
|
|
|
Returns a Page struct for a query.
|
2020-12-09 17:56:19 +01:00
|
|
|
|
|
|
|
`field` is use to define the field that will be used for the count aggregate, which should be the same as the field used for order_by
|
|
|
|
See https://stackoverflow.com/q/12693089/10204399
|
2019-09-07 02:32:23 +02:00
|
|
|
"""
|
2021-09-24 16:46:42 +02:00
|
|
|
@spec build_page(Ecto.Queryable.t(), integer | nil, integer | nil, atom()) :: t(any)
|
2020-12-09 17:56:19 +01:00
|
|
|
def build_page(query, page, limit, field \\ :id) do
|
2019-09-07 02:32:23 +02:00
|
|
|
[total, elements] =
|
|
|
|
[
|
2020-12-09 17:56:19 +01:00
|
|
|
fn -> Repo.aggregate(query, :count, field) end,
|
2019-09-07 02:32:23 +02:00
|
|
|
fn -> Repo.all(paginate(query, page, limit)) end
|
|
|
|
]
|
|
|
|
|> Enum.map(&Task.async/1)
|
2022-11-22 09:48:39 +01:00
|
|
|
|> Enum.map(&Task.await(&1, 30_000))
|
2019-09-07 02:32:23 +02:00
|
|
|
|
|
|
|
%__MODULE__{total: total, elements: elements}
|
|
|
|
end
|
|
|
|
|
|
|
|
@doc """
|
|
|
|
Add limit and offset to the query.
|
|
|
|
"""
|
2021-09-24 16:46:42 +02:00
|
|
|
@spec paginate(Ecto.Queryable.t() | struct, integer | nil, integer | nil) :: Ecto.Query.t()
|
2019-09-07 02:32:23 +02:00
|
|
|
def paginate(query, page \\ 1, size \\ 10)
|
|
|
|
|
|
|
|
def paginate(query, page, _size) when is_nil(page), do: paginate(query)
|
|
|
|
def paginate(query, page, size) when is_nil(size), do: paginate(query, page)
|
|
|
|
|
|
|
|
def paginate(query, page, size) do
|
|
|
|
from(query, limit: ^size, offset: ^((page - 1) * size))
|
|
|
|
end
|
2023-01-31 19:35:29 +01:00
|
|
|
|
|
|
|
@doc """
|
|
|
|
Stream chunks of results from the given queryable.
|
|
|
|
|
|
|
|
Unlike Repo.stream, this function does not keep a long running transaction open.
|
|
|
|
Hence, consistency is not guarenteed in the presence of rows being deleted or sort criteria changing.
|
|
|
|
|
|
|
|
## Example
|
|
|
|
|
|
|
|
Ecto.Query.from(u in Users, order_by: [asc: :created_at])
|
|
|
|
|> Repo.chunk(100)
|
|
|
|
|> Stream.map(&process_batch_of_users)
|
|
|
|
|> Stream.run()
|
|
|
|
|
|
|
|
## Source
|
|
|
|
https://elixirforum.com/t/what-is-the-best-approach-for-fetching-large-amount-of-records-from-postgresql-with-ecto/3766/8
|
|
|
|
"""
|
|
|
|
@spec chunk(Ecto.Queryable.t(), integer) :: Stream.t()
|
|
|
|
def chunk(queryable, chunk_size) do
|
|
|
|
chunk_stream =
|
|
|
|
Stream.unfold(1, fn page_number ->
|
|
|
|
page = queryable |> paginate(page_number, chunk_size) |> Repo.all()
|
|
|
|
{page, page_number + 1}
|
|
|
|
end)
|
|
|
|
|
|
|
|
Stream.take_while(chunk_stream, fn
|
|
|
|
[] -> false
|
|
|
|
_ -> true
|
|
|
|
end)
|
|
|
|
end
|
2019-09-07 02:32:23 +02:00
|
|
|
end
|