Ecto(Repo) Streams

deepak sharma
2 min readApr 6, 2024

Repo.stream/2 offers a compelling way to handle large datasets efficiently. It seems almost too good to be true, especially when compared to similar functionality in other frameworks. However, like most powerful tools, there are some limitations to consider before diving in.

While Repo.stream/2 offers a fantastic solution for large datasets, it leverages PostgreSQL’s advanced features under the hood. Here’s a breakdown:

  • Internal Workhorse: Cursors (https://www.postgresql.org/docs/current/plpgsql-cursors.html) — Cursors are a built-in PostgreSQL feature that enable processing data one row at a time. This is particularly useful for massive datasets that wouldn’t fit comfortably in memory all at once.
  • Transactional Necessity — Cursors require a transaction to function. This explains why Repo.stream/2 needs to be executed within a transaction itself.

In essence, Repo.stream/2 utilizes cursors to manage large datasets efficiently, but keep in mind the transactional requirement.

While Repo.stream/2 offers a fantastic solution for large datasets, it’s important to be aware of its limitations:

Data Consistency:

  • Snapshot in Time: Repo.stream/2 provides a snapshot of the data at the moment the stream starts.
  • Any additions, updates, or deletions after this point won’t be reflected in the stream.
  • This can be an issue for long-running operations where data might change dynamically.

Transactional Constraints:

  • Long Transactions: Repo.stream/2 operates within a single transaction. This can negatively impact database performance, especially for Autovacuum (a PostgreSQL housekeeping process). Additionally, breaking the transaction into smaller chunks isn’t possible.

What to do if you can’t use Repo.stream/2

If Repo.stream/2 is not suitable for your needs then you can fallback to a standard mechanism of fetching rows in batches.

def process_batch([]), do: :ok
def process_batch(batch) do
do_some_work(batch)
last_id =
batch
|> Enum.map(& &1.id)
|> Enum.max()

batch = Repo.all(
from c in Consumers,
where: c.id > ^last_id,
order_by: c.id,
limit: @batch_size
)
process_batch(batch)
end

In Conclusion:

Repo.stream/2 is a powerful tool for handling large datasets efficiently, but its transactional nature and data consistency limitations require careful consideration for your specific workflow. For some use cases, these caveats might be acceptable, while others might require a different approach.

--

--

deepak sharma

Senior Programmer with 10 years of industry experience. I love to learn and share new things. I have worked on varied tech stacks like Python flask, Elixir etc.