Stream

public struct Stream<T>

A lazy, asynchronous, concurrent, and reusable stream of operations on elements of data.

let sum = (1..<50000).stream()
    .filter { $0 % 2 == 0 }
    .reduce(0, reducer: +)
    .wait()

This example makes a stream from the collection created by (1..<50000). Then it filters out from that stream all elements that aren’t even. Then it reduces the stream; it starts at 0, then uses + to combine all elements. Streams work asynchronously, so wait() is called to wait on the result.

There are three stages to using streams.

  • Create the stream

    Most often, a stream will be created using CollectionType.stream(), or Stream.of(T...). But it is possible to create custom streams.

    A stream uses the Continuation monad to pass elements to handlers. A continuation is specialized to the type Continuation<Future<()>, T>. It returns a Future<()> because the handlers are asynchronous. The goal is to create a continuation that will call the handler once for each element. This is encouraged to be concurrent. A stream shouldn’t complete its future until all the futures returned by the handler are completed.

    The future’s type is Future<()> because it is isomorphic to (). That is to say there is no actual return value. The only thing that matters is the time of completion.

  • Manipulate the stream

    There are several intermediary operations to manipulate the stream. The simplest is Stream<T>.map(T -> U). This returns a stream that maps elements of the original stream to elements of a different type.

    Streams are immutable, lazy, and reusable, so intermediary operations aren’t changing the stream or its elements. Instead, they construct a new stream that will get its elements from the old stream, and modify them accordingly before passing the element to a handler.

  • Run the stream

    Terminal operations on a stream will start running the stream. Most often, the stream will run asynchronously and concurrently, but this depends on the how the stream was created. Running a stream means to start accepting elements of the stream with a handler. The simplest example of this is Stream.forEach, which calls a handler for each element.

    Most streams are concurrent. This leads to dramatic performance improvements in many scenarios. There are, however, situations where the overhead of concurrency outweights the performance gains. For example, there are two different methods of reducing a stream. One is psuedo-serial, in that the reduction is performed serially, while the elements are computed concurrently. The other is fully concurrent, where both the computation of elements and the reduction are concurrent. For very fast reduction functions, the psuedo-serial method is usually faster, since there’s no concurrency overhead. For slower reduction functions, the concurrent method is usually faster, since more reductions can be occurring at a time.

