stomp.pl -- STOMP client.
This module provides a STOMP (Simple (or Streaming) Text Orientated Messaging Protocol) client. This client is based on work by Hongxin Liang. The current version is a major rewrite, both changing the API and the low-level STOMP frame (de)serialization.
The predicate stomp_connection/5 is used to register a connection. The connection is established by stomp_connect/1, which is lazily called from any of the predicates that send a STOMP frame. After establishing the connection two threads are created. One receives STOMP frames and the other manages and watches the heart beat.
Threading
Upon receiving a frame the callback registered with stomp_connection/5 is called in the context of the receiving thread. More demanding applications may decide to send incomming frames to a SWI-Prolog message queue and have one or more worker threads processing the input. Alternatively, frames may be inspected by the receiving thread and either processed immediately or be dispatched to either new or running threads. The best scenario depends on the message rate, processing time and concurrency of the Prolog application.
All message sending predicates of this library are thread safe. If two threads send a frame to the same connection the library ensures that both frames are properly serialized.
Reconnecting
By default this library tries to establish the connection and the user
gets notified by means of a disconnected
pseudo frame that the
connection is lost. Using the Options argument of stomp_connection/6 the
system can be configured to try and keep connecting if the server is not
available and reconnect if the connection is lost. See the pong.pl
example.
- stomp_connection(+Address, +Host, +Headers, :Callback, -Connection) is det
- stomp_connection(+Address, +Host, +Headers, :Callback, -Connection, +Options) is det
- Create a connection reference. The connection is not set up yet by
this predicate. Callback is called on any received frame except for
heart beat frames as below.
call(Callback, Command, Connection, Header, Body)
Where command is one of the commands below. Header is a dict holding the STOMP frame header, where all values are strings except for the
'content-length'
key value which is passed as an integer.Body is a string or, if the data is of the type
application/json
, a dict.- connected
- A connection was established. Connection and Header are valid.
- disconnected
- The connection was lost. Only Connection is valid.
- message
- A message arrived. All three arguments are valid. Body is
a dict if the
content-type
of the message isapplication/json
and a string otherwise. - heartbeat
- A heartbeat was received. Only Connection is valid. This callback is also called for each newline that follows a frame. These newlines can be a heartbeat, but can also be additional newlines that follow a frame.
- heartbeat_timeout
- No heartbeat was received. Only Connection is valid.
- error
- An error happened. All three arguments are valid and handled
as
message
.
Note that stomp_teardown/1 causes the receiving and heartbeat thread to be signalled with abort/0.
Options processed:
- reconnect(+Bool)
- Try to reestablish the connection to the server if the
connection is lost. Default is
false
- connect_timeout(+Seconds)
- Maximum time to try and reestablish a connection. The
default is
600
(10 minutes). - json_options(+Options)
- Options passed to json_read_dict/3 to translate
application/json
content to Prolog. Default is[]
.
- stomp_connection_property(?Connection, ?Property) is nondet
- True when Property, is a property of Connection. Defined properties
are:
- address(Address)
- callback(Callback)
- host(Host)
- headers(Headers)
- reconnect(Bool)
- connect_timeout(Seconds)
- All the above properties result from the stomp_connection/6 registration.
- receiver_thread_id(Thread)
- stream(Stream)
- heartbeat_thread_id(Thread)
- received_heartbeat(TimeStamp)
- These describe an active STOMP connection.
- stomp_destroy_connection(+Connection)
- Destroy a connection. If it is active, first use stomp_teardown/1 to disconnect.
- stomp_setup(+Connection, +Options) is det
- Set up the actual socket connection and start receiving thread. This
is a no-op if the connection has already been created. The Options
processed are below. Other options are passed to tcp_connect/3.
- timeout(+Seconds)
- If tcp_connect/3 fails, retry until the timeout is reached.
If Seconds is
inf
orinfinite
, keep retrying forever.
- connect(+Connection, +Address, -Stream, +Options) is det[private]
- Connect to Address. If the option
timeout(Sec)
is present, retry the connection until the timeout is reached. - stomp_teardown(+Connection) is semidet
- Tear down the socket connection, stop receiving thread and heartbeat thread (if applicable). The registration of the connection as created by stomp_connection/5 is preserved and the connection may be reconnected using stomp_connect/1.
- stomp_reconnect(+Connection) is det
- Teardown the connection and try to reconnect.
- stomp_connect(+Connection) is det
- stomp_connect(+Connection, +Options) is det
- Setup the connection. First ensures a socket connection and if this
is new send a
CONNECT
frame. Protocol version and heartbeat negotiation will be handled.STOMP
frame is not used for backward compatibility.This predicate waits for the connection handshake to have completed. Currently it waits for a maximum of 10 seconds after establishing the socket for the server reply.
Calling this on an established connection has no effect.
- stomp_deadline(+Connection, -Deadline, +Options) is det[private]
- True when there is a connection timeout and Deadline is the deadline
for establishing a connection. Deadline is one of
- Number
- The deadline as a time stamp
- infinite
- Keep trying
- once
- Try to connect once.
- stomp_send(+Connection, +Destination, +Headers, +Body) is det
- Send a
SEND
frame. Ifcontent-type
is not provided,text/plain
will be used.content-length
will be filled in automatically. - stomp_send_json(+Connection, +Destination, +Headers, +JSON) is det
- Send a
SEND
frame.JSON
can be either a JSON term or a dict.content-type
is filled in automatically asapplication/json
andcontent-length
will be filled in automatically as well. - stomp_subscribe(+Connection, +Destination, +Id, +Headers) is det
- Send a
SUBSCRIBE
frame. - stomp_unsubscribe(+Connection, +Id) is det
- Send an
UNSUBSCRIBE
frame. - stomp_ack(+Connection, +MessageId, +Headers) is det
- Send an
ACK
frame. See stomp_ack/2 for simply passing the header received with the message we acknowledge. - stomp_nack(+Connection, +MessageId, +Headers) is det
- Send a
NACK
frame. See stomp_nack/2 for simply passing the header received with the message we acknowledge. - stomp_ack(+Connection, +MsgHeader) is det
- stomp_nack(+Connection, +MsgHeader) is det
- Reply with an ACK or NACK based on the received message header. On a
STOMP 1.1 request we get an
ack
field in the header and reply with anid
. For STOMP 1.2 we reply with themessage-id
andsubscription
that we received with the message. - stomp_begin(+Connection, +Transaction) is det
- Send a
BEGIN
frame. @see http://stomp.github.io/stomp-specification-1.2.html#BEGIN - stomp_commit(+Connection, +Transaction) is det
- Send a
COMMIT
frame. - stomp_abort(+Connection, +Transaction) is det
- Send a
ABORT
frame. - stomp_transaction(+Connection, :Goal) is semidet
- Run Goal as once/1, tagging all
SEND
messages inside the transaction with the transaction id. If Goal fails or raises an exception the transaction is aborted. Failure or exceptions cause the transaction to be aborted using stomp_abort/2, after which the result is forwarded. - stomp_disconnect(+Connection, +Headers) is det
- Send a
DISCONNECT
frame. If the connection has thereconnect
property set totrue
, this will be reset todisconnected
to avoid reconnecting. A subsequent stomp_connect/2 resets the reconnect status. - send_frame(+Connection, +Command, +Headers) is det[private]
- send_frame(+Connection, +Command, +Headers, +Body) is det[private]
- Send a frame. If no connection is established connect first. If the
sending results in an I/O error, disconnect, reconnect and try again
if the
reconnect
propertys is set. - connection_stream(+Connection, -Stream)[private]
- read_frame(+Connection, +Stream, -Frame) is det[private]
- Read a frame from a STOMP stream. On end-of-file, Frame is unified
with the atom
end_of_file
. Otherwise it is a dict holding thecmd
,headers
(another dict) andbody
(a string) - read_content(+Connection, +Stream, +Header, -Content) is det[private]
- Read the body. Note that the body may be followed by an arbitrary number of newlines. We leave them in place to avoid blocking.
- receive(+Connection, +Stream) is det[private]
- Read and process incoming messages from Stream.
- receive_done(+Connection, +Why)[private]
- The receiver thread needs to close the connection due to reading end-of-file, an I/O error or failure to parse a frame. If connection is configured to be restarted this thread will try to reestablish the connection. After reestablishing the connection this receiver thread terminates.
- process_frame(+Connection, +Frame) is det[private]
- Process an incoming frame.
- notify(+Connection, +FrameType) is det[private]
- notify(+Connection, +FrameType, +Header) is det[private]
- notify(+Connection, +FrameType, +Header, +Body) is det[private]
- Call the callback using FrameType.
Undocumented predicates
The following predicates are exported, but not or incorrectly documented.