1/* Part of SWI-Prolog 2 3 Author: Hongxin Liang and Jan Wielemaker 4 E-mail: jan@swi-prolog.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2021, SWI-Prolog Solutions b.v. 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(stomp, 36 [ stomp_connection/5, % +Address, +Host, +Headers, 37 % :Callback, -Connection 38 stomp_connection/6, % +Address, +Host, +Headers, 39 % :Callback, -Connection, +Options 40 stomp_connection_property/2, % ?Connection, ?Property 41 stomp_destroy_connection/1, % +Connection 42 stomp_connect/1, % +Connection 43 stomp_connect/2, % +Connection, +Options 44 stomp_teardown/1, % +Connection 45 stomp_reconnect/1, % +Connection 46 stomp_send/4, % +Connection, +Destination, +Headers, +Body 47 stomp_send_json/4, % +Connection, +Destination, +Headers, +JSON 48 stomp_subscribe/4, % +Connection, +Destination, +Id, +Headers 49 stomp_unsubscribe/2, % +Connection, +Id 50 stomp_ack/2, % +Connection, +MsgHeaders 51 stomp_nack/2, % +Connection, +MsgHeaders 52 stomp_ack/3, % +Connection, +MessageId, +Headers 53 stomp_nack/3, % +Connection, +MessageId, +Headers 54 stomp_transaction/2, % +Connection, :Goal 55 stomp_disconnect/2, % +Connection, +Headers 56 % Low level predicates 57 stomp_begin/2, % +Connection, +Transaction 58 stomp_commit/2, % +Connection, +Transaction 59 stomp_abort/2, % +Connection, +Transaction 60 stomp_setup/2 % +Connection, +Options 61 ]).
107:- meta_predicate 108 stomp_connection( , , , , ), 109 stomp_connection( , , , , , ), 110 stomp_transaction( , ). 111 112:- use_module(library(apply)). 113:- use_module(library(debug)). 114:- use_module(library(error)). 115:- use_module(library(gensym)). 116:- use_module(library(http/http_stream)). 117:- use_module(library(http/json)). 118:- use_module(library(readutil)). 119:- use_module(library(socket)). 120:- use_module(library(uuid)). 121:- use_module(library(lists)). 122:- use_module(library(option)). 123:- use_module(library(time)). 124 125:- dynamic 126 connection_property/3.
call(Callback, Command, Connection, Header, Body)
Where command is one of the commands below. Header is a dict
holding the STOMP frame header, where all values are strings except
for the 'content-length'
key value which is passed as an integer.
Body is a string or, if the data is of the type
application/json
, a dict.
content-type
of the message is
application/json
and a string otherwise.message
.Note that stomp_teardown/1 causes the receiving and heartbeat thread to be signalled with abort/0.
Options processed:
false
600
(10 minutes).application/json
content to Prolog. Default is []
.190stomp_connection(Address, Host, Headers, Callback, Connection) :- 191 stomp_connection(Address, Host, Headers, Callback, Connection, []). 192 193stomp_connection(Address, Host, Headers, Callback, Connection, Options) :- 194 option(reconnect(Reconnect), Options, false), 195 option(connect_timeout(Timeout), Options, 600), 196 option(json_options(JSONOptions), Options, []), 197 valid_address(Address), 198 must_be(atom, Host), 199 must_be(dict, Headers), 200 must_be(callable, Callback), 201 uuid(Connection), 202 retractall(connection_property(Connection, _, _)), 203 update_connection_mapping( 204 Connection, 205 _{ address: Address, 206 callback: Callback, 207 host: Host, 208 headers: Headers, 209 reconnect: Reconnect, 210 connect_timeout: Timeout, 211 json_options: JSONOptions 212 }). 213 214valid_address(Host:Port) :- 215 !, 216 must_be(atom, Host), 217 must_be(integer, Port). 218valid_address(Address) :- 219 type_error(stom_address, Address). 220 221connection_property(address). 222connection_property(callback). 223connection_property(host). 224connection_property(headers). 225connection_property(reconnect). 226connection_property(connect_timeout).
247stomp_connection_property(Connection, Property) :- 248 var(Property), 249 !, 250 connection_property(Connection, Name, Value), 251 Property =.. [Name,Value]. 252stomp_connection_property(Connection, Property) :- 253 must_be(compound, Property), 254 Property =.. [Name,Value], 255 query_connection_property(Connection, Name, Value).
262stomp_destroy_connection(Connection) :-
263 must_be(ground, Connection),
264 ( query_connection_property(Connection, address, _)
265 -> stomp_teardown(Connection),
266 retractall(connection_property(Connection, _, _))
267 ; existence_error(stomp_connection, Connection)
268 ).
inf
or infinite
, keep retrying forever.280stomp_setup(Connection, Options) :- 281 stomp_setup(Connection, _New, Options). 282 283stomp_setup(Connection, false, _) :- 284 query_connection_property(Connection, stream, _Stream), 285 !. 286stomp_setup(Connection, New, Options) :- 287 with_mutex(stomp, stomp_setup_guarded(Connection, New, Options)). 288 289stomp_setup_guarded(Connection, false, _) :- 290 query_connection_property(Connection, stream, _Stream), 291 !. 292stomp_setup_guarded(Connection, true, Options) :- 293 query_connection_property(Connection, address, Address), 294 connect(Connection, Address, Stream, Options), 295 set_stream(Stream, encoding(utf8)), 296 gensym(stomp_receive, Alias), 297 thread_create(receive(Connection, Stream), ReceiverThreadId, [alias(Alias)]), 298 debug(stomp(connection), 'Handling input on thread ~p', [ReceiverThreadId]), 299 update_connection_mapping(Connection, 300 _{ receiver_thread_id: ReceiverThreadId, 301 stream:Stream 302 }).
timeout(Sec)
is present, retry the
connection until the timeout is reached.309connect(Connection, Address, Stream, Options) :- 310 stomp_deadline(Connection, Deadline, Options), 311 connect_with_deadline(Connection, Address, Stream, Deadline, Options). 312 313connect_with_deadline(_Connection, Address, Stream, once, Options) :- 314 !, 315 tcp_connect(Address, Stream, Options). 316connect_with_deadline(Connection, Address, Stream, Deadline, Options) :- 317 number(Deadline), 318 !, 319 between(0, infinite, Retry), 320 get_time(Now), 321 Timeout is Deadline-Now, 322 ( Now > 0 323 -> ( catch(call_with_time_limit( 324 Timeout, 325 tcp_connect(Address, Stream, Options)), 326 Error, 327 true) 328 -> ( var(Error) 329 -> ! 330 ; ( debugging(stomp(connection)) 331 -> print_message(warning, Error) 332 ; true 333 ), 334 wait_retry(Connection, Error, Retry, Deadline) 335 ) 336 ; wait_retry(Connection, failed, Retry, Deadline) 337 ) 338 ; throw(stomp_error(connect, Connection, timeout)) 339 ). 340connect_with_deadline(Connection, Address, Stream, Deadline, Options) :- 341 between(0, infinite, Retry), 342 Error = error(Formal, _), 343 ( catch(tcp_connect(Address, Stream, Options), 344 Error, 345 true) 346 -> ( var(Formal) 347 -> ! 348 ; ( debugging(stomp(connection)) 349 -> print_message(warning, Error) 350 ; true 351 ), 352 wait_retry(Connection, Formal, Retry, Deadline) 353 ) 354 ; wait_retry(Connection, failed, Retry, Deadline) 355 ). 356 357wait_retry(Connection, Why, _Retry, _Deadline) :- 358 Why = error(stomp_error(connect, Connection, error(_)), _), 359 !, 360 throw(Why). 361wait_retry(Connection, _Why, Retry, Deadline) :- 362 Wait0 is min(10, 0.1 * (1<<Retry)), 363 ( number(Deadline) 364 -> get_time(Now), 365 Wait is min(Deadline-Now, Wait0) 366 ; Wait = Wait0 367 ), 368 ( Wait > 0 369 -> sleep(Wait), 370 fail 371 ; throw(error(stomp_error(connect, Connection, timeout), _)) 372 ).
382stomp_teardown(Connection) :- 383 terminate_helper(Connection, receiver_thread_id), 384 terminate_helper(Connection, heartbeat_thread_id), 385 forall(query_connection_property(Connection, stream, Stream), 386 close(Stream, [force(true)])), 387 debug(stomp(connection), 'retract connection mapping for ~p', [Connection]), 388 reset_connection_properties(Connection). 389 390terminate_helper(Connection, Helper) :- 391 retract(connection_property(Connection, Helper, Thread)), 392 \+ thread_self(Thread), 393 catch(thread_signal(Thread, abort), error(_,_), fail), 394 !, 395 thread_join(Thread, _Status). 396terminate_helper(_, _). 397 398reset_connection_properties(Connection) :- 399 findall(P, 400 ( query_connection_property(Connection, P, _), 401 \+ connection_property(P) 402 ), Ps), 403 forall(member(P, Ps), 404 retractall(connection_property(Connection, P, _))).
410stomp_reconnect(Connection) :-
411 stomp_teardown(Connection),
412 stomp_connect(Connection).
CONNECT
frame. Protocol version and heartbeat
negotiation will be handled. STOMP
frame is not used for
backward compatibility.
This predicate waits for the connection handshake to have completed. Currently it waits for a maximum of 10 seconds after establishing the socket for the server reply.
Calling this on an established connection has no effect.
432stomp_connect(Connection) :- 433 stomp_connect(Connection, []). 434 435stomp_connect(Connection, Options) :- 436 update_reconnect_property(Connection), 437 stomp_deadline(Connection, Deadline, Options), 438 stomp_deadline_connect(Connection, Deadline, Options). 439 440update_reconnect_property(Connection) :- 441 query_connection_property(Connection, reconnect, disconnected), 442 !, 443 update_connection_property(Connection, reconnect, true). 444update_reconnect_property(_). 445 446stomp_deadline_connect(Connection, Deadline, Options) :- 447 between(0, infinite, Retry), 448 stomp_setup(Connection, New, [deadline(Deadline)|Options]), 449 ( New == true 450 -> Error = error(Formal, _), 451 catch(connect_handshake(Connection), Error, true), 452 ( var(Formal) 453 -> ! 454 ; stomp_teardown(Connection), 455 wait_retry(Connection, Error, Retry, Deadline) 456 ) 457 ; query_connection_property(Connection, connected, _) 458 -> true 459 ; wait_connected(Connection) 460 -> true 461 ; stomp_teardown(Connection), 462 wait_retry(Connection, failed, Retry, Deadline) 463 ). 464 465connect_handshake(Connection) :- 466 query_connection_property(Connection, headers, Headers), 467 query_connection_property(Connection, host, Host), 468 send_frame(Connection, 469 connect, 470 Headers.put(_{ 'accept-version':'1.0,1.1,1.2', 471 host:Host 472 })), 473 ( Heartbeat = Headers.get('heart-beat') 474 -> update_connection_property(Connection, 'heart-beat', Heartbeat) 475 ; true 476 ), 477 thread_self(Self), 478 update_connection_property(Connection, waiting_thread, Self), 479 ( thread_get_message(Self, stomp(connected(Connection, Status)), 480 [timeout(10)]) 481 -> ( Status == true 482 -> get_time(Now), 483 update_connection_property(Connection, connected, Now) 484 ; throw(error(stomp_error(connect, Connection, Status), _)) 485 ) 486 ; throw(error(stomp_error(connect, Connection, timeout), _)) 487 ). 488 489wait_connected(Connection) :- 490 thread_wait(query_connection_property(Connection, connected, _), 491 [ timeout(10), 492 wait_preds([connection_property/3]) 493 ]), 494 !. 495wait_connected(Connection) :- 496 throw(error(stomp_error(connect, Connection, timeout), _)).
510stomp_deadline(_Connection, Deadline, Options) :- 511 option(deadline(Deadline), Options), 512 !. 513stomp_deadline(Connection, Deadline, Options) :- 514 ( option(timeout(Time), Options) 515 ; query_connection_property(Connection, connect_timeout, Time) 516 ), 517 !, 518 ( number(Time) 519 -> get_time(Now), 520 Deadline is Now+Time 521 ; must_be(oneof([inf,infinite]), Time), 522 Deadline = infinite 523 ). 524stomp_deadline(_, once, _).
SEND
frame. If content-type
is not provided,
text/plain
will be used. content-length
will be filled in
automatically.
535stomp_send(Connection, Destination, Headers, Body) :-
536 add_transaction(Headers, Headers1),
537 send_frame(Connection, send, Headers1.put(destination, Destination), Body).
SEND
frame. JSON
can be either a JSON term or a dict.
content-type
is filled in automatically as application/json
and content-length
will be filled in automatically as well.
547stomp_send_json(Connection, Destination, Headers, JSON) :-
548 add_transaction(Headers, Headers1),
549 atom_json_term(Body, JSON,
550 [ as(string),
551 width(0) % No layout for speed
552 ]),
553 send_frame(Connection, send,
554 Headers1.put(_{ destination:Destination,
555 'content-type':'application/json'
556 }),
557 Body).
SUBSCRIBE
frame.
565stomp_subscribe(Connection, Destination, Id, Headers) :-
566 send_frame(Connection, subscribe,
567 Headers.put(_{destination:Destination, id:Id})).
UNSUBSCRIBE
frame.
575stomp_unsubscribe(Connection, Id) :-
576 send_frame(Connection, unsubscribe, _{id:Id}).
ACK
frame. See stomp_ack/2 for simply passing the header
received with the message we acknowledge.
585stomp_ack(Connection, MessageId, Headers) :-
586 send_frame(Connection, ack, Headers.put('message-id', MessageId)).
NACK
frame. See stomp_nack/2 for simply passing the
header received with the message we acknowledge.
595stomp_nack(Connection, MessageId, Headers) :-
596 send_frame(Connection, nack, Headers.put('message-id', MessageId)).
ack
field in the header and reply with
an id
. For STOMP 1.2 we reply with the message-id
and
subscription
that we received with the message.606stomp_ack(Connection, Header) :- 607 stomp_ack_nack(Connection, ack, Header). 608 609stomp_nack(Connection, Header) :- 610 stomp_ack_nack(Connection, nack, Header). 611 612stomp_ack_nack(Connection, Type, Header) :- 613 ( Id = Header.get(ack) 614 -> send_frame(Connection, Type, _{id: Id}) 615 ; Pass = _{'message-id':_, subscription:_}, 616 Pass :< Header 617 -> send_frame(Connection, Type, Pass) 618 ).
BEGIN
frame.
@see http://stomp.github.io/stomp-specification-1.2.html#BEGIN
626stomp_begin(Connection, Transaction) :-
627 send_frame(Connection, begin, _{transaction:Transaction}).
COMMIT
frame.
635stomp_commit(Connection, Transaction) :-
636 send_frame(Connection, commit, _{transaction:Transaction}).
ABORT
frame.
644stomp_abort(Connection, Transaction) :-
645 send_frame(Connection, abort, _{transaction:Transaction}).
SEND
messages inside the
transaction with the transaction id. If Goal fails or raises an
exception the transaction is aborted. Failure or exceptions cause
the transaction to be aborted using stomp_abort/2, after which the
result is forwarded.655stomp_transaction(Connection, Goal) :- 656 uuid(TransactionID), 657 stomp_transaction(Connection, Goal, TransactionID). 658 659stomp_transaction(Connection, Goal, TransactionID) :- 660 stomp_begin(Connection, TransactionID), 661 ( catch(once(Goal), Error, true) 662 -> ( var(Error) 663 -> stomp_commit(Connection, TransactionID) 664 ; stomp_abort(Connection, TransactionID), 665 throw(Error) 666 ) 667 ; stomp_abort(Connection, TransactionID), 668 fail 669 ). 670 671in_transaction(TransactionID) :- 672 prolog_current_frame(Frame), 673 prolog_frame_attribute( 674 Frame, parent_goal, 675 stomp_transaction(_Connection, _Goal, TransactionID)). 676 677add_transaction(Headers, Headers1) :- 678 in_transaction(TransactionID), 679 !, 680 Headers1 = Headers.put(transaction, TransactionID). 681add_transaction(Headers, Headers).
DISCONNECT
frame. If the connection has the reconnect
property set to true
, this will be reset to disconnected
to
avoid reconnecting. A subsequent stomp_connect/2 resets the
reconnect status.
693stomp_disconnect(Connection, Headers) :-
694 ( query_connection_property(Connection, reconnect, true)
695 -> update_connection_property(Connection, reconnect, disconnected)
696 ; true
697 ),
698 send_frame(Connection, disconnect, Headers).
reconnect
propertys is set.707send_frame(Connection, Command, Headers) :- 708 send_frame(Connection, Command, Headers, ""). 709 710send_frame(Connection, Command, Headers, Body) :- 711 Error = error(Formal,_), 712 catch(send_frame_guarded(Connection, Command, Headers, Body), 713 Error, 714 true), 715 ( var(Formal) 716 -> true 717 ; query_connection_property(Connection, reconnect, true) 718 -> notify(Connection, disconnected), 719 stomp_teardown(Connection), 720 debug(stomp(connection), 'Sending thread reconnects', []), 721 send_frame(Connection, Command, Headers, Body) 722 ; notify(Connection, disconnected), 723 throw(Error) 724 ). 725 726send_frame_guarded(Connection, Command, Headers, Body) :- 727 has_body(Command), 728 !, 729 connection_stream(Connection, Stream), 730 default_content_type('text/plain', Headers, Headers1), 731 body_bytes(Body, ContentLength), 732 Headers2 = Headers1.put('content-length', ContentLength), 733 with_output_to(Stream, 734 ( send_command(Stream, Command), 735 send_header(Stream, Headers2), 736 format(Stream, '~w\u0000\n', [Body]), 737 flush_output(Stream))). 738send_frame_guarded(Connection, heartbeat, _Headers, _Body) :- 739 !, 740 connection_stream(Connection, Stream), 741 nl(Stream), 742 flush_output(Stream). 743send_frame_guarded(Connection, Command, Headers, _Body) :- 744 connection_stream(Connection, Stream), 745 with_output_to(Stream, 746 ( send_command(Stream, Command), 747 send_header(Stream, Headers), 748 format(Stream, '\u0000\n', []), 749 flush_output(Stream))).
753connection_stream(Connection, Stream) :- 754 query_connection_property(Connection, stream, Stream), 755 !. 756connection_stream(Connection, Stream) :- 757 stomp_connect(Connection), 758 query_connection_property(Connection, stream, Stream). 759 760send_command(Stream, Command) :- 761 string_upper(Command, Upper), 762 format(Stream, '~w\n', [Upper]). 763 764send_header(Stream, Headers) :- 765 dict_pairs(Headers, _, Pairs), 766 maplist(send_header_line(Stream), Pairs), 767 nl(Stream). 768 769send_header_line(Stream, Name-Value) :- 770 ( number(Value) 771 -> format(Stream, '~w:~w\n', [Name,Value]) 772 ; escape_value(Value, String), 773 format(Stream, '~w:~w\n', [Name,String]) 774 ). 775 776escape_value(Value, String) :- 777 split_string(Value, "\n:\\", "", [_]), 778 !, 779 String = Value. 780escape_value(Value, String) :- 781 string_codes(Value, Codes), 782 phrase(escape(Codes), Encoded), 783 string_codes(String, Encoded). 784 785escape([]) --> []. 786escape([H|T]) --> esc(H), escape(T). 787 788esc(0'\r) --> !, "\\r". 789esc(0'\n) --> !, "\\n". 790esc(0':) --> !, "\\c". 791esc(0'\\) --> !, "\\\\". 792esc(C) --> [C]. 793 794default_content_type(ContentType, Header0, Header) :- 795 ( get_dict('content-type', Header0, _) 796 -> Header = Header0 797 ; put_dict('content-type', Header0, ContentType, Header) 798 ). 799 800body_bytes(String, Bytes) :- 801 setup_call_cleanup( 802 open_null_stream(Out), 803 ( write(Out, String), 804 byte_count(Out, Bytes) 805 ), 806 close(Out)). 807 808 809 /******************************* 810 * INCOMING DATA * 811 *******************************/
end_of_file
. Otherwise it is a dict holding the
cmd
, headers
(another dict) and body
(a string)819read_frame(Connection, Stream, Frame) :- 820 read_command(Stream, Command), 821 ( Command == end_of_file 822 -> Frame = end_of_file 823 ; Command == heartbeat 824 -> Frame = stomp_frame{cmd:heartbeat} 825 ; read_header(Stream, Header), 826 ( has_body(Command) 827 -> read_content(Connection, Stream, Header, Content), 828 Frame = stomp_frame{cmd:Command, headers:Header, body:Content} 829 ; Frame = stomp_frame{cmd:Command, headers:Header} 830 ) 831 ). 832 833has_body(send). 834has_body(message). 835has_body(error). 836 837read_command(Stream, Command) :- 838 read_line_to_string(Stream, String), 839 debug(stomp(command), 'Got line ~p', [String]), 840 ( String == end_of_file 841 -> Command = end_of_file 842 ; String == "" 843 -> Command = heartbeat 844 ; string_lower(String, Lwr), 845 atom_string(Command, Lwr) 846 ). 847 848read_header(Stream, Header) :- 849 read_header_lines(Stream, Pairs, []), 850 dict_pairs(Header, stomp_header, Pairs). 851 852read_header_lines(Stream, Header, Seen) :- 853 read_line_to_string(Stream, Line), 854 ( Line == "" 855 -> Header = [] 856 ; sub_string(Line, Before, _, After, ":") 857 -> sub_atom(Line, 0, Before, _, Name), 858 ( memberchk(Name, Seen) 859 -> read_header_lines(Stream, Header, Seen) 860 ; sub_string(Line, _, After, 0, Value0), 861 convert_value(Name, Value0, Value), 862 Header = [Name-Value|More], 863 read_header_lines(Stream, More, [Name|Seen]) 864 ) 865 ). 866 867convert_value('content-length', String, Bytes) :- 868 !, 869 number_string(Bytes, String). 870convert_value(_, String, Value) :- 871 unescape_header_value(String, Value). 872 873unescape_header_value(String, Value) :- 874 sub_atom(String, _, _, _, "\\"), 875 !, 876 string_codes(String, Codes), 877 phrase(unescape(Plain), Codes), 878 string_codes(Value, Plain). 879unescape_header_value(String, String). 880 881unescape([H|T]) --> "\\", !, unesc(H), unescape(T). 882unescape([H|T]) --> [H], !, unescape(T). 883unescape([]) --> []. 884 885unesc(0'\r) --> "r", !. 886unesc(0'\n) --> "n", !. 887unesc(0':) --> "c", !. 888unesc(0'\\) --> "\\", !. 889unesc(_) --> [C], { syntax_error(invalid_stomp_escape(C)) }.
896read_content(Connection, Stream, Header, Content) :- 897 _{ 'content-length':Bytes, 898 'content-type':Type 899 } :< Header, 900 setup_call_cleanup( 901 stream_range_open(Stream, DataStream, [size(Bytes)]), 902 read_content(Connection, Type, DataStream, Header, Content), 903 close(DataStream)). 904 905read_content(Connection, "application/json", Stream, _Header, Content) :- 906 !, 907 query_connection_property(Connection, json_options, Options), 908 json_read_dict(Stream, Content, Options). 909read_content(_Connection, _Type, Stream, _Header, Content) :- 910 read_string(Stream, _, Content).
917receive(Connection, Stream) :- 918 E = error(Formal, _), 919 catch(read_frame(Connection, Stream, Frame), E, true), 920 !, 921 ( var(Formal) 922 -> debug(stomp(receive), 'received frame ~p', [Frame]), 923 ( Frame == end_of_file 924 -> receive_done(Connection, end_of_file) 925 ; process_frame(Connection, Frame), 926 receive(Connection, Stream) 927 ) 928 ; receive_done(Connection, E) 929 ). 930receive(Connection, _Stream) :- 931 receive_done(Connection, "failed to read frame").
941receive_done(Connection, Why) :-
942 ( Why = error(_,_)
943 -> print_message(warning, Why)
944 ; true
945 ),
946 notify(Connection, disconnected),
947 stomp_teardown(Connection),
948 ( query_connection_property(Connection, reconnect, true)
949 -> debug(stomp(connection), 'Receiver thread reconnects (~p)', [Why]),
950 stomp_connect(Connection)
951 ; debug(stomp(connection), 'Receiver thread terminates (~p)', [Why])
952 ),
953 thread_self(Me),
954 thread_detach(Me).
960process_frame(Connection, Frame) :- 961 Frame.cmd = heartbeat, !, 962 get_time(Now), 963 debug(stomp(heartbeat), 'received heartbeat at ~w', [Now]), 964 update_connection_property(Connection, received_heartbeat, Now), 965 notify(Connection, heartbeat, _{}, ""). 966process_frame(Connection, Frame) :- 967 _{cmd:FrameType, headers:Headers, body:Body} :< Frame, 968 !, 969 process_connect(FrameType, Connection, Frame), 970 notify(Connection, FrameType, Headers, Body). 971process_frame(Connection, Frame) :- 972 _{cmd:FrameType, headers:Headers} :< Frame, 973 process_connect(FrameType, Connection, Frame), 974 notify(Connection, FrameType, Headers). 975 976process_connect(connected, Connection, Frame) :- 977 retract(connection_property(Connection, waiting_thread, Waiting)), 978 !, 979 thread_send_message(Waiting, stomp(connected(Connection, true))), 980 start_heartbeat_if_required(Connection, Frame.headers). 981process_connect(error, Connection, Frame) :- 982 retract(connection_property(Connection, waiting_thread, Waiting)), 983 !, 984 thread_send_message( 985 Waiting, 986 stomp(connected(Connection, error(Frame.body)))). 987process_connect(_, _, _). 988 989start_heartbeat_if_required(Connection, Headers) :- 990 ( query_connection_property(Connection, 'heart-beat', CHB), 991 SHB = Headers.get('heart-beat') 992 -> start_heartbeat(Connection, CHB, SHB) 993 ; true 994 ). 995 996start_heartbeat(Connection, CHB, SHB) :- 997 extract_heartbeats(CHB, CX, CY), 998 extract_heartbeats(SHB, SX, SY), 999 calculate_heartbeats(CX-CY, SX-SY, X-Y), 1000 \+ (X =:= 0, Y =:= 0), 1001 !, 1002 debug(stomp(heartbeat), 'calculated heartbeats are ~p,~p', [X, Y]), 1003 SendSleep is X / 1000, 1004 ReceiveSleep is Y / 1000 + 2, 1005 ( X =:= 0 1006 -> SleepTime = ReceiveSleep 1007 ; ( Y =:= 0 1008 -> SleepTime = SendSleep 1009 ; SleepTime is gcd(X, Y) / 2000 1010 ) 1011 ), 1012 get_time(Now), 1013 gensym(stomp_heartbeat, Alias), 1014 debug(stomp(heartbeat), 'Creating heartbeat thread (~p ~p ~p)', 1015 [SendSleep, ReceiveSleep, SleepTime]), 1016 thread_create(heartbeat_loop(Connection, SendSleep, ReceiveSleep, 1017 SleepTime, Now, Now), 1018 HeartbeatThreadId, [alias(Alias)]), 1019 update_connection_mapping(Connection, 1020 _{ heartbeat_thread_id:HeartbeatThreadId, 1021 received_heartbeat:Now 1022 }). 1023start_heartbeat(_, _, _). 1024 1025extract_heartbeats(Heartbeat, X, Y) :- 1026 split_string(Heartbeat, ",", " ", [XS, YS]), 1027 number_string(X, XS), 1028 number_string(Y, YS). 1029 1030calculate_heartbeats(CX-CY, SX-SY, X-Y) :- 1031 ( CX =\= 0, SY =\= 0 1032 -> X is max(CX, floor(SY)) 1033 ; X = 0 1034 ), 1035 ( CY =\= 0, SX =\= 0 1036 -> Y is max(CY, floor(SX)) 1037 ; Y = 0 1038 ). 1039 1040heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime, 1041 SendTime, ReceiveTime) :- 1042 sleep(SleepTime), 1043 get_time(Now), 1044 ( Now - SendTime > SendSleep 1045 -> SendTime1 = Now, 1046 debug(stomp(heartbeat), 'sending a heartbeat message at ~p', [Now]), 1047 send_frame(Connection, heartbeat, _{}) 1048 ; SendTime1 = SendTime 1049 ), 1050 ( Now - ReceiveTime > ReceiveSleep 1051 -> ReceiveTime1 = Now, 1052 check_receive_heartbeat(Connection, Now, ReceiveSleep) 1053 ; ReceiveTime1 = ReceiveTime 1054 ), 1055 heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime, 1056 SendTime1, ReceiveTime1). 1057 1058check_receive_heartbeat(Connection, Now, ReceiveSleep) :- 1059 query_connection_property(Connection, received_heartbeat, ReceivedHeartbeat), 1060 DiffReceive is Now - ReceivedHeartbeat, 1061 ( DiffReceive > ReceiveSleep 1062 -> debug(stomp(heartbeat), 1063 'Heartbeat timeout: diff_receive=~p, time=~p, lastrec=~p', 1064 [DiffReceive, Now, ReceivedHeartbeat]), 1065 notify(Connection, heartbeat_timeout) 1066 ; true 1067 ).
1075notify(Connection, FrameType) :- 1076 notify(Connection, FrameType, stomp_header{cmd:FrameType}, ""). 1077 1078notify(Connection, FrameType, Header) :- 1079 notify(Connection, FrameType, Header, ""). 1080 1081notify(Connection, FrameType, Header, Body) :- 1082 query_connection_property(Connection, callback, Callback), 1083 Error = error(Formal,_), 1084 ( catch_with_backtrace( 1085 call(Callback, FrameType, Connection, Header, Body), 1086 Error, true) 1087 -> ( var(Formal) 1088 -> true 1089 ; print_message(warning, Error) 1090 ) 1091 ; true 1092 ). 1093 1094update_connection_mapping(Connection, Dict) :- 1095 dict_pairs(Dict, _, Pairs), 1096 maplist(update_connection_property(Connection), Pairs). 1097 1098update_connection_property(Connection, Name-Value) :- 1099 update_connection_property(Connection, Name, Value). 1100 1101update_connection_property(Connection, Name, Value) :- 1102 transaction(update_connection_property_(Connection, Name, Value)). 1103 1104update_connection_property_(Connection, Name, Value) :- 1105 retractall(connection_property(Connection, Name, _)), 1106 asserta(connection_property(Connection, Name, Value)). 1107 1108query_connection_property(Connection, Name, Value) :- 1109 ( nonvar(Name), 1110 nonvar(Connection) 1111 -> connection_property(Connection, Name, Value), 1112 ! 1113 ; connection_property(Connection, Name, Value) 1114 ). 1115 1116 1117 /******************************* 1118 * MESSAGES * 1119 *******************************/ 1120 1121:- multifile prolog:error_message//1. 1122 1123prologerror_message(stomp_error(connect, Connection, error(Message))) --> 1124 { connection_property(Connection, address, Address) }, 1125 [ 'STOMPL: Failed to connect to ~p: ~p'-[Address, Message] ]. 1126prologerror_message(stomp_error(connect, Connection, Detail)) --> 1127 { connection_property(Connection, address, Address) }, 1128 [ 'STOMPL: Failed to connect to ~p: ~p'-[Address, Detail] ]
STOMP client.
This module provides a STOMP (Simple (or Streaming) Text Orientated Messaging Protocol) client. This client is based on work by Hongxin Liang. The current version is a major rewrite, both changing the API and the low-level STOMP frame (de)serialization.
The predicate stomp_connection/5 is used to register a connection. The connection is established by stomp_connect/1, which is lazily called from any of the predicates that send a STOMP frame. After establishing the connection two threads are created. One receives STOMP frames and the other manages and watches the heart beat.
Threading
Upon receiving a frame the callback registered with stomp_connection/5 is called in the context of the receiving thread. More demanding applications may decide to send incomming frames to a SWI-Prolog message queue and have one or more worker threads processing the input. Alternatively, frames may be inspected by the receiving thread and either processed immediately or be dispatched to either new or running threads. The best scenario depends on the message rate, processing time and concurrency of the Prolog application.
All message sending predicates of this library are thread safe. If two threads send a frame to the same connection the library ensures that both frames are properly serialized.
Reconnecting
By default this library tries to establish the connection and the user gets notified by means of a
disconnected
pseudo frame that the connection is lost. Using the Options argument of stomp_connection/6 the system can be configured to try and keep connecting if the server is not available and reconnect if the connection is lost. See thepong.pl
example.