Lazy Combinations in Elixir

4 days ago 1

When building Guesswork, a logic programming library for Elixir, I ran into an interesting problem: supporting both logical And as well as infinite streams of possibilities. To give an example, the statement ‘Let A be some positive number, and let B be some factorial of A’, has both an infinite stream of values (A could be 1, 2, 3, etc.) and a logical conjunction; it cannot be represented without both.

Background

At a basic level, Guesswork is pretty simple. Each logical statement produces a stream of possible answers (which may be lazy and could be infinite), and statements that take other statements as inputs (such as And, Or, and Not) don’t know anything about their inputs, aside from the fact that they are Enumerable. Furthermore, the calculations behind that Enumerable can be pretty expensive. For Or and Not, this isn’t much of an issue. Not is very simple, it uses Stream.map/2 to turn each answer into it’s logical negation, and Or uses Stream.concat/11.

And, however, is more complicated. For And to produce a single answer it takes a single answer from each of its inputs, and uses Guesswork.Answer.union/2 to combine them. But to build the next answer, only a single answer is pulled from a single input, and then combined with the already pulled answers from the other streams. So not only must And hold all the previously pulled answers in some form of state, but it must then produce the new answers in a lazy fashion.

An Eager Attempt

Before doing the hard work of building a lazy version, we’ll build a simpler, eager version2. The answer sets are just maps of terms3, and two answer sets can be combined with union/2, provided that they have the same terms for all values.

defmodule LazyCombinations.Answer do @type t() :: %{atom() => term()} @spec new() :: t() def new() do %{} end @spec union(t(), t()) :: t() | nil def union(a1, a2) do new = Map.merge(a1, a2, fn _, t, t -> t _, _, _ -> nil end) if Enum.any?(new, fn {_, t} -> t == nil end) do nil else new end end end

Building on the answer set, anding sets together is surprisingly4 straightforward. Enum.reduce/2 allows us to walk through the args, unioning all possible pairs together, and removing all invalid combinations.

defmodule LazyCombinations.EagerAnd do defstruct [:args] alias LazyCombinations.Answer @type t() :: %__MODULE__{args: Enumerable.t(Answer.t())} @spec new([Enumerable.t(Answer.t())]) :: t() def new(args) do %__MODULE__{args: args} end @spec resolve(t()) :: [Answer.t()] def resolve(%__MODULE__{args: args}) do Enum.reduce(args, fn val, acc -> Enum.flat_map(val, fn val_item -> acc |> Enum.map(&Answer.union(&1, val_item)) |> Enum.filter(&Kernel.!=(&1, nil)) end) end) end end

Finally, we can confirm that everything works.

arg1 = [%{a: 1}, %{a: 2}] arg2 = [%{b: 1}, %{b: 2}] arg3 = [%{c: 1}, %{c: 2}] LazyCombinations.EagerAnd.new([arg1, arg2, arg3]) |> LazyCombinations.EagerAnd.resolve()
[ %{c: 1, a: 1, b: 1}, %{c: 1, a: 2, b: 1}, %{c: 1, a: 1, b: 2}, %{c: 1, a: 2, b: 2}, %{c: 2, a: 1, b: 1}, %{c: 2, a: 2, b: 1}, %{c: 2, a: 1, b: 2}, %{c: 2, a: 2, b: 2} ]

Laziness

The above example is a good starting point, but it is obviously not exactly what we want, because if we tried to represent ‘Let A be some positive number’5 and passed that stream of answer sets to the above And, the resolve/1 function would never return.

Iterating over Streams

Before getting into the meat of this problem, we have to resolve the issue of pulling items from our Enumerable inputs one at a time6. As there is no way to do this directly in the standard library, we’ll have to use Enumerable.reduce/3. This is actually how many of the functions in the standard library are implemented, and it provides very fine-grained control over how we iterate over the data.

The stream here is another infinite list of numbers, built with Stream.iterate/2. To pull items one at a time we need to use a reducer/0 function that always returns :suspend, which in turn instructs reduce/3 to pause the stream and return the next value and a continuation.

