View source with raw comments or as raw
    1/*  Prolog Machine Query Interface
    2    Author:        Eric Zinda
    3    E-mail:        ericz@inductorsoftware.com
    4    WWW:           http://www.inductorsoftware.com
    5    Copyright (c)  2021, Eric Zinda
    6    All rights reserved.
    7
    8        Redistribution and use in source and binary forms, with or without
    9    modification, are permitted provided that the following conditions
   10    are met:
   11
   12    1. Redistributions of source code must retain the above copyright
   13       notice, this list of conditions and the following disclaimer.
   14
   15    2. Redistributions in binary form must reproduce the above copyright
   16       notice, this list of conditions and the following disclaimer in
   17       the documentation and/or other materials provided with the
   18       distribution.
   19
   20    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   21    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   22    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   23    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   24    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   25    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   26    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   27    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   28    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   29    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   30    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   31    POSSIBILITY OF SUCH DAMAGE.
   32*/
   33
   34:- module(mqi,
   35          [ mqi_start/0,
   36            mqi_start/1,                % +Options
   37            mqi_stop/1,                 % ?Thread
   38            mqi_version/2               % ?Major_Version, ?Minor_Version
   39          ]).
 mqi_start(+Options:list) is semidet
Starts a Prolog Machine Query Interface ('MQI') using Options. The MQI is normally started automatically by a library built for a particular programming language such as the swiplserver Python library, but starting manually can be useful when debugging Prolog code in some scenarios. See the documentation on "Standalone Mode" for more information.

Once started, the MQI listens for TCP/IP or Unix Domain Socket connections and authenticates them using the password provided (or created depending on options) before processing any messages. The messages processed by the MQI are described below.

For debugging, the server outputs traces using the debug/3 predicate so that the server operation can be observed by using the debug/1 predicate. Run the following commands to see them:

  110:- use_module(library(socket)).  111:- use_module(library(http/json)).  112:- use_module(library(http/json_convert)).  113:- use_module(library(http/http_stream)).  114:- use_module(library(option)).  115:- use_module(library(term_to_json)).  116:- use_module(library(debug)).  117:- use_module(library(filesex)).  118:- use_module(library(gensym)).  119:- use_module(library(lists)).  120:- use_module(library(main)).  121:- use_module(library(make)).  122:- use_module(library(prolog_source)).  123:- use_module(library(time)).  124:- use_module(library(uuid)).  125
  126% One for every Machine Query Interface running
  127:- dynamic(mqi_thread/3).  128
  129% One for every active connection
  130:- dynamic(mqi_worker_threads/3).  131:- dynamic(mqi_socket/5).  132
  133% Indicates that a query is in progress on the goal thread or hasn't had its results drained
  134% Deleted once the last result from the queue has been drained
  135% Only deleted by the communication thread to avoid race conditions
  136:- dynamic(query_in_progress/1).  137
  138% Indicates to the communication thread that we are in a place
  139% that can be cancelled
  140:- dynamic(safe_to_cancel/1).
 mqi_version(?Major_Version, ?Minor_Version) is det
Provides the major and minor version number of the protocol used by the MQI. The protocol includes the message format and the messages that can be sent and received from the MQI.

Note that the initial version of the MQI did not have a version predicate so The proper way for callers to check the version is:

use_module(library(mqi)), ( current_predicate(mqi_version/2) -> mqi_version(Major_Version, Minor_Version) ; Major_Version = 0, Minor_Version = 0 )

Major versions are increased when there is a change to the protocol that will likely break clients written to the previous version. Minor versions are increased when there is new functionality that will not break clients written to the old version

This allows a client written to MQI version 'Client_Major_Version.Client_Minor_Version' to check for non-breaking compatibility like this:

Client_Major_Version = MQI_Major_Version and Client_Minor_Version <= MQI_Minor_Version

Breaking changes (i.e. Major version increments) should be very rare as the goal is to have the broadest adoption possible.

