Data streams
Type of a stream holding values of type 'a
Naming convention: in this module all function taking a function which is applied to all element of the streams are suffixed by:
_s
when the function returns a thread and calls are serialised_p
when the function returns a thread and calls are parallelisedException raised by the push function of a push-stream when
pushing an element after the end of stream (= None
) have been
pushed.
create_with_reference ()
returns a new stream and a push
function. The last function allows to set a reference to an
external source. This prevent the external source from being
garbage collected.
For example, to convert a reactive event to a stream:
let stream, push, set_ref = Lwt_stream.create_with_reference () in
set_ref (map_event push event)
Exception raised by the push function of a bounded push-stream when the stream queue is full and a thread is already waiting to push an element.
Size of the stream.
Change the size of the stream queue. Note that the new size can smaller than the current stream queue size.
It raises Invalid_argument
if size < 0
.
Number of elements in the stream queue.
Is the stream closed ?
Set the reference to an external source.
Type of sources for bounded push-streams.
create_bounded size
returns a new stream and a bounded push
source. The stream can hold a maximum of size
elements. When
this limit is reached, pushing a new element will block until
one is consumed.
Note that you cannot clone or parse (with parse) a bounded
stream. These functions will raise Invalid_argument
if you try
to do so.
It raises Invalid_argument
if size < 0
.
clone st
clone the given stream. Operations on each stream
will not affect the other.
For example:
# let st1 = Lwt_stream.of_list [1; 2; 3];;
val st1 : int Lwt_stream.t = <abstr>
# let st2 = Lwt_stream.clone st1;;
val st2 : int Lwt_stream.t = <abstr>
# lwt x = Lwt_stream.next st1;;
val x : int = 1
# lwt y = Lwt_stream.next st2;;
val y : int = 1
It raises Invalid_argument
if st
is a bounded
push-stream.
Exception raised when trying to retreive data from an empty stream.
get_available st
returns all available elements of l
without
blocking
get_available_up_to n st
returns up to n
elements of l
without blocking
Note: all the following functions are destructive.
For example:
# let st1 = Lwt_stream.of_list [1; 2; 3];;
val st1 : int Lwt_stream.t = <abstr>
# let st2 = Lwt_stream.map string_of_int st1;;
val st2 : string Lwt_stream.t = <abstr>
# lwt x = Lwt_stream.next st1;;
val x : int = 1
# lwt y = Lwt_stream.next st2;;
val y : string = "2"