Composable SFTP streams in Elixir
I'm still new to writing and I want to disclose that AI helped in creating this post. All words are my own but AI was used in spelling correction and structural discussions.
Recently I was tasked to add media file sync feature via SFTP. OTP has all the necessary tools for this so I started with:
{:ok, channel, connection_ref} = :ssh_sftp.start_channel("example.com", 22)video_file = File.read!("my_video.mp4"):ok = :ssh_sftp.write_file(channel, "/var/media/video.mp4", video_file)But there's a big (literally!) problem. By using File.read!/1 the whole file gets loaded into memory. For small files this is fine but in our case we had intraframe heavy video files which can be hundreds of GB big. We don't want them to be loaded into RAM. But Elixir has a solution for this - lazy enumerables.
Elixir differentiates between two kinds of enumerables. Eager (Enum module) and lazy (Stream module). Both build on the Enumerable protocol but differ in how they are consumed. Lazy means the enumerable is only calculated when consumed.
We can enumerate the contents of a file lazily by opening it as a stream with File.stream!/2:
chunk_size = 2 ** 16file_stream = File.stream!("my_big_video.mov", chunk_size)Because this is a Stream the file will be read in 64KB chunks one after another when we enumerate it. Until then, nothing will be loaded into memory. The 64KB are a tradeoff between memory consumption and network overhead. This can be tuned for specific use cases.
We could end it here and use it like this:
{:ok, handle} = :ssh_sftp.open(channel, "/archive/my_big_video.mov", [:write, :binary])for chunk <- file_stream do :ok = :ssh_sftp.write(channel, handle, chunk)end:ok = :ssh_sftp.close(channel, handle):ok = :ssh.close(connection_ref)The for comprehension enumerates the stream, and now the chunks will be consumed
From a functional standpoint this works as desired. But from an architectural point of view it has several drawbacks:
No cleanup on failure. If
:ssh_sftp.write/3returns an error, we raise. If we remove the match, we'd have to inspect the resulting chunk results.Leaks the Erlang API. Every place that uploads a file has to know about
:ssh_sftp.open/3,:ssh_sftp.write/3and:ssh_sftp.close/2Not a value. We'd have to wrap everything in a function but now the caller has to actually call it. An external library would need to know, how to use it.
Let's go back to the file streaming example and extend it to a sink, because File.stream!/2 has one more trick up its sleeve! It also implements the Collectable protocol.
source_stream = File.stream!("my_big_video.mov", chunk_size)destination_stream = File.stream!("/archive/my_big_video.mov", chunk_size)Stream.into(source_stream, destination_stream)|> Stream.run()Stream.into/2 (or Enum.into/2 for the eager version) is the missing link here, and consumes the enumerable from the first argument and collects it into the second argument.
This is basically what we want to do but with the SFTP stream as the sink.
In Elixir a protocol can be implemented on any datatype including custom structs! File.stream!/2 returns a %File.Stream struct which implements Enumerable and Collectable
This gives us all we need. Protocols are an Elixir concept so we don't have a streaming collectable SFTP sink in OTP. But nothing stops us from implementing it on our own data structure.
Let's write a simple example to understand the structure of the Collectable protocol:
defmodule MyInspector do defstruct [] defimpl Collectable do def into(struct) do collector_fun = fn _acc, {:cont, chunk} -> IO.inspect(chunk) acc, :done -> acc acc, :halt -> acc end {struct, collector_fun} end endendinto/1 returns a tuple of the {initial accumulator, collector_fun}. The collector is a 2-arity function called for each step in the enumeration:
acc, {:cont, chunk}: called on a new chunkacc, :done: called when enumerable has no more chunksacc, :halt: called when enumeration is aborted
And put it to action:
source_stream = File.stream!("my_big_video.mov", chunk_size)Stream.into(source_stream, %MyInspector{})|> Stream.take(5)|> Stream.run()Let's combine everything we've learned into implementing the Collectable protocol for a custom SFTP stream.
defmodule SFTP.Stream do defstruct [:channel, :path, :handle, :status] def build(channel, path) when is_pid(channel) and is_binary(path), do: %__MODULE__{channel: channel, path: path} defimpl Collectable do def into(%{channel: channel, path: path} = stream) do {:ok, handle} = :ssh_sftp.open(channel, path, [:write, :binary]) stream = %{stream | handle: handle, status: :ok} collector_fun = fn %__MODULE__{status: :ok, handle: handle} = stream , {:cont, chunk} -> case :ssh_sftp.write(channel, handle, chunk) do :ok -> stream {:error, err} -> %{stream | status: err} end stream , {:cont, _chunk} -> # status != :ok # we don't do anything anymore and wait for the :done call stream %__MODULE__{status: status, handle: handle, channel: channel}, :done -> :ssh_sftp.close(channel, handle) status %__MODULE__{handle: handle, channel: channel}, :halt -> :ssh_sftp.close(channel, handle) end {stream, collector_fun} end endendBack to our example but this time with our new implementation:
file_stream = File.stream!("my_big_video.mov", chunk_size){:ok, channel, connection_ref} = :ssh_sftp.start_channel("example.com", 22)sftp_stream = SFTP.Stream.build(channel, "my_big_video.mov")Stream.into(file_stream, sftp_stream)|> Stream.run():ok = :ssh.close(connection_ref)Notice that :ssh_sftp.start_channel/2 lives in the caller's code and not inside the Collectable implementation. That's by design: opening an SSH connection is expensive (network, key exchange, auth) and we may want to transfer many files through one channel. We can always wrap it in a convenience function if needed.
Remember the file transfer example? This is exactly the same, we just swapped the destination file stream for an SFTP file stream. Without changing the code which loads from a file or how the chunks are consumed!
Why bother implementing protocols?
The protocol implementation seems more complicated than the for comprehension. So why should we go through the trouble of implementing it?
Because now it is flexible to work with. The Collectable protocol is defined in the Elixir standard library, so any other project can either implement it or work with it.
For example the Req library accepts a Collectable with the :into option. This allows us to stream an HTTP response into a file over SFTP:
sftp_stream = SFTP.Stream.build(channel, "my_downloaded_file")Req.new(url: "https://example.com/download", into: sftp_stream)|> Req.get!()Remember the for comprehension from the beginning? It also has an :into option:
for chunk <- file_stream, into: sftp_stream, do: chunkWhat's important is that the caller never needs to know any of this. From the outside, our SFTP destination looks identical to File.stream!/2. We can swap one for the other, compose them with Stream.into/2, pass the resulting stream to a Task or a worker queue — all without changing the pipeline.
Turns out, it wasn't really about SFTP. It was about using the right abstractions and creating a building block. Think about it next time when you upload files to S3 or create a big CSV.
If you want to play around with this concepts use this Livebook