Up

module Uq_engines

: sig

An engine performs a certain task in an autonomous way. Engines are attached to a Unixqueue.event_system, and do their task by generating events for resources of the operating system, and by handling such events. Engines are in one of four states: They may be still working, they may be done, they may be aborted, or they may be in an error state. The three latter states a called final states, because they indicate that the engine has stopped operation.

It is possible to ask an engine to notify another object when it changes its state. For simplicity, notification is done by invoking a callback function, and not by issuing notification events.

Effectively, engines provide a calculus for cooperative microthreading. This calculus includes combinators for sequential execution and synchronization. Moreover, it is easy to connect it with callback-style microthreading - one can arrange callbacks when an engine is done, and one can catch callbacks and turn them into engines.

Exceptions

#
exception Closed_channel

Raised when a method of a closed channel object is called (only channel methods count).

This exception should be regarded as equivalent to Netchannels.Closed_channel, but need not be the same exception.

#
exception Broken_communication

Some engines indicate this error when they cannot continue because the other endpoint of communication signals an error.

This exception is not raised, but used as argument of the `Error state.

#
exception Watchdog_timeout

Used by the watchdog engine to indicate a timeout.

This exception is not raised, but used as argument of the `Error state.

#
exception Timeout

Used by input_engine and output_engine to indicate timeouts

#
exception Addressing_method_not_supported

Raised by client_socket_connector and server_socket_acceptor to indicate that the passed address is not supported by the class.

#
exception Cancelled

The callback function of a multiplex_controller is invoked with this exception if the operation is cancelled.

Engine definition

#
type 't engine_state = [
| `Working of int
| `Done of 't
| `Error of exn
| `Aborted
]

The type of states with result values of type 't:

  • `Working n: The engine is working. The number n counts the number of events that have been processed.
  • `Done arg: The engine has completed its task without errors. The argument arg is the result value of the engine
  • `Error exn: The engine has aborted because of an error. The argument exn describes the error as an exception.
  • `Aborted: The engine has aborted because the abort method was called
#
type 't final_state = [
| `Done of 't
| `Error of exn
| `Aborted
]

Same as engine_state without `Working. These are only the final states.

#
val string_of_state : 'a engine_state -> string

For debug purposes: Returns a string describing the state

#
class type ['t] engine =

Requirements for engines

#
method state : 't engine_state

Returns the state of the engine

#
method abort : unit -> unit

Forces that the engine aborts operation. If the state is already `Done, `Aborted, or `Error, this method must do nothing (you cannot abort an already finished engine).

#
method request_notification : (unit -> bool) -> unit

Requests notification about state changes.

After the notification has been requested, the passed function must be called whenever state changes its value (or might change its value; it is allowed to call the notification function more frequently than necessary). The function returns true if there is still interest in notification, and false if notification must be disabled; the function must not be called any longer in this case.

There can be any number of parallel active notifications. It is allowed that a notification callback function requests further notifications.

If the callback raises an exception, this exception is propagated to the caller of Unixqueue.run.

#
method event_system : Unixqueue.event_system

Returns the event system the engine is attached to

This class type defines the interface an engine must support. The class parameter 't is the type of the result values (when the engine goes to state `Done).

#
class ['t] delegate_engine : 't #engine -> ['t] engine

Turns an engine value into a class

Engines and callbacks

#
val when_state : ?is_done:('a -> unit) -> ?is_error:(exn -> unit) -> ?is_aborted:(unit -> unit) -> ?is_progressing:(int -> unit) -> 'a #engine -> unit

Watches the state of the argument engine, and arranges that one of the functions is called when the corresponding state change is done. Once a final state is reached, the engine is no longer watched. Note that when_state only observes future state changes.

If one of the functions raises an exception, this exception is propagated to the caller of Unixqueue.run.

is_done The state transitions to `Done. The argument of is_done is the argument of the `Done state.
is_error The state transitions to `Error. The argument of is_error is the argument of the `Error state.
is_aborted The state transitions to `Aborted.
is_progressing This function is called when the `Working state changes. The int argument is the new `Working arg.
#
class ['a] signal_engine : Unixqueue.event_system ->
inherit ['a] engine
#
method signal : 'a final_state -> unit

let se = new signal_engine esys: The engine se remains in `Working 0 until the method se # signal x is called. At this point e transitions to x. Any further call of signal does not have any effect.

Also, if se is aborted, signal does not have any effect.

The function signal may be called from a different thread. The signalling event is forwarded to the thread running the event system.

#
val signal_engine : Unixqueue.event_system -> 'a engine * ('a final_state -> unit)

let (se, signal) = signal_engine esys: Same as function

Combinators

The following combinators serve as the control structures to connect primitive engines with each other.

#
class ['a, 'b] map_engine : map_done:'a -> 'b engine_state -> ?map_error:(exn -> 'b engine_state) option -> ?map_aborted:(unit -> 'b engine_state) option -> ?propagate_working:bool option -> 'a #engine -> ['b] engine

The map_engine observes the argument engine, and when the state changes to `Done, `Error, or `Aborted, the corresponding mapping function is called, and the resulting state becomes the state of the mapped engine. If the engine is already in one of the mentioned states, the map functions are also called (unlike when_state).

After the state change to `Done, `Error, or `Aborted has been observed, the map engine detaches from the argument engine, and no further state changes are recognized.

The state `Working cannot be mapped to another state. It is an error to map final states to `Working. The result type of the map_* functions is engine_state and not final_state because of historic reasons.

If the mapped engine is aborted, this request will be forwarded to the argument engine.

If one of the mapping functions raises an exception, this causes a transiton to `Error.

map_done Maps the `Done state of the argument engine to another state. The argument of map_done is the argument of the `Done state. Note that map_done is non-optional only because of typing. If it were optional, the type checker would infer 'a = 'b.
map_error Maps the `Error state of the argument engine to another state. The argument of map_error is the argument of the `Error state.
map_aborted Maps the `Aborted state of the argument engine to another state.
propagate_working Specifies whether changes of the `Working state in the argument engine are propagated. Defaults to true. If set to false, the mapped engine remains in `Working 0 until it transitions to a final state.
#
val map_engine : map_done:('a -> 'b engine_state) -> ?map_error:(exn -> 'b engine_state) -> ?map_aborted:(unit -> 'b engine_state) -> ?propagate_working:bool -> 'a #engine -> 'b engine

Same as function

#
class ['a, 'b] fmap_engine : 'a #engine -> 'a final_state -> 'b final_state -> ['b] engine