Streams and collections differ in several ways. Besides being concurrent and asynchronous, streams are lazy and unordered. Streams do not store elements, and instead rely on abstract data sources. They can’t have their count calculated. Most importantly, Stream is not a data structure. It is a pipeline for manipulating elements, no matter how many there are. This is the inspiration for using the Continuation monad.

  • The continuation that defines this Stream. cont.run is the function to call to run the stream on some handler.

    Declaration

    Swift

    public let cont: Continuation<Future<()>, T>
  • Initializes the stream with a continuation.

    Declaration

    Swift

    public init(_ cont: Continuation<Future<()>, T>)
  • Initializes the stream with an unwrapped continuation.

    Declaration

    Swift

    public init(_ cont: (T -> Future<()>) -> Future<()>)
  • Functor fmap.

    Maps this stream to a stream of another type.

    Declaration

    Swift

    public func map<U>(mapper: T -> U) -> Stream<U>

    Parameters

    mapper

    The function to apply to elements of this stream.

    Return Value

    A stream whose elements are the results of applying the mapper to the elements of this stream.

  • Applicative <*>

    Applies the functions in another stream to elements of this stream.

    Declaration

    Swift

    public func apply<U>(mappers: Stream<T -> U>) -> Stream<U>

    Parameters

    mappers

    The stream of functions to apply to elements of this stream.

    Return Value

    A stream whose elements are the results of applying all functions in another stream to all elements of this stream.

  • Monad return.

    Declaration

    Swift

    public static func of<T>(element: T) -> Stream<T>

    Parameters

    element

    The element to make a stream around.

    Return Value

    A stream with one element.

  • Monad bind.

    Maps elements of this stream to streams, and concatenates the results.

    Declaration

    Swift

    public func flatMap<U>(mapper: T -> Stream<U>) -> Stream<U>

    Parameters

    mapper

    The function to map elements with.

    Return Value

    A stream whose elements are the elements of all streams returned by applying the mapper to each element of this stream.

  • Monad join

    The join function is the conventional monad join operator. It is used to remove one level of monadic structure, projecting its bound argument into the outer level.

    Declaration

    Swift

    public static func join<T>(streams: Stream<Stream<T>>) -> Stream<T>

    Parameters

    streams

    The streams to join.

    Return Value

    A stream whose elements are all the elements of the streams in the parameter.

  • Monoid mempty

    Declaration

    Swift

    public static func empty<T>() -> Stream<T>

    Return Value

    An empty stream of any type.

  • Monoid mappend

    Declaration

    Swift

    public func appended(other: Stream<T>) -> Stream<T>

    Return Value

    A stream whose elements are all the elements of both this, and another stream.

  • Synonym for join, but takes an array.

    Declaration

    Swift

    public static func concat<T>(streams: [Stream<T>]) -> Stream<T>
  • Synonym for join, but takes varargs.

    Declaration

    Swift

    public static func concat<T>(streams: Stream<T>...) -> Stream<T>
  • Runs the stream on a given handler. The handler is expected to be synchronous, so the returned Future<()> represents the time that all calls to the handler have exited. You can use Stream.cont.run() to control the time the future completes

    Declaration

    Swift

    public func forEach(handler: T -> ()) -> Future<()>

    Parameters

    handler

    The closure to call with each element.

    Return Value

    A future representing when all the calls to the handler have exited.

  • Declaration

    Swift

    public func filter(predicate: T -> Bool) -> Stream<T>

    Parameters

    predicate

    A test for elements of this stream.

    Return Value

    A new stream whose elements are the elements of this stream that passed the predicate test.

  • Asynchronously and concurrently reduces elements of this stream to a single value.

    let futureSum = arrayOfStrings.stream()
        .reduce(identity: 0, accumulate: +) { i, s in
            return i + s.characters.count
        }
    

    Which can be simplified to:

    let futureSum = arrayOfStrings.stream()
        .map { s.characters.count }
        .reduce(identity: 0, accumulate: +, combine: +)
    

    The following laws must hold.

    • Identity:

      accumulate(identity, a) == a
      
    • Commutative:

      accumulate(a, b) == accumulate(b, a)
      combine(combine(reduced, a), b) ==
                                       combine(combine(reduced, b), a)
      
    • Associative:

      combine(accumulate(a, b), element) ==
                                    accumulate(a, combine(b, element))
      accumulate(accumulate(a, b), c) ==
                                       accumulate(a, accumulate(b, c))
      

    These laws are necessary largely because Streams are concurrent and unordered.

    accumulate is necessary because several reductions may be occurring at a time, which requires their reductions to be accumulated somehow.

    The initial value is expected to be an identity so that it can start more than one reduction. If it weren’t an identity, the final accumulation of reducitons would produce unexpected results.

    Note

    If the underlying stream is not concurrent, this does not perform concurrently.

    Declaration

    Swift

    public func reduce<Reduced>(
            identity identity: Reduced,
            accumulate: (Reduced, Reduced) -> Reduced,
            combine: (Reduced, T) -> Reduced
        ) -> Future<Reduced>

    Parameters

    identity

    A value that will have no effect when accumulated.

    accumulate

    A function for combining two reduced values.

    combine

    A function for combining an element and a reduction into a new reduction.

    Return Value

    A future that will complete with the result of the reduction.

  • Asynchronously reduces elements of this stream to a single value.

    let futureSum = arrayOfStrings.stream()
        .reduce(0) { i, s in
            return i + s.characters.count
        }
    

    Which can be simplified to:

    let futureSum = arrayOfStrings.stream()
        .map { s.characters.count }
        .reduce(0, combine: +)
    

    The following law must hold.

    • Commutative:

      combine(combine(reduced, a), b) ==
                                       combine(combine(reduced, b), a)
      

    This law is necessary largely because Streams are unordered.

    Declaration

    Swift

    public func reduce<Reduced>(
            initial: Reduced,
            combine: (Reduced, T) -> Reduced
        ) -> Future<Reduced>

    Parameters

    initial

    The value to start reduction with.

    combine

    A function for combining an element and a reduction into a new reduction.

    Return Value

    A future that will complete with the result of the reduction.