stream = Stream.iterate(1, &Kernel.+(&1, 1)) reduce_fun = fn item, _acc -> {:suspend, item} end {:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun) {:suspended, val, continuation} = continuation.({:cont, nil}) val

Once we have the continuation we can just keep calling it with {:cont, nil}, which instructs the closure to move the stream forward once, before the reducer/0 function suspends it again.

{:suspended, val, continuation} = continuation.({:cont, nil}) val

Lazy And

To actually build a stream of values we are going to use Stream.unfold/2. This function works for our purposes because it allows us to store state in an accumulator. We can then generate our sequence from that state, updating it as we go. The exact details of that ‘state’ are abstracted away here using a behavior7 where the important callback is next_unions/1. next_unions/1 just takes a list of converted streams and either indicates that there is nothing left to calculate (:empty or :done) or returns a list of ‘unioned’ answer sets and the next version of the streams to keep in the accumulator.

defmodule LazyCombinations.LazyAnd do defstruct [:partial_module, :args] alias LazyCombinations.Answer @callback new(Enumerable.t(Answer.t())) :: term() @doc """ Takes a list of partial evaluations and attempts to calculate the next set of unioned answer sets, based on the next evaluated answer set unioned with the previously evaluated answer sets. """ @callback next_unions([term()]) :: :empty | :done | {:ok, [term()], [Answer.t()]} @type t() :: %__MODULE__{args: Enumerable.t(Answer.t())} @spec new(module(), [Enumerable.t(Answer.t())]) :: t() def new(partial_module, args) do %__MODULE__{partial_module: partial_module, args: args} end @spec resolve(t()) :: Enumerable.t(Answer.t()) def resolve(%__MODULE__{args: args, partial_module: partial_module}) do streams = Enum.map(args, &partial_module.new/1) Stream.unfold({streams, []}, &unfold_streams(&1, partial_module)) end def unfold_streams({streams, []}, partial_module) do case partial_module.next_unions(streams) do :empty -> nil :done -> nil {:ok, new_streams, unions} -> unfold_streams({new_streams, unions}, partial_module) end end def unfold_streams({streams, [head | rest]}, _), do: {head, {streams, rest}} end

There is a lot of code in the state below, but that is mostly handling all the edge cases. The first bit, new/1 and head/1 are just modified versions of the iterated stream we saw above. The only noticeable difference is that a defstruct is used to hold the continuation (stream) and the previously evaluated answer sets.

The rest of the module is spent implementing next_unions/1, which simply works through the streams, pulling an answer set if nothing has been evaluated, and ‘unioning’ everything together as it works.

defmodule LazyCombinations.LazyAnd.SimplePartiallyEvaluated do @behaviour LazyCombinations.LazyAnd defstruct [:continuation, evaluated: []] alias LazyCombinations.Answer @type t() :: %__MODULE__{ continuation: Enumerable.continuation() | nil, evaluated: [Answer.t()] } @impl true @spec new(Enumerable.t(Answer.t())) :: t() def new(stream) do reduce_fun = fn item, _acc -> {:suspend, item} end {:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun) %__MODULE__{continuation: continuation} end @spec head(t()) :: t() def head(%__MODULE__{continuation: nil} = stream), do: stream def head(%__MODULE__{continuation: continuation, evaluated: evaluated}) do case continuation.({:cont, nil}) do {:suspended, item, continuation} -> %__MODULE__{continuation: continuation, evaluated: [item | evaluated]} {:halted, item} -> %__MODULE__{continuation: nil, evaluated: [item | evaluated]} {:done, nil} -> %__MODULE__{continuation: nil, evaluated: evaluated} end end @spec done?(t()) :: boolean() def done?(%__MODULE__{continuation: continuation}) do continuation == nil end @impl true @spec next_unions([t()]) :: :empty | :done | {:ok, [t()], [Answer.t()]} def next_unions(streams) do case Enum.reduce_while(streams, {false, [], [Answer.new()]}, &collect_next_head/2) do :empty -> :empty {false, _, _} -> :done {true, new_streams, unions} -> {:ok, new_streams, unions} end end defp collect_next_head(%__MODULE__{continuation: nil, evaluated: []}, _), do: {:halt, :empty} defp collect_next_head(%__MODULE__{evaluated: []} = stream, {_, streams, acc}) do next = head(stream) if done?(next) do {:halt, :empty} else {:cont, {true, [next | streams], union_pulled_value(next, acc)}} end end defp collect_next_head( %__MODULE__{continuation: continuation} = stream, {false, streams, acc} ) when not is_nil(continuation) do next = head(stream) if done?(next) do {:cont, {false, [next | streams], union_unpulled_value(next, acc)}} else {:cont, {true, [next | streams], union_pulled_value(next, acc)}} end end defp collect_next_head(%__MODULE__{} = stream, {pulled, streams, acc}), do: {:cont, {pulled, [stream | streams], union_unpulled_value(stream, acc)}} defp union_pulled_value(%__MODULE__{evaluated: [head | _]}, acc) do acc |> Enum.map(&Answer.union(&1, head)) |> Enum.filter(&Kernel.!=(&1, nil)) end defp union_unpulled_value(%__MODULE__{evaluated: evaluated}, acc) do Enum.flat_map(evaluated, fn eval_item -> acc |> Enum.map(&Answer.union(&1, eval_item)) |> Enum.filter(&Kernel.!=(&1, nil)) end) end end