Protocol Version History:

  175mqi_version(1, 0).
  176
  177
  178% Password is carefully constructed to be a string (not an atom) so that it is not
  179% globally visible
  180% Add ".\n" to the password since it will be added by the message when received
  181mqi_start(Options) :-
  182    Encoding = utf8,
  183    option(pending_connections(Connection_Count), Options, 5),
  184    option(query_timeout(Query_Timeout), Options, -1),
  185    option(port(Port), Options, _),
  186    option(run_server_on_thread(Run_Server_On_Thread), Options, true),
  187    option(exit_main_on_failure(Exit_Main_On_Failure), Options, false),
  188    option(write_connection_values(Write_Connection_Values), Options, false),
  189    option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _),
  190    (   (   memberchk(unix_domain_socket(_), Options),
  191            var(Unix_Domain_Socket_Path_And_File)
  192        )
  193    ->  unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File)
  194    ;   true
  195    ),
  196    option(server_thread(Server_Thread_ID), Options, _),
  197    (   var(Server_Thread_ID)
  198    ->  gensym(mqi, Server_Thread_ID)
  199    ;   true
  200    ),
  201    option(password(Password), Options, _),
  202    (   var(Password)
  203    ->  (   current_prolog_flag(bounded, false)
  204        ->  uuid(UUID, [format(integer)])
  205        ;   UUID is random(1<<62)
  206        ),
  207        format(string(Password), '~d', [UUID])
  208    ;   true
  209    ),
  210    string_concat(Password, '.\n', Final_Password),
  211    bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address),
  212    send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password),
  213    option(write_output_to_file(File), Options, _),
  214    (   var(File)
  215    ->  true
  216    ;   write_output_to_file(File)
  217    ),
  218    Server_Goal = (
  219                    catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true),
  220                    debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)])
  221                 ),
  222    start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File).
  223
  224opt_type(port,                      port,                      natural).
  225opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean).
  226opt_type(unix_domain_socket,        unix_domain_socket,        file(write)).
  227opt_type(password,                  password,                  string).
  228opt_type(pending_connections,       pending_connections,       nonneg).
  229opt_type(query_timeout,             query_timeout,             float).
  230opt_type(run_server_on_thread,      run_server_on_thread,      boolean).
  231opt_type(exit_main_on_failure,      exit_main_on_failure,      boolean).
  232opt_type(write_connection_values,   write_connection_values,   boolean).
  233opt_type(write_output_to_file,      write_output_to_file,      file(write)).
  234
  235opt_help(port,                      "TCP/IP port for clients to connect to").
  236opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to").
  237opt_help(unix_domain_socket,        "File path for the Unix domain socket").
  238opt_help(password,                  "Connection password").
  239opt_help(pending_connections,       "Max number of queued connections (5)").
  240opt_help(query_timeout,             "Max query runtime in seconds (default infinite)").
  241opt_help(run_server_on_thread,      "Run server in a background thread (true)").
  242opt_help(exit_main_on_failure,      "Exit the process on a failure").
  243opt_help(write_connection_values,   "Print info for clients to connect").
  244opt_help(write_output_to_file,      "Write stdout and stderr to file").
 mqi_start is semidet
Main entry point for running the Machine Query Interface in "Embedded Mode" and designed to be called from the command line. Embedded Mode is used when launching the Machine Query Interface as an embedded part of another language (e.g. Python). Calling mqi_start/0 from Prolog interactively is not recommended as it depends on Prolog exiting to stop the MQI, instead use mqi_start/1 for interactive use.

To launch embedded mode:

swipl --quiet -g mqi_start -t halt -- --write_connection_values=true

This will start SWI Prolog and invoke the mqi_start/0 predicate and exit the process when that predicate stops. Any command line arguments after the standalone -- will be passed as Options. These are the same Options that mqi_start/1 accepts and are passed to it directly. Some options are expressed differently due to command line limitations, see mqi_start/1 Options for more information.

Any Option values that cause issues during command line parsing (such as spaces) should be passed with "" like this:

swipl --quiet -g mqi_start -t halt -- --write_connection_values=true \
                                      --password="HGJ SOWLWW WNDSJD"

For help on commandline options run

swipl -g mqi_start -- --help
  284% Turn off int signal when running in embedded mode so the client language
  285% debugger signal doesn't put Prolog into debug mode
  286% run_server_on_thread must be missing or true (the default) so we can exit
  287% properly
  288% create_unix_domain_socket=true/false is only used as a command line argument
  289% since it doesn't seem possible to pass create_unix_domain_socket=_ on the command line
  290% and have it interpreted as a variable.
  291mqi_start :-
  292    current_prolog_flag(argv, Argv),
  293    argv_options(Argv, _Args, Options),
  294    merge_options(Options, [exit_main_on_failure(true)], Options1),
  295    select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false),
  296    (   Create_Unix_Domain_Socket == true
  297    ->  merge_options(Options2, [unix_domain_socket(_)], FinalOptions)
  298    ;   FinalOptions = Options2
  299    ),
  300    option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true),
  301    (   Run_Server_On_Thread == true
  302    ->  true
  303    ;   throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread))
  304    ),
  305    mqi_start(FinalOptions),
  306    on_signal(int, _, quit),
  307    thread_get_message(quit_mqi).
  308
  309
  310quit(_) :-
  311    thread_send_message(main, quit_mqi).
 mqi_stop(?Server_Thread_ID:atom) is det
If Server_Thread_ID is a variable, stops all Machine Query Interfaces and associated threads. If Server_Thread_ID is an atom, then only the MQI with that Server_Thread_ID is stopped. Server_Thread_ID can be provided or retrieved using Options in mqi_start/1.

