From 77eeb74a3d5f9f9e6e8b57d3cbfe0e90af71b15c Mon Sep 17 00:00:00 2001 From: sloane Date: Fri, 13 Dec 2024 15:42:27 -0500 Subject: [PATCH] feat: add first transforms (gzip, gunzip, split, lines) - `gzip`, `gunzip`: transforms a stream of byte chunks (`iodata()`) by either compressing or decompressing it using `:zlib` - `split`: re-chunks a stream of binaries / strings into binary / string elements that are between the provided pattern - lines: shorthand for `split` with a pattern of `"\n"` --- lib/transform.ex | 96 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 87 insertions(+), 9 deletions(-) diff --git a/lib/transform.ex b/lib/transform.ex index e29fd80..bc88d6f 100644 --- a/lib/transform.ex +++ b/lib/transform.ex @@ -1,18 +1,96 @@ defmodule Transform do @moduledoc """ - Documentation for `Transform`. + `Transform` is a set of utilities for transforming `Stream`s in some way. """ @doc """ - Hello world. - - ## Examples - - iex> Transform.hello() - :world + Gunzips (un-gzips, inflates) a stream of binary chunks. + Produces a stream of uncompressed binary chunks. """ - def hello do - :world + def gunzip(stream) do + Stream.transform( + stream, + fn -> + zstream = :zlib.open() + :zlib.inflateInit(zstream) + zstream + end, + fn chunk, zstream -> + enumerable = + Stream.resource( + fn -> :zlib.safeInflate(zstream, chunk) end, + fn + {:continue, inflated} -> {List.wrap(inflated), :zlib.safeInflate(zstream, [])} + {:finished, inflated} -> {List.wrap(inflated), :halt} + :halt -> {:halt, nil} + end, + & &1 + ) + + {enumerable, zstream} + end, + &:zlib.close/1 + ) end + + @doc """ + Gzips a stream of binary chunks. + + Produces a stream of compressed binary chunks. + """ + def gzip(stream) do + Stream.transform( + stream, + fn -> + zstream = :zlib.open() + :zlib.deflateInit(zstream) + zstream + end, + fn chunk, zstream -> + {List.wrap(:zlib.deflate(zstream, chunk)), zstream} + end, + fn zstream -> + {List.wrap(:zlib.deflate(zstream, [], :finish)), zstream} + end, + &:zlib.close/1 + ) + end + + @doc """ + Re-chunks a stream of binary data (/ stings) into a stream of chunks where + each output chunk is the the result of spliting the entire unchunked source + stream on the provided binary pattern + """ + def split(stream, pattern) do + Stream.transform( + stream, + fn -> "" end, + fn chunk, acc -> + case String.split(chunk, pattern, parts: 2) do + # Could not split + [^chunk] -> + # Prefix the chunk with the accumulator + {[], acc <> chunk} + + # Could split. + # - The last item is our next accumulator + # - Prepend the first part with the accumulator + parts -> + {last, [first | parts]} = List.pop_at(parts, -1) + + {[acc <> first | parts], last} + end + end, + fn acc -> {List.wrap(acc), nil} end, + fn nil -> :ok end + ) + end + + @doc """ + Re-chunks a stream into lines + + This is an alias for `Transform.split(stream, "\n")`. + """ + def lines(stream), do: split(stream, "\n") end