Now we can confirm that everything works! The only difference is that, because our new and is lazy, we need to use Enum.to_list/0 to force evaluation.

arg1 = [%{a: 1}, %{a: 2}] arg2 = [%{b: 1}, %{b: 2}] arg3 = [%{c: 1}, %{c: 2}] LazyCombinations.LazyAnd.new(LazyCombinations.LazyAnd.SimplePartiallyEvaluated, [arg1, arg2, arg3]) |> LazyCombinations.LazyAnd.resolve() |> Enum.to_list()
[ %{c: 1, a: 1, b: 1}, %{c: 2, a: 1, b: 1}, %{c: 2, a: 2, b: 1}, %{c: 1, a: 2, b: 1}, %{c: 2, a: 2, b: 2}, %{c: 1, a: 2, b: 2}, %{c: 2, a: 1, b: 2}, %{c: 1, a: 1, b: 2} ]

Ensuring Equal Evaluation

Now we’ll test some infinite streams with a helper function that, again, builds a stream of positive numbers.

defmodule Test do def build_answer_stream(var) do Stream.iterate(1, &Kernel.+(&1, 1)) |> Stream.map(&%{var => &1}) end end

But, sadly, there is a subtle issue with this implementation. If you take a look at the result below, you’ll see that our :b stream never gets past 1. This can be a bigger problem than it might first appear. Consider the Pythagorean triples example and imagine our current setup. If you request at least one answer, the function will never return because there is no valid answer where B is 1.

arg1 = Test.build_answer_stream(:a) arg2 = [%{b: 1}, %{b: 2}] arg3 = Test.build_answer_stream(:c) LazyCombinations.LazyAnd.new(LazyCombinations.LazyAnd.SimplePartiallyEvaluated, [arg1, arg2, arg3]) |> LazyCombinations.LazyAnd.resolve() |> Enum.take(10)
[ %{c: 1, a: 1, b: 1}, %{c: 2, a: 1, b: 1}, %{c: 2, a: 2, b: 1}, %{c: 1, a: 2, b: 1}, %{c: 3, a: 2, b: 1}, %{c: 3, a: 1, b: 1}, %{c: 3, a: 3, b: 1}, %{c: 2, a: 3, b: 1}, %{c: 1, a: 3, b: 1}, %{c: 4, a: 3, b: 1} ]

The above bug is a result of the order in which the streams are evaluated. The first incomplete stream seen in the list will have a new value calculated. In the above example, that first stream is always one of the infinite streams. An answer is to ensure that the smaller streams are always seen first.

The module below achieves this by tracking the count (so we do not have to count through every item of the evaluated list each time we order the streams) and then sorting the streams on each call of next_unions/18 in lines 51-53.

