1/* Prolog Machine Query Interface 2 Author: Eric Zinda 3 E-mail: ericz@inductorsoftware.com 4 WWW: http://www.inductorsoftware.com 5 Copyright (c) 2021, Eric Zinda 6 All rights reserved. 7 8 Redistribution and use in source and binary forms, with or without 9 modification, are permitted provided that the following conditions 10 are met: 11 12 1. Redistributions of source code must retain the above copyright 13 notice, this list of conditions and the following disclaimer. 14 15 2. Redistributions in binary form must reproduce the above copyright 16 notice, this list of conditions and the following disclaimer in 17 the documentation and/or other materials provided with the 18 distribution. 19 20 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 23 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 24 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 25 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 26 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 27 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 28 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 29 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 30 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 31 POSSIBILITY OF SUCH DAMAGE. 32*/ 33 34:- module(mqi, 35 [ mqi_start/0, 36 mqi_start/1, % +Options 37 mqi_stop/1, % ?Thread 38 mqi_version/2 % ?Major_Version, ?Minor_Version 39 ]).
swiplserver
Python library, but starting manually can be useful when debugging Prolog code in some scenarios. See the documentation on "Standalone Mode" for more information.
Once started, the MQI listens for TCP/IP or Unix Domain Socket connections and authenticates them using the password provided (or created depending on options) before processing any messages. The messages processed by the MQI are described below.
For debugging, the server outputs traces using the debug/3 predicate so that the server operation can be observed by using the debug/1 predicate. Run the following commands to see them:
debug(mqi(protocol))
: Traces protocol messages to show the flow of commands and connections. It is designed to avoid filling the screen with large queries and results to make it easier to read.debug(mqi(query))
: Traces messages that involve each query and its results. Therefore it can be quite verbose depending on the query.
__Options__
Options is a list containing any combination of the following options. When used in the Prolog top level (i.e. in Standalone Mode), these are specified as normal Prolog options like this:
mqi_start([unix_domain_socket(Socket), password('a password')])
When using "Embedded Mode" they are passed using the same name but as normal command line arguments like this:
swipl --quiet -g mqi_start -t halt -- --write_connection_values=true --password="a password" --create_unix_domain_socket=true
Note the use of quotes around values that could confuse command line
processing like spaces (e.g. "a password") and that
unix_domain_socket(Variable)
is written as
--create_unix_domain_socket=true
on the command line. See below for
more information.
write_connection_values(true)
is set, the selected port is output to STDOUT followed by \n
on startup to allow the client language library to retrieve it in "Embedded Mode".
To have one generated instead (recommended), pass Unix_Domain_Socket_Path_And_File as a variable when calling from the Prolog top level and the variable will be unified with a created filename. If launching in "Embedded Mode", instead pass --create_unix_domain_socket=true
since there isn't a way to specify variables from the command line. When generating the file, a temporary directory will be created using tmp_file/2 and a socket file will be created within that directory following the below requirements. If the directory and file are unable to be created for some reason, mqi_start/1 fails.
Regardless of whether the file is specified or generated, if the option write_connection_values(true)
is set, the fully qualified path to the generated file is output to STDOUT followed by \n
on startup to allow the client language library to retrieve it.
Specifying a file to use should follow the same guidelines as the generated file:
write_connection_values(true)
is set, the password is output to STDOUT followed by \n
on startup to allow the client language library to retrieve it. This is the recommended way to integrate the MQI with a language as it avoids including the password as source code. This option is only included so that a known password can be supplied for when the MQI is running in Standalone Mode.query_timeout(+Seconds)
Sets the default time in seconds that a query is allowed to run before it is cancelled. This can be overridden on a query by query basis. If not set, the default is no timeout (-1
).pending_connections(+Count)
Sets the number of pending connections allowed for the MQI as in tcp_listen/2. If not provided, the default is 5
.run_server_on_thread(+Run_Server_On_Thread)
Determines whether mqi_start/1 runs in the background on its own thread or blocks until the MQI shuts down. Must be missing or set to true
when running in "Embedded Mode" so that the SWI Prolog process can exit properly. If not set, the default is true
.run_server_on_thread(true)
. Passing in an atom for Server_Thread will only set the server thread name if run_server_on_thread(true)
. If Server_Thread is a variable, it is unified with a generated name.write_connection_values(+Write_Connection_Values)
Determines whether the server writes the port (or generated Unix Domain Socket) and password to STDOUT as it initializes. Used by language libraries to retrieve this information for connecting. If not set, the default is false
.write_output_to_file(+File)
Redirects STDOUT and STDERR to the file path specified. Useful for debugging the MQI when it is being used in "Embedded Mode". If using multiple MQI instances in one SWI Prolog instance, only set this on the first one. Each time it is set the output will be redirected.
*/
110:- use_module(library(socket)). 111:- use_module(library(http/json)). 112:- use_module(library(http/json_convert)). 113:- use_module(library(http/http_stream)). 114:- use_module(library(option)). 115:- use_module(library(term_to_json)). 116:- use_module(library(debug)). 117:- use_module(library(filesex)). 118:- use_module(library(gensym)). 119:- use_module(library(lists)). 120:- use_module(library(main)). 121:- use_module(library(make)). 122:- use_module(library(prolog_source)). 123:- use_module(library(time)). 124:- use_module(library(uuid)). 125 126% One for every Machine Query Interface running 127:- dynamic(mqi_thread/3). 128 129% One for every active connection 130:- dynamic(mqi_worker_threads/3). 131:- dynamic(mqi_socket/5). 132 133% Indicates that a query is in progress on the goal thread or hasn't had its results drained 134% Deleted once the last result from the queue has been drained 135% Only deleted by the communication thread to avoid race conditions 136:- dynamic(query_in_progress/1). 137 138% Indicates to the communication thread that we are in a place 139% that can be cancelled 140:- dynamic(safe_to_cancel/1).
Note that the initial version of the MQI did not have a version predicate so The proper way for callers to check the version is:
use_module(library(mqi))
,
( current_predicate(mqi_version/2)
-> mqi_version(Major_Version, Minor_Version)
; Major_Version = 0, Minor_Version = 0
)
Major versions are increased when there is a change to the protocol that will likely break clients written to the previous version. Minor versions are increased when there is new functionality that will not break clients written to the old version
This allows a client written to MQI version 'Client_Major_Version.Client_Minor_Version' to check for non-breaking compatibility like this:
Client_Major_Version = MQI_Major_Version and Client_Minor_Version <= MQI_Minor_Version
Breaking changes (i.e. Major version increments) should be very rare as the goal is to have the broadest adoption possible.
Protocol Version History:
175mqi_version(1, 0). 176 177 178% Password is carefully constructed to be a string (not an atom) so that it is not 179% globally visible 180% Add ".\n" to the password since it will be added by the message when received 181mqi_start(Options) :- 182 Encoding = utf8, 183 option(pending_connections(Connection_Count), Options, 5), 184 option(query_timeout(Query_Timeout), Options, -1), 185 option(port(Port), Options, _), 186 option(run_server_on_thread(Run_Server_On_Thread), Options, true), 187 option(exit_main_on_failure(Exit_Main_On_Failure), Options, false), 188 option(write_connection_values(Write_Connection_Values), Options, false), 189 option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _), 190 ( ( memberchk(unix_domain_socket(_), Options), 191 var(Unix_Domain_Socket_Path_And_File) 192 ) 193 -> unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) 194 ; true 195 ), 196 option(server_thread(Server_Thread_ID), Options, _), 197 ( var(Server_Thread_ID) 198 -> gensym(mqi, Server_Thread_ID) 199 ; true 200 ), 201 option(password(Password), Options, _), 202 ( var(Password) 203 -> ( current_prolog_flag(bounded, false) 204 -> uuid(UUID, [format(integer)]) 205 ; UUID is random(1<<62) 206 ), 207 format(string(Password), '~d', [UUID]) 208 ; true 209 ), 210 string_concat(Password, '.\n', Final_Password), 211 bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address), 212 send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password), 213 option(write_output_to_file(File), Options, _), 214 ( var(File) 215 -> true 216 ; write_output_to_file(File) 217 ), 218 Server_Goal = ( 219 catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true), 220 debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)]) 221 ), 222 start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File). 223 224opt_type(port, port, natural). 225opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean). 226opt_type(unix_domain_socket, unix_domain_socket, file(write)). 227opt_type(password, password, string). 228opt_type(pending_connections, pending_connections, nonneg). 229opt_type(query_timeout, query_timeout, float). 230opt_type(run_server_on_thread, run_server_on_thread, boolean). 231opt_type(exit_main_on_failure, exit_main_on_failure, boolean). 232opt_type(write_connection_values, write_connection_values, boolean). 233opt_type(write_output_to_file, write_output_to_file, file(write)). 234 235opt_help(port, "TCP/IP port for clients to connect to"). 236opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to"). 237opt_help(unix_domain_socket, "File path for the Unix domain socket"). 238opt_help(password, "Connection password"). 239opt_help(pending_connections, "Max number of queued connections (5)"). 240opt_help(query_timeout, "Max query runtime in seconds (default infinite)"). 241opt_help(run_server_on_thread, "Run server in a background thread (true)"). 242opt_help(exit_main_on_failure, "Exit the process on a failure"). 243opt_help(write_connection_values, "Print info for clients to connect"). 244opt_help(write_output_to_file, "Write stdout and stderr to file").
To launch embedded mode:
swipl --quiet -g mqi_start -t halt -- --write_connection_values=true
This will start SWI Prolog and invoke the mqi_start/0 predicate and
exit the process when that predicate stops. Any command line
arguments after the standalone --
will be passed as Options. These
are the same Options that mqi_start/1 accepts and are passed to it
directly. Some options are expressed differently due to command line
limitations, see mqi_start/1 Options for more information.
Any Option values that cause issues during command line parsing (such
as spaces) should be passed with ""
like this:
swipl --quiet -g mqi_start -t halt -- --write_connection_values=true \ --password="HGJ SOWLWW WNDSJD"
For help on commandline options run
swipl -g mqi_start -- --help
284% Turn off int signal when running in embedded mode so the client language 285% debugger signal doesn't put Prolog into debug mode 286% run_server_on_thread must be missing or true (the default) so we can exit 287% properly 288% create_unix_domain_socket=true/false is only used as a command line argument 289% since it doesn't seem possible to pass create_unix_domain_socket=_ on the command line 290% and have it interpreted as a variable. 291mqi_start :- 292 current_prolog_flag(argv, Argv), 293 argv_options(Argv, _Args, Options), 294 merge_options(Options, [exit_main_on_failure(true)], Options1), 295 select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false), 296 ( Create_Unix_Domain_Socket == true 297 -> merge_options(Options2, [unix_domain_socket(_)], FinalOptions) 298 ; FinalOptions = Options2 299 ), 300 option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true), 301 ( Run_Server_On_Thread == true 302 -> true 303 ; throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread)) 304 ), 305 mqi_start(FinalOptions), 306 on_signal(int, _, quit), 307 thread_get_message(quit_mqi). 308 309 310quit(_) :- 311 thread_send_message(main, quit_mqi).
Always succeeds.
320% tcp_close_socket(Socket) will shut down the server thread cleanly so the socket is released and can be used again in the same session 321% Closes down any pending connections using abort even if there were no matching server threads since the server thread could have died. 322% At this point only threads associated with live connections (or potentially a goal thread that hasn't detected its missing communication thread) 323% should be left so seeing abort warning messages in the console seems OK 324mqi_stop(Server_Thread_ID) :- 325 % First shut down any matching servers to stop new connections 326 forall(retract(mqi_thread(Server_Thread_ID, _, Socket)), 327 ( 328 debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]), 329 catch(tcp_close_socket(Socket), Socket_Exception, true), 330 abortSilentExit(Server_Thread_ID, Server_Thread_Exception), 331 debug(mqi(protocol), "Stopped server thread: ~w, socket_close_exception(~w), stop_thread_exception(~w)", [Server_Thread_ID, Socket_Exception, Server_Thread_Exception]) 332 )), 333 forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)), 334 ( 335 abortSilentExit(Communication_Thread_ID, CommunicationException), 336 debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)", [Server_Thread_ID, Communication_Thread_ID, CommunicationException]), 337 abortSilentExit(Goal_Thread_ID, Goal_Exception), 338 debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)", [Server_Thread_ID, Goal_Thread_ID, Goal_Exception]) 339 )). 340 341 342start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :- 343 ( 344 -> ( thread_create(Server_Goal, _, [ alias(Server_Thread_ID), 345 at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File), 346 detach_if_expected(Server_Thread_ID) 347 )) 348 ]), 349 debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID]) 350 ) 351 ; ( , 352 delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File), 353 debug(mqi(protocol), "Halting.", []) 354 ) 355 ). 356 357 358% Unix domain sockets create a file that needs to be cleaned up 359% If mqi generated it, there is also a directory that needs to be cleaned up 360% that will only contain that file 361delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :- 362 ( nonvar(Unix_Domain_Socket_Path) 363 -> catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true) 364 ; ( nonvar(Unix_Domain_Socket_Path_And_File) 365 -> catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true) 366 ; true 367 ) 368 ). 369 370:- if(current_predicate(unix_domain_socket/1)). 371 optional_unix_domain_socket(Socket) :- 372 unix_domain_socket(Socket). 373:- else. 374 optional_unix_domain_socket(_). 375:- endif. 376 377% Always bind only to localhost for security reasons 378% Delete the socket file in case it is already around so that the same name can be reused 379bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :- 380 ( nonvar(Unix_Domain_Socket_Path_And_File) 381 -> debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]), 382 optional_unix_domain_socket(Socket), 383 catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true), 384 tcp_bind(Socket, Unix_Domain_Socket_Path_And_File), 385 Client_Address = Unix_Domain_Socket_Path_And_File 386 ; ( tcp_socket(Socket), 387 tcp_setopt(Socket, reuseaddr), 388 tcp_bind(Socket, '127.0.0.1':Port), 389 debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]), 390 Client_Address = Port 391 ) 392 ), 393 assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)). 394 395% Communicates the used port and password to the client via STDOUT so the client 396% language library can use them to connect 397send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :- 398 ( 399 -> ( ( var(Unix_Domain_Socket_Path_And_File) 400 -> format(Stream, "~d\n", [Port]) 401 ; format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File]) 402 ), 403 format(Stream, "~w\n", [Password]), 404 flush_output(Stream) 405 ) 406 ; true 407 ). 408 409 410% Server thread worker predicate 411% Listen for connections and create a connection for each in its own communication thread 412% Uses tail recursion to ensure the stack doesn't grow 413server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :- 414 debug(mqi(protocol), "Listening on address: ~w", [Address]), 415 tcp_listen(Socket, Connection_Count), 416 tcp_open_socket(Socket, AcceptFd, _), 417 create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure), 418 server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure). 419 420 421% Wait for the next connection and create communication and goal threads to support it 422% Create known IDs for the threads so we can pass them along before the threads are created 423% First create the goal thread to avoid a race condition where the communication 424% thread tries to queue a goal before it is created 425create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :- 426 debug(mqi(protocol), "Waiting for client connection...", []), 427 tcp_accept(AcceptFd, Socket, _Peer), 428 debug(mqi(protocol), "Client connected", []), 429 gensym('conn', Connection_Base), 430 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias), 431 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias), 432 mutex_create(Goal_Alias, [alias(Goal_Alias)]), 433 assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)), 434 thread_create(goal_thread(Thread_Alias), 435 _, 436 [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]), 437 thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure), 438 _, 439 [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]). 440 441 442% The worker predicate for the Goal thread. 443% Looks for a message from the connection thread, processes it, then recurses. 444% 445% Goals always run in the same thread in case the user is setting thread local information. 446% For each answer to the user's query (including an exception), the goal thread will queue a message 447% to the communication thread of the form result(Answer, Find_All), where Find_All == true if the user wants all answers at once 448% Tail recurse to avoid growing the stack 449goal_thread(Respond_To_Thread_ID) :- 450 thread_self(Self_ID), 451 throw_if_testing(Self_ID), 452 thread_get_message(Self_ID, goal(Unexpanded_Goal, Binding_List, Query_Timeout, Find_All)), 453 expand_goal(Unexpanded_Goal, Goal), 454 debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, unexpanded: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Unexpanded_Goal, Goal]), 455 ( 456 -> One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers) 457 ; One_Answer_Goal = ( findall( One_Answer, 458 ( @(user:Goal, user), 459 One_Answer = [Binding_List], 460 send_next_result(Respond_To_Thread_ID, One_Answer, _, Find_All) 461 ), 462 Answers 463 ), 464 ( Answers == [] 465 -> send_next_result(Respond_To_Thread_ID, [], _, Find_All) 466 ; true 467 ) 468 ) 469 ), 470 Cancellable_Goal = run_cancellable_goal(Self_ID, One_Answer_Goal), 471 ( Query_Timeout == -1 472 -> catch(Cancellable_Goal, Top_Exception, true) 473 ; catch(call_with_time_limit(Query_Timeout, Cancellable_Goal), Top_Exception, true) 474 ), 475 ( var(Top_Exception) 476 -> ( 477 -> send_next_result(Respond_To_Thread_ID, Answers, _, Find_All) 478 ; send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All) 479 ) 480 ; send_next_result(Respond_To_Thread_ID, [], Top_Exception, true) 481 ), 482 goal_thread(Respond_To_Thread_ID). 483 484 485% Used only for testing unhandled exceptions outside of the "safe zone" 486throw_if_testing(Self_ID) :- 487 ( thread_peek_message(Self_ID, testThrow(Test_Exception)) 488 -> ( debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]), 489 throw(Test_Exception) 490 ) 491 ; true 492 ). 493 494 495% run_cancellable_goal handles the communication 496% to ensure the cancel exception from the communication thread 497% is injected at a place we are prepared to handle in the goal_thread 498% Before the goal is run, sets a fact to indicate we are in the "safe to cancel" 499% zone for the communication thread. 500% Then it doesn't exit this "safe to cancel" zone if the 501% communication thread is about to cancel 502run_cancellable_goal(Mutex_ID, Goal) :- 503 thread_self(Self_ID), 504 setup_call_cleanup( 505 assert(safe_to_cancel(Self_ID), Assertion), 506 Goal, 507 with_mutex(Mutex_ID, erase(Assertion)) 508 ). 509 510 511% Worker predicate for the communication thread. 512% Processes messages and sends goals to the goal thread. 513% Continues processing messages until communication_thread_listen() throws or ends with true/false 514% 515% Catches all exceptions from communication_thread_listen so that it can do an orderly shutdown of the goal 516% thread if there is a communication failure. 517% 518% True means user explicitly called close or there was an exception 519% only exit the main thread if there was an exception and we are supposed to Exit_Main_On_Failure 520% otherwise just exit the session 521communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :- 522 thread_self(Self_ID), 523 ( ( 524 catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout), error(Serve_Exception1, Serve_Exception2), true), 525 debug(mqi(protocol), "Session finished. Communication thread exception: ~w", [error(Serve_Exception1, Serve_Exception2)]), 526 abortSilentExit(Goal_Thread_ID, _), 527 retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID)) 528 ) 529 -> Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure) 530 ; Halt = true 531 ), 532 ( 533 -> ( debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)", [Self_ID, error(Serve_Exception1, Serve_Exception2)]), 534 quit(_) 535 ) 536 ; ( debug(mqi(protocol), "Ending session ~w", [Self_ID]), 537 catch(tcp_close_socket(Socket), error(_, _), true) 538 ) 539 ). 540 541 542% Open socket and begin processing the streams for a connection using the Encoding if the password matches 543% true: session ended 544% exception: communication failure or an internal failure (like a thread threw or shutdown unexpectedly) 545% false: halt 546communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :- 547 tcp_open_socket(Socket, Read_Stream, Write_Stream), 548 thread_self(Communication_Thread_ID), 549 assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)), 550 set_stream(Read_Stream, encoding(Encoding)), 551 set_stream(Write_Stream, encoding(Encoding)), 552 read_message(Read_Stream, Sent_Password), 553 ( Password == Sent_Password 554 -> ( debug(mqi(protocol), "Password matched.", []), 555 thread_self(Self_ID), 556 mqi_version(Major, Minor), 557 reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID), version(Major, Minor)]])) 558 ) 559 ; ( debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]), 560 reply_error(Write_Stream, password_mismatch), 561 throw(password_mismatch) 562 ) 563 ), 564 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout), 565 debug(mqi(protocol), "Session finished.", []). 566 567 568% process_mqi_messages implements the main interface to the Machine Query Interface. 569% Continuously reads a Machine Query Interface message from Read_Stream and writes a response to Write_Stream, 570% until the connection fails or a `quit` or `close` message is sent. 571% 572% Read_Stream and Write_Stream can be any valid stream using any encoding. 573% 574% Goal_Thread_ID must be the threadID of a thread started on the goal_thread predicate 575% 576% uses tail recursion to ensure the stack doesn't grow 577% 578% true: indicates we should terminate the session (clean termination) 579% false: indicates we should exit the process if running in embedded mode 580% exception: indicates we should terminate the session (communication failure termination) or 581% thread was asked to halt 582process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :- 583 process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command), 584 ( Command == close 585 -> ( debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []), 586 true 587 ) 588 ; ( Command == quit 589 -> ( debug(mqi(protocol), "Command: quit.", []), 590 false 591 ) 592 ; 593 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) 594 ) 595 ). 596 597% process_mqi_message manages the protocol for the connection: receive message, parse it, process it. 598% - Reads a single message from Read_Stream. 599% - Processes it and issues a response on Write_Stream. 600% - The message will be unified with Command to allow the caller to handle it. 601% 602% Read_Stream and Write_Stream can be any valid stream using any encoding. 603% 604% True if the message understood. A response will always be sent. 605% False if the message was malformed. 606% Exceptions will be thrown by the underlying stream if there are communication failures writing to Write_Stream or the thread was asked to exit. 607% 608% state_* predicates manage the state transitions of the protocol 609% They only bubble up exceptions if there is a communication failure 610% 611% state_process_command will never return false 612% since errors should be sent to the client 613% It can throw if there are communication failures, though. 614process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :- 615 debug(mqi(protocol), "Waiting for next message ...", []), 616 ( state_receive_raw_message(Read_Stream, Message_String) 617 -> ( state_parse_command(Write_Stream, Message_String, Command, Binding_List) 618 -> state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List) 619 ; true 620 ) 621 ; false 622 ). 623 624 625% state_receive_raw_message: receive a raw message, which is simply a string 626% true: valid message received 627% false: invalid message format 628% exception: communication failure OR thread asked to exit 629state_receive_raw_message(Read, Command_String) :- 630 read_message(Read, Command_String), 631 debug(mqi(protocol), "Valid message: ~w", [Command_String]). 632 633 634% state_parse_command: attempt to parse the message string into a valid command 635% 636% Use read_term_from_atom instead of read_term(stream) so that we don't hang 637% indefinitely if the caller didn't properly finish the term 638% parse in the context of module 'user' to properly bind operators, do term expansion, etc 639% 640% true: command could be parsed 641% false: command cannot be parsed. An error is sent to the client in this case 642% exception: communication failure on sending a reply 643state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :- 644 ( catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true) 645 -> ( var(Parse_Exception) 646 -> debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command]) 647 ; ( reply_error(Write_Stream, Parse_Exception), 648 fail 649 ) 650 ) 651 ; ( reply_error(Write_Stream, error(couldNotParseCommand, _)), 652 fail 653 ) 654 ). 655 656 657% state_process_command(): execute the requested Command 658% 659% First wait until we have removed all results from any previous query. 660% If query_in_progress(Goal_Thread_ID) exists then there is at least one 661% more result to drain, by definition. Because the predicate is 662% deleted by get_next_result in the communication thread when the last result is drained 663% 664% true: if the command itself succeeded, failed or threw an exception. 665% In that case, the outcome is sent to the client 666% exception: only communication or thread failures are allowed to bubble up 667% See mqi(Options) documentation 668state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :- 669 !, 670 debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]), 671 repeat_until_false(( 672 query_in_progress(Goal_Thread_ID), 673 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]), 674 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer), 675 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]), 676 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer]) 677 )), 678 debug(mqi(protocol), "All previous results drained", []), 679 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true), 680 heartbeat_until_result(Goal_Thread_ID, Stream, Answers), 681 reply_with_result(Goal_Thread_ID, Stream, Answers). 682 683 684% See mqi(Options) documentation for documentation 685% See notes in run(Goal, Timeout) re: draining previous query 686state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :- 687 !, 688 debug(mqi(protocol), "Command: run_async/1.", []), 689 debug(mqi(query), " Goal: ~w", [Goal]), 690 repeat_until_false(( 691 query_in_progress(Goal_Thread_ID), 692 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]), 693 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer), 694 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]), 695 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer]) 696 )), 697 debug(mqi(protocol), "All previous results drained", []), 698 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All), 699 reply(Stream, true([[]])). 700 701 702% See mqi(Options) documentation for documentation 703state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :- 704 !, 705 debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]), 706 ( once((var(Timeout) ; Timeout == -1)) 707 -> Options = [] 708 ; Options = [timeout(Timeout)] 709 ), 710 ( query_in_progress(Goal_Thread_ID) 711 -> ( ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]), 712 get_next_result(Goal_Thread_ID, Stream, Options, Result) 713 ) 714 -> reply_with_result(Goal_Thread_ID, Stream, Result) 715 ; reply_error(Stream, result_not_available) 716 ) 717 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]), 718 reply_error(Stream, no_query) 719 ) 720 ). 721 722 723% See mqi(Options) documentation for documentation 724% To ensure the goal thread is in a place it is safe to cancel, 725% we lock a mutex first that the goal thread checks before exiting 726% the "safe to cancel" zone. 727% It is not in the safe zone: it either finished 728% or was never running. 729state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :- 730 !, 731 debug(mqi(protocol), "Command: cancel_async/0.", []), 732 with_mutex(Goal_Thread_ID, ( 733 ( safe_to_cancel(Goal_Thread_ID) 734 -> ( thread_signal(Goal_Thread_ID, throw(cancel_goal)), 735 reply(Stream, true([[]])) 736 ) 737 ; ( query_in_progress(Goal_Thread_ID) 738 -> ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]), 739 reply(Stream, true([[]])) 740 ) 741 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]), 742 reply_error(Stream, no_query) 743 ) 744 ) 745 ) 746 )). 747 748 749% Used for testing how the system behaves when the goal thread is killed unexpectedly 750% Needs to run a bogus command `run(true, -1)` to 751% get the goal thread to process the exception 752state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :- 753 !, 754 debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []), 755 thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)), 756 state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List). 757 758 759state_process_command(Stream, _, _, close, _) :- 760 !, 761 reply(Stream, true([[]])). 762 763 764state_process_command(Stream, _, _, quit, _) :- 765 !, 766 reply(Stream, true([[]])). 767 768 769% Send an exception if the command is not known 770state_process_command(Stream, _, _, Command, _) :- 771 debug(mqi(protocol), "Unknown command ~w", [Command]), 772 reply_error(Stream, unknownCommand). 773 774 775% Wait for a result (and put in Answers) from the goal thread, but send a heartbeat message 776% every so often until it arrives to detect if the socket is broken. 777% Throws if If the heartbeat failed which will 778% and then shutdown the communication thread 779% Tail recurse to not grow the stack 780heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :- 781 ( get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers) 782 -> debug(mqi(query), "Received answer from goal thread: ~w", [Answers]) 783 ; ( debug(mqi(protocol), "heartbeat...", []), 784 write_heartbeat(Stream), 785 heartbeat_until_result(Goal_Thread_ID, Stream, Answers) 786 ) 787 ). 788 789 790% True if write succeeded, otherwise throws as that 791% indicates that heartbeat failed because the other 792% end of the pipe terminated 793write_heartbeat(Stream) :- 794 put_char(Stream, '.'), 795 flush_output(Stream). 796 797 798% Send a goal to the goal thread in its queue 799% 800% Remember that we are now running a query using assert. 801% This will be retracted once all the answers have been drained. 802% 803% If Goal_Thread_ID died, thread_send_message throws and, if we don't respond, 804% the client could hang so catch and give them a good message before propagating 805% the exception 806send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :- 807 ( var(Timeout) 808 -> Timeout = Default_Timeout 809 ; true 810 ), 811 ( var(Binding_List) 812 -> Binding_List = [] 813 ; true 814 ), 815 debug(mqi(query), "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]), 816 assert(query_in_progress(Goal_Thread_ID)), 817 catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true), 818 ( var(Send_Message_Exception) 819 -> true 820 ; ( reply_error(Stream, connection_failed), 821 throw(Send_Message_Exception) 822 ) 823 ). 824 825 826% Send a result from the goal thread to the communication thread in its queue 827send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :- 828 ( var(Exception_In_Goal) 829 -> ( ( debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]), 830 Answer == [] 831 ) 832 -> thread_send_message(Respond_To_Thread_ID, result(false, Find_All)) 833 ; handle_constraints(Answer, Final_Answer), 834 thread_send_message(Respond_To_Thread_ID, result(true(Final_Answer), Find_All)) 835 ) 836 ; ( debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]), 837 thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All)) 838 ) 839 ). 840 841 842handle_constraints(Answer, Final_Answer) :- 843 ( term_attvars(Answer, []) 844 -> Final_Answer = Answer 845 ; findall( Single_Answer_With_Attributes, 846 ( member(Single_Answer, Answer), 847 copy_term(Single_Answer, Single_Answer_Copy, Attributes), 848 append(['$residuals' = Attributes], Single_Answer_Copy, Single_Answer_With_Attributes) 849 ), 850 Final_Answer 851 ), 852 debug(mqi(query), "Constraints detected, converted: ~w to ~w", [Answer, Final_Answer]) 853 ). 854 855 856% Gets the next result from the goal thread in the communication thread queue, 857% and retracts query_in_progress/1 when the last result has been sent. 858% Find_All == true only returns one message, so delete query_in_progress 859% no matter what it is 860% \+ Find_All: There may be more than one result. The first one we hit with any exception 861% (note that no_more_results is also returned as an exception) means we are done 862get_next_result(Goal_Thread_ID, Stream, Options, Answers) :- 863 ( thread_property(Goal_Thread_ID, status(running)) 864 -> true 865 ; ( reply_error(Stream, connection_failed), 866 throw(connection_failed) 867 ) 868 ), 869 thread_self(Self_ID), 870 thread_get_message(Self_ID, result(Answers, Find_All), Options), 871 ( 872 -> ( debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]), 873 retractall(query_in_progress(Goal_Thread_ID)) 874 ) 875 ; ( Answers = error(_) 876 -> ( debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]), 877 retractall(query_in_progress(Goal_Thread_ID)) 878 ) 879 ; true 880 ) 881 ). 882 883 884% reply_with_result predicates are used to consistently return 885% answers for a query from either run() or run_async() 886reply_with_result(_, Stream, error(Error)) :- 887 !, 888 reply_error(Stream, Error). 889 890% Gracefully handle exceptions that can occur during conversion to JSON 891reply_with_result(_, Stream, Result) :- 892 !, 893 catch(reply(Stream, Result), error(Exception, _), reply_with_result(_, Stream, error(Exception))). 894 895 896% Reply with a normal term 897% Convert term to an actual JSON string 898reply(Stream, Term) :- 899 debug(mqi(query), "Responding with Term: ~w", [Term]), 900 term_to_json_string(Term, Json_String), 901 write_message(Stream, Json_String). 902 903 904% Special handling for exceptions since they can have parts that are not 905% "serializable". Ensures they they are always returned in an exception/1 term 906reply_error(Stream, Error_Term) :- 907 debug(mqi(query), "Responding with exception: ~w", [Error_Term]), 908 ( error(Error_Value, _) = Error_Term 909 -> Response = exception(Error_Value) 910 ; ( atom(Error_Term) 911 -> 912 Response = exception(Error_Term) 913 ; ( compound_name_arity(Error_Term, Name, _), 914 Response = exception(Name) 915 ) 916 ) 917 ), 918 reply(Stream, Response). 919 920 921% Send and receive messages are simply strings preceded by their length + ".\n" 922% i.e. "<stringlength>.\n<string>" 923% The desired encoding must be set on the Stream before calling this predicate 924 925 926% Writes the next message. 927% Throws if there is an unexpected exception 928write_message(Stream, String) :- 929 write_string_length(Stream, String), 930 write(Stream, String), 931 flush_output(Stream). 932 933 934% Reads the next message. 935% Throws if there is an unexpected exception or thread has been requested to quit 936% the length passed must match the actual number of bytes in the stream 937% in whatever encoding is being used 938read_message(Stream, String) :- 939 read_string_length(Stream, Length), 940 stream_property(Stream, encoding(Encoding)), 941 setup_call_cleanup( 942 stream_range_open(Stream, Tmp, [size(Length)]), 943 ( set_stream(Tmp, encoding(Encoding)), 944 read_string(Tmp, _, String) 945 ), 946 close(Tmp)). 947 948 949% Terminate with '.\n' so we know that's the end of the count 950write_string_length(Stream, String) :- 951 stream_property(Stream, encoding(Encoding)), 952 string_encoding_length(String, Encoding, Length), 953 format(Stream, "~d.\n", [Length]). 954 955 956% Note: read_term requires ".\n" after the length 957% ... but does not consume the "\n" 958read_string_length(Stream, Length) :- 959 read_term(Stream, Length, []), 960 get_char(Stream, _). 961 962 963% converts a string to Codes using Encoding 964string_encoding_length(String, Encoding, Length) :- 965 setup_call_cleanup( 966 open_null_stream(Out), 967 ( set_stream(Out, encoding(Encoding)), 968 write(Out, String), 969 byte_count(Out, Length) 970 ), 971 close(Out)). 972 973 974% Convert Prolog Term to a Prolog JSON term 975% Add a final \n so that using netcat to debug works well 976term_to_json_string(Term, Json_String) :- 977 term_to_json(Term, Json), 978 with_output_to(string(Json_String), 979 ( current_output(Stream), 980 json_write(Stream, Json), 981 put(Stream, '\n') 982 )). 983 984 985% Execute the goal as once() without binding any variables 986% and keep executing it until it returns false (or throws) 987repeat_until_false(Goal) :- 988 (\+ (\+ )), !, repeat_until_false(Goal). 989repeat_until_false(_). 990 991 992% Used to kill a thread in an "expected" way so it doesn't leave around traces in thread_property/2 afterwards 993% 994% If the thread is alive OR it was already aborted (expected cases) then attempt to join 995% the thread so that no warnings are sent to the console. Other cases leave the thread for debugging. 996% There are some fringe cases (like calling external code) 997% where the call might not return for a long time. Do a timeout for those cases. 998abortSilentExit(Thread_ID, Exception) :- 999 catch(thread_signal(Thread_ID, abort), error(Exception, _), true), 1000 debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]). 1001% Workaround SWI Prolog bug: https://github.com/SWI-Prolog/swipl-devel/issues/852 by not joining 1002% The workaround just stops joining the aborted thread, so an inert record will be left if thread_property/2 is called. 1003% , 1004% ( once((var(Exception) ; catch(thread_property(Thread_ID, status(exception('$aborted'))), error(_, _), true))) 1005% -> ( catch(call_with_time_limit(4, thread_join(Thread_ID)), error(JoinException1, JoinException2), true), 1006% debug(mqi(protocol), "thread_join attempted because thread: ~w exit was expected, exception: ~w", [Thread_ID, error(JoinException1, JoinException2)]) 1007% ) 1008% ; true 1009% ). 1010 1011 1012% Detach a thread that exits with true or false so that it doesn't leave around a record in thread_property/2 afterwards 1013% Don't detach a thread if it exits because of an exception so we can debug using thread_property/2 afterwards 1014% 1015% However, `abort` is an expected exception but detaching a thread that aborts will leave an unwanted 1016% thread_property/2 record *and* print a message to the console. To work around this, 1017% the goal thread is always aborted by the communication thread using abortSilentExit. 1018detach_if_expected(Thread_ID) :- 1019 thread_property(Thread_ID, status(Status)), 1020 debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]), 1021 ( once((Status = true ; Status = false)) 1022 -> ( debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]), 1023 thread_detach(Thread_ID) 1024 ) 1025 ; true 1026 ). 1027 1028 1029write_output_to_file(File) :- 1030 debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]), 1031 open(File, write, Stream, [buffer(false)]), 1032 set_prolog_IO(user_input, Stream, Stream). 1033 1034 1035% Creates a Unix Domain Socket file in a secured directory. 1036% Throws if the directory or file cannot be created in /tmp for any reason 1037% Requirements for this file are: 1038% - The Prolog process will attempt to create and, if Prolog exits cleanly, 1039% delete this file when the server closes. This means the directory 1040% must have the appropriate permissions to allow the Prolog process 1041% to do so. 1042% - For security reasons, the filename should not be predictable and the 1043% directory it is contained in should have permissions set so that files 1044% created are only accessible to the current user. 1045% - The path must be below 92 *bytes* long (including null terminator) to 1046% be portable according to the Linux documentation 1047% 1048% tmp_file finds the right /tmp directory, even on Mac OS, so the path is small 1049% Set 700 (rwx------) permission so it is only accessible by current user 1050% Create a secure tmp file in the new directory 1051% {set,current}_prolog_flag is copied to a thread, so no need to use a mutex. 1052% Close the stream so sockets can use it 1053unix_domain_socket_path(Created_Directory, File_Path) :- 1054 tmp_file(udsock, Created_Directory), 1055 make_directory(Created_Directory), 1056 catch( chmod(Created_Directory, urwx), 1057 Exception, 1058 ( catch(delete_directory(Created_Directory), error(_, _), true), 1059 throw(Exception) 1060 ) 1061 ), 1062 setup_call_cleanup( ( current_prolog_flag(tmp_dir, Save_Tmp_Dir), 1063 set_prolog_flag(tmp_dir, Created_Directory) 1064 ), 1065 tmp_file_stream(File_Path, Stream, []), 1066 set_prolog_flag(tmp_dir, Save_Tmp_Dir) 1067 ), 1068 close(Stream). 1069 1070 1071% Helper for installing the mqi.pl file to the right 1072% library directory. 1073% Call using swipl -s mqi.pl -g "mqi:install_to_library('mqi.pl')" -t halt 1074install_to_library(File) :- 1075 once(find_library(Path)), 1076 copy_file(File, Path), 1077 make. 1078 1079 1080% Find the base library path, i.e. the one that ends in 1081% "library/" 1082find_library(Path) :- 1083 file_alias_path(library, Path), 1084 atomic_list_concat(Parts, '/', Path), 1085 reverse(Parts, Parts_Reverse), 1086 nth0(0, Parts_Reverse, ''), 1087 nth0(1, Parts_Reverse, Library), 1088 string_lower(Library, 'library')