When building Guesswork, a logic programming library for Elixir, I ran into an
interesting problem: supporting both logical And as well as infinite streams
of possibilities.
To give an example, the statement ‘Let A be some positive number, and let B be
some factorial of A’, has both an infinite stream of values (A could be 1, 2,
3, etc.) and a logical conjunction; it cannot be represented without both.
Background
At a basic level, Guesswork is pretty simple.
Each logical statement produces a stream of possible answers (which may be lazy
and could be infinite), and statements that take other statements as inputs (such
as And, Or, and Not) don’t know anything about their inputs, aside from
the fact that they are Enumerable.
Furthermore, the calculations behind that Enumerable can be pretty expensive.
For Or and Not, this isn’t much of an issue.
Not is very simple, it uses Stream.map/2 to turn each answer into it’s logical
negation, and Or uses Stream.concat/11.
And, however, is more complicated.
For And to produce a single answer it takes a single answer from each of its
inputs, and uses Guesswork.Answer.union/2 to combine them.
But to build the next answer, only a single answer is pulled from a single input,
and then combined with the already pulled answers from the other streams.
So not only must And hold all the previously pulled answers in some form of
state, but it must then produce the new answers in a lazy fashion.
An Eager Attempt
Before doing the hard work of building a lazy version, we’ll build a simpler,
eager version2.
The answer sets are just maps of terms3, and two answer sets can be combined with
union/2, provided that they have the same terms for all values.
defmodule LazyCombinations.Answer do
@type t() :: %{atom() => term()}
@spec new() :: t()
def new() do
%{}
end
@spec union(t(), t()) :: t() | nil
def union(a1, a2) do
new =
Map.merge(a1, a2, fn
_, t, t -> t
_, _, _ -> nil
end)
if Enum.any?(new, fn {_, t} -> t == nil end) do
nil
else
new
end
end
end
Building on the answer set, anding sets together is surprisingly4
straightforward.
Enum.reduce/2 allows us to walk through the args, unioning all possible pairs
together, and removing all invalid combinations.
defmodule LazyCombinations.EagerAnd do
defstruct [:args]
alias LazyCombinations.Answer
@type t() :: %__MODULE__{args: Enumerable.t(Answer.t())}
@spec new([Enumerable.t(Answer.t())]) :: t()
def new(args) do
%__MODULE__{args: args}
end
@spec resolve(t()) :: [Answer.t()]
def resolve(%__MODULE__{args: args}) do
Enum.reduce(args, fn val, acc ->
Enum.flat_map(val, fn val_item ->
acc
|> Enum.map(&Answer.union(&1, val_item))
|> Enum.filter(&Kernel.!=(&1, nil))
end)
end)
end
end
Finally, we can confirm that everything works.
arg1 = [%{a: 1}, %{a: 2}]
arg2 = [%{b: 1}, %{b: 2}]
arg3 = [%{c: 1}, %{c: 2}]
LazyCombinations.EagerAnd.new([arg1, arg2, arg3])
|> LazyCombinations.EagerAnd.resolve()
[
%{c: 1, a: 1, b: 1},
%{c: 1, a: 2, b: 1},
%{c: 1, a: 1, b: 2},
%{c: 1, a: 2, b: 2},
%{c: 2, a: 1, b: 1},
%{c: 2, a: 2, b: 1},
%{c: 2, a: 1, b: 2},
%{c: 2, a: 2, b: 2}
]
Laziness
The above example is a good starting point, but it is obviously not exactly what
we want, because if we tried to represent ‘Let A be some positive number’5
and passed that stream of answer sets to the above And, the resolve/1 function
would never return.
Iterating over Streams
Before getting into the meat of this problem, we have to resolve the issue of
pulling items from our Enumerable inputs one at a time6.
As there is no way to do this directly in the standard library, we’ll have to
use Enumerable.reduce/3.
This is actually how many of the functions in the standard library are
implemented, and it provides very fine-grained control over how we iterate over
the data.
The stream here is another infinite list of numbers, built with
Stream.iterate/2.
To pull items one at a time we need to use a reducer/0 function that always
returns :suspend, which in turn instructs reduce/3 to pause the stream and
return the next value and a continuation.
stream = Stream.iterate(1, &Kernel.+(&1, 1))
reduce_fun = fn item, _acc -> {:suspend, item} end
{:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun)
{:suspended, val, continuation} = continuation.({:cont, nil})
val
Once we have the continuation we can just keep calling it with {:cont, nil},
which instructs the closure to move the stream forward once, before the reducer/0
function suspends it again.
{:suspended, val, continuation} = continuation.({:cont, nil})
val
Lazy And
To actually build a stream of values we are going to use Stream.unfold/2.
This function works for our purposes because it allows us to store state in an
accumulator.
We can then generate our sequence from that state, updating it as we go.
The exact details of that ‘state’ are abstracted away here using a behavior7
where the important callback is next_unions/1.
next_unions/1 just takes a list of converted streams and either indicates that
there is nothing left to calculate (:empty or :done) or returns a list of
‘unioned’ answer sets and the next version of the streams to keep in the accumulator.
defmodule LazyCombinations.LazyAnd do
defstruct [:partial_module, :args]
alias LazyCombinations.Answer
@callback new(Enumerable.t(Answer.t())) :: term()
@doc """
Takes a list of partial evaluations and attempts to calculate the next set of
unioned answer sets, based on the next evaluated answer set unioned with the
previously evaluated answer sets.
"""
@callback next_unions([term()]) :: :empty | :done | {:ok, [term()], [Answer.t()]}
@type t() :: %__MODULE__{args: Enumerable.t(Answer.t())}
@spec new(module(), [Enumerable.t(Answer.t())]) :: t()
def new(partial_module, args) do
%__MODULE__{partial_module: partial_module, args: args}
end
@spec resolve(t()) :: Enumerable.t(Answer.t())
def resolve(%__MODULE__{args: args, partial_module: partial_module}) do
streams = Enum.map(args, &partial_module.new/1)
Stream.unfold({streams, []}, &unfold_streams(&1, partial_module))
end
def unfold_streams({streams, []}, partial_module) do
case partial_module.next_unions(streams) do
:empty -> nil
:done -> nil
{:ok, new_streams, unions} -> unfold_streams({new_streams, unions}, partial_module)
end
end
def unfold_streams({streams, [head | rest]}, _), do: {head, {streams, rest}}
end
There is a lot of code in the state below, but that is mostly handling all the
edge cases.
The first bit, new/1 and head/1 are just modified versions of the iterated
stream we saw above.
The only noticeable difference is that a defstruct is used to hold the
continuation (stream) and the previously evaluated answer sets.
The rest of the module is spent implementing next_unions/1, which simply
works through the streams, pulling an answer set if nothing has been evaluated,
and ‘unioning’ everything together as it works.
defmodule LazyCombinations.LazyAnd.SimplePartiallyEvaluated do
@behaviour LazyCombinations.LazyAnd
defstruct [:continuation, evaluated: []]
alias LazyCombinations.Answer
@type t() :: %__MODULE__{
continuation: Enumerable.continuation() | nil,
evaluated: [Answer.t()]
}
@impl true
@spec new(Enumerable.t(Answer.t())) :: t()
def new(stream) do
reduce_fun = fn item, _acc -> {:suspend, item} end
{:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun)
%__MODULE__{continuation: continuation}
end
@spec head(t()) :: t()
def head(%__MODULE__{continuation: nil} = stream), do: stream
def head(%__MODULE__{continuation: continuation, evaluated: evaluated}) do
case continuation.({:cont, nil}) do
{:suspended, item, continuation} ->
%__MODULE__{continuation: continuation, evaluated: [item | evaluated]}
{:halted, item} ->
%__MODULE__{continuation: nil, evaluated: [item | evaluated]}
{:done, nil} ->
%__MODULE__{continuation: nil, evaluated: evaluated}
end
end
@spec done?(t()) :: boolean()
def done?(%__MODULE__{continuation: continuation}) do
continuation == nil
end
@impl true
@spec next_unions([t()]) :: :empty | :done | {:ok, [t()], [Answer.t()]}
def next_unions(streams) do
case Enum.reduce_while(streams, {false, [], [Answer.new()]}, &collect_next_head/2) do
:empty -> :empty
{false, _, _} -> :done
{true, new_streams, unions} -> {:ok, new_streams, unions}
end
end
defp collect_next_head(%__MODULE__{continuation: nil, evaluated: []}, _), do: {:halt, :empty}
defp collect_next_head(%__MODULE__{evaluated: []} = stream, {_, streams, acc}) do
next = head(stream)
if done?(next) do
{:halt, :empty}
else
{:cont, {true, [next | streams], union_pulled_value(next, acc)}}
end
end
defp collect_next_head(
%__MODULE__{continuation: continuation} = stream,
{false, streams, acc}
)
when not is_nil(continuation) do
next = head(stream)
if done?(next) do
{:cont, {false, [next | streams], union_unpulled_value(next, acc)}}
else
{:cont, {true, [next | streams], union_pulled_value(next, acc)}}
end
end
defp collect_next_head(%__MODULE__{} = stream, {pulled, streams, acc}),
do: {:cont, {pulled, [stream | streams], union_unpulled_value(stream, acc)}}
defp union_pulled_value(%__MODULE__{evaluated: [head | _]}, acc) do
acc
|> Enum.map(&Answer.union(&1, head))
|> Enum.filter(&Kernel.!=(&1, nil))
end
defp union_unpulled_value(%__MODULE__{evaluated: evaluated}, acc) do
Enum.flat_map(evaluated, fn eval_item ->
acc
|> Enum.map(&Answer.union(&1, eval_item))
|> Enum.filter(&Kernel.!=(&1, nil))
end)
end
end
Now we can confirm that everything works!
The only difference is that, because our new and is lazy, we need to use
Enum.to_list/0 to force evaluation.
arg1 = [%{a: 1}, %{a: 2}]
arg2 = [%{b: 1}, %{b: 2}]
arg3 = [%{c: 1}, %{c: 2}]
LazyCombinations.LazyAnd.new(LazyCombinations.LazyAnd.SimplePartiallyEvaluated,
[arg1, arg2, arg3])
|> LazyCombinations.LazyAnd.resolve()
|> Enum.to_list()
[
%{c: 1, a: 1, b: 1},
%{c: 2, a: 1, b: 1},
%{c: 2, a: 2, b: 1},
%{c: 1, a: 2, b: 1},
%{c: 2, a: 2, b: 2},
%{c: 1, a: 2, b: 2},
%{c: 2, a: 1, b: 2},
%{c: 1, a: 1, b: 2}
]
Ensuring Equal Evaluation
Now we’ll test some infinite streams with a helper function that, again, builds
a stream of positive numbers.
defmodule Test do
def build_answer_stream(var) do
Stream.iterate(1, &Kernel.+(&1, 1))
|> Stream.map(&%{var => &1})
end
end
But, sadly, there is a subtle issue with this implementation.
If you take a look at the result below, you’ll see that our :b stream never
gets past 1.
This can be a bigger problem than it might first appear.
Consider the Pythagorean triples example and imagine our current setup.
If you request at least one answer, the function will never return because there
is no valid answer where B is 1.
arg1 = Test.build_answer_stream(:a)
arg2 = [%{b: 1}, %{b: 2}]
arg3 = Test.build_answer_stream(:c)
LazyCombinations.LazyAnd.new(LazyCombinations.LazyAnd.SimplePartiallyEvaluated,
[arg1, arg2, arg3])
|> LazyCombinations.LazyAnd.resolve()
|> Enum.take(10)
[
%{c: 1, a: 1, b: 1},
%{c: 2, a: 1, b: 1},
%{c: 2, a: 2, b: 1},
%{c: 1, a: 2, b: 1},
%{c: 3, a: 2, b: 1},
%{c: 3, a: 1, b: 1},
%{c: 3, a: 3, b: 1},
%{c: 2, a: 3, b: 1},
%{c: 1, a: 3, b: 1},
%{c: 4, a: 3, b: 1}
]
The above bug is a result of the order in which the streams are evaluated.
The first incomplete stream seen in the list will have a new value calculated.
In the above example, that first stream is always one of the infinite streams.
An answer is to ensure that the smaller streams are always seen first.
The module below achieves this by tracking the count (so we do not have to count
through every item of the evaluated list each time we order the streams) and
then sorting the streams on each call of next_unions/18 in lines 51-53.
defmodule LazyCombinations.LazyAnd.OrderedPartiallyEvaluated do
@behaviour LazyCombinations.LazyAnd
defstruct [:continuation, evaluated: [], count: 0]
alias LazyCombinations.Answer
@type t() :: %__MODULE__{
continuation: Enumerable.continuation() | nil,
evaluated: [Answer.t()],
count: integer()
}
@impl true
@spec new(Enumerable.t(Answer.t())) :: t()
def new(stream) do
reduce_fun = fn item, _acc -> {:suspend, item} end
{:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun)
%__MODULE__{continuation: continuation}
end
@spec head(t()) :: t()
def head(%__MODULE__{continuation: nil} = stream), do: stream
def head(%__MODULE__{continuation: continuation, evaluated: evaluated, count: count} = partial) do
case continuation.({:cont, nil}) do
{:suspended, item, continuation} ->
%__MODULE__{continuation: continuation, evaluated: [item | evaluated], count: count + 1}
{:halted, item} ->
%__MODULE__{partial | continuation: nil, evaluated: [item | evaluated]}
{:done, nil} ->
%__MODULE__{partial | continuation: nil, evaluated: evaluated}
end
end
@spec done?(t()) :: boolean()
def done?(%__MODULE__{continuation: continuation}) do
continuation == nil
end
@spec count(t()) :: integer()
def count(%__MODULE__{count: count}) do
count
end
@impl true
@spec next_unions([t()]) :: :empty | :done | {:ok, [t()], [Answer.t()]}
def next_unions(streams) do
case streams
|> Enum.sort_by(&count/1)
|> Enum.reduce_while({false, [], [Answer.new()]}, &collect_next_head/2) do
:empty -> :empty
{false, _, _} -> :done
{true, new_streams, unions} -> {:ok, new_streams, unions}
end
end
defp collect_next_head(%__MODULE__{continuation: nil, evaluated: []}, _), do: {:halt, :empty}
defp collect_next_head(%__MODULE__{evaluated: []} = stream, {_, streams, acc}) do
next = head(stream)
if done?(next) do
{:halt, :empty}
else
{:cont, {true, [next | streams], union_pulled_value(next, acc)}}
end
end
defp collect_next_head(%__MODULE__{continuation: continuation} = stream, {false, streams, acc})
when not is_nil(continuation) do
next = head(stream)
if done?(next) do
{:cont, {false, [next | streams], union_unpulled_value(next, acc)}}
else
{:cont, {true, [next | streams], union_pulled_value(next, acc)}}
end
end
defp collect_next_head(%__MODULE__{} = stream, {pulled, streams, acc}),
do: {:cont, {pulled, [stream | streams], union_unpulled_value(stream, acc)}}
defp union_pulled_value(%__MODULE__{evaluated: [head | _]}, acc) do
acc
|> Enum.map(&Answer.union(&1, head))
|> Enum.filter(&Kernel.!=(&1, nil))
end
defp union_unpulled_value(%__MODULE__{evaluated: evaluated}, acc) do
Enum.flat_map(evaluated, fn eval_item ->
acc
|> Enum.map(&Answer.union(&1, eval_item))
|> Enum.filter(&Kernel.!=(&1, nil))
end)
end
end
Finally, we’ll check that this change works.
And, low and behold, :b makes it to 2!
Everything is being evaluated, and while we might have to check a lot of
possibilities, we won’t ever get stuck because an answer set is never created.
arg1 = Test.build_answer_stream(:a)
arg2 = [%{b: 1}, %{b: 2}]
arg3 = Test.build_answer_stream(:c)
LazyCombinations.LazyAnd.new(LazyCombinations.LazyAnd.OrderedPartiallyEvaluated,
[arg1, arg2, arg3])
|> LazyCombinations.LazyAnd.resolve()
|> Enum.take(10)
[
%{c: 1, a: 1, b: 1},
%{c: 2, a: 1, b: 1},
%{c: 2, a: 2, b: 1},
%{c: 1, a: 2, b: 1},
%{c: 2, a: 2, b: 2},
%{c: 1, a: 2, b: 2},
%{c: 2, a: 1, b: 2},
%{c: 1, a: 1, b: 2},
%{c: 2, a: 3, b: 2},
%{c: 1, a: 3, b: 2}
]
Conclusion
Elixir’s standard library is very comprehensive (especially the Enum and
Stream modules), but not every use case is, or should be, implemented in it.
However, the abstractions that those modules are built on, like the Enumerable
protocol, provide a lot of leverage and let one build pretty complex behavior
with surprisingly little work.