Monday, January 18, 2010

Introducing Streams

Streams are a special type of Iterable/Traversable whose elements are not evaluated until they are requested. Streams are normally constructed as functions.

A Scala List basically consists of the head element and the rest of the list. A Scala Stream is the head of the stream and a function that can construct the rest of the Stream. It is a bit more than this because each element is evaluated only once and stored in memory for future evaluations. That should be more clear after some examples.

As usual I created these examples with the Scala 2.8 REPL but I think most if not all should work in 2.7.

  1. scala> import Stream.cons          
  2. import Stream.cons
  3. /*
  4. Because streams are very functional in nature it is recommended that methods from the Stream object are used for creation
  5. This is a real boring example of creating a Stream.  Anything this simple should be a list.
  6. The important part is that cons take the value at the point and a function to return the rest of the
  7. stream NOT another stream.  
  8. */
  9. scala> val stream1 = cons(0,cons(1,Stream.empty))
  10. stream1: Stream.Cons[Int] = Stream(0, ?)
  11. scala> stream1 foreach {print _}                 
  12. 01
  13. /*
  14. This illustrates the similarity in design between Stream and list, again the difference is the entire list is created in a stream the second argument of cons is not evaluated until it is requested
  15. */
  16. scala> new ::(0, new ::(1,List.empty))
  17. res35: scala.collection.immutable.::[Int] = List(0, 1)
  18. /*
  19. To drive home the point of the similarities.  Here is an alternative declaration of a
  20. stream most similar to declaring a list
  21. */
  22. scala> val stream2 = 0 #:: 1 #:: Stream.empty    
  23. stream2: scala.collection.immutable.Stream[Int] = Stream(0, ?)
  24. scala> stream2 foreach {print _}
  25. 01
  26. scala> 0 :: 1 :: Nil
  27. res36: List[Int] = List(0, 1)
  28. /*
  29. A little more interesting now.  The accessing the second element will run the function.  Notice it is not evaluated until request
  30. */
  31. scala> val stream3 = cons (0, {    
  32.      | println("getting second element")
  33.      | cons(1,Stream.empty)
  34.      | })
  35. stream3: Stream.Cons[Int] = Stream(0, ?)
  36. scala> stream3(0)
  37. res56: Int = 0
  38. // function is evaluated
  39. scala> stream3(1)
  40. getting second element
  41. res57: Int = 1
  42. /* 
  43. Function is only evaluated once.  
  44. Important! This means that all elements in a Stream are loaded into a memory so
  45. it can cause a OutOfMemoryError if the stream is large
  46. */
  47. scala> stream3(1)
  48. res58: Int = 1
  49. scala> stream3(1)
  50. res59: Int = 1
  51. /*
  52. This creates an infinate stream then forces resolution of all elements
  53. */
  54. scala> Stream.from(100).force            
  55. java.lang.OutOfMemoryError: Java heap space
  56. // Alternative demonstration of laziness
  57. scala> val stream4 = 0 #:: {println("hi"); 1} #:: Stream.empty
  58. stream4: scala.collection.immutable.Stream[Int] = Stream(0, ?)
  59. scala> stream4(1)
  60. hi
  61. res2: Int = 1

A very common way to construct a Stream is to define a recursive method. Each recursive call constructs a new element in the stream. The method may or may not have a guard condition that terminates the stream.
  1. // construct a stream of random elements
  2. scala> def make : Stream[Int] = Stream.cons(util.Random.nextInt(10), make)
  3. make: Stream[Int]
  4. scala> val infinate = make                                                
  5. infinate: Stream[Int] = Stream(3, ?)
  6. scala> infinate(5)                                  
  7. res10: Int = 6
  8. scala> infinate(0)
  9. res11: Int = 3
  10. // Once evaluated each element does not change
  11. scala> infinate(5)
  12. res13: Int = 6
  13. // this function makes a stream that does terminate
  14. scala> def make(i:Int) : Stream[String] = {                  
  15.      | if(i==0) Stream.empty                                 
  16.      | else Stream.cons(i + 5 toString, make(i-1))           
  17.      | }
  18. make: (i: Int)Stream[String]
  19. scala> val finite = make(5)                       
  20. finite: Stream[String] = Stream(10, ?)
  21. scala> finite foreach print _                     
  22. 109876
  23. // One last demonstration of making a stream object
  24. scala> Stream.cons("10", make(2))
  25. res18: Stream.Cons[String] = Stream(10, ?)
  26. /*
  27. This method is dangerous as it forces the entire stream to be evaluated
  28. */
  29. scala> res18.size
  30. res19: Int = 3


This is only an introduction. I hope to add a few more topics that focus on Streams because they can be very powerful but are also more challenging to recognize where they should be used instead of a standard collection.

8 comments:

  1. Is there some way to use streams without keeping the previous values for ever? Something like getting a reference to the 'tail' equivalent and loose the stream reference?

    ReplyDelete
  2. I believe you can do exactly that. I will test a bit more but I think you might be able to do:

    val theRest = stream.drop(100)

    theRest should have dropped the first part of the stream so that it can be garbage collected (once all references to stream are gone).

    I have to test to verify but that is my understanding

    ReplyDelete
  3. Thanks for this post, it taught me some things. In particular, the new #:: notation seems very convenient...

    ReplyDelete
  4. From the example, def make : Stream[Int] = Stream.cons(util.Random.nextInt(10), make), doesn't actually work as recorded for me (on 2.8 beta). Each call to make(0) returns a different value, as the function is evaluated each time. In order to have the value evaluated just once, I needed to write the call to nextInt in a closure. Odd that you got the results you did...

    ReplyDelete
    Replies
    1. make is a function that returns a Stream of randomizer. Calling make(0) instantiates a stream and takes the first value of that stream. The stream is then send to garbage. So each make(0) will start all over again. You expected that the result of make is preserved. You need to assign the Stream to a val like 'val r = make'. Then each call to r(0) will result in the same value.

      Delete
  5. I get different results from the example: def make : Stream[Int] = Stream.cons(util.Random.nextInt(10), make)
    For me, the nextInt call is evaluated each time f(0) (for instance) is called, returning a different random int. In order to get a single result, evaluated only once, I had to wrap the call in a closure. Odd you got the result you did...

    ReplyDelete
  6. It looks like the implementation changed since I wrote this. Life on the bleeding edge :P

    ReplyDelete
  7. What is "infinate"? Maybe it should be "infinite"?

    ReplyDelete