defmodule LazyCombinations.LazyAnd.OrderedPartiallyEvaluated do @behaviour LazyCombinations.LazyAnd defstruct [:continuation, evaluated: [], count: 0] alias LazyCombinations.Answer @type t() :: %__MODULE__{ continuation: Enumerable.continuation() | nil, evaluated: [Answer.t()], count: integer() } @impl true @spec new(Enumerable.t(Answer.t())) :: t() def new(stream) do reduce_fun = fn item, _acc -> {:suspend, item} end {:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun) %__MODULE__{continuation: continuation} end @spec head(t()) :: t() def head(%__MODULE__{continuation: nil} = stream), do: stream def head(%__MODULE__{continuation: continuation, evaluated: evaluated, count: count} = partial) do case continuation.({:cont, nil}) do {:suspended, item, continuation} -> %__MODULE__{continuation: continuation, evaluated: [item | evaluated], count: count + 1} {:halted, item} -> %__MODULE__{partial | continuation: nil, evaluated: [item | evaluated]} {:done, nil} -> %__MODULE__{partial | continuation: nil, evaluated: evaluated} end end @spec done?(t()) :: boolean() def done?(%__MODULE__{continuation: continuation}) do continuation == nil end @spec count(t()) :: integer() def count(%__MODULE__{count: count}) do count end @impl true @spec next_unions([t()]) :: :empty | :done | {:ok, [t()], [Answer.t()]} def next_unions(streams) do case streams |> Enum.sort_by(&count/1) |> Enum.reduce_while({false, [], [Answer.new()]}, &collect_next_head/2) do :empty -> :empty {false, _, _} -> :done {true, new_streams, unions} -> {:ok, new_streams, unions} end end defp collect_next_head(%__MODULE__{continuation: nil, evaluated: []}, _), do: {:halt, :empty} defp collect_next_head(%__MODULE__{evaluated: []} = stream, {_, streams, acc}) do next = head(stream) if done?(next) do {:halt, :empty} else {:cont, {true, [next | streams], union_pulled_value(next, acc)}} end end defp collect_next_head(%__MODULE__{continuation: continuation} = stream, {false, streams, acc}) when not is_nil(continuation) do next = head(stream) if done?(next) do {:cont, {false, [next | streams], union_unpulled_value(next, acc)}} else {:cont, {true, [next | streams], union_pulled_value(next, acc)}} end end defp collect_next_head(%__MODULE__{} = stream, {pulled, streams, acc}), do: {:cont, {pulled, [stream | streams], union_unpulled_value(stream, acc)}} defp union_pulled_value(%__MODULE__{evaluated: [head | _]}, acc) do acc |> Enum.map(&Answer.union(&1, head)) |> Enum.filter(&Kernel.!=(&1, nil)) end defp union_unpulled_value(%__MODULE__{evaluated: evaluated}, acc) do Enum.flat_map(evaluated, fn eval_item -> acc |> Enum.map(&Answer.union(&1, eval_item)) |> Enum.filter(&Kernel.!=(&1, nil)) end) end end

Finally, we’ll check that this change works. And, low and behold, :b makes it to 2! Everything is being evaluated, and while we might have to check a lot of possibilities, we won’t ever get stuck because an answer set is never created.

arg1 = Test.build_answer_stream(:a) arg2 = [%{b: 1}, %{b: 2}] arg3 = Test.build_answer_stream(:c) LazyCombinations.LazyAnd.new(LazyCombinations.LazyAnd.OrderedPartiallyEvaluated, [arg1, arg2, arg3]) |> LazyCombinations.LazyAnd.resolve() |> Enum.take(10)
[ %{c: 1, a: 1, b: 1}, %{c: 2, a: 1, b: 1}, %{c: 2, a: 2, b: 1}, %{c: 1, a: 2, b: 1}, %{c: 2, a: 2, b: 2}, %{c: 1, a: 2, b: 2}, %{c: 2, a: 1, b: 2}, %{c: 1, a: 1, b: 2}, %{c: 2, a: 3, b: 2}, %{c: 1, a: 3, b: 2} ]

Conclusion

Elixir’s standard library is very comprehensive (especially the Enum and Stream modules), but not every use case is, or should be, implemented in it. However, the abstractions that those modules are built on, like the Enumerable protocol, provide a lot of leverage and let one build pretty complex behavior with surprisingly little work.

Read Entire Article