defmodule Transform do @moduledoc """ `Transform` is a set of utilities for transforming `Stream`s in some way. """ @doc """ Gunzips (un-gzips, inflates) a stream of binary chunks. Produces a stream of uncompressed binary chunks. """ def gunzip(stream) do Stream.transform( stream, fn -> zstream = :zlib.open() :zlib.inflateInit(zstream) zstream end, fn chunk, zstream -> enumerable = Stream.resource( fn -> :zlib.safeInflate(zstream, chunk) end, fn {:continue, inflated} -> {List.wrap(inflated), :zlib.safeInflate(zstream, [])} {:finished, inflated} -> {List.wrap(inflated), :halt} :halt -> {:halt, nil} end, & &1 ) {enumerable, zstream} end, &:zlib.close/1 ) end @doc """ Gzips a stream of binary chunks. Produces a stream of compressed binary chunks. """ def gzip(stream) do Stream.transform( stream, fn -> zstream = :zlib.open() :zlib.deflateInit(zstream) zstream end, fn chunk, zstream -> {List.wrap(:zlib.deflate(zstream, chunk)), zstream} end, fn zstream -> {List.wrap(:zlib.deflate(zstream, [], :finish)), zstream} end, &:zlib.close/1 ) end @doc """ Re-chunks a stream of binary data (/ stings) into a stream of chunks where each output chunk is the the result of spliting the entire unchunked source stream on the provided binary pattern """ def split(stream, pattern) do Stream.transform( stream, fn -> "" end, fn chunk, acc -> case String.split(chunk, pattern, parts: 2) do # Could not split [^chunk] -> # Prefix the chunk with the accumulator {[], acc <> chunk} # Could split. # - The last item is our next accumulator # - Prepend the first part with the accumulator parts -> {last, [first | parts]} = List.pop_at(parts, -1) {[acc <> first | parts], last} end end, fn acc -> {List.wrap(acc), nil} end, fn nil -> :ok end ) end @doc """ Re-chunks a stream into lines This is an alias for `Transform.split(stream, "\n")`. """ def lines(stream), do: split(stream, "\n") end