Always succeeds.

  320% tcp_close_socket(Socket) will shut down the server thread cleanly so the socket is released and can be used again in the same session
  321% Closes down any pending connections using abort even if there were no matching server threads since the server thread could have died.
  322% At this point only threads associated with live connections (or potentially a goal thread that hasn't detected its missing communication thread)
  323% should be left so seeing abort warning messages in the console seems OK
  324mqi_stop(Server_Thread_ID) :-
  325    % First shut down any matching servers to stop new connections
  326    forall(retract(mqi_thread(Server_Thread_ID, _, Socket)),
  327        (
  328            debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]),
  329            catch(tcp_close_socket(Socket), Socket_Exception, true),
  330            abortSilentExit(Server_Thread_ID, Server_Thread_Exception),
  331            debug(mqi(protocol), "Stopped server thread: ~w, socket_close_exception(~w), stop_thread_exception(~w)", [Server_Thread_ID, Socket_Exception, Server_Thread_Exception])
  332        )),
  333    forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)),
  334        (
  335            abortSilentExit(Communication_Thread_ID, CommunicationException),
  336            debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)", [Server_Thread_ID, Communication_Thread_ID, CommunicationException]),
  337            abortSilentExit(Goal_Thread_ID, Goal_Exception),
  338            debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)", [Server_Thread_ID, Goal_Thread_ID, Goal_Exception])
  339        )).
  340
  341
  342start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
  343    (   Run_Server_On_Thread
  344    ->  (   thread_create(Server_Goal, _, [ alias(Server_Thread_ID),
  345                                            at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
  346                                                     detach_if_expected(Server_Thread_ID)
  347                                                    ))
  348                                          ]),
  349            debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID])
  350        )
  351    ;   (   Server_Goal,
  352            delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
  353            debug(mqi(protocol), "Halting.", [])
  354        )
  355    ).
  356
  357
  358% Unix domain sockets create a file that needs to be cleaned up
  359% If mqi generated it, there is also a directory that needs to be cleaned up
  360%   that will only contain that file
  361delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
  362    (   nonvar(Unix_Domain_Socket_Path)
  363    ->  catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true)
  364    ;   (   nonvar(Unix_Domain_Socket_Path_And_File)
  365        ->  catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true)
  366        ;   true
  367        )
  368    ).
  369
  370:- if(current_predicate(unix_domain_socket/1)).  371    optional_unix_domain_socket(Socket) :-
  372        unix_domain_socket(Socket).
  373:- else.  374    optional_unix_domain_socket(_).
  375:- endif.  376
  377% Always bind only to localhost for security reasons
  378% Delete the socket file in case it is already around so that the same name can be reused
  379bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :-
  380    (   nonvar(Unix_Domain_Socket_Path_And_File)
  381    ->  debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]),
  382        optional_unix_domain_socket(Socket),
  383        catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true),
  384        tcp_bind(Socket, Unix_Domain_Socket_Path_And_File),
  385        Client_Address = Unix_Domain_Socket_Path_And_File
  386    ;   (   tcp_socket(Socket),
  387            tcp_setopt(Socket, reuseaddr),
  388            tcp_bind(Socket, '127.0.0.1':Port),
  389            debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]),
  390            Client_Address = Port
  391        )
  392    ),
  393    assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)).
  394
  395% Communicates the used port and password to the client via STDOUT so the client
  396% language library can use them to connect
  397send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :-
  398    (   Write_Connection_Values
  399    ->  (   (  var(Unix_Domain_Socket_Path_And_File)
  400            ->  format(Stream, "~d\n", [Port])
  401            ;   format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File])
  402            ),
  403            format(Stream, "~w\n", [Password]),
  404            flush_output(Stream)
  405        )
  406    ;   true
  407    ).
  408
  409
  410% Server thread worker predicate
  411% Listen for connections and create a connection for each in its own communication thread
  412% Uses tail recursion to ensure the stack doesn't grow
  413server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
  414    debug(mqi(protocol), "Listening on address: ~w", [Address]),
  415    tcp_listen(Socket, Connection_Count),
  416    tcp_open_socket(Socket, AcceptFd, _),
  417    create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure),
  418    server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure).
  419
  420
  421% Wait for the next connection and create communication and goal threads to support it
  422% Create known IDs for the threads so we can pass them along before the threads are created
  423% First create the goal thread to avoid a race condition where the communication
  424% thread tries to queue a goal before it is created
  425create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
  426    debug(mqi(protocol), "Waiting for client connection...", []),
  427    tcp_accept(AcceptFd, Socket, _Peer),
  428    debug(mqi(protocol), "Client connected", []),
  429    gensym('conn', Connection_Base),
  430    atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias),
  431    atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias),
  432    mutex_create(Goal_Alias, [alias(Goal_Alias)]),
  433    assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)),
  434    thread_create(goal_thread(Thread_Alias),
  435        _,
  436        [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]),
  437    thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure),
  438        _,
  439        [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]).
  440
  441
  442% The worker predicate for the Goal thread.
  443% Looks for a message from the connection thread, processes it, then recurses.
  444%
  445% Goals always run in the same thread in case the user is setting thread local information.
  446% For each answer to the user's query (including an exception), the goal thread will queue a message
  447% to the communication thread of the form result(Answer, Find_All), where Find_All == true if the user wants all answers at once
  448% Tail recurse to avoid growing the stack
  449goal_thread(Respond_To_Thread_ID) :-
  450    thread_self(Self_ID),
  451    throw_if_testing(Self_ID),
  452    thread_get_message(Self_ID, goal(Unexpanded_Goal, Binding_List, Query_Timeout, Find_All)),
  453    expand_goal(Unexpanded_Goal, Goal),
  454    debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, unexpanded: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Unexpanded_Goal, Goal]),
  455    (   Find_All
  456    ->  One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers)
  457    ;   One_Answer_Goal = ( findall(    One_Answer,
  458                                        (   @(user:Goal, user),
  459                                            One_Answer = [Binding_List],
  460                                            send_next_result(Respond_To_Thread_ID, One_Answer, _, Find_All)
  461                                        ),
  462                                        Answers
  463                            ),
  464                            (   Answers == []
  465                            ->  send_next_result(Respond_To_Thread_ID, [], _, Find_All)
  466                            ;   true
  467                            )
  468                          )
  469    ),
  470    Cancellable_Goal = run_cancellable_goal(Self_ID, One_Answer_Goal),
  471    (   Query_Timeout == -1
  472    ->  catch(Cancellable_Goal, Top_Exception, true)
  473    ;   catch(call_with_time_limit(Query_Timeout, Cancellable_Goal), Top_Exception, true)
  474    ),
  475    (   var(Top_Exception)
  476    ->  (   Find_All
  477        ->  send_next_result(Respond_To_Thread_ID, Answers, _, Find_All)
  478        ;   send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All)
  479        )
  480    ;   send_next_result(Respond_To_Thread_ID, [], Top_Exception, true)
  481    ),
  482    goal_thread(Respond_To_Thread_ID).
  483
  484
  485% Used only for testing unhandled exceptions outside of the "safe zone"
  486throw_if_testing(Self_ID) :-
  487    (   thread_peek_message(Self_ID, testThrow(Test_Exception))
  488    ->  (   debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]),
  489            throw(Test_Exception)
  490        )
  491    ;   true
  492    ).
  493
  494
  495% run_cancellable_goal handles the communication
  496% to ensure the cancel exception from the communication thread
  497% is injected at a place we are prepared to handle in the goal_thread
  498% Before the goal is run, sets a fact to indicate we are in the "safe to cancel"
  499% zone for the communication thread.
  500% Then it doesn't exit this "safe to cancel" zone if the
  501% communication thread is about to cancel
  502run_cancellable_goal(Mutex_ID, Goal) :-
  503    thread_self(Self_ID),
  504    setup_call_cleanup(
  505        assert(safe_to_cancel(Self_ID), Assertion),
  506        Goal,
  507        with_mutex(Mutex_ID, erase(Assertion))
  508    ).
  509
  510
  511% Worker predicate for the communication thread.
  512% Processes messages and sends goals to the goal thread.
  513% Continues processing messages until communication_thread_listen() throws or ends with true/false
  514%
  515% Catches all exceptions from communication_thread_listen so that it can do an orderly shutdown of the goal
  516%   thread if there is a communication failure.
  517%
  518% True means user explicitly called close or there was an exception
  519%   only exit the main thread if there was an exception and we are supposed to Exit_Main_On_Failure
  520%   otherwise just exit the session
  521communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :-
  522    thread_self(Self_ID),
  523    (   (
  524            catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout), error(Serve_Exception1, Serve_Exception2), true),
  525            debug(mqi(protocol), "Session finished. Communication thread exception: ~w", [error(Serve_Exception1, Serve_Exception2)]),
  526            abortSilentExit(Goal_Thread_ID, _),
  527            retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID))
  528        )
  529    ->  Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure)
  530    ;   Halt = true
  531    ),
  532    (   Halt
  533    ->  (   debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)", [Self_ID, error(Serve_Exception1, Serve_Exception2)]),
  534            quit(_)
  535        )
  536    ;   (   debug(mqi(protocol), "Ending session ~w", [Self_ID]),
  537            catch(tcp_close_socket(Socket), error(_, _), true)
  538        )
  539    ).
  540
  541
  542% Open socket and begin processing the streams for a connection using the Encoding if the password matches
  543% true: session ended
  544% exception: communication failure or an internal failure (like a thread threw or shutdown unexpectedly)
  545% false: halt
  546communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :-
  547    tcp_open_socket(Socket, Read_Stream, Write_Stream),
  548    thread_self(Communication_Thread_ID),
  549    assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)),
  550    set_stream(Read_Stream, encoding(Encoding)),
  551    set_stream(Write_Stream, encoding(Encoding)),
  552    read_message(Read_Stream, Sent_Password),
  553    (   Password == Sent_Password
  554    ->  (   debug(mqi(protocol), "Password matched.", []),
  555            thread_self(Self_ID),
  556            mqi_version(Major, Minor),
  557            reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID), version(Major, Minor)]]))
  558        )
  559    ;   (   debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]),
  560            reply_error(Write_Stream, password_mismatch),
  561            throw(password_mismatch)
  562        )
  563    ),
  564    process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout),
  565    debug(mqi(protocol), "Session finished.", []).
  566
  567
  568% process_mqi_messages implements the main interface to the Machine Query Interface.
  569% Continuously reads a Machine Query Interface message from Read_Stream and writes a response to Write_Stream,
  570% until the connection fails or a `quit` or `close` message is sent.
  571%
  572% Read_Stream and Write_Stream can be any valid stream using any encoding.
  573%
  574% Goal_Thread_ID must be the threadID of a thread started on the goal_thread predicate
  575%
  576% uses tail recursion to ensure the stack doesn't grow
  577%
  578% true: indicates we should terminate the session (clean termination)
  579% false: indicates we should exit the process if running in embedded mode
  580% exception: indicates we should terminate the session (communication failure termination) or
  581%    thread was asked to halt
  582process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :-
  583    process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command),
  584    (   Command == close
  585    ->  (   debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []),
  586            true
  587        )
  588    ;   (   Command == quit
  589        ->  (   debug(mqi(protocol), "Command: quit.", []),
  590                false
  591            )
  592        ;
  593            process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout)
  594        )
  595    ).
  596
  597% process_mqi_message manages the protocol for the connection: receive message, parse it, process it.
  598% - Reads a single message from Read_Stream.
  599% - Processes it and issues a response on Write_Stream.
  600% - The message will be unified with Command to allow the caller to handle it.
  601%
  602% Read_Stream and Write_Stream can be any valid stream using any encoding.
  603%
  604% True if the message understood. A response will always be sent.
  605% False if the message was malformed.
  606% Exceptions will be thrown by the underlying stream if there are communication failures writing to Write_Stream or the thread was asked to exit.
  607%
  608% state_* predicates manage the state transitions of the protocol
  609% They only bubble up exceptions if there is a communication failure
  610%
  611% state_process_command will never return false
  612% since errors should be sent to the client
  613% It can throw if there are communication failures, though.
  614process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :-
  615    debug(mqi(protocol), "Waiting for next message ...", []),
  616    (   state_receive_raw_message(Read_Stream, Message_String)
  617    ->  (   state_parse_command(Write_Stream, Message_String, Command, Binding_List)
  618        ->  state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List)
  619        ;   true
  620        )
  621    ;   false
  622    ).
  623
  624
  625% state_receive_raw_message: receive a raw message, which is simply a string
  626%   true: valid message received
  627%   false: invalid message format
  628%   exception: communication failure OR thread asked to exit
  629state_receive_raw_message(Read, Command_String) :-
  630    read_message(Read, Command_String),
  631    debug(mqi(protocol), "Valid message: ~w", [Command_String]).
  632
  633
  634% state_parse_command: attempt to parse the message string into a valid command
  635%
  636% Use read_term_from_atom instead of read_term(stream) so that we don't hang
  637% indefinitely if the caller didn't properly finish the term
  638% parse in the context of module 'user' to properly bind operators, do term expansion, etc
  639%
  640%   true: command could be parsed
  641%   false: command cannot be parsed.  An error is sent to the client in this case
  642%   exception: communication failure on sending a reply
  643state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :-
  644    (   catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true)
  645    ->  (   var(Parse_Exception)
  646        ->  debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command])
  647        ;   (   reply_error(Write_Stream, Parse_Exception),
  648                fail
  649            )
  650        )
  651    ;   (   reply_error(Write_Stream, error(couldNotParseCommand, _)),
  652            fail
  653        )
  654    ).
  655
  656
  657% state_process_command(): execute the requested Command
  658%
  659% First wait until we have removed all results from any previous query.
  660% If query_in_progress(Goal_Thread_ID) exists then there is at least one
  661% more result to drain, by definition. Because the predicate is
  662% deleted by get_next_result in the communication thread when the last result is drained
  663%
  664%   true: if the command itself succeeded, failed or threw an exception.
  665%         In that case, the outcome is sent to the client
  666%   exception: only communication or thread failures are allowed to bubble up
  667% See mqi(Options) documentation
  668state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :-
  669    !,
  670    debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]),
  671    repeat_until_false((
  672            query_in_progress(Goal_Thread_ID),
  673            debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
  674            heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
  675            debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
  676            debug(mqi(query), "    Discarded answer: ~w", [Unused_Answer])
  677        )),
  678    debug(mqi(protocol), "All previous results drained", []),
  679    send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true),
  680    heartbeat_until_result(Goal_Thread_ID, Stream, Answers),
  681    reply_with_result(Goal_Thread_ID, Stream, Answers).
  682
  683
  684% See mqi(Options) documentation for documentation
  685% See notes in run(Goal, Timeout) re: draining previous query
  686state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :-
  687    !,
  688    debug(mqi(protocol), "Command: run_async/1.", []),
  689    debug(mqi(query),  "   Goal: ~w", [Goal]),
  690    repeat_until_false((
  691            query_in_progress(Goal_Thread_ID),
  692            debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
  693            heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
  694            debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
  695            debug(mqi(query), "    Discarded answer: ~w", [Unused_Answer])
  696            )),
  697    debug(mqi(protocol), "All previous results drained", []),
  698    send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All),
  699    reply(Stream, true([[]])).
  700
  701
  702% See mqi(Options) documentation for documentation
  703state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :-
  704    !,
  705    debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]),
  706    (   once((var(Timeout) ; Timeout == -1))
  707    ->  Options = []
  708    ;   Options = [timeout(Timeout)]
  709    ),
  710    (   query_in_progress(Goal_Thread_ID)
  711    ->  (   (   debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
  712                get_next_result(Goal_Thread_ID, Stream, Options, Result)
  713            )
  714        ->  reply_with_result(Goal_Thread_ID, Stream, Result)
  715        ;   reply_error(Stream, result_not_available)
  716        )
  717   ;    (   debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
  718            reply_error(Stream, no_query)
  719        )
  720   ).
  721
  722
  723% See mqi(Options) documentation for documentation
  724% To ensure the goal thread is in a place it is safe to cancel,
  725% we lock a mutex first that the goal thread checks before exiting
  726% the "safe to cancel" zone.
  727% It is not in the safe zone: it either finished
  728% or was never running.
  729state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :-
  730    !,
  731    debug(mqi(protocol), "Command: cancel_async/0.", []),
  732    with_mutex(Goal_Thread_ID, (
  733        (   safe_to_cancel(Goal_Thread_ID)
  734        ->  (   thread_signal(Goal_Thread_ID, throw(cancel_goal)),
  735                reply(Stream, true([[]]))
  736            )
  737        ;   (   query_in_progress(Goal_Thread_ID)
  738            ->  (   debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
  739                    reply(Stream, true([[]]))
  740                )
  741            ;   (   debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
  742                    reply_error(Stream, no_query)
  743                )
  744            )
  745        )
  746    )).
  747
  748
  749% Used for testing how the system behaves when the goal thread is killed unexpectedly
  750% Needs to run a bogus command `run(true, -1)` to
  751% get the goal thread to process the exception
  752state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :-
  753    !,
  754    debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []),
  755    thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)),
  756    state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List).
  757
  758
  759state_process_command(Stream, _, _, close, _) :-
  760    !,
  761    reply(Stream, true([[]])).
  762
  763
  764state_process_command(Stream, _, _, quit, _) :-
  765    !,
  766    reply(Stream, true([[]])).
  767
  768
  769%  Send an exception if the command is not known
  770state_process_command(Stream, _, _, Command, _) :-
  771    debug(mqi(protocol), "Unknown command ~w", [Command]),
  772    reply_error(Stream, unknownCommand).
  773
  774
  775% Wait for a result (and put in Answers) from the goal thread, but send a heartbeat message
  776% every so often until it arrives to detect if the socket is broken.
  777% Throws if If the heartbeat failed which will
  778% and then shutdown the communication thread
  779% Tail recurse to not grow the stack
  780heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :-
  781    (   get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers)
  782    ->  debug(mqi(query), "Received answer from goal thread: ~w", [Answers])
  783    ;   (   debug(mqi(protocol), "heartbeat...", []),
  784            write_heartbeat(Stream),
  785            heartbeat_until_result(Goal_Thread_ID, Stream, Answers)
  786        )
  787    ).
  788
  789
  790% True if write succeeded, otherwise throws as that
  791% indicates that heartbeat failed because the other
  792% end of the pipe terminated
  793write_heartbeat(Stream) :-
  794    put_char(Stream, '.'),
  795    flush_output(Stream).
  796
  797
  798% Send a goal to the goal thread in its queue
  799%
  800% Remember that we are now running a query using assert.
  801%   This will be retracted once all the answers have been drained.
  802%
  803% If Goal_Thread_ID died, thread_send_message throws and, if we don't respond,
  804%   the client could hang so catch and give them a good message before propagating
  805%   the exception
  806send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :-
  807    (   var(Timeout)
  808    ->  Timeout = Default_Timeout
  809    ;   true
  810    ),
  811    (   var(Binding_List)
  812    ->  Binding_List = []
  813    ;   true
  814    ),
  815    debug(mqi(query),  "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]),
  816    assert(query_in_progress(Goal_Thread_ID)),
  817    catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true),
  818    (   var(Send_Message_Exception)
  819    ->  true
  820    ;   (   reply_error(Stream, connection_failed),
  821            throw(Send_Message_Exception)
  822        )
  823    ).
  824
  825
  826% Send a result from the goal thread to the communication thread in its queue
  827send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :-
  828    (   var(Exception_In_Goal)
  829    ->  (   (   debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]),
  830                Answer == []
  831            )
  832        ->  thread_send_message(Respond_To_Thread_ID, result(false, Find_All))
  833        ;   handle_constraints(Answer, Final_Answer),
  834            thread_send_message(Respond_To_Thread_ID, result(true(Final_Answer), Find_All))
  835        )
  836    ;   (   debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]),
  837            thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All))
  838        )
  839    ).
  840
  841
  842handle_constraints(Answer, Final_Answer) :-
  843    (   term_attvars(Answer, [])
  844    ->  Final_Answer = Answer
  845    ;   findall(    Single_Answer_With_Attributes,
  846                    (   member(Single_Answer, Answer),
  847                        copy_term(Single_Answer, Single_Answer_Copy, Attributes),
  848                        append(['$residuals' = Attributes], Single_Answer_Copy, Single_Answer_With_Attributes)
  849                    ),
  850                    Final_Answer
  851        ),
  852        debug(mqi(query), "Constraints detected, converted: ~w to ~w", [Answer, Final_Answer])
  853    ).
  854
  855
  856% Gets the next result from the goal thread in the communication thread queue,
  857% and retracts query_in_progress/1 when the last result has been sent.
  858% Find_All == true only returns one message, so delete query_in_progress
  859% no matter what it is
  860% \+ Find_All: There may be more than one result. The first one we hit with any exception
  861% (note that no_more_results is also returned as an exception) means we are done
  862get_next_result(Goal_Thread_ID, Stream, Options, Answers) :-
  863    (   thread_property(Goal_Thread_ID, status(running))
  864    ->  true
  865    ;   (   reply_error(Stream, connection_failed),
  866            throw(connection_failed)
  867        )
  868    ),
  869    thread_self(Self_ID),
  870    thread_get_message(Self_ID, result(Answers, Find_All), Options),
  871    (   Find_All
  872    ->  (   debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]),
  873            retractall(query_in_progress(Goal_Thread_ID))
  874        )
  875    ;   (   Answers = error(_)
  876        ->  (   debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]),
  877                retractall(query_in_progress(Goal_Thread_ID))
  878            )
  879        ;   true
  880        )
  881    ).
  882
  883
  884% reply_with_result predicates are used to consistently return
  885% answers for a query from either run() or run_async()
  886reply_with_result(_, Stream, error(Error)) :-
  887    !,
  888    reply_error(Stream, Error).
  889
  890% Gracefully handle exceptions that can occur during conversion to JSON
  891reply_with_result(_, Stream, Result) :-
  892    !,
  893    catch(reply(Stream, Result), error(Exception, _), reply_with_result(_, Stream, error(Exception))).
  894
  895
  896% Reply with a normal term
  897% Convert term to an actual JSON string
  898reply(Stream, Term) :-
  899    debug(mqi(query), "Responding with Term: ~w", [Term]),
  900    term_to_json_string(Term, Json_String),
  901    write_message(Stream, Json_String).
  902
  903
  904% Special handling for exceptions since they can have parts that are not
  905% "serializable". Ensures they they are always returned in an exception/1 term
  906reply_error(Stream, Error_Term) :-
  907    debug(mqi(query), "Responding with exception: ~w", [Error_Term]),
  908    (   error(Error_Value, _) = Error_Term
  909    ->  Response = exception(Error_Value)
  910    ;   (   atom(Error_Term)
  911        ->
  912            Response = exception(Error_Term)
  913        ;   (   compound_name_arity(Error_Term, Name, _),
  914                Response = exception(Name)
  915            )
  916        )
  917    ),
  918    reply(Stream, Response).
  919
  920
  921% Send and receive messages are simply strings preceded by their length + ".\n"
  922% i.e. "<stringlength>.\n<string>"
  923% The desired encoding must be set on the Stream before calling this predicate
  924
  925
  926% Writes the next message.
  927% Throws if there is an unexpected exception
  928write_message(Stream, String) :-
  929    write_string_length(Stream, String),
  930    write(Stream, String),
  931    flush_output(Stream).
  932
  933
  934% Reads the next message.
  935% Throws if there is an unexpected exception or thread has been requested to quit
  936% the length passed must match the actual number of bytes in the stream
  937% in whatever encoding is being used
  938read_message(Stream, String) :-
  939    read_string_length(Stream, Length),
  940    stream_property(Stream, encoding(Encoding)),
  941    setup_call_cleanup(
  942         stream_range_open(Stream, Tmp, [size(Length)]),
  943         ( set_stream(Tmp, encoding(Encoding)),
  944           read_string(Tmp, _, String)
  945         ),
  946         close(Tmp)).
  947
  948
  949% Terminate with '.\n' so we know that's the end of the count
  950write_string_length(Stream, String) :-
  951    stream_property(Stream, encoding(Encoding)),
  952    string_encoding_length(String, Encoding, Length),
  953    format(Stream, "~d.\n", [Length]).
  954
  955
  956% Note: read_term requires ".\n" after the length
  957% ... but does not consume the "\n"
  958read_string_length(Stream, Length) :-
  959    read_term(Stream, Length, []),
  960    get_char(Stream, _).
  961
  962
  963% converts a string to Codes using Encoding
  964string_encoding_length(String, Encoding, Length) :-
  965    setup_call_cleanup(
  966        open_null_stream(Out),
  967        (   set_stream(Out, encoding(Encoding)),
  968            write(Out, String),
  969            byte_count(Out, Length)
  970        ),
  971        close(Out)).
  972
  973
  974% Convert Prolog Term to a Prolog JSON term
  975% Add a final \n so that using netcat to debug works well
  976term_to_json_string(Term, Json_String) :-
  977    term_to_json(Term, Json),
  978    with_output_to(string(Json_String),
  979        (   current_output(Stream),
  980            json_write(Stream, Json),
  981            put(Stream, '\n')
  982        )).
  983
  984
  985% Execute the goal as once() without binding any variables
  986% and keep executing it until it returns false (or throws)
  987repeat_until_false(Goal) :-
  988    (\+ (\+ Goal)), !, repeat_until_false(Goal).
  989repeat_until_false(_).
  990
  991
  992% Used to kill a thread in an "expected" way so it doesn't leave around traces in thread_property/2 afterwards
  993%
  994% If the thread is alive OR it was already aborted (expected cases) then attempt to join
  995%   the thread so that no warnings are sent to the console. Other cases leave the thread for debugging.
  996% There are some fringe cases (like calling external code)
  997%   where the call might not return for a long time.  Do a timeout for those cases.
  998abortSilentExit(Thread_ID, Exception) :-
  999    catch(thread_signal(Thread_ID, abort), error(Exception, _), true),
 1000    debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]).
 1001% Workaround SWI Prolog bug: https://github.com/SWI-Prolog/swipl-devel/issues/852 by not joining
 1002% The workaround just stops joining the aborted thread, so an inert record will be left if thread_property/2 is called.
 1003%    ,
 1004%    (   once((var(Exception) ; catch(thread_property(Thread_ID, status(exception('$aborted'))), error(_, _), true)))
 1005%    ->  (   catch(call_with_time_limit(4, thread_join(Thread_ID)), error(JoinException1, JoinException2), true),
 1006%            debug(mqi(protocol), "thread_join attempted because thread: ~w exit was expected, exception: ~w", [Thread_ID, error(JoinException1, JoinException2)])
 1007%        )
 1008%    ;   true
 1009%    ).
 1010
 1011
 1012% Detach a thread that exits with true or false so that it doesn't leave around a record in thread_property/2 afterwards
 1013% Don't detach a thread if it exits because of an exception so we can debug using thread_property/2 afterwards
 1014%
 1015% However, `abort` is an expected exception but detaching a thread that aborts will leave an unwanted
 1016% thread_property/2 record *and* print a message to the console. To work around this,
 1017% the goal thread is always aborted by the communication thread using abortSilentExit.
 1018detach_if_expected(Thread_ID) :-
 1019    thread_property(Thread_ID, status(Status)),
 1020    debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]),
 1021    (   once((Status = true ; Status = false))
 1022    ->  (   debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]),
 1023            thread_detach(Thread_ID)
 1024        )
 1025    ;   true
 1026    ).
 1027
 1028
 1029write_output_to_file(File) :-
 1030    debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]),
 1031    open(File, write, Stream, [buffer(false)]),
 1032    set_prolog_IO(user_input, Stream, Stream).
 1033
 1034
 1035% Creates a Unix Domain Socket file in a secured directory.
 1036% Throws if the directory or file cannot be created in /tmp for any reason
 1037% Requirements for this file are:
 1038%    - The Prolog process will attempt to create and, if Prolog exits cleanly,
 1039%           delete this file when the server closes.  This means the directory
 1040%           must have the appropriate permissions to allow the Prolog process
 1041%           to do so.
 1042%    - For security reasons, the filename should not be predictable and the
 1043%           directory it is contained in should have permissions set so that files
 1044%           created are only accessible to the current user.
 1045%    - The path must be below 92 *bytes* long (including null terminator) to
 1046%           be portable according to the Linux documentation
 1047%
 1048% tmp_file finds the right /tmp directory, even on Mac OS, so the path is small
 1049% Set 700 (rwx------)  permission so it is only accessible by current user
 1050% Create a secure tmp file in the new directory
 1051% {set,current}_prolog_flag is copied to a thread, so no need to use a mutex.
 1052% Close the stream so sockets can use it
 1053unix_domain_socket_path(Created_Directory, File_Path) :-
 1054    tmp_file(udsock, Created_Directory),
 1055    make_directory(Created_Directory),
 1056    catch(  chmod(Created_Directory, urwx),
 1057            Exception,
 1058            (   catch(delete_directory(Created_Directory), error(_, _), true),
 1059                throw(Exception)
 1060            )
 1061    ),
 1062    setup_call_cleanup( (   current_prolog_flag(tmp_dir, Save_Tmp_Dir),
 1063                            set_prolog_flag(tmp_dir, Created_Directory)
 1064                        ),
 1065                        tmp_file_stream(File_Path, Stream, []),
 1066                        set_prolog_flag(tmp_dir, Save_Tmp_Dir)
 1067                      ),
 1068    close(Stream).
 1069
 1070
 1071% Helper for installing the mqi.pl file to the right
 1072% library directory.
 1073% Call using swipl -s mqi.pl -g "mqi:install_to_library('mqi.pl')" -t halt
 1074install_to_library(File) :-
 1075    once(find_library(Path)),
 1076    copy_file(File, Path),
 1077    make.
 1078
 1079
 1080% Find the base library path, i.e. the one that ends in
 1081% "library/"
 1082find_library(Path) :-
 1083    file_alias_path(library, Path),
 1084    atomic_list_concat(Parts, '/', Path),
 1085    reverse(Parts, Parts_Reverse),
 1086    nth0(0, Parts_Reverse, ''),
 1087    nth0(1, Parts_Reverse, Library),
 1088    string_lower(Library, 'library')