Similar to map_engine but different calling conventions: The mapping function is called when the argument engine reaches a final state, and this state can be mapped to another final state.

#
val fmap_engine : 'a #engine -> ('a final_state -> 'b final_state) -> 'b engine

Same as function

After opening Uq_engines.Operators, this is also available as operator >>, e.g.

         e >>
           (function
             | `Done r -> ...
             | `Error error -> ...
             | `Aborted -> ...
           )
#
class ['a] meta_engine : 'a #engine -> ['a final_state] engine

maps the final state s to `Done s

#
val meta_engine : 'a #engine -> 'a final_state engine

Same as function

#
class ['t] epsilon_engine : 't engine_state -> Unixqueue.event_system -> ['t] engine

This engine transitions from its initial state `Working 0 in one step ("epsilon time") to the passed constant state. During this time event processing will continue, so concurrently running engines can make progress. For performance reasons, however, external resources like file descriptors are not watched for new events.

In previous versions of this library the class was called const_engine. However, this is not a constant thing. In particular, it is possible that this engine is aborted, so the passed state is not reached. To avoid programming errors because of the misnomer, this class has been renamed.

#
val epsilon_engine : 't engine_state -> Unixqueue.event_system -> 't engine

Same as function

#
class ['a, 'b] seq_engine : 'a #engine -> 'a -> 'b #engine -> ['b] engine

This engine runs two engines in sequential order. It is called

let eng_s = new seq_engine eng_a f

When eng_a goes to the state `Done arg, the function f is called to obtain

let eng_b = f arg

eng_b runs until it is also in state `Done.

If eng_a or eng_b go to states `Aborted or `Error, the sequential engine eng_s does so, too. If eng_s is aborted, this request will be forwarded to the currently active engine, eng_a or eng_b.

If calling f results in an exception, this is handled as if eng_a signaled an exception.

#
val seq_engine : 'a #engine -> ('a -> 'b #engine) -> 'b engine

Same as function.

After opening Uq_engines.Operators, this is also available as operator ++, e.g.

e1 ++ (fun r1 -> e2)

(when e1 and e2 are engines, and r1 is the result of e1).

#
class ['a, 'b] qseq_engine : 'a #engine -> 'a -> 'b #engine -> ['b] engine
#
val qseq_engine : 'a #engine -> ('a -> 'b #engine) -> 'b engine

Almost the same as seq_engine, but this version does not propagate working state (i.e. no progress reporting).

qseq_engine should be preferred for recursive chains of engines.

#
class ['a] stream_seq_engine : 'a -> ('a -> 'a #engine) Stream.t -> Unixqueue.event_system -> ['a] engine

let se = new stream_seq_engine x0 s esys: The constructed engine se fetches functions f : 'a -> 'a #engine from the stream s, and runs the engines obtained by calling these functions e = f x one after the other. Each function call gets the result of the previous engine as argument. The first call gets x0 as argument.

If one of the engines e transitions into an error or aborted state, se will also do that. If se is aborted, this is passed down to the currently running engine e.

#
val stream_seq_engine : 'a -> ('a -> 'a #engine) Stream.t -> Unixqueue.event_system -> 'a engine

Same as function

#
class ['a, 'b] sync_engine : 'a #engine -> 'b #engine -> ['a * 'b] engine

This engine runs two engines in parallel, and waits until both are `Done (synchronization). The product of the two `Done arguments is taken as the combined result.

If one of the engines goes to the states `Aborted or `Error, the combined engine will follow this transition. The other, non-aborted and non-errorneous engine is aborted in this case. `Error has higher precedence than `Aborted.

If the combined engine is aborted, this request is forwarded to both member engines.

#
val sync_engine : 'a #engine -> 'b #engine -> ('a * 'b) engine

Same as function

#
class ['a, 'b] msync_engine : 'a #engine list -> 'a -> 'b -> 'b -> 'b -> Unixqueue.event_system -> ['b] engine

Multiple synchronization: let me = new msync_engine el f x0 esys - Runs the engines in el in parallel, and waits until all are `Done. The result of me is then computed by folding the results of the part engines using f, with an initial accumulator x0.

If one of the engines goes to the states `Aborted or `Error, the combined engine will follow this transition. The other, non-aborted and non-errorneous engines are aborted in this case. `Error has higher precedence than `Aborted.

If calling f results in an exception, this is handled as if the part engine signals an error.

If the combined engine is aborted, this request is forwarded to all member engines.

#
val msync_engine : 'a #engine list -> ('a -> 'b -> 'b) -> 'b -> Unixqueue.event_system -> 'b engine

Same as function

#
class ['a] delay_engine : float -> unit -> 'a #engine -> Unixqueue.event_system -> ['a] engine

let de = delay_engine d f esys: The engine e = f() is created after d seconds, and the result of e becomes the result of de.

#
val delay_engine : float -> (unit -> 'a #engine) -> Unixqueue.event_system -> 'a engine

Same as function

#
class ['a] timeout_engine : float -> exn -> 'a engine -> ['a] engine

timeout_engine d x e: If the engine e finishes within d seconds, the result remains unchanged. If the engine takes longer, though, it is aborted, and the state transitions to `Error x

#
val timeout_engine : float -> exn -> 'a engine -> 'a engine

Same as function

#
class watchdog : float -> 'a #engine -> [unit] engine

A watchdog engine checks whether the argument engine makes progress, and if there is no progress for the passed number of seconds, the engine is aborted, and the watchdog state changes to `Error Watchdog_timeout.

The current implementation is not very exact, and it may take a little longer than the passed period of inactivity until the watchdog recognizes inactivity.

If the argument engine terminates, the watchdog changes its state to `Done ()

Important note: The watchdog assumes that the `Working state of the target engine really counts events that indicate progress. This does not work for:

  • poll_process_engine: there is no way to check whether a subprocess makes progress
  • connector: It is usually not possible to reflect the progress on packet level
  • listener: It is usually not possible to reflect the progress on packet level
#
val watchdog : float -> 'a #engine -> unit engine

Same as function

#
class type ['a] serializer_t =
#
method serialized : (Unixqueue.event_system -> 'a engine) -> 'a engine

let se = serialized f: Waits until all the previous engines reach a final state, and then runs e = f esys.

se enters a final state when e does.

A serializer queues up engines, and starts the next engine when the previous one finishes.

#
class ['a] serializer : Unixqueue.event_system -> ['a] serializer_t

Creates a serializer

#
val serializer : Unixqueue.event_system -> 'a serializer_t

Same as function

#
class type ['a] prioritizer_t =
#
method prioritized : (Unixqueue.event_system -> 'a engine) -> int -> 'a engine

let pe = prioritized f p: Queues up f on priority level p. The engine e = f esys can start when there is no waiting engine on a higher priority level (i.e. with a number less than p), and all running engines on lower priority levels are done.

pe enters a final state when e does.

A prioritizer allows to prioritize the execution of engines: At any time, only engines of a certain priority p can be executed. If an engine with a higher priority ph wants to start, it prevents further engines with priority level p from being started until the higher prioritized engines with level ph are done. On the same priority level, there is no limit for the number of executed engines.

Here, higher priorities have lower numbers.

#
class ['a] prioritizer : Unixqueue.event_system -> ['a] prioritizer_t

Creates a prioritizer

#
val prioritizer : Unixqueue.event_system -> 'a prioritizer_t

Same as function

#
class type ['a] cache_t =
#
method get_engine : unit -> 'a engine

Requests the value. If it is not already in the cache, the engine for getting the value is started, and it is waited until the value is available.

#
method get_opt : unit -> 'a option

Returns the cached value if available

#
method put : 'a -> unit

Puts a value immediately into the cache. It replaces an existing value. If it is currently tried to obtain a new value by running an engine, this engine is kept running, and get_engine will return its result. Only future calls of get_engine will return the value just put into the cache.

#
method invalidate : unit -> unit

Invalidates the cache - if a value exists in the cache, it is removed. If in the future the cache value is requested via get_engine the engine will be started anew to get the value.

Note that (as for put) any already running get_engine is not interrupted.

#
method abort : unit -> unit

Any engine running to get the cache value is aborted, and the contents of the cache are invalidated. Note that also the engines returned by get_engine are aborted.

A cache contains a mutable value that is obtained by running an engine.

#
class ['a] cache : Unixqueue.event_system -> 'a engine -> Unixqueue.event_system -> ['a] cache_t

new cache f esys: A cache that runs f esys to obtain values

#
val cache : (Unixqueue.event_system -> 'a engine) -> Unixqueue.event_system -> 'a cache_t

Same as function

#
class ['t] engine_mixin : 't engine_state -> Unixqueue.event_system ->
#
method state : 't engine_state
#
method private set_state : 't engine_state -> unit
#
method request_notification : (unit -> bool) -> unit
#
method private notify : unit -> unit
#
method event_system : Unixqueue.event_system

A useful class fragment that implements state and request_notification.

#
module Operators : sig

Handy operators: ++, >>, and eps_e

The most important operators. This module should be opened.

#
val (++) : 'a #engine -> ('a -> 'b #engine) -> 'b engine

Another name for qseq_engine. Use this operator to run engines in sequence:

	    e1 ++ (fun r1 -> e2) ++ (fun r2 -> e3) ++ ...

Here rK is the result of engine eK.

Change in OCamlnet-3.6.4: ++ is now qseq_engine, and no longer seq_engine, and hence it does not support progress reporting anymore. Redefine ++ as seq_engine in your own code if you need the old behavior.

#
val (>>) : 'a #engine -> ('a final_state -> 'b final_state) -> 'b engine

Another name for fmap_engine. Use this operator to map the final value of an engine:

	    e >> (function `Done x -> ... | `Error e -> ... | `Aborted -> ...)
#
val eps_e : 't engine_state -> Unixqueue.event_system -> 't engine

Same as epsilon_engine

end

Basic I/O engines

#
class poll_engine : ?extra_match:(exn -> bool) option -> (Unixqueue.operation * float) list -> Unixqueue.event_system ->
inherit [Unixqueue.event] engine

Additional methods

#
method restart : unit -> unit

Activate the engine again when it is already in a final state. This method violates the engine protocol, and should be used with care; it is not allowed to leave a final state.

The notification lists are kept, but note that observers often detach when final states are reached. This may cause problems.

#
method group : Unixqueue.group

Returns the group the engine is member of

This engine waits until one of the passed operations can be carried out, or until one of the operations times out. In these cases, the state of the engine changes to `Done ev, where ev is the corresponding event.

The argument list enumerates the operations to watch for. For every operation there may be a positive timeout value, or a negative number to indicate that no timeout is specified.

After one event has been caught, the engine terminates operation. The method restart can be called to activate it again (with the same event condition, and the same notification list). See the description of restart for possible problems.

extra_match This function is called when an Extra event is found. If the function returns true for the argument exception of Extra, the event is caught; otherwise it is rejected.
#
class ['a] input_engine : Unix.file_descr -> 'a -> Unix.file_descr -> float -> Unixqueue.event_system -> ['a] engine

Generic input engine for reading from a file descriptor: let e = new input_engine f fd tmo - Waits until the file descriptor becomes readable, and calls then let x = f fd to read from the descriptor. The result x is the result of the engine.

If the file descriptor does not become readable within tmo seconds, the resulting engine transitions to `Error Timeout.

Use this class to construct engines reading via Unix.read or comparable I/O functions:

      let read_engine fd tmo esys =
        new input_engine (fun fd ->
                            let buf = String.create 4096 in
                            let n = Unix.read fd buf 0 (String.length buf) in
                            String.sub buf 0 n
                         )
                         fd tmo esys

This engine returns the read data as string.

See also Uq_io.input_e for a more generic way of reading with engines.

#
class ['a] output_engine : Unix.file_descr -> 'a -> Unix.file_descr -> float -> Unixqueue.event_system -> ['a] engine

Generic output engine for writing to a file descriptor: let e = new output_engine f fd tmo - Waits until the file descriptor becomes writable, and calls then let x = f fd to write to the descriptor. The result x is the result of the engine.

If the file descriptor does not become writable within tmo seconds, the resulting engine transitions to `Error Timeout.

Use this class to construct engines writing via Unix.single_write or comparable I/O functions:

      let write_engine fd s tmo esys =
        new output_engine (fun fd ->
                             Unix.single_write fd s 0 (String.length s)
                          )
                          fd tmo esys

This engine returns the number of written bytes.

See also Uq_io.output_e for a more generic way of writing with engines.

#
class poll_process_engine : ?period:float option -> pid:int -> Unixqueue.event_system -> [Unix.process_status] engine

This class is deprecated! Use the classes in [root:Shell_uq] instead.

This engine waits until the process with the ID pid terminates. When this happens, the state of the engine changes to `Done, and the argument of `Done is the process status.

The engine does not catch stopped processes.

The engine checks the process status every period seconds, and whenever there is a Signal event on the queue. The idea of the latter is that the user of this engine can increase the responsiveness by defining a signal handler for SIGCHLD signals (the handler need not to perform any special action, it must just be defined). When the sub process terminates, a SIGCHLD signal is sent to the current process. If the event loop happens to wait for new conditions (which is usually very likely), a Signal event will be generated, and the engine will check the process status very soon. Note that it is not guaranteed that a terminating process triggers a Signal event, although it is very likely.

You can define an empty SIGCHLD handler with:

Sys.set_signal Sys.sigchld (Sys.Signal_handle (fun _ -> ()))
period Every period seconds the process status is checked. Defaults to 0.1 seconds.

More I/O

The module [root:Uq_io] provides a bunch of functions to read and write data via various "devices". All these functions return engines, and are easy to use. Devices can be file descriptors, but also other data structures. In particular, there is also support for buffered I/O and for reading line-by-line from an input device.

Transfer engines

Transfer engines copy data between file descriptors. This kind of engine is likely to be declared as deprecated in the future. If possible, one should use multiplex controllers (see below), and for copying streams the generic copier Uq_io.copy_e is a better choice.

The pure types async_in_channel and async_out_channel have been proven to be useful for bridging with [root:Netchannels].

#
class type async_out_channel =

Methods from raw_out_channel

#
method output : string -> int -> int -> int

output s k n: Writes the substring of s beginning at index k with length n into the channel. The channel is free to accept only a portion of the string (or even nothing), and returns the number of bytes it accepts.

#
method close_out : unit -> unit

Closes the channel

#
method pos_out : int

Returns the number of characters output into the channel

#
method flush : unit -> unit

Flushes the channel. Asynchronous channels usually ignore flush requests. A potential meaning of flushing could be that no more data are accepted until the current buffer is completely processed. Implementing this is optional.

Additional control methods

#
method can_output : bool

Whether output is possible, i.e. the output method accepts at least one byte

#
method request_notification : (unit -> bool) -> unit

After the notification has been requested, the passed function is be called whenever can_output changes its value (or might change its value). The function returns true if there is still interest in notification, and false if notification must be disabled.

There can be any number of parallel active notifications. It is allowed that a notification callback requests further notifications.

An asynchrounous output channel provides methods to output data to a stream descriptor. It is based on raw_out_channel, which is defined by the Ocamlnet module Netchannels (see there for an introduction into the idea of using objects as I/O channels). An asynchronous channel can indicate that there is no space in the output buffer. Furthermore, one can request notification in the case that there is no space or again space in the output buffer.

#
class type async_in_channel =

Methods from raw_in_channel

#
method input : string -> int -> int -> int

input s k n: Reads channel data into the substring of s beginning at index k with length n. The channel is free to fill only a portion of the string (or even nothing). The method returns the number of bytes actually read.

The exception End_of_file indicates that the end of the channel is reached. The return value 0, however, means that no data could be read.

#
method close_in : unit -> unit

Closes the channel

#
method pos_in : int

Returns the number of characters read from the channel

Additional control methods

#
method can_input : bool

Whether input is possible, i.e. the input method gets at least one byte, or can signal End_of_file.

#
method request_notification : (unit -> bool) -> unit

After the notification has been requested, the passed function is be called whenever can_input changes its value (or might change its value). The function returns true if there is still interest in notification, and false if notification must be disabled.

There can be any number of parallel active notifications. It is allowed that a notification callback requests further notifications.

An asynchrounous input channel provides methods to input data from a stream descriptor. It is based on raw_in_channel, which is defined by the Ocamlnet module Netchannels (see there for an introduction into the idea of using objects as I/O channels). An asynchronous channel can indicate that there is no data in the input buffer. Furthermore, one can request notification in the case that there is no data or again data in the input buffer.

#
class pseudo_async_out_channel : Netchannels.#raw_out_channel -> async_out_channel

Takes a Netchannels.raw_out_channel as an asynchronous channel. It is always possible to output to this channel.

#
class pseudo_async_in_channel : Netchannels.#raw_in_channel -> async_in_channel

Takes a Netchannels.raw_in_channel as an asynchronous channel. It is always possible to input from this channel.

#
class receiver : src:Unix.file_descr -> dst:async_out_channel -> ?close_src:bool option -> ?close_dst:bool option -> Unixqueue.event_system -> [unit] engine

This engine copies all data from the src file descriptor to the dst output channel. The engine attaches immediately to the event system, and detaches automatically.

By default, both the file descriptor and the output channel are closed when the engine stops operation, either successfully or because of an error.

The semantics of the engine is undefined if src is not a stream-oriented descriptor.

The engine goes to `Error state when either reading from src or writing to dst raises an unexpected exception.

For every file descriptor event, the state is advanced from `Working n to `Working (n+1).

TODO: This class cannot yet cope with Win32 named pipes.

close_src Whether to close src when the engine stops (default: true)
close_dst Whether to close dst when the engine stops (default: true)
#
class sender : src:async_in_channel -> dst:Unix.file_descr -> ?close_src:bool option -> ?close_dst:bool option -> Unixqueue.event_system -> [unit] engine

This engine copies all data from the src input channel to the dst file descriptor. The engine attaches immediately to the event system, and detaches automatically.

By default, both the file descriptor and the output channel are closed when the engine stops operation, either successfully or because of an error.

The semantics of the engine is undefined if dst is not a stream-oriented descriptor.

The engine goes to `Error state when either reading from src or writing to dst raises an unexpected exception.

For every file descriptor event, the state is advanced from `Working n to `Working (n+1).

TODO: This class cannot yet cope with Win32 named pipes.

close_src Whether to close src when the engine stops (default: true)
close_dst Whether to close dst when the engine stops (default: true)
#
class type async_out_channel_engine =
inherit [unit] engine

Combination of engine + async_out_channel

#
class type async_in_channel_engine =
inherit [unit] engine

Combination of engine + async_in_channel

#
class output_async_descr : dst:Unix.file_descr -> ?buffer_size:int option -> ?close_dst:bool option -> Unixqueue.event_system -> async_out_channel_engine

This engine implements an async_out_channel for the output descriptor dst. The engine provides an internal buffer to reduce the number of blocked output operations; by default there is even no limit for the growth of the buffer, and because of this the channel never blocks (can_output is always true).

The engine attaches immediately to the event system, and detaches automatically. By default, the file descriptor is closed when the engine stops operation, either successfully or because of an error.

If the buffer is full, the class accepts no more data until there is again free space in the buffer. This means that writers must be prepared that can_output returns false, and that the output method returns 0. The buffer can only get "full" if the buffer_size argument is passed.

The notification mechanism is shared by the "engine nature" and by the "channel nature" of this class: If either the state or can_output change their values, the notification callbacks are invoked.

The semantics of the engine is undefined if dst is not a stream-oriented descriptor.

TODO: This class cannot yet cope with Win32 named piped.

buffer_size Limits the size of the buffer
close_dst Whether to close dst when the engine stops (default: true)
#
class input_async_descr : src:Unix.file_descr -> ?buffer_size:int option -> ?close_src:bool option -> Unixqueue.event_system -> async_in_channel_engine

The corresponding class for asynchronous input channels.

TODO: This class cannot yet cope with Win32 named piped.

#
type copy_task = [
| `Unidirectional of Unix.file_descr * Unix.file_descr
| `Uni_socket of Unix.file_descr * Unix.file_descr
| `Bidirectional of Unix.file_descr * Unix.file_descr
| `Tridirectional of Unix.file_descr * Unix.file_descr * Unix.file_descr
]

Specifies the task the copier class has to do:

  • `Unidirectional(src,dst): Data from src are copied to dst. EOF of src causes that both descriptors are closed.
  • `Uni_socket(src,dst): Data from src are copied to dst. EOF of src causes that dst is shut down for sending; all descriptors remain open. It is required that dst is a socket.
  • `Bidirectional(bi1,bi2): Data from bi1 are copied to bi2, and data from bi2 are copied to bi1. EOF of one descriptor causes that the other descriptor is shut down for sending. When both descriptors are at EOF, both are closed. It is required that bi1 and bi2 are sockets.
  • `Tridirectional(bi,dst,src): Data from bi are copied to dst, and data from src are copied to bi (i.e. a bidirectional descriptor is split up into two unidirectional descriptors). EOF of bi causes that dst is closed. EOF of src causes that bi is shut down for sending. EOF in both directions causes that all descriptors are closed. It is required that bi is a socket.
#
class copier : copy_task -> Unixqueue.event_system -> [unit] engine

This engine copies data between file descriptors as specified by the copy_task argument.

The task is done when all input descriptors are at EOF. See the description of copy_task for details, especially whether the descriptors are closed or not.

On error or abort, the descriptors are only closed if they had been closed on regular EOF.

The semantics of the engine is undefined if one of the descriptors is not stream-oriented.

TODO: This class cannot yet cope with Win32 named piped.

Socket engines

Note that Win32 named pipes are also supported by the following API's, although they are not sockets. These pipes have a feature set comparable to Unix domain sockets.

#
type inetspec = [
| `Sock_inet of Unix.socket_type * Unix.inet_addr * int
| `Sock_inet_byname of Unix.socket_type * string * int
]
#
type sockspec = [
| `Sock_unix of Unix.socket_type * string
]

Extended names for socket addresses. Currently, these naming schemes are supported:

  • `Sock_unix(stype,path): Names the Unix domain socket at path. The socket type stype is an auxiliary piece of information, but not a distinguishing part of the name. path = "" refers to anonymous sockets. Otherwise, the path must be an absolute path name.
  • `Sock_inet(stype,addr,port): Names the Internet socket of type stype bound to the IP address addr and the port. If stype = Unix.SOCK_STREAM, a TCP socket is meant, and if stype = Unix.SOCK_DGRAM, a UDP socket is meant. It is allowed that addr = Unix.inet_addr_any. If port = 0, the name is to be considered as incomplete.
  • `Sock_inet_byname(stype,name,port): Names the Internet socket of type stype bound to the IP address corresponding to the name, and bound to the port. It is unspecified which naming service is used to resolve name to an IP address, and how it is used. If the name cannot be resolved, no socket is meant; this is usually an error. stype is interpreted as for `Sock_inet. If port = 0, the name is to be considered as incomplete.

It is currently not possible to name IP sockets that are bound to several IP addresses but not all IP addresses of the host.

#
val sockspec_of_sockaddr : Unix.socket_type -> Unix.sockaddr -> sockspec

Converts a normal socket address to the extended form

#
val sockspec_of_socksymbol : Unix.socket_type -> Netsockaddr.socksymbol -> sockspec

Converts a Netsockaddr.socksymbol to this form

Client sockets

#
type connect_address = [
| `Socket of sockspec * connect_options
| `Command of string * (int -> Unixqueue.event_system -> unit)
| `W32_pipe of Netsys_win32.pipe_mode * string
]

Specifies the service to connect to:

  • `Socket(addr,opts): Connect to the passed socket address
  • `Command(cmd,handler): The cmd is started with the shell, and stdin and stdout are used to transfer data to the process and from the process, respectively. Only SOCK_STREAM type is supported. Note that the passed file descriptors are normal pipes, not sockets (so the descriptors can be individually closed).
    There is not any kind of error detection, so the command should be failsafe. stderr of the command is connected with stderr of the caller process.
    No provisions are taken to wait for the process; this is the task of the caller. After the process has been started, the handler is invoked with the process ID and the event system to give the caller a chance to arrange that the process will be waited for.
  • `W32_pipe(mode,name): A Win32 named pipe
#
type connect_options = {
# conn_bind
: sockspec option;(*Bind the connecting socket to this address (same family as the connected socket required). None: Use an anonymous port.*)
}
#
val default_connect_options : connect_options

Returns the default options

#
type connect_status = [
| `Socket of Unix.file_descr * sockspec
| `Command of Unix.file_descr * int
| `W32_pipe of Unix.file_descr
]

This type corresponds with Uq_engines.connect_address: An engine connecting with an address `X will return a status of `X.

  • `Socket(fd,addr): fd is the client socket connected with the service. addr is the socket address of the client that must be used by the server to reach the client.
  • `Command(fd, pid): fd is the Unix domain socket connected with the running command. pid is the process ID.
  • `W32_pipe fd: fd is the proxy descriptor of the connected Win32 named pipe endpoint. See [root:Netsys_win32] how to get the w32_pipe object to access the pipe. The proxy descriptor cannot be used for I/O.
#
val client_endpoint : connect_status -> Unix.file_descr

Returns the client endpoint contained in the connect_status

#
val client_socket : connect_status -> Unix.file_descr

For backward compatibility. Deprecated name for client_endpoint

#
class type client_endpoint_connector =
#
method connect : connect_address -> Unixqueue.event_system -> connect_status engine

Instantiates an engine that connects to the endpoint given by the connect_address argument. If successful, the state of the engine changes to `Done(status) where status contains the socket details. The connection is established in the background.

The type of status will correspond to the type of connect address (e.g. a `Socket address will return a `Socket status).

The close-on-exec flag of the created socket descriptor is always set. The socket descriptor is always in non-blocking mode.

This class type provides engines to connect to a service. In order to get and activate such an engine, call connect.

#
class type client_socket_connector = client_endpoint_connector

For backward compatibility. Deprecated name for client_endpoint_connector

#
val connector : ?proxy:client_socket_connector -> connect_address -> Unixqueue.event_system -> connect_status engine

This engine connects to a socket as specified by the connect_address, optionally using the proxy, and changes to the state `Done(status) when the connection is established.

If the proxy does not support the connect_address, the class will raise Addressing_method_not_supported.

The descriptor fd (part of the connect_status) is in non-blocking mode, and the close-on-exec flag is set. It is the task of the caller to close this descriptor.

The engine attaches automatically to the event system, and detaches when it is possible to do so. This depends on the type of the connection method. For direct socket connections, the engine can often detach immediately when the conection is established. For proxy connections it is required that the engine copies data to and from the file descriptor. In this case, the engine detaches when the file descriptor is closed.

It is possible that name service queries block execution.

If name resolution fails, the engine will enter `Error(Uq_resolver.Host_not_found name). This is new since Ocamlnet-3.3 - before this version, the exception was simply Not_found.

Example of using connector: This engine e connects to the "echo" service as provided by inetd, sends a line of data to it, and awaits the response.

	let e =
	  Uq_engines.connector
	    (`Socket(`Sock_inet_byname(Unix.SOCK_STREAM, "localhost", 7),
		     Uq_engines.default_connect_options))
	    esys
	  ++ (fun cs ->
		match cs with
		  | `Socket(fd,_) ->
		      let mplex =
			Uq_engines.create_multiplex_controller_for_connected_socket
			  ~supports_half_open_connection:true
			  fd esys in
		      let d_unbuf = `Multiplex mplex in
		      let d = `Buffer_in(Uq_io.create_in_buffer d_unbuf) in
		      Uq_io.output_string_e d_unbuf "This is line1\n"
		      ++ (fun () ->
			    Uq_io.input_line_e d 
			    ++ (fun s ->
				  print_endline s;
				  eps_e (`Done()) esys
			       )
			 )
		  | _ -> assert false
	     )

Server sockets

#
type listen_address = [
| `Socket of sockspec * listen_options
| `W32_pipe of Netsys_win32.pipe_mode * string * listen_options
]

Specifies the resource to listen on:

  • `Socket(addr,opts): It is listened on a socket with address addr
  • `W32_pipe(mode,name,opts): It is listened on a pipe server with name which accepts pipe connections in mode.
#
type listen_options = {
# lstn_backlog
: int;(*The length of the queue of not yet accepted connections.*)
# lstn_reuseaddr
: bool;(*Whether to allow that the address can be immediately reused after the previous listener has its socket shut down. (Only for Internet sockets.)*)
}
#
val default_listen_options : listen_options

Returns the default options

#
class type server_endpoint_acceptor =
#
method server_address : connect_address

The contact address under which the clients can establish new connections with this server.

#
method multiple_connections : bool

Whether it is possible to accept multiple connections

#
method accept : unit -> (Unix.file_descr * inetspec option) engine

Instantiates an engine that accepts connections on the listening endpoint.

If the connection is successfully established, the state of the engine changes to `Done(fd,addr) where fd is the connected file descriptor, and where addr (if not-None) is the endpoint address of the connecting client (from the server's perspective). Such addresses are only supported for Internet endpoints. If a proxy is used to accept the connections, the returned address is that from the proxy's view, and usually different from what Unix.getpeername returns.

The close-on-exec flag of the created endpoint descriptor is always set. The endpoint descriptor is always in non-blocking mode.

It is allowed to shut down fd for sending, and it is required to close fd after all data transfers have been performed.

A call of accept allows it only to establish one connection at a time. However, it is allowed to call accept several times to accept several connections, provided the acceptor supports this (returned by multiple_connections). It is only allowed to call accept again when the previous engine was successful.

#
method shut_down : unit -> unit

The server endpoint is shut down such that no further connections are possible. It is required to call this method even for acceptors that do not support multiple connections. It is also required to call this method when an accept was not successful.

If there is a engine waiting for connections, it is aborted.

This class type is for service providers that listen for connections. By calling accept, one gets an engine that waits for the next connection, and establishes it.

There are services that can only accept one connection for a certain contact address. In this case accept must only be called once. Normally, services can accept any number of connections (multiplexing), and it is allowed to call accept again after the previous accept engine was successful.

#
class type server_socket_acceptor = server_endpoint_acceptor

For backward compatibility. Deprecated name for server_endpoint_acceptor

#
class direct_acceptor : ?close_on_shutdown:bool option -> ?preclose:(unit -> unit) option -> Unix.file_descr -> Unixqueue.event_system -> server_endpoint_acceptor

An implementation of server_endpoint_acceptor for sockets and Win32 named pipes. For sockets, the passed descriptor must be the master socket. For Win32 named pipes, the passed descriptor must be the proxy descriptor of the pipe server..

#
class direct_socket_acceptor : Unix.file_descr -> Unixqueue.event_system -> server_endpoint_acceptor

For backward compatibility. Deprecated name for direct_acceptor

#
class type server_endpoint_listener =
#
method listen : listen_address -> Unixqueue.event_system -> server_endpoint_acceptor engine

Instantiates an engine that listens for connections on the socket given by the listen_address argument. If successful, the state of the engine changes to `Done(acc) where acc is the acceptor object guiding you through further operation of the socket (see above).

This class type represents factories for service providers

#
class type server_socket_listener = server_endpoint_listener

For backward compatibility. Deprecated name for server_endpoint_listener

#
val listener : ?proxy:server_socket_listener -> listen_address -> Unixqueue.event_system -> server_socket_acceptor engine

This engine creates a server socket listening on the listen_address. If passed, the proxy is used to create the server socket.

On success, the engine goes to state `Done acc, where acc is the acceptor object (see above). The acceptor object can be used to accept incoming connections.

Datagrams

#
type datagram_type = [
| `Unix_dgram
| `Inet_udp
| `Inet6_udp
]

- `Unix_dgram: Datagrams over Unix domain sockets

  • `Inet_udp: Internet v4 UDP protocol
  • `Inet6_udp: Internet v6 UDP protocol
#
class type wrapped_datagram_socket =
#
method descriptor : Unix.file_descr

The underlying file descriptor. This descriptor must not be used to transfer data (Unix.send(to), Unix.recv(from), etc.), because the descriptor may be connected with a proxy, and the socket addresses may be wrong that are used by the low-level socket functions. The right way is to use the methods below to transfer data. It is allowed, however, to pass the descriptor to Unix.select, and to check whether transfers are possible. It is also allowed to set or clear non-blocking mode, and the close-on-exec flag, and to modify the socket options.

#
method sendto : string -> int -> int -> Unix.msg_flag list -> sockspec -> int

Send data over the (unconnected) socket

#
method recvfrom : string -> int -> int -> Unix.msg_flag list -> int * sockspec

Receive data from the (unconnected) socket. The method will raise EAGAIN if the message cannot be processed for some reason, even if the socket is in blocking mode. In this case, the received message is discarded.

#
method shut_down : unit -> unit

Close the descriptor, shuts down any further needed resources

#
method datagram_type : datagram_type
#
method socket_domain : Unix.socket_domain
#
method socket_type : Unix.socket_type
#
method socket_protocol : int

A wrapped_datagram_socket allows datagrams to be sent via proxies. It provides versions of the sendto and recvfrom functions that use extended socket names (which are proxy-friendly).

#
class type datagram_socket_provider =
#
method create_datagram_socket : datagram_type -> Unixqueue.event_system -> wrapped_datagram_socket engine

Creates an engine that creates a wrapped_datagram_socket object and that sets up any further resources the objects needs.

This is a factory for wrapped_datagram_socket objects.

#
val datagram_provider : ?proxy:datagram_socket_provider -> datagram_type -> Unixqueue.event_system -> wrapped_datagram_socket engine

This engine creates a datagram socket as demanded by the datagram_type, optionally using proxy for sending and receiving datagrams.

The socket is unconnected.

The socket is in non-blocking mode, and the close-on-exec flag is set.

Multiplex Controllers

#
class type multiplex_controller =
#
method alive : bool

If the controller is alive, the socket is not yet completely down.

#
method mem_supported : bool

Whether start_mem_reading and start_mem_writing are possible

#
method event_system : Unixqueue.event_system

Returns the event system

#
method reading : bool

True iff there is a reader

#
method start_reading : ?peek:(unit -> unit) -> when_done:(exn option -> int -> unit) -> string -> int -> int -> unit

Start reading from the connection. When data is available, the when_done callback is invoked. The int is the number of read bytes. It is 0 if an error occurred which is indicated by the exception. The exception End_of_file is used when the end of the data stream is reached. The exception Cancelled indicates that reading has been cancelled in the meantime.

This starts one-time read job only, i.e. it is not restarted after when_done has been invoked.

It is an error to start reading several times.

The function peek is called immediately before data is read in from the underlying communication channel.

For getting an engine-based version of start_reading, use a signal_engine:

           let (e, signal) = signal_engine esys in
           mplex # start_reading ~when_done:(fun xo n -> signal (xo,n)) ...

Now e will transition to `Done(x0,n) when the read is done.

#
method start_mem_reading : ?peek:(unit -> unit) -> when_done:(exn option -> int -> unit) -> Netsys_mem.memory -> int -> int -> unit

Same as start_reading, but puts the data into a memory buffer. There is an optimization for the case that the descriptor is a connected socket, or supports Unix.read. If this is not possible the method raises Mem_not_supported.

#
method cancel_reading : unit -> unit

Cancels the read job. The when_done callback is invoked with the number of bytes read so far (which may be 0) and the exception Cancelled.

It is no error if there is no reader.

#
method writing : bool

True iff there is a writer

#
method start_writing : when_done:(exn option -> int -> unit) -> string -> int -> int -> unit

Start writing to the connection. When data is written, the when_done callback is invoked. The int is the number of written bytes. It is 0 if an error occurred which is indicated by the exception. The exception Cancelled indicates that writing has been cancelled in the meantime.

This starts one-time write job only, i.e. it is not restarted after when_done has been invoked.

It is an error to start writing several times.

See the comment for start_reading for how to get an engine-based version of this method.

#
method start_mem_writing : when_done:(exn option -> int -> unit) -> Netsys_mem.memory -> int -> int -> unit

Same as start_writing, but takes the data from a memory buffer. There is an optimization for the case that the descriptor is a connected socket, or supports Unix.write. If this is not possible the method raises Mem_not_supported.

#
method supports_half_open_connection : bool

Whether the underlying transport mechanism can close the write side of the connection only (half-open connection).

#
method start_writing_eof : when_done:(exn option -> unit) -> unit -> unit

Start writing the EOF marker to the connection. When it is written, the when_done callback is invoked. The exception Cancelled indicates that writing has been cancelled in the meantime.

This starts one-time write job only, i.e. it is not restarted after when_done has been invoked.

It is an error to start writing several times. It is an error to write EOF when the socket does not support half-open connections.

See the comment for start_reading for how to get an engine-based version of this method.

#
method cancel_writing : unit -> unit

Cancels the write job. The when_done callback is invoked with the number of bytes read so far (which may be 0) and the exception Canelled.

It is no error if there is no writer.

#
method read_eof : bool

Whether the EOF marker has been read

#
method wrote_eof : bool

Whether the EOF marker has been written

#
method shutting_down : bool

True iff the shutdown is in progress

#
method start_shutting_down : ?linger:float -> when_done:(exn option -> unit) -> unit -> unit

Start shutting down the connection. After going through the shutdown procedure, the when_done callback is invoked. The exception indicates whether an error happened. Cancelled means that the shutdown operation has been cancelled in the meantime.

The underlying file descriptor (if any) is not closed. A shutdown is only a protocol handshake. After a shutdown, both read_eof and wrote_eof are true. Call inactivate to close the descriptor.

Optionally, one can linger for a certain period of time. It is only lingered when the EOF was written before the EOF is seen on input. Defaults to linger 60.0. Set to 0 to turn off.

See the comment for start_reading for how to get an engine-based version of this method.

#
method cancel_shutting_down : unit -> unit

Cancels the shutdown procedure. After that, the state of the connection is undefined. The when_done callback is invoked with the exception Cancelled.

It is no error if no shutdown is in progress.

#
method inactivate : unit -> unit

Inactivates the connection immediately, and releases any resources the controller is responsible for (e.g. closes file descriptors). Note that this is more than cancelling all pending operations and shutting the connection down. However, the details of this method are implementation-defined. Callbacks are not invoked.

A multiplex_controller is a quite low-level device to abstract bidirectional socket connections. It is independent of any real device.

There can be a reader, a writer (or both), or alternatively, the shutdown process may be in progress. One cannot have more than one reader and more than more writer.

#
exception Mem_not_supported

May be raised by multiplex controller methods start_mem_reading and start_mem_writing if these methods are not supported for the kind of file descriptor

#
val create_multiplex_controller_for_connected_socket : ?close_inactive_descr:bool -> ?preclose:(unit -> unit) -> ?supports_half_open_connection:bool -> ?timeout:float * exn -> Unix.file_descr -> Unixqueue.unix_event_system -> multiplex_controller

Creates a multiplex controller for a bidirectional socket (e.g. a TCP socket). It is essential that the socket is in connected state. This function also supports Win32 named pipes.

Note that the file descriptor is not closed when the attached engines are terminated. One can call inactivate manually to do that.

close_inactive_descr: Whether inactivate closes the descriptor. True by default.

preclose: This function is called just before the descriptor is closed.

supports_half_open_connection: This implementation does not know how to find out whether the socket supports half-open connections. You can simply set this boolean because of this. Defaults to false. You can set it to true for TCP connections and for Unix-domain connections with stream semantics.

timeout: If set to (t, x), a general timeout of t is set. When an operation has been started, and there is no I/O activity within t seconds, neither by the started operation nor by another operation, the connection times out. In this case, the operation returns the exception x.

#
class type datagram_multiplex_controller =
#
method received_from : Unix.sockaddr

Returns the socket address of the last received datagram. This value is updated just before the when_done callback of the reader is invoked.

#
method send_to : Unix.sockaddr -> unit

Sets the socket address of the next datagram to send.

Additional methods for unconnected datagram handling

#
val create_multiplex_controller_for_datagram_socket : ?close_inactive_descr:bool -> ?preclose:(unit -> unit) -> ?timeout:float * exn -> Unix.file_descr -> Unixqueue.unix_event_system -> datagram_multiplex_controller

Creates a multiplex controller for datagram sockets (e.g. UDP socket).

Note that the file descriptor is not closed when the attached engines are terminated. One can call inactivate manually to do that.

close_inactive_descr: Whether inactivate closes the descriptor. True by default.

preclose: This function is called just before the descriptor is closed.

timeout: If set to (t, x), a general timeout of t is set. When an operation has been started, and there is no I/O activity within t seconds, neither by the started operation nor by another operation, the connection times out. In this case, the operation returns the exception x.

#
type onshutdown_out_spec = [
| `Ignore
| `Initiate_shutdown
]

See class output_async_mplex for explanations

#
type onshutdown_in_spec = [
| `Ignore
| `Initiate_shutdown
]

See class input_async_mplex for explanations

#
class output_async_mplex : ?onclose:[
| `Write_eof
| `Ignore
] option -> ?onshutdown:onshutdown_out_spec option -> ?buffer_size:int option -> multiplex_controller -> async_out_channel_engine

Creates an asynchronous output channel writing to the multiplex controller (see also output_async_descr for the corresponding class writing to a single descriptor).

onclose: What to do when the close_out method is invoked. Defaults to `Ignore. `Write_eof means to write the EOF marker. Anyway, after doing the close action, the multiplex controller is shutdown.

onshutdown: What to do when all data (and optionally, the EOF marker) have been written. It is also invoked in case of I/O errors. The default is `Ignore. The value `Initiate_shutdown means that it is started to shutdown the socket. The success of this action is not waited upon, however. One can also pass `Action f in which case the function f is called with this object, the multiplex controller, and the proposed next state as arguments. By checking the proposed next state the function can see why the shutdown function was called.

buffer_size: The size of the internal buffer. By default unlimited.

Note that the engine is done when the output channel is closed. The socket is not shut down, and the underlying file descriptor is not closed! You can define the shutdown callback to do something in this case.

#
class input_async_mplex : ?onshutdown:onshutdown_in_spec option -> ?buffer_size:int option -> multiplex_controller -> async_in_channel_engine

Creates an asynchronous input channel reading from the multiplex controller.

onshutdown: See output_async_mplex.

buffer_size: The size of the internal buffer. By default unlimited.

Note that the engine is done when the input channel is closed. The socket is not shut down, and the underlying file descriptor is not closed! You can define the shutdown callback to do something in this case.

Recursion

When programming with engines, it is normal to use recursion for any kind of loops. For example, to read the lines from a file:

      open Uq_engines.Operators  (* for ">>" and "++" *)

      let fd = 
        Unix.openfile filename [Unix.O_RDONLY] 0 in
      let d = 
        `Buffer_in(Uq_io.create_in_buffer(`Polldescr(`Read_write,fd,esys))) in

      let rec read_lines acc =
        Uq_io.input_line_e d >>
          (function                       (* catch exception End_of_file *)
            | `Done line -> `Done(Some line)
            | `Error End_of_file -> `Done None
            | `Error error -> `Error error
            | `Aborted -> `Aborted
          ) ++
          (function
            | Some line ->
                read_lines (line :: acc)
            | None ->
                eps_e (`Done (List.rev acc)) esys
          ) in

      let e = read_lines []

There is generally the question whether this style leads to stack overflows. This depends on the mechanisms that come into play:

  • The engine mechanism passing control from one engine to the next is not tail-recursive, and thus the stack can overflow when the recursion becomes too deep
  • The event queue mechanism, however, does not have this problem. Control falls automatically back to the event queue whenever I/O needs to be done.

In this example, this means that only the engine mechanism is used as long as the data is read from the buffer. When the buffer needs to be refilled, however, control is passed back to the event queue (so the stack is cleaned), and the continuation of the execution is only managed via closures (which only allocate memory on the heap, not on the stack). Usually, this is a good compromise: The engine mechnism is a lot faster, but I/O is an indicator for using the better but slower technique.

Also note another difference: The event queue mechanism allows that other asynchronous code attached to the same event queue may run (control maybe yielded to unrelated execution contexts). The pure engine mechanism does not allow that. This may be handy when exclusive access to variables is needed. (But be careful here - this is very sensitive to minimal changes of the implementation.)

Certain engines enforce using the event queue mechanisms although they are unrelated to I/O. Especially Uq_engines.delay_engine is useful here: A "delay" of 0 seconds is already sufficient to go back to the event queue. If recursions sometimes lead to stack overflows the solution is to include such a zero delay before doing the self call.

More Engines

Pointers to other modules related to engines:

  • RPC clients: The function Rpc_proxy.ManagedClient.rpc_engine allows to call an RPC via an engine. When the call is done, the engine transitions to `Done r, and r is the result of the remote call.
  • Subprograms: The class Shell_uq.call_engine allows to start an external program, and to monitor it via an engine.

Debugging

#
module Debug : sig
#
val enable : bool Pervasives.ref

Enables [root:Netlog]-style debugging

end
end