# Composable SFTP streams in Elixir ```elixir Mix.install( [ {:req, "~> 0.5"}, {:kino, "~> 0.19.0"} ], consolidate_protocols: false ) Logger.configure(level: :info) ``` ## In-process SFTP server ```elixir :ssh.start() ``` OTP also ships with an SFTP daemon which we can use with a temporary directory to simulate our host ```elixir defmodule SSHServer do defstruct [:daemon_ref, :system_dir, :sftp_root, :port] @host ~c"localhost" @username ~c"livebook" @password ~c"livebook" def start(port \\ 0) do unique_id = System.unique_integer([:positive]) system_dir = Path.join(System.tmp_dir!(), "ssh_system_#{unique_id}") sftp_root = Path.join(System.tmp_dir!(), "sftp_root_#{unique_id}") File.mkdir_p!(system_dir) File.mkdir_p!(sftp_root) # Generate an RSA host key with :public_key and write it as PEM. # The default :ssh_file handler will pick it up from system_dir. host_key = :public_key.generate_key({:rsa, 2048, 65537}) pem_entry = :public_key.pem_entry_encode(:RSAPrivateKey, host_key) pem = :public_key.pem_encode([pem_entry]) system_dir |> Path.join("ssh_host_rsa_key") |> File.write!(pem) {:ok, daemon_ref} = :ssh.daemon(port, system_dir: String.to_charlist(system_dir), auth_methods: ~c"password", user_passwords: [{@username, @password}], subsystems: [:ssh_sftpd.subsystem_spec(cwd: String.to_charlist(sftp_root))] ) {:ok, daemon_info} = :ssh.daemon_info(daemon_ref) port = Keyword.fetch!(daemon_info, :port) %__MODULE__{ daemon_ref: daemon_ref, port: port, system_dir: system_dir, sftp_root: sftp_root } end def with_channel(server, lambda) do {:ok, channel, conn} = :ssh_sftp.start_channel(@host, server.port, user: @username, password: @password, silently_accept_hosts: true, user_interaction: false ) try do lambda.(channel) after :ssh.close(conn) end end def stop(server) do :ssh.stop_daemon(server.daemon_ref) File.rm_rf!(server.system_dir) File.rm_rf!(server.sftp_root) end end ``` ## Helper This `Helper` module just defines a function, so that we can later inspect chunks from the enumerable everywhere in the same way. ```elixir defmodule Helper do def inspect_chunk(chunk) do IO.inspect(chunk, label: "chunk with #{byte_size(chunk)} bytes", limit: 5 ) end end ``` ## Setup ```elixir server = SSHServer.start() ``` We're not creating a big video file here. For demonstration purposes a small file still works through a few chunks ```elixir source_path = System.tmp_dir!() |> Path.join("my_big_video.mov") File.write!(source_path, :crypto.strong_rand_bytes(10_000)) File.stat!(source_path).size ``` ## Naive version As explained in the article, this loads the whole file into memory an can cause OOM errors ```elixir SSHServer.with_channel(server, fn channel -> video_file = File.read!(source_path) destination_path = "naive.mov" IO.inspect(byte_size(video_file), label: "Video File Size: ") :ok = :ssh_sftp.write_file(channel, destination_path, video_file) server.sftp_root |> Path.join(destination_path) |> File.exists?() |> IO.inspect(label: "File exists?:") end) ``` ## For comprehension Works, but: no cleanup on failure, leaks the Erlang API to every callsite, and the destination isn't a value you can pass around. ```elixir # 4 KB chunks so we see the streaming happen chunk_size = 4 * 1024 file_stream = File.stream!(source_path, chunk_size) destination_path = "for_comprehension.mov" SSHServer.with_channel(server, fn channel -> {:ok, handle} = :ssh_sftp.open(channel, destination_path, [:write, :binary]) for chunk <- file_stream do :ok = :ssh_sftp.write(channel, handle, chunk) Helper.inspect_chunk(chunk) end :ok = :ssh_sftp.close(channel, handle) end) File.read!(source_path) == server.sftp_root |> Path.join(destination_path) |> File.read!() ``` ## Understand the Collectable protocol Before we implement the `Collectable` protocol for `SFTP.Stream` we have a look at the protocol itself with a simple example. ```elixir defmodule ChunkInspector do defstruct [] defimpl Collectable do def into(struct) do collector_fun = fn _acc, {:cont, chunk} -> Helper.inspect_chunk(chunk) acc, :done -> acc acc, :halt -> acc end {struct, collector_fun} end end end ``` ```elixir source_path |> File.stream!(chunk_size) |> Stream.take(3) |> Enum.into(%ChunkInspector{}) :ok ``` ## Implement the Collectable protocol Now we know how `Collectable` works and we understand how to write a file over SFTP. Let's combine the two and implement our final version. ```elixir defmodule SFTP.Stream do defstruct [:channel, :path, :handle, :status] def build(channel, path) when is_pid(channel) and is_binary(path), do: %__MODULE__{channel: channel, path: path} defimpl Collectable do def into(%{channel: channel, path: path} = stream) do {:ok, handle} = :ssh_sftp.open(channel, path, [:write, :binary]) stream = %{stream | handle: handle, status: :ok} collector_fun = fn %{status: :ok, handle: handle} = stream, {:cont, chunk} -> case :ssh_sftp.write(channel, handle, chunk) do :ok -> stream {:error, err} -> %{stream | status: err} end stream, {:cont, _chunk} -> # status != :ok — wait for :done to clean up stream %{handle: handle, status: status}, :done -> :ssh_sftp.close(channel, handle) status %{handle: handle}, :halt -> :ssh_sftp.close(channel, handle) end {stream, collector_fun} end end end ``` ## Using SFTP.Stream ```elixir SSHServer.with_channel(server, fn channel -> source = File.stream!(source_path, chunk_size) destination_path = "collectable.mov" sink = SFTP.Stream.build(channel, destination_path) source |> Stream.into(sink) |> Stream.each(fn chunk -> Helper.inspect_chunk(chunk) end) |> Stream.run() |> IO.inspect(label: "Stream result") end) ``` Verify the upload: ```elixir File.read!(source_path) == server.sftp_root |> Path.join(destination_path) |> File.read!() ``` ## Comeback of the for comprehension The very first thing we wrote also accepts `:into` — meaning our SFTP sink slots in with no protocol-specific code: ```elixir SSHServer.with_channel(server, fn channel -> sink = SFTP.Stream.build(channel, "for_into.mov") for chunk <- File.stream!(source_path, chunk_size), into: sink do Helper.inspect_chunk(chunk) end end) File.exists?(Path.join(server.sftp_root, "for_into.mov")) ``` ## Other use cases `Req` accepts any `Collectable` as `:into`. So we can stream an HTTP download through our SFTP sink without ever touching disk on this side: ```elixir filename = "downloaded.txt" SSHServer.with_channel(server, fn channel -> sink = SFTP.Stream.build(channel, filename) Req.new(url: "https://httpbin.org/uuid", into: sink) |> Req.get!() end) file = server.sftp_root |> Path.join(filename) |> File.read!() if System.version() |> Version.parse!() |> Version.match?(">= 1.18.0") do JSON.decode!(file) else file end ``` ## Cleanup ```elixir File.rm!(source_path) SSHServer.stop(server) ```