Stream
public struct Stream<T>
A lazy, asynchronous, concurrent, and reusable stream of operations on elements of data.
let sum = (1..<50000).stream()
.filter { $0 % 2 == 0 }
.reduce(0, reducer: +)
.wait()
This example makes a stream from the collection created by (1..<50000)
.
Then it filters out from that stream all elements that aren’t even.
Then it reduces the stream;
it starts at 0, then uses + to combine all elements.
Streams work asynchronously, so wait()
is called to wait on the result.
There are three stages to using streams.
Create the stream
Most often, a stream will be created using
CollectionType.stream()
, orStream.of(T...)
. But it is possible to create custom streams.A stream uses the
Continuation
monad to pass elements to handlers. A continuation is specialized to the typeContinuation<Future<()>, T>
. It returns aFuture<()>
because the handlers are asynchronous. The goal is to create a continuation that will call the handler once for each element. This is encouraged to be concurrent. A stream shouldn’t complete its future until all the futures returned by the handler are completed.The future’s type is
Future<()>
because it is isomorphic to()
. That is to say there is no actual return value. The only thing that matters is the time of completion.Manipulate the stream
There are several intermediary operations to manipulate the stream. The simplest is
Stream<T>.map(T -> U)
. This returns a stream that maps elements of the original stream to elements of a different type.Streams are immutable, lazy, and reusable, so intermediary operations aren’t changing the stream or its elements. Instead, they construct a new stream that will get its elements from the old stream, and modify them accordingly before passing the element to a handler.
Run the stream
Terminal operations on a stream will start running the stream. Most often, the stream will run asynchronously and concurrently, but this depends on the how the stream was created.
Running
a stream means to start accepting elements of the stream with a handler. The simplest example of this isStream.forEach
, which calls a handler for each element.Most streams are concurrent. This leads to dramatic performance improvements in many scenarios. There are, however, situations where the overhead of concurrency outweights the performance gains. For example, there are two different methods of reducing a stream. One is psuedo-serial, in that the reduction is performed serially, while the elements are computed concurrently. The other is fully concurrent, where both the computation of elements and the reduction are concurrent. For very fast reduction functions, the psuedo-serial method is usually faster, since there’s no concurrency overhead. For slower reduction functions, the concurrent method is usually faster, since more reductions can be occurring at a time.
Streams and collections differ in several ways.
Besides being concurrent and asynchronous, streams are lazy and unordered.
Streams do not store elements, and instead rely on abstract data sources.
They can’t have their count calculated.
Most importantly, Stream
is not a data structure.
It is a pipeline for manipulating elements, no matter how many there are.
This is the inspiration for using the Continuation
monad.
-
The continuation that defines this
Stream
.cont.run
is the function to call to run the stream on some handler.Declaration
Swift
public let cont: Continuation<Future<()>, T>
-
Initializes the stream with a continuation.
Declaration
Swift
public init(_ cont: Continuation<Future<()>, T>)
-
Initializes the stream with an unwrapped continuation.
Declaration
Swift
public init(_ cont: (T -> Future<()>) -> Future<()>)
-
Functor fmap.
Maps this stream to a stream of another type.
Declaration
Swift
public func map<U>(mapper: T -> U) -> Stream<U>
Parameters
mapper
The function to apply to elements of this stream.
Return Value
A stream whose elements are the results of applying the mapper to the elements of this stream.
-
Applicative <*>
Applies the functions in another stream to elements of this stream.
Declaration
Swift
public func apply<U>(mappers: Stream<T -> U>) -> Stream<U>
Parameters
mappers
The stream of functions to apply to elements of this stream.
Return Value
A stream whose elements are the results of applying all functions in another stream to all elements of this stream.
-
Monad return.
Declaration
Swift
public static func of<T>(element: T) -> Stream<T>
Parameters
element
The element to make a stream around.
Return Value
A stream with one element.
-
Monad bind.
Maps elements of this stream to streams, and concatenates the results.
Declaration
Swift
public func flatMap<U>(mapper: T -> Stream<U>) -> Stream<U>
Parameters
mapper
The function to map elements with.
Return Value
A stream whose elements are the elements of all streams returned by applying the mapper to each element of this stream.
-
Monad join
The join function is the conventional monad join operator. It is used to remove one level of monadic structure, projecting its bound argument into the outer level.
Declaration
Swift
public static func join<T>(streams: Stream<Stream<T>>) -> Stream<T>
Parameters
streams
The streams to join.
Return Value
A stream whose elements are all the elements of the streams in the parameter.
-
Monoid mempty
Declaration
Swift
public static func empty<T>() -> Stream<T>
Return Value
An empty stream of any type.
-
Monoid mappend
Declaration
Swift
public func appended(other: Stream<T>) -> Stream<T>
Return Value
A stream whose elements are all the elements of both this, and another stream.
-
Synonym for
join
, but takes an array.Declaration
Swift
public static func concat<T>(streams: [Stream<T>]) -> Stream<T>
-
Synonym for
join
, but takes varargs.Declaration
Swift
public static func concat<T>(streams: Stream<T>...) -> Stream<T>
-
Runs the stream on a given handler. The handler is expected to be synchronous, so the returned
Future<()>
represents the time that all calls to the handler have exited. You can useStream.cont.run()
to control the time the future completesDeclaration
Swift
public func forEach(handler: T -> ()) -> Future<()>
Parameters
handler
The closure to call with each element.
Return Value
A future representing when all the calls to the handler have exited.
-
Declaration
Swift
public func filter(predicate: T -> Bool) -> Stream<T>
Parameters
predicate
A test for elements of this stream.
Return Value
A new stream whose elements are the elements of this stream that passed the predicate test.
-
Asynchronously and concurrently reduces elements of this stream to a single value.
let futureSum = arrayOfStrings.stream() .reduce(identity: 0, accumulate: +) { i, s in return i + s.characters.count }
Which can be simplified to:
let futureSum = arrayOfStrings.stream() .map { s.characters.count } .reduce(identity: 0, accumulate: +, combine: +)
The following laws must hold.
Identity:
accumulate(identity, a) == a
Commutative:
accumulate(a, b) == accumulate(b, a) combine(combine(reduced, a), b) == combine(combine(reduced, b), a)
Associative:
combine(accumulate(a, b), element) == accumulate(a, combine(b, element)) accumulate(accumulate(a, b), c) == accumulate(a, accumulate(b, c))
These laws are necessary largely because Streams are concurrent and unordered.
accumulate
is necessary because several reductions may be occurring at a time, which requires their reductions to be accumulated somehow.The initial value is expected to be an identity so that it can start more than one reduction. If it weren’t an identity, the final accumulation of reducitons would produce unexpected results.
Note
If the underlying stream is not concurrent, this does not perform concurrently.
Declaration
Swift
public func reduce<Reduced>( identity identity: Reduced, accumulate: (Reduced, Reduced) -> Reduced, combine: (Reduced, T) -> Reduced ) -> Future<Reduced>
Parameters
identity
A value that will have no effect when accumulated.
accumulate
A function for combining two reduced values.
combine
A function for combining an element and a reduction into a new reduction.
Return Value
A future that will complete with the result of the reduction.
-
Asynchronously reduces elements of this stream to a single value.
let futureSum = arrayOfStrings.stream() .reduce(0) { i, s in return i + s.characters.count }
Which can be simplified to:
let futureSum = arrayOfStrings.stream() .map { s.characters.count } .reduce(0, combine: +)
The following law must hold.
Commutative:
combine(combine(reduced, a), b) == combine(combine(reduced, b), a)
This law is necessary largely because Streams are unordered.
Declaration
Swift
public func reduce<Reduced>( initial: Reduced, combine: (Reduced, T) -> Reduced ) -> Future<Reduced>
Parameters
initial
The value to start reduction with.
combine
A function for combining an element and a reduction into a new reduction.
Return Value
A future that will complete with the result of the reduction.