An engine performs a certain task in an autonomous way. Engines are attached to a Unixqueue.event_system, and do their task by generating events for resources of the operating system, and by handling such events. Engines are in one of four states: They may be still working, they may be done, they may be aborted, or they may be in an error state. The three latter states a called final states, because they indicate that the engine has stopped operation.
It is possible to ask an engine to notify another object when it changes its state. For simplicity, notification is done by invoking a callback function, and not by issuing notification events.
Effectively, engines provide a calculus for cooperative microthreading. This calculus includes combinators for sequential execution and synchronization. Moreover, it is easy to connect it with callback-style microthreading - one can arrange callbacks when an engine is done, and one can catch callbacks and turn them into engines.
Raised when a method of a closed channel object is called (only channel methods count).
This exception should be regarded as equivalent to
Netchannels.Closed_channel
, but need not be the same exception.
Some engines indicate this error when they cannot continue because the other endpoint of communication signals an error.
This exception is not raised, but used as argument of the `Error
state.
Used by the watchdog engine to indicate a timeout.
This exception is not raised, but used as argument of the `Error
state.
Used by input_engine
and output_engine
to indicate timeouts
Raised by client_socket_connector
and server_socket_acceptor
to
indicate that the passed address is not supported by the class.
The callback function of a multiplex_controller
is invoked with this
exception if the operation is cancelled.
The type of states with result values of type 't
:
`Working n
: The engine is working. The number n
counts the number
of events that have been processed.`Done arg
: The engine has completed its task without errors.
The argument arg
is the result value of the engine`Error exn
: The engine has aborted because of an error. The
argument exn
describes the error as an exception.`Aborted
: The engine has aborted because the abort
method
was calledSame as engine_state
without `Working
. These are only the final
states.
For debug purposes: Returns a string describing the state
Requirements for engines
Forces that the engine aborts operation. If the state is already
`Done
, `Aborted
, or `Error
, this method must do nothing (you
cannot abort an already finished engine).
Requests notification about state changes.
After the notification has been requested, the passed function must
be called whenever state
changes its value (or might change
its value; it is allowed to call the notification function more
frequently than necessary). The function returns true
if there
is still interest in notification, and false
if notification must
be disabled; the function must not be called any longer in this
case.
There can be any number of parallel active notifications. It is allowed that a notification callback function requests further notifications.
If the callback raises an exception, this exception is propagated to the caller of Unixqueue.run.
This class type defines the interface an engine must support. The
class parameter 't
is the type of the result values (when the
engine goes to state `Done
).
Watches the state of the argument engine, and arranges that one of
the functions is called when the corresponding state change is done.
Once a final state is reached, the engine is no longer watched.
Note that when_state
only observes future state changes.
If one of the functions raises an exception, this exception is propagated to the caller of Unixqueue.run.
`Done
. The argument of
is_done
is the argument of the `Done
state.
`Error
. The argument of
is_error
is the argument of the `Error
state.
`Aborted
.
`Working
state changes. The int argument is the new `Working
arg.
let se = new signal_engine esys
: The engine se
remains in
`Working 0
until the method se # signal x
is called. At this point
e
transitions to x
. Any further call of signal
does not
have any effect.
Also, if se
is aborted, signal
does not have any effect.
The function signal
may be called from a different thread.
The signalling event is forwarded to the thread running the
event system.
let (se, signal) = signal_engine esys
: Same as function
The following combinators serve as the control structures to connect primitive engines with each other.
The map_engine
observes the argument engine, and when the
state changes to `Done
, `Error
, or `Aborted
, the corresponding
mapping function is called, and the resulting state becomes the state
of the mapped engine. If the engine is already in one of the
mentioned states, the map functions are also called (unlike
when_state
).
After the state change to `Done
, `Error
, or `Aborted
has been
observed, the map engine detaches from the argument engine,
and no further state changes are recognized.
The state `Working
cannot be mapped to another state. It is an
error to map final states to `Working
.
The result type of the map_*
functions is engine_state
and not final_state
because of historic reasons.
If the mapped engine is aborted, this request will be forwarded to the argument engine.
If one of the mapping functions raises an exception, this causes
a transiton to `Error
.
`Done
state of the argument engine to
another state. The argument of map_done
is the argument of the
`Done
state. Note that map_done
is non-optional only because
of typing. If it were optional, the type checker would infer 'a = 'b
.
`Error
state of the argument engine to
another state. The argument of map_error
is the argument of the
`Error
state.
`Aborted
state of the argument engine to
another state.
`Working
state in the argument engine are propagated. Defaults to true
.
If set to false
, the mapped engine remains in `Working 0
until
it transitions to a final state.
Same as function
Similar to map_engine
but different calling conventions: The
mapping function is called when the argument engine reaches a
final state, and this state can be mapped to another final state.
Same as function
After opening Uq_engines.Operators, this is also available
as operator >>
, e.g.
e >>
(function
| `Done r -> ...
| `Error error -> ...
| `Aborted -> ...
)
This engine transitions from its initial state `Working 0
in one
step ("epsilon time") to the passed constant state. During this time
event processing will continue, so concurrently running engines can
make progress. For performance reasons, however, external resources
like file descriptors are not watched for new events.
In previous versions of this library the class was called const_engine
.
However, this is not a constant thing. In particular, it is possible
that this engine is aborted, so the passed state is not reached.
To avoid programming errors because of the misnomer, this class has been
renamed.
This engine runs two engines in sequential order. It is called
let eng_s = new seq_engine eng_a f
When eng_a
goes to the state `Done arg
, the function f
is called to
obtain
let eng_b = f arg
eng_b
runs until it is also in state `Done
.
If eng_a
or eng_b
go to states `Aborted
or `Error
, the
sequential engine eng_s
does so, too. If eng_s
is aborted,
this request will be forwarded to the currently active engine,
eng_a
or eng_b
.
If calling f
results in an exception, this is handled as if eng_a
signaled an exception.
Same as function.
After opening Uq_engines.Operators, this is also available
as operator ++
, e.g.
e1 ++ (fun r1 -> e2)
(when e1
and e2
are engines, and r1
is the result of e1
).
let se = new stream_seq_engine x0 s esys
: The constructed engine se
fetches functions f : 'a -> 'a #engine
from the stream s
, and
runs the engines obtained by calling these functions e = f x
one
after the other. Each function call gets the result of the previous
engine as argument. The first call gets x0
as argument.
If one of the engines e
transitions into an error or aborted state,
se
will also do that. If se
is aborted, this is passed down to
the currently running engine e
.
This engine runs two engines in parallel, and waits until both
are `Done
(synchronization). The product of the two `Done
arguments
is taken as the combined result.
If one of the engines goes to the states `Aborted
or `Error
,
the combined engine will follow this transition. The other,
non-aborted and non-errorneous engine is aborted in this case.
`Error
has higher precedence than `Aborted
.
If the combined engine is aborted, this request is forwarded to both member engines.
Multiple synchronization:
let me = new msync_engine el f x0 esys
- Runs the engines in el
in
parallel, and waits until all are `Done
. The result of me
is
then computed by folding the results of the part engines using
f
, with an initial accumulator x0
.
If one of the engines goes to the states `Aborted
or `Error
,
the combined engine will follow this transition. The other,
non-aborted and non-errorneous engines are aborted in this case.
`Error
has higher precedence than `Aborted
.
If calling f
results in an exception, this is handled as if
the part engine signals an error.
If the combined engine is aborted, this request is forwarded to all member engines.
Same as function
let de = delay_engine d f esys
: The engine e = f()
is created
after d
seconds, and the result of e
becomes the result of de
.
Same as function
A watchdog engine checks whether the argument engine makes
progress, and if there is no progress for the passed number of
seconds, the engine is aborted, and the watchdog state changes
to `Error Watchdog_timeout
.
The current implementation is not very exact, and it may take a little longer than the passed period of inactivity until the watchdog recognizes inactivity.
If the argument engine terminates, the watchdog changes its state to
`Done ()
Important note: The watchdog assumes that the `Working
state
of the target engine really counts events that indicate progress.
This does not work for:
poll_process_engine
: there is no way to check whether a subprocess
makes progressconnector
: It is usually not possible to reflect the progress
on packet levellistener
: It is usually not possible to reflect the progress
on packet levellet se = serialized f
: Waits until all the previous engines reach
a final state, and then runs e = f esys
.
se
enters a final state when e
does.
A serializer queues up engines, and starts the next engine when the previous one finishes.
let pe = prioritized f p
: Queues up f
on priority level p
.
The engine e = f esys
can start when there is no waiting
engine on a higher priority level (i.e. with a number less than
p
), and all running engines on lower priority levels are done.
pe
enters a final state when e
does.
A prioritizer allows to prioritize the execution of engines: At any
time, only engines of a certain priority p
can be executed. If an
engine with a higher priority ph
wants to start, it prevents further
engines with priority level p
from being started until the higher
prioritized engines with level ph
are done. On the same priority level,
there is no limit for the number of executed engines.
Here, higher priorities have lower numbers.
Requests the value. If it is not already in the cache, the engine for getting the value is started, and it is waited until the value is available.
Returns the cached value if available
Puts a value immediately into the cache. It replaces an existing
value. If it is currently tried to obtain a new value by running
an engine, this engine is kept running, and get_engine
will
return its result. Only future calls of get_engine
will return
the value just put into the cache.
Invalidates the cache - if a value exists in the cache, it is removed.
If in the future the cache value is requested via get_engine
the engine will be started anew to get the value.
Note that (as for put
) any already running get_engine
is not
interrupted.
Any engine running to get the cache value is aborted, and the contents
of the cache are invalidated. Note that also the engines returned
by get_engine
are aborted.
A cache contains a mutable value that is obtained by running an engine.
new cache f esys
: A cache that runs f esys
to obtain values
Same as function
A useful class fragment that implements state
and
request_notification
.
Handy operators: ++
, >>
, and eps_e
The most important operators. This module should be opened.
Another name for qseq_engine
. Use this operator to run engines in
sequence:
e1 ++ (fun r1 -> e2) ++ (fun r2 -> e3) ++ ...
Here rK
is the result of engine eK
.
Change in OCamlnet-3.6.4: ++
is now qseq_engine
, and no longer
seq_engine
, and hence it does not support progress reporting anymore.
Redefine ++
as seq_engine
in your own code if you need the old
behavior.
Another name for fmap_engine
. Use this operator to map the
final value of an engine:
e >> (function `Done x -> ... | `Error e -> ... | `Aborted -> ...)
Activate the engine again when it is already in a final state. This method violates the engine protocol, and should be used with care; it is not allowed to leave a final state.
The notification lists are kept, but note that observers often detach when final states are reached. This may cause problems.
This engine waits until one of the passed operations can be
carried out, or until one of the operations times out.
In these cases, the state of the engine changes to `Done ev
, where
ev
is the corresponding event.
The argument list enumerates the operations to watch for. For every operation there may be a positive timeout value, or a negative number to indicate that no timeout is specified.
After one event has been caught, the engine terminates operation.
The method restart
can be called to activate it again (with the
same event condition, and the same notification list). See the
description of restart
for possible problems.
Extra
event is
found. If the function returns true
for the argument exception
of Extra
, the event is caught; otherwise it is rejected.
Generic input engine for reading from a file descriptor:
let e = new input_engine f fd tmo
- Waits until the file descriptor
becomes readable, and calls then let x = f fd
to read from the
descriptor. The result x
is the result of the engine.
If the file descriptor does not become readable within tmo
seconds,
the resulting engine transitions to `Error Timeout
.
Use this class to construct engines reading via Unix.read
or
comparable I/O functions:
let read_engine fd tmo esys =
new input_engine (fun fd ->
let buf = String.create 4096 in
let n = Unix.read fd buf 0 (String.length buf) in
String.sub buf 0 n
)
fd tmo esys
This engine returns the read data as string.
See also Uq_io.input_e for a more generic way of reading with engines.
Generic output engine for writing to a file descriptor:
let e = new output_engine f fd tmo
- Waits until the file descriptor
becomes writable, and calls then let x = f fd
to write to the
descriptor. The result x
is the result of the engine.
If the file descriptor does not become writable within tmo
seconds,
the resulting engine transitions to `Error Timeout
.
Use this class to construct engines writing via Unix.single_write
or
comparable I/O functions:
let write_engine fd s tmo esys =
new output_engine (fun fd ->
Unix.single_write fd s 0 (String.length s)
)
fd tmo esys
This engine returns the number of written bytes.
See also Uq_io.output_e for a more generic way of writing with engines.
This class is deprecated! Use the classes in [root:Shell_uq] instead.
This engine waits until the process with the ID pid
terminates.
When this happens, the state of the engine changes to
`Done
, and the argument of `Done
is the process status.
The engine does not catch stopped processes.
The engine checks the process status every period
seconds, and
whenever there is a Signal
event on the queue. The idea of the
latter is that the user of this engine can increase the responsiveness
by defining a signal handler for SIGCHLD signals (the handler need
not to perform any special action, it must just be defined). When
the sub process terminates, a SIGCHLD signal is sent to the current
process. If the event loop happens to wait for new conditions (which
is usually very likely), a Signal
event will be generated, and
the engine will check the process status very soon. Note that it is
not guaranteed that a terminating process triggers a Signal
event,
although it is very likely.
You can define an empty SIGCHLD handler with:
Sys.set_signal Sys.sigchld (Sys.Signal_handle (fun _ -> ()))
period
seconds the process status is checked.
Defaults to 0.1 seconds.
The module [root:Uq_io] provides a bunch of functions to read and write data via various "devices". All these functions return engines, and are easy to use. Devices can be file descriptors, but also other data structures. In particular, there is also support for buffered I/O and for reading line-by-line from an input device.
Transfer engines copy data between file descriptors. This kind of engine is likely to be declared as deprecated in the future. If possible, one should use multiplex controllers (see below), and for copying streams the generic copier Uq_io.copy_e is a better choice.
The pure types async_in_channel
and async_out_channel
have been
proven to be useful for bridging with [root:Netchannels].
raw_out_channel
output s k n
: Writes the substring of s
beginning at index
k
with length n
into the channel. The channel is free to
accept only a portion of the string (or even nothing), and
returns the number of bytes it accepts.
Closes the channel
Returns the number of characters output into the channel
Flushes the channel. Asynchronous channels usually ignore flush requests. A potential meaning of flushing could be that no more data are accepted until the current buffer is completely processed. Implementing this is optional.
Whether output is possible, i.e. the output method accepts at least one byte
After the notification has been requested, the passed function is
be called whenever can_output
changes its value (or might change
its value). The function returns true
if there is still interest
in notification, and false
if notification must be disabled.
There can be any number of parallel active notifications. It is allowed that a notification callback requests further notifications.
An asynchrounous output channel provides methods to output data to
a stream descriptor. It is based on raw_out_channel
, which is
defined by the Ocamlnet module Netchannels
(see there for an
introduction into the idea of using objects as I/O channels).
An asynchronous channel can indicate that there is no space in the
output buffer. Furthermore, one can request notification in the case
that there is no space or again space in the output buffer.
raw_in_channel
input s k n
: Reads channel data into the substring of s
beginning at index k
with length n
. The channel is free to
fill only a portion of the string (or even nothing). The method
returns the number of bytes actually read.
The exception End_of_file
indicates that the end of the channel
is reached. The return value 0
, however, means that no data
could be read.
Closes the channel
Returns the number of characters read from the channel
Whether input is possible, i.e. the input method gets at least
one byte, or can signal End_of_file
.
After the notification has been requested, the passed function is
be called whenever can_input
changes its value (or might change
its value). The function returns true
if there is still interest
in notification, and false
if notification must be disabled.
There can be any number of parallel active notifications. It is allowed that a notification callback requests further notifications.
An asynchrounous input channel provides methods to input data from
a stream descriptor. It is based on raw_in_channel
, which is
defined by the Ocamlnet module Netchannels
(see there for an
introduction into the idea of using objects as I/O channels).
An asynchronous channel can indicate that there is no data in the
input buffer. Furthermore, one can request notification in the case
that there is no data or again data in the input buffer.
Takes a Netchannels.raw_out_channel as an asynchronous channel. It is always possible to output to this channel.
Takes a Netchannels.raw_in_channel as an asynchronous channel. It is always possible to input from this channel.
This engine copies all data from the src
file descriptor to the
dst
output channel. The engine attaches immediately to the
event system, and detaches automatically.
By default, both the file descriptor and the output channel are closed when the engine stops operation, either successfully or because of an error.
The semantics of the engine is undefined if src
is not a
stream-oriented descriptor.
The engine goes to `Error
state when either reading from src
or writing to dst
raises an unexpected exception.
For every file descriptor event, the state is advanced from
`Working n
to `Working (n+1)
.
TODO: This class cannot yet cope with Win32 named pipes.
src
when the engine stops
(default: true
)
dst
when the engine stops
(default: true
)
This engine copies all data from the src
input channel to the
dst
file descriptor. The engine attaches immediately to the
event system, and detaches automatically.
By default, both the file descriptor and the output channel are closed when the engine stops operation, either successfully or because of an error.
The semantics of the engine is undefined if dst
is not a
stream-oriented descriptor.
The engine goes to `Error
state when either reading from src
or writing to dst
raises an unexpected exception.
For every file descriptor event, the state is advanced from
`Working n
to `Working (n+1)
.
TODO: This class cannot yet cope with Win32 named pipes.
src
when the engine stops
(default: true
)
dst
when the engine stops
(default: true
)
Combination of engine + async_out_channel
Combination of engine + async_in_channel
This engine implements an async_out_channel
for the output
descriptor dst
. The engine provides an internal buffer to
reduce the number of blocked output operations; by default there
is even no limit for the growth of the buffer, and because of this
the channel never blocks (can_output
is always true
).
The engine attaches immediately to the event system, and detaches automatically. By default, the file descriptor is closed when the engine stops operation, either successfully or because of an error.
If the buffer is full, the class accepts no more data until
there is again free space in the buffer. This means that writers
must be prepared that can_output
returns false
, and that
the output
method returns 0. The buffer can only get "full"
if the buffer_size
argument is passed.
The notification mechanism is shared by the "engine nature" and
by the "channel nature" of this class: If either the state
or
can_output
change their values, the notification callbacks
are invoked.
The semantics of the engine is undefined if dst
is not a
stream-oriented descriptor.
TODO: This class cannot yet cope with Win32 named piped.
dst
when the engine stops
(default: true
)
The corresponding class for asynchronous input channels.
TODO: This class cannot yet cope with Win32 named piped.
Specifies the task the copier
class has to do:
`Unidirectional(src,dst)
: Data from src
are copied to dst
.
EOF of src
causes that both descriptors are closed.`Uni_socket(src,dst)
: Data from src
are copied to dst
.
EOF of src
causes that dst
is shut down for sending; all descriptors
remain open. It is required that dst
is a socket.`Bidirectional(bi1,bi2)
: Data from bi1
are copied to bi2
,
and data from bi2
are copied to bi1
. EOF of one descriptor
causes that the other descriptor is shut down for sending.
When both descriptors are at EOF, both are closed.
It is required that bi1
and bi2
are sockets.`Tridirectional(bi,dst,src)
: Data from bi
are copied to dst
,
and data from src
are copied to bi
(i.e. a bidirectional
descriptor is split up into two unidirectional descriptors).
EOF of bi
causes that dst
is closed. EOF of src
causes
that bi
is shut down for sending. EOF in both directions
causes that all descriptors are closed. It is required that
bi
is a socket.This engine copies data between file descriptors as specified by
the copy_task
argument.
The task is done when all input descriptors are at EOF. See
the description of copy_task
for details, especially whether
the descriptors are closed or not.
On error or abort, the descriptors are only closed if they had been closed on regular EOF.
The semantics of the engine is undefined if one of the descriptors is not stream-oriented.
TODO: This class cannot yet cope with Win32 named piped.
Note that Win32 named pipes are also supported by the following API's, although they are not sockets. These pipes have a feature set comparable to Unix domain sockets.
Extended names for socket addresses. Currently, these naming schemes are supported:
`Sock_unix(stype,path)
: Names the Unix domain socket at path
.
The socket type stype
is an auxiliary piece of information, but
not a distinguishing part of the name. path = ""
refers to
anonymous sockets. Otherwise, the path
must be an absolute path name.`Sock_inet(stype,addr,port)
: Names the Internet socket of type
stype
bound to the IP address addr
and the port
.
If stype = Unix.SOCK_STREAM
, a TCP socket is meant, and if
stype = Unix.SOCK_DGRAM
, a UDP socket is meant. It is allowed
that addr = Unix.inet_addr_any
. If port = 0
, the name is to
be considered as incomplete.`Sock_inet_byname(stype,name,port)
: Names the Internet socket of
type stype
bound to the IP address corresponding to the
name
, and bound to the port
. It is unspecified which naming
service is used to resolve name
to an IP address, and how it is
used. If the name
cannot be resolved, no socket is meant; this
is usually an error. stype
is interpreted as for `Sock_inet
.
If port = 0
, the name is to be considered as incomplete.It is currently not possible to name IP sockets that are bound to several IP addresses but not all IP addresses of the host.
Converts a normal socket address to the extended form
Converts a Netsockaddr.socksymbol to this form
Specifies the service to connect to:
`Socket(addr,opts)
: Connect to the passed socket address`Command(cmd,handler)
: The cmd
is started with the shell,
and stdin
and stdout
are used to transfer data to the
process and from the process, respectively. Only SOCK_STREAM
type is supported. Note that the passed file descriptors are
normal pipes, not sockets (so the descriptors can be individually
closed).stderr
of the command is connected with stderr
of
the caller process.handler
is invoked with the process ID and the event system
to give the caller a chance to arrange that the process will be
waited for.`W32_pipe(mode,name)
: A Win32 named pipeThis type corresponds with Uq_engines.connect_address: An engine connecting with an address `X will return a status of `X.
`Socket(fd,addr)
: fd
is the client socket connected with the
service. addr
is the socket address of the client that must be
used by the server to reach the client.`Command(fd, pid)
: fd
is the Unix domain socket connected with
the running command. pid
is the process ID.`W32_pipe fd
: fd
is the proxy descriptor of the connected
Win32 named pipe endpoint. See [root:Netsys_win32] how to get the
w32_pipe
object to access the pipe. The proxy descriptor cannot
be used for I/O.Returns the client endpoint contained in the connect_status
For backward compatibility. Deprecated name for client_endpoint
Instantiates an engine that connects to the endpoint given by the
connect_address
argument. If successful, the state of the engine
changes to `Done(status)
where status
contains the socket
details. The connection is established in the background.
The type of status will correspond to the type of connect address
(e.g. a `Socket
address will return a `Socket
status).
The close-on-exec flag of the created socket descriptor is always set. The socket descriptor is always in non-blocking mode.
This class type provides engines to connect to a service. In order
to get and activate such an engine, call connect
.
For backward compatibility. Deprecated name for
client_endpoint_connector
This engine connects to a socket as specified by the connect_address
,
optionally using the proxy
, and changes to the state
`Done(status)
when the connection is established.
If the proxy
does not support the connect_address
, the class
will raise Addressing_method_not_supported
.
The descriptor fd
(part of the connect_status
) is in non-blocking mode,
and the close-on-exec flag is set.
It is the task of the caller to close this descriptor.
The engine attaches automatically to the event system, and detaches when it is possible to do so. This depends on the type of the connection method. For direct socket connections, the engine can often detach immediately when the conection is established. For proxy connections it is required that the engine copies data to and from the file descriptor. In this case, the engine detaches when the file descriptor is closed.
It is possible that name service queries block execution.
If name resolution fails, the engine will enter
`Error(Uq_resolver.Host_not_found name)
. This is new since
Ocamlnet-3.3 - before this version, the exception was simply
Not_found
.
Example of using connector
: This engine e
connects to the
"echo" service as provided by inetd, sends a line of data to it,
and awaits the response.
let e =
Uq_engines.connector
(`Socket(`Sock_inet_byname(Unix.SOCK_STREAM, "localhost", 7),
Uq_engines.default_connect_options))
esys
++ (fun cs ->
match cs with
| `Socket(fd,_) ->
let mplex =
Uq_engines.create_multiplex_controller_for_connected_socket
~supports_half_open_connection:true
fd esys in
let d_unbuf = `Multiplex mplex in
let d = `Buffer_in(Uq_io.create_in_buffer d_unbuf) in
Uq_io.output_string_e d_unbuf "This is line1\n"
++ (fun () ->
Uq_io.input_line_e d
++ (fun s ->
print_endline s;
eps_e (`Done()) esys
)
)
| _ -> assert false
)
Specifies the resource to listen on:
`Socket(addr,opts)
: It is listened on a socket with address addr
`W32_pipe(mode,name,opts)
: It is listened on a pipe server with
name
which accepts pipe connections in mode
.The contact address under which the clients can establish new connections with this server.
Whether it is possible to accept multiple connections
Instantiates an engine that accepts connections on the listening endpoint.
If the connection is successfully established, the state of the engine
changes to `Done(fd,addr)
where fd
is the connected file descriptor,
and where addr
(if not-None
) is the endpoint address of the
connecting client (from the server's perspective). Such addresses are
only supported for Internet endpoints. If a proxy is used to accept
the connections, the returned address is that from the proxy's
view, and usually different from what Unix.getpeername
returns.
The close-on-exec flag of the created endpoint descriptor is always set. The endpoint descriptor is always in non-blocking mode.
It is allowed to shut down fd
for sending, and it is required to
close fd
after all data transfers have been performed.
A call of accept
allows it only to establish one connection at a time.
However, it is allowed to call accept
several times to accept several
connections, provided the acceptor supports this (returned by
multiple_connections
). It is only allowed to call accept
again
when the previous engine was successful.
The server endpoint is shut down such that no further connections
are possible. It is required to call this method even for acceptors
that do not support multiple connections. It is also required to
call this method when an accept
was not successful.
If there is a engine waiting for connections, it is aborted.
This class type is for service providers that listen for connections.
By calling accept
, one gets an engine that waits for the next
connection, and establishes it.
There are services that can only accept one connection for a
certain contact address. In this case accept
must only be called
once. Normally, services can accept any number of connections
(multiplexing), and it is allowed to call accept
again after
the previous accept engine was successful.
For backward compatibility. Deprecated name for
server_endpoint_acceptor
An implementation of server_endpoint_acceptor
for sockets and Win32
named pipes. For sockets, the passed descriptor must be the master
socket. For Win32 named pipes, the passed descriptor must be the
proxy descriptor of the pipe server..
For backward compatibility. Deprecated name for direct_acceptor
Instantiates an engine that listens for connections on the socket given
by the listen_address
argument. If successful, the state of the engine
changes to `Done(acc)
where acc
is the acceptor object guiding
you through further operation of the socket (see above).
This class type represents factories for service providers
For backward compatibility. Deprecated name for
server_endpoint_listener
This engine creates a server socket listening on the listen_address
.
If passed, the proxy
is used to create the server socket.
On success, the engine goes to state `Done acc
, where acc
is
the acceptor object (see above). The acceptor object can be used
to accept incoming connections.
- `Unix_dgram
: Datagrams over Unix domain sockets
`Inet_udp
: Internet v4 UDP protocol`Inet6_udp
: Internet v6 UDP protocolThe underlying file descriptor. This descriptor must not be used
to transfer data (Unix.send(to)
, Unix.recv(from)
, etc.), because the
descriptor may be connected with a proxy, and the socket addresses
may be wrong that are used by the low-level socket functions.
The right way is to use the methods below to transfer data. It is
allowed, however, to pass the descriptor to Unix.select
, and to check
whether transfers are possible. It is also allowed to set or clear
non-blocking mode, and the close-on-exec flag, and to modify the
socket options.
Close the descriptor, shuts down any further needed resources
A wrapped_datagram_socket
allows datagrams to be sent via proxies.
It provides versions of the sendto
and recvfrom
functions that
use extended socket names (which are proxy-friendly).
Creates an engine that creates a wrapped_datagram_socket
object
and that sets up any further resources the objects needs.
This is a factory for wrapped_datagram_socket
objects.
This engine creates a datagram socket as demanded by the datagram_type
,
optionally using proxy
for sending and receiving datagrams.
The socket is unconnected.
The socket is in non-blocking mode, and the close-on-exec flag is set.
If the controller is alive, the socket is not yet completely down.
Whether start_mem_reading
and start_mem_writing
are possible
True iff there is a reader
Start reading from the connection. When data is available, the
when_done
callback is invoked. The int is the number of read
bytes. It is 0 if an error occurred which is indicated by the
exception. The exception End_of_file
is used when the end of the
data stream is reached. The exception Cancelled
indicates that
reading has been cancelled in the meantime.
This starts one-time read job only, i.e. it is not restarted
after when_done
has been invoked.
It is an error to start reading several times.
The function peek
is called immediately before data is read in
from the underlying communication channel.
For getting an engine-based version of start_reading
, use
a signal_engine
:
let (e, signal) = signal_engine esys in
mplex # start_reading ~when_done:(fun xo n -> signal (xo,n)) ...
Now e
will transition to `Done(x0,n)
when the read is done.
Same as start_reading
, but puts the data into a memory
buffer.
There is an optimization for the case that the descriptor is a
connected socket, or supports Unix.read
. If this is not possible
the method raises Mem_not_supported
.
Cancels the read job. The when_done
callback is invoked with the
number of bytes read so far (which may be 0) and the exception
Cancelled
.
It is no error if there is no reader.
True iff there is a writer
Start writing to the connection. When data is written, the
when_done
callback is invoked. The int is the number of written
bytes. It is 0 if an error occurred which is indicated by the
exception. The exception Cancelled
indicates that
writing has been cancelled in the meantime.
This starts one-time write job only, i.e. it is not restarted
after when_done
has been invoked.
It is an error to start writing several times.
See the comment for start_reading
for how to get an engine-based
version of this method.
Same as start_writing
, but takes the data from a memory
buffer.
There is an optimization for the case that the descriptor is a
connected socket, or supports Unix.write
. If this is not possible
the method raises Mem_not_supported
.
Whether the underlying transport mechanism can close the write side of the connection only (half-open connection).
Start writing the EOF marker to the connection. When it is written,
the when_done
callback is invoked. The exception Cancelled
indicates
that writing has been cancelled in the meantime.
This starts one-time write job only, i.e. it is not restarted
after when_done
has been invoked.
It is an error to start writing several times. It is an error to write EOF when the socket does not support half-open connections.
See the comment for start_reading
for how to get an engine-based
version of this method.
Cancels the write job. The when_done
callback is invoked with the
number of bytes read so far (which may be 0) and the exception
Canelled
.
It is no error if there is no writer.
Whether the EOF marker has been read
Whether the EOF marker has been written
True iff the shutdown is in progress
Start shutting down the connection. After going through the shutdown
procedure, the when_done
callback is invoked. The exception
indicates whether an error happened. Cancelled
means that the
shutdown operation has been cancelled in the meantime.
The underlying file descriptor (if any) is not closed. A shutdown
is only a protocol handshake. After a shutdown, both read_eof
and wrote_eof
are true. Call inactivate
to close the descriptor.
Optionally, one can linger
for a certain period of time.
It is only lingered when the EOF was written before the EOF
is seen on input.
Defaults to linger 60.0
. Set to 0 to turn off.
See the comment for start_reading
for how to get an engine-based
version of this method.
Cancels the shutdown procedure. After that, the state of the
connection is undefined. The when_done
callback is invoked with
the exception Cancelled
.
It is no error if no shutdown is in progress.
Inactivates the connection immediately, and releases any resources the controller is responsible for (e.g. closes file descriptors). Note that this is more than cancelling all pending operations and shutting the connection down. However, the details of this method are implementation-defined. Callbacks are not invoked.
A multiplex_controller
is a quite low-level device to abstract
bidirectional socket connections. It is independent of any real
device.
There can be a reader, a writer (or both), or alternatively, the shutdown process may be in progress. One cannot have more than one reader and more than more writer.
May be raised by multiplex controller methods start_mem_reading
and
start_mem_writing
if these methods are not supported for the kind
of file descriptor
Creates a multiplex controller for a bidirectional socket (e.g. a TCP socket). It is essential that the socket is in connected state. This function also supports Win32 named pipes.
Note that the file descriptor is not closed when the attached engines
are terminated. One can call inactivate
manually to do that.
close_inactive_descr
: Whether inactivate
closes the descriptor.
True by default.
preclose
: This function is called just before the descriptor is
closed.
supports_half_open_connection
: This implementation does not know
how to find out whether the socket supports half-open connections.
You can simply set this boolean because of this. Defaults to false
.
You can set it to true
for TCP connections and for Unix-domain
connections with stream semantics.
timeout
: If set to (t, x)
, a general timeout of t
is set.
When an operation has been started, and there is no I/O activity within
t
seconds, neither by the started operation nor by another operation,
the connection times out. In this case, the operation returns the
exception x
.
Additional methods for unconnected datagram handling
Creates a multiplex controller for datagram sockets (e.g. UDP socket).
Note that the file descriptor is not closed when the attached engines
are terminated. One can call inactivate
manually to do that.
close_inactive_descr
: Whether inactivate
closes the descriptor.
True by default.
preclose
: This function is called just before the descriptor is
closed.
timeout
: If set to (t, x)
, a general timeout of t
is set.
When an operation has been started, and there is no I/O activity within
t
seconds, neither by the started operation nor by another operation,
the connection times out. In this case, the operation returns the
exception x
.
See class output_async_mplex
for explanations
See class input_async_mplex
for explanations
Creates an asynchronous output channel writing to the multiplex
controller (see also output_async_descr
for the corresponding
class writing to a single descriptor).
onclose
: What to do when the close_out
method is invoked.
Defaults to `Ignore
. `Write_eof
means to write the EOF marker.
Anyway, after doing the close action, the multiplex controller
is shutdown.
onshutdown
: What to do when all data (and optionally, the EOF marker)
have been written. It is also invoked in case of I/O errors.
The default is `Ignore
. The value `Initiate_shutdown
means that
it is started to shutdown the socket. The success of this action
is not waited upon, however. One can also pass `Action f
in which
case the function f
is called with this object, the
multiplex controller, and the proposed next state as arguments.
By checking the proposed next state the function can see why the
shutdown function was called.
buffer_size
: The size of the internal buffer. By default unlimited.
Note that the engine is done when the output channel is closed.
The socket is not shut down, and the underlying file descriptor
is not closed! You can define the shutdown
callback to do something
in this case.
Creates an asynchronous input channel reading from the multiplex controller.
onshutdown
: See output_async_mplex
.
buffer_size
: The size of the internal buffer. By default unlimited.
Note that the engine is done when the input channel is closed.
The socket is not shut down, and the underlying file descriptor
is not closed! You can define the shutdown
callback to do something
in this case.
When programming with engines, it is normal to use recursion for any kind of loops. For example, to read the lines from a file:
open Uq_engines.Operators (* for ">>" and "++" *)
let fd =
Unix.openfile filename [Unix.O_RDONLY] 0 in
let d =
`Buffer_in(Uq_io.create_in_buffer(`Polldescr(`Read_write,fd,esys))) in
let rec read_lines acc =
Uq_io.input_line_e d >>
(function (* catch exception End_of_file *)
| `Done line -> `Done(Some line)
| `Error End_of_file -> `Done None
| `Error error -> `Error error
| `Aborted -> `Aborted
) ++
(function
| Some line ->
read_lines (line :: acc)
| None ->
eps_e (`Done (List.rev acc)) esys
) in
let e = read_lines []
There is generally the question whether this style leads to stack overflows. This depends on the mechanisms that come into play:
In this example, this means that only the engine mechanism is used as long as the data is read from the buffer. When the buffer needs to be refilled, however, control is passed back to the event queue (so the stack is cleaned), and the continuation of the execution is only managed via closures (which only allocate memory on the heap, not on the stack). Usually, this is a good compromise: The engine mechnism is a lot faster, but I/O is an indicator for using the better but slower technique.
Also note another difference: The event queue mechanism allows that other asynchronous code attached to the same event queue may run (control maybe yielded to unrelated execution contexts). The pure engine mechanism does not allow that. This may be handy when exclusive access to variables is needed. (But be careful here - this is very sensitive to minimal changes of the implementation.)
Certain engines enforce using the event queue mechanisms although they are unrelated to I/O. Especially Uq_engines.delay_engine is useful here: A "delay" of 0 seconds is already sufficient to go back to the event queue. If recursions sometimes lead to stack overflows the solution is to include such a zero delay before doing the self call.
Pointers to other modules related to engines:
`Done r
, and r
is the result of the remote call.