Ecto(Repo) Streams

deepak sharma
2 min readApr 6, 2024

Repo.stream/2 offers a compelling way to handle large datasets efficiently. It seems almost too good to be true, especially when compared to similar functionality in other frameworks. However, like most powerful tools, there are some limitations to consider before diving in.

While Repo.stream/2 offers a fantastic solution for large datasets, it leverages PostgreSQL’s advanced features under the hood. Here’s a breakdown:

  • Internal Workhorse: Cursors (https://www.postgresql.org/docs/current/plpgsql-cursors.html) — Cursors are a built-in PostgreSQL feature that enable processing data one row at a time. This is particularly useful for massive datasets that wouldn’t fit comfortably in memory all at once.
  • Transactional Necessity — Cursors require a transaction to function. This explains why Repo.stream/2 needs to be executed within a transaction itself.

In essence, Repo.stream/2 utilizes cursors to manage large datasets efficiently, but keep in mind the transactional requirement.

While Repo.stream/2 offers a fantastic solution for large datasets, it’s important to be aware of its limitations:

Data Consistency:

  • Snapshot in Time: Repo.stream/2 provides a snapshot of the data at the moment the stream starts.
  • Any additions, updates, or deletions after this point won’t be reflected in the stream.
  • This can be an issue for long-running operations where data might change dynamically.

Transactional Constraints:

  • Long Transactions: Repo.stream/2 operates within a single transaction. This can negatively impact database performance, especially for Autovacuum (a PostgreSQL housekeeping process). Additionally, breaking the transaction into smaller chunks isn’t possible.

What to do if you can’t use Repo.stream/2

If Repo.stream/2 is not suitable for your needs then you can fallback to a standard mechanism of fetching rows in batches.

def process_batch([]), do: :ok
def process_batch(batch) do
do_some_work(batch)
last_id =
batch
|> Enum.map(& &1.id)
|> Enum.max()

batch = Repo.all(
from c in Consumers,
where: c.id > ^last_id,
order_by: c.id,
limit: @batch_size
)
process_batch(batch)
end

In Conclusion:

Repo.stream/2 is a powerful tool for handling large datasets efficiently, but its transactional nature and data consistency limitations require careful consideration for your specific workflow. For some use cases, these caveats might be acceptable, while others might require a different approach.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

deepak sharma
deepak sharma

Written by deepak sharma

Engineer with 10 years of experience. I have worked on varied tech stacks like Python flask, Elixir etc. Support me https://buymeacoffee.com/deepaksharma89

Responses (2)

Write a response

Deep Insights, really helpful

1

good article. I recently found out that Repo stream do not support preloading.