View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2022, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread_httpd,
   38          [ http_current_server/2,      % ?:Goal, ?Port
   39            http_server_property/2,     % ?Port, ?Property
   40            http_server/2,              % :Goal, +Options
   41            http_workers/2,             % +Port, ?WorkerCount
   42            http_add_worker/2,          % +Port, +Options
   43            http_current_worker/2,      % ?Port, ?ThreadID
   44            http_stop_server/2,         % +Port, +Options
   45            http_spawn/2,               % :Goal, +Options
   46
   47            http_requeue/1,             % +Request
   48            http_close_connection/1,    % +Request
   49            http_enough_workers/3       % +Queue, +Why, +Peer
   50          ]).   51:- use_module(library(debug)).   52:- use_module(library(error)).   53:- use_module(library(option)).   54:- use_module(library(socket)).   55:- use_module(library(thread_pool)).   56:- use_module(library(gensym)).   57:- use_module(http_wrapper).   58:- use_module(http_path).   59
   60:- autoload(library(uri), [uri_resolve/3]).   61
   62:- predicate_options(http_server/2, 2,
   63                     [ port(any),
   64                       unix_socket(atom),
   65                       entry_page(atom),
   66                       tcp_socket(any),
   67                       workers(positive_integer),
   68                       timeout(number),
   69                       keep_alive_timeout(number),
   70                       silent(boolean),
   71                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   72                       pass_to(system:thread_create/3, 3)
   73                     ]).   74:- predicate_options(http_spawn/2, 2,
   75                     [ pool(atom),
   76                       pass_to(system:thread_create/3, 3),
   77                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   78                     ]).   79:- predicate_options(http_add_worker/2, 2,
   80                     [ timeout(number),
   81                       keep_alive_timeout(number),
   82                       max_idle_time(number),
   83                       pass_to(system:thread_create/3, 3)
   84                     ]).   85
   86/** <module> Threaded HTTP server
   87
   88Most   code   doesn't   need  to   use  this   directly;  instead   use
   89library(http/http_server),  which  combines   this  library  with   the
   90typical HTTP libraries that most servers need.
   91
   92This library defines the HTTP server  frontend of choice for SWI-Prolog.
   93It is based on the multi-threading   capabilities of SWI-Prolog and thus
   94exploits multiple cores  to  serve   requests  concurrently.  The server
   95scales well and can cooperate with   library(thread_pool) to control the
   96number of concurrent requests of a given   type.  For example, it can be
   97configured to handle 200 file download requests concurrently, 2 requests
   98that potentially uses a lot of memory and   8 requests that use a lot of
   99CPU resources.
  100
  101On   Unix   systems,    this    library     can    be    combined   with
  102library(http/http_unix_daemon) to realise a proper  Unix service process
  103that creates a web server at  port   80,  runs under a specific account,
  104optionally detaches from the controlling terminal, etc.
  105
  106Combined with library(http/http_ssl_plugin) from the   SSL package, this
  107library   can   be   used   to    create     an    HTTPS   server.   See
  108<plbase>/doc/packages/examples/ssl/https for an example   server using a
  109self-signed SSL certificate.
  110*/
  111
  112:- meta_predicate
  113    http_server(1, :),
  114    http_current_server(1, ?),
  115    http_spawn(0, +).  116
  117:- dynamic
  118    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  119    queue_worker/2,         % Queue, ThreadID
  120    queue_options/2.        % Queue, Options
  121
  122:- multifile
  123    make_socket_hook/3,
  124    accept_hook/2,
  125    close_hook/1,
  126    open_client_hook/6,
  127    discard_client_hook/1,
  128    http:create_pool/1,
  129    http:schedule_workers/1.  130
  131:- meta_predicate
  132    thread_repeat_wait(0).  133
  134%!  http_server(:Goal, :Options) is det.
  135%
  136%   Create a server at Port that calls Goal for each parsed request.
  137%   Options provide a list of options. Defined options are
  138%
  139%     * port(?Address)
  140%     Port to bind to.  Address is either a port or a term
  141%     Host:Port. The port may be a variable, causing the system
  142%     to select a free port.  See tcp_bind/2.
  143%
  144%     * unix_socket(+Path)
  145%     Instead of binding to a TCP port, bind to a _Unix Domain
  146%     Socket_ at Path.
  147%
  148%     * entry_page(+URI)
  149%     Affects the message printed while the server is started.
  150%     Interpreted as a URI relative to the server root.
  151%
  152%     * tcp_socket(+Socket)
  153%     If provided, use this socket instead of the creating one and
  154%     binding it to an address.  The socket must be bound to an
  155%     address.
  156%
  157%     * workers(+Count)
  158%     Determine the number of worker threads.  Default is 5.  This
  159%     is fine for small scale usage.  Public servers typically need
  160%     a higher number.
  161%
  162%     * timeout(+Seconds)
  163%     Maximum time of inactivity trying to read the request after a
  164%     connection has been opened.  Default is 60 seconds.  See
  165%     set_stream/1 using the _timeout_ option.
  166%
  167%     * keep_alive_timeout(+Seconds)
  168%     Time to keep `Keep alive' connections alive.  Default is
  169%     2 seconds.
  170%
  171%     * stack_limit(+Bytes)
  172%     Stack limit to use for the workers.  The default is inherited
  173%     from the `main` thread.
  174%     If you need to control resource usage you may consider the
  175%     `spawn` option of http_handler/3 and library(thread_pool).
  176%
  177%     * silent(Bool)
  178%     If `true` (default `false`), do not print an informational
  179%     message that the server was started.
  180%
  181%   A  typical  initialization  for  an    HTTP   server  that  uses
  182%   http_dispatch/1 to relay requests to predicates is:
  183%
  184%     ==
  185%     :- use_module(library(http/thread_httpd)).
  186%     :- use_module(library(http/http_dispatch)).
  187%
  188%     start_server(Port) :-
  189%         http_server(http_dispatch, [port(Port)]).
  190%     ==
  191%
  192%   Note that multiple servers  can  coexist   in  the  same  Prolog
  193%   process. A notable application of this is   to have both an HTTP
  194%   and HTTPS server, where the HTTP   server redirects to the HTTPS
  195%   server for handling sensitive requests.
  196
  197http_server(Goal, M:Options0) :-
  198    server_address(Address, Options0),
  199    !,
  200    make_socket(Address, M:Options0, Options),
  201    create_workers(Options),
  202    create_server(Goal, Address, Options),
  203    (   option(silent(true), Options0)
  204    ->  true
  205    ;   print_message(informational,
  206                      httpd_started_server(Address, Options0))
  207    ).
  208http_server(_Goal, _:Options0) :-
  209    existence_error(server_address, Options0).
  210
  211server_address(Address, Options) :-
  212    (   option(port(Port), Options)
  213    ->  Address = Port
  214    ;   option(unix_socket(Path), Options)
  215    ->  Address = unix_socket(Path)
  216    ).
  217
  218address_port(_IFace:Port, Port) :- !.
  219address_port(unix_socket(Path), Path) :- !.
  220address_port(Address, Address) :- !.
  221
  222tcp_address(Port) :-
  223    var(Port),
  224    !.
  225tcp_address(Port) :-
  226    integer(Port),
  227    !.
  228tcp_address(_Iface:_Port).
  229
  230%!  make_socket(+Address, :OptionsIn, -OptionsOut) is det.
  231%
  232%   Create the HTTP server socket and  worker pool queue. OptionsOut
  233%   is quaranteed to hold the option queue(QueueId).
  234%
  235%   @arg   OptionsIn   is   qualified   to     allow   passing   the
  236%   module-sensitive ssl option argument.
  237
  238make_socket(Address, M:Options0, Options) :-
  239    tcp_address(Address),
  240    make_socket_hook(Address, M:Options0, Options),
  241    !.
  242make_socket(Address, _:Options0, Options) :-
  243    option(tcp_socket(_), Options0),
  244    !,
  245    make_addr_atom('httpd', Address, Queue),
  246    Options = [ queue(Queue)
  247              | Options0
  248              ].
  249make_socket(Address, _:Options0, Options) :-
  250    tcp_address(Address),
  251    !,
  252    tcp_socket(Socket),
  253    tcp_setopt(Socket, reuseaddr),
  254    tcp_bind(Socket, Address),
  255    tcp_listen(Socket, 64),
  256    make_addr_atom('httpd', Address, Queue),
  257    Options = [ queue(Queue),
  258                tcp_socket(Socket)
  259              | Options0
  260              ].
  261:- if(current_predicate(unix_domain_socket/1)).  262make_socket(Address, _:Options0, Options) :-
  263    Address = unix_socket(Path),
  264    !,
  265    unix_domain_socket(Socket),
  266    tcp_bind(Socket, Path),
  267    tcp_listen(Socket, 64),
  268    make_addr_atom('httpd', Address, Queue),
  269    Options = [ queue(Queue),
  270                tcp_socket(Socket)
  271              | Options0
  272              ].
  273:- endif.  274
  275%!  make_addr_atom(+Scheme, +Address, -Atom) is det.
  276%
  277%   Create an atom that identifies  the   server's  queue and thread
  278%   resources.
  279
  280make_addr_atom(Scheme, Address, Atom) :-
  281    phrase(address_parts(Address), Parts),
  282    atomic_list_concat([Scheme,@|Parts], Atom).
  283
  284address_parts(Var) -->
  285    { var(Var),
  286      !,
  287      instantiation_error(Var)
  288    }.
  289address_parts(Atomic) -->
  290    { atomic(Atomic) },
  291    !,
  292    [Atomic].
  293address_parts(Host:Port) -->
  294    !,
  295    address_parts(Host), [:], address_parts(Port).
  296address_parts(ip(A,B,C,D)) -->
  297    !,
  298    [ A, '.', B, '.', C, '.', D ].
  299address_parts(unix_socket(Path)) -->
  300    [Path].
  301address_parts(Address) -->
  302    { domain_error(http_server_address, Address) }.
  303
  304
  305%!  create_server(:Goal, +Address, +Options) is det.
  306%
  307%   Create the main server thread that runs accept_server/2 to
  308%   listen to new requests.
  309
  310create_server(Goal, Address, Options) :-
  311    get_time(StartTime),
  312    memberchk(queue(Queue), Options),
  313    scheme(Scheme, Options),
  314    autoload_https(Scheme),
  315    address_port(Address, Port),
  316    make_addr_atom(Scheme, Port, Alias),
  317    thread_self(Initiator),
  318    thread_create(accept_server(Goal, Initiator, Options), _,
  319                  [ alias(Alias)
  320                  ]),
  321    thread_get_message(server_started),
  322    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  323
  324scheme(Scheme, Options) :-
  325    option(scheme(Scheme), Options),
  326    !.
  327scheme(Scheme, Options) :-
  328    (   option(ssl(_), Options)
  329    ;   option(ssl_instance(_), Options)
  330    ),
  331    !,
  332    Scheme = https.
  333scheme(http, _).
  334
  335autoload_https(https) :-
  336    \+ clause(accept_hook(_Goal, _Options), _),
  337    exists_source(library(http/http_ssl_plugin)),
  338    !,
  339    use_module(library(http/http_ssl_plugin)).
  340autoload_https(_).
  341
  342%!  http_current_server(:Goal, ?Port) is nondet.
  343%
  344%   True if Goal is the goal of a server at Port.
  345%
  346%   @deprecated Use http_server_property(Port, goal(Goal))
  347
  348http_current_server(Goal, Port) :-
  349    current_server(Port, Goal, _, _, _, _).
  350
  351
  352%!  http_server_property(?Port, ?Property) is nondet.
  353%
  354%   True if Property is a property of the HTTP server running at
  355%   Port.  Defined properties are:
  356%
  357%       * goal(:Goal)
  358%       Goal used to start the server. This is often
  359%       http_dispatch/1.
  360%       * scheme(-Scheme)
  361%       Scheme is one of `http` or `https`.
  362%       * start_time(?Time)
  363%       Time-stamp when the server was created.
  364
  365http_server_property(_:Port, Property) :-
  366    integer(Port),
  367    !,
  368    server_property(Property, Port).
  369http_server_property(Port, Property) :-
  370    server_property(Property, Port).
  371
  372server_property(goal(Goal), Port) :-
  373    current_server(Port, Goal, _, _, _, _).
  374server_property(scheme(Scheme), Port) :-
  375    current_server(Port, _, _, _, Scheme, _).
  376server_property(start_time(Time), Port) :-
  377    current_server(Port, _, _, _, _, Time).
  378
  379
  380%!  http_workers(+Port, -Workers) is det.
  381%!  http_workers(+Port, +Workers:int) is det.
  382%
  383%   Query or set the number of workers  for the server at this port.
  384%   The number of workers is dynamically   modified. Setting it to 1
  385%   (one) can be used to profile the worker using tprofile/1.
  386
  387http_workers(Port, Workers) :-
  388    must_be(ground, Port),
  389    current_server(Port, _, _, Queue, _, _),
  390    !,
  391    (   integer(Workers)
  392    ->  resize_pool(Queue, Workers)
  393    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  394        length(WorkerIDs, Workers)
  395    ).
  396http_workers(Port, _) :-
  397    existence_error(http_server, Port).
  398
  399
  400%!  http_add_worker(+Port, +Options) is det.
  401%
  402%   Add a new worker to  the  HTTP   server  for  port Port. Options
  403%   overrule the default queue  options.   The  following additional
  404%   options are processed:
  405%
  406%     - max_idle_time(+Seconds)
  407%     The created worker will automatically terminate if there is
  408%     no new work within Seconds.
  409
  410http_add_worker(Port, Options) :-
  411    must_be(ground, Port),
  412    current_server(Port, _, _, Queue, _, _),
  413    !,
  414    queue_options(Queue, QueueOptions),
  415    merge_options(Options, QueueOptions, WorkerOptions),
  416    atom_concat(Queue, '_', AliasBase),
  417    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  418http_add_worker(Port, _) :-
  419    existence_error(http_server, Port).
  420
  421
  422%!  http_current_worker(?Port, ?ThreadID) is nondet.
  423%
  424%   True if ThreadID is the identifier   of  a Prolog thread serving
  425%   Port. This predicate is  motivated  to   allow  for  the  use of
  426%   arbitrary interaction with the worker thread for development and
  427%   statistics.
  428
  429http_current_worker(Port, ThreadID) :-
  430    current_server(Port, _, _, Queue, _, _),
  431    queue_worker(Queue, ThreadID).
  432
  433
  434%!  accept_server(:Goal, +Initiator, +Options)
  435%
  436%   The goal of a small server-thread accepting new requests and
  437%   posting them to the queue of workers.
  438
  439accept_server(Goal, Initiator, Options) :-
  440    catch(accept_server2(Goal, Initiator, Options), http_stop, true),
  441    thread_self(Thread),
  442    debug(http(stop), '[~p]: accept server received http_stop', [Thread]),
  443    retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)),
  444    close_pending_accepts(Queue),
  445    close_server_socket(Options).
  446
  447accept_server2(Goal, Initiator, Options) :-
  448    thread_send_message(Initiator, server_started),
  449    repeat,
  450      (   catch(accept_server3(Goal, Options), E, true)
  451      ->  (   var(E)
  452          ->  fail
  453          ;   accept_rethrow_error(E)
  454          ->  throw(E)
  455          ;   print_message(error, E),
  456              fail
  457          )
  458      ;   print_message(error,      % internal error
  459                        goal_failed(accept_server3(Goal, Options))),
  460          fail
  461      ).
  462
  463accept_server3(Goal, Options) :-
  464    accept_hook(Goal, Options),
  465    !.
  466accept_server3(Goal, Options) :-
  467    memberchk(tcp_socket(Socket), Options),
  468    memberchk(queue(Queue), Options),
  469    debug(http(connection), 'Waiting for connection', []),
  470    tcp_accept(Socket, Client, Peer),
  471    sig_atomic(send_to_worker(Queue, Client, Goal, Peer)),
  472    http_enough_workers(Queue, accept, Peer).
  473
  474send_to_worker(Queue, Client, Goal, Peer) :-
  475    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  476    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  477
  478accept_rethrow_error(http_stop).
  479accept_rethrow_error('$aborted').
  480
  481
  482%!  close_server_socket(+Options)
  483%
  484%   Close the server socket.
  485
  486close_server_socket(Options) :-
  487    close_hook(Options),
  488    !.
  489close_server_socket(Options) :-
  490    memberchk(tcp_socket(Socket), Options),
  491    !,
  492    tcp_close_socket(Socket).
  493
  494%!  close_pending_accepts(+Queue)
  495
  496close_pending_accepts(Queue) :-
  497    (   thread_get_message(Queue, Msg, [timeout(0)])
  498    ->  close_client(Msg),
  499        close_pending_accepts(Queue)
  500    ;   true
  501    ).
  502
  503close_client(tcp_client(Client, _Goal, _0Peer)) =>
  504    debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]),
  505    tcp_close_socket(Client).
  506close_client(Msg) =>
  507    (   discard_client_hook(Msg)
  508    ->  true
  509    ;   print_message(warning, http_close_client(Msg))
  510    ).
  511
  512
  513%!  http_stop_server(+Port, +Options)
  514%
  515%   Stop the indicated  HTTP  server   gracefully.  First  stops all
  516%   workers, then stops the server.
  517%
  518%   @tbd    Realise non-graceful stop
  519
  520http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  521    ground(Host),
  522    !,
  523    http_stop_server(Port, Options).
  524http_stop_server(Port, _Options) :-
  525    http_workers(Port, 0),                  % checks Port is ground
  526    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  527    retractall(queue_options(Queue, _)),
  528    debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]),
  529    thread_signal(Thread, throw(http_stop)),
  530    catch(connect(localhost:Port), _, true),
  531    thread_join(Thread, _0Status),
  532    debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]),
  533    message_queue_destroy(Queue).
  534
  535connect(Address) :-
  536    setup_call_cleanup(
  537        tcp_socket(Socket),
  538        tcp_connect(Socket, Address),
  539        tcp_close_socket(Socket)).
  540
  541%!  http_enough_workers(+Queue, +Why, +Peer) is det.
  542%
  543%   Check that we have enough workers in our queue. If not, call the
  544%   hook http:schedule_workers/1 to extend  the   worker  pool. This
  545%   predicate can be used by accept_hook/2.
  546
  547http_enough_workers(Queue, _Why, _Peer) :-
  548    message_queue_property(Queue, waiting(_0)),
  549    !,
  550    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  551http_enough_workers(Queue, Why, Peer) :-
  552    message_queue_property(Queue, size(Size)),
  553    (   enough(Size, Why)
  554    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  555    ;   current_server(Port, _, _, Queue, _, _),
  556        Data = _{ port:Port,
  557                  reason:Why,
  558                  peer:Peer,
  559                  waiting:Size,
  560                  queue:Queue
  561                },
  562        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  563        catch(http:schedule_workers(Data),
  564              Error,
  565              print_message(error, Error))
  566    ->  true
  567    ;   true
  568    ).
  569
  570enough(0, _).
  571enough(1, keep_alive).                  % I will be ready myself
  572
  573
  574%!  http:schedule_workers(+Data:dict) is semidet.
  575%
  576%   Hook called if a  new  connection   or  a  keep-alive connection
  577%   cannot be scheduled _immediately_ to a worker. Dict contains the
  578%   following keys:
  579%
  580%     - port:Port
  581%     Port number that identifies the server.
  582%     - reason:Reason
  583%     One of =accept= for a new connection or =keep_alive= if a
  584%     worker tries to reschedule itself.
  585%     - peer:Peer
  586%     Identify the other end of the connection
  587%     - waiting:Size
  588%     Number of messages waiting in the queue.
  589%     - queue:Queue
  590%     Message queue used to dispatch accepted messages.
  591%
  592%   Note that, when called with `reason:accept`,   we  are called in
  593%   the time critical main accept loop.   An  implementation of this
  594%   hook shall typically send  the  event   to  thread  dedicated to
  595%   dynamic worker-pool management.
  596%
  597%   @see    http_add_worker/2 may be used to create (temporary) extra
  598%           workers.
  599
  600
  601                 /*******************************
  602                 *    WORKER QUEUE OPERATIONS   *
  603                 *******************************/
  604
  605%!  create_workers(+Options)
  606%
  607%   Create the pool of HTTP worker-threads. Each worker has the
  608%   alias http_worker_N.
  609
  610create_workers(Options) :-
  611    option(workers(N), Options, 5),
  612    option(queue(Queue), Options),
  613    catch(message_queue_create(Queue), _, true),
  614    atom_concat(Queue, '_', AliasBase),
  615    create_workers(1, N, Queue, AliasBase, Options),
  616    assert(queue_options(Queue, Options)).
  617
  618create_workers(I, N, _, _, _) :-
  619    I > N,
  620    !.
  621create_workers(I, N, Queue, AliasBase, Options) :-
  622    gensym(AliasBase, Alias),
  623    thread_create(http_worker(Options), Id,
  624                  [ alias(Alias)
  625                  | Options
  626                  ]),
  627    assertz(queue_worker(Queue, Id)),
  628    I2 is I + 1,
  629    create_workers(I2, N, Queue, AliasBase, Options).
  630
  631
  632%!  resize_pool(+Queue, +Workers) is det.
  633%
  634%   Create or destroy workers. If workers   are  destroyed, the call
  635%   waits until the desired number of waiters is reached.
  636
  637resize_pool(Queue, Size) :-
  638    findall(W, queue_worker(Queue, W), Workers),
  639    length(Workers, Now),
  640    (   Now < Size
  641    ->  queue_options(Queue, Options),
  642        atom_concat(Queue, '_', AliasBase),
  643        I0 is Now+1,
  644        create_workers(I0, Size, Queue, AliasBase, Options)
  645    ;   Now == Size
  646    ->  true
  647    ;   Now > Size
  648    ->  Excess is Now - Size,
  649        thread_self(Me),
  650        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  651        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  652    ).
  653
  654
  655%!  http_worker(+Options)
  656%
  657%   Run HTTP worker main loop. Workers   simply  wait until they are
  658%   passed an accepted socket to process  a client.
  659%
  660%   If the message quit(Sender) is read   from the queue, the worker
  661%   stops.
  662
  663http_worker(Options) :-
  664    debug(http(scheduler), 'New worker', []),
  665    prolog_listen(this_thread_exit, done_worker),
  666    option(queue(Queue), Options),
  667    option(max_idle_time(MaxIdle), Options, infinite),
  668    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  669      debug(http(worker), 'Waiting for a job ...', []),
  670      debug(http(worker), 'Got job ~p', [Message]),
  671      (   Message = quit(Sender)
  672      ->  !,
  673          thread_self(Self),
  674          thread_detach(Self),
  675          (   Sender == idle
  676          ->  true
  677          ;   retract(queue_worker(Queue, Self)),
  678              thread_send_message(Sender, quitted(Self))
  679          )
  680      ;   open_client(Message, Queue, Goal, In, Out,
  681                      Options, ClientOptions),
  682          (   catch(http_process(Goal, In, Out, ClientOptions),
  683                    Error, true)
  684          ->  true
  685          ;   Error = goal_failed(http_process/4)
  686          ),
  687          (   var(Error)
  688          ->  fail
  689          ;   current_message_level(Error, Level),
  690              print_message(Level, Error),
  691              memberchk(peer(Peer), ClientOptions),
  692              close_connection(Peer, In, Out),
  693              fail
  694          )
  695      ).
  696
  697get_work(Queue, Message, infinite) :-
  698    !,
  699    thread_get_message(Queue, Message).
  700get_work(Queue, Message, MaxIdle) :-
  701    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  702    ->  true
  703    ;   Message = quit(idle)
  704    ).
  705
  706
  707%!  open_client(+Message, +Queue, -Goal, -In, -Out,
  708%!              +Options, -ClientOptions) is semidet.
  709%
  710%   Opens the connection to the client in a worker from the message
  711%   sent to the queue by accept_server/2.
  712
  713open_client(requeue(In, Out, Goal, ClOpts),
  714            _, Goal, In, Out, Opts, ClOpts) :-
  715    !,
  716    memberchk(peer(Peer), ClOpts),
  717    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  718    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  719open_client(Message, Queue, Goal, In, Out, Opts,
  720            [ pool(client(Queue, Goal, In, Out)),
  721              timeout(Timeout)
  722            | Options
  723            ]) :-
  724    catch(open_client(Message, Goal, In, Out, Options, Opts),
  725          E, report_error(E)),
  726    option(timeout(Timeout), Opts, 60),
  727    (   debugging(http(connection))
  728    ->  memberchk(peer(Peer), Options),
  729        debug(http(connection), 'Opened connection from ~p', [Peer])
  730    ;   true
  731    ).
  732
  733
  734%!  open_client(+Message, +Goal, -In, -Out,
  735%!              -ClientOptions, +Options) is det.
  736
  737open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  738    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  739    !.
  740open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  741            [ peer(Peer),
  742              protocol(http)
  743            ], _) :-
  744    tcp_open_socket(Socket, In, Out).
  745
  746report_error(E) :-
  747    print_message(error, E),
  748    fail.
  749
  750
  751%!  check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet.
  752%
  753%   Wait for the client for at most  TimeOut seconds. Succeed if the
  754%   client starts a new request within   this  time. Otherwise close
  755%   the connection and fail.
  756
  757check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  758    stream_property(In, timeout(Old)),
  759    set_stream(In, timeout(TMO)),
  760    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  761    catch(peek_code(In, Code), E, true),
  762    (   var(E),                     % no exception
  763        Code \== -1                 % no end-of-file
  764    ->  set_stream(In, timeout(Old)),
  765        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  766    ;   (   Code == -1
  767        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  768        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  769        ),
  770        close_connection(Peer, In, Out),
  771        fail
  772    ).
  773
  774
  775%!  done_worker
  776%
  777%   Called when worker is terminated  due   to  http_workers/2  or a
  778%   (debugging) exception. In  the   latter  case, recreate_worker/2
  779%   creates a new worker.
  780
  781done_worker :-
  782    thread_self(Self),
  783    thread_detach(Self),
  784    retract(queue_worker(Queue, Self)),
  785    thread_property(Self, status(Status)),
  786    !,
  787    (   catch(recreate_worker(Status, Queue), _, fail)
  788    ->  print_message(informational,
  789                      httpd_restarted_worker(Self))
  790    ;   done_status_message_level(Status, Level),
  791        print_message(Level,
  792                      httpd_stopped_worker(Self, Status))
  793    ).
  794done_worker :-                                  % received quit(Sender)
  795    thread_self(Self),
  796    thread_property(Self, status(Status)),
  797    done_status_message_level(Status, Level),
  798    print_message(Level,
  799                  httpd_stopped_worker(Self, Status)).
  800
  801done_status_message_level(true, silent) :- !.
  802done_status_message_level(exception('$aborted'), silent) :- !.
  803done_status_message_level(_, informational).
  804
  805
  806%!  recreate_worker(+Status, +Queue) is semidet.
  807%
  808%   Deal with the possibility  that   threads  are,  during development,
  809%   killed with abort/0. We recreate the worker to avoid that eventually
  810%   we run out of workers. If  we  are   aborted  due  to a halt/0 call,
  811%   thread_create/3 will raise a permission error.
  812%
  813%   The first clause deals with the possibility  that we cannot write to
  814%   `user_error`. This is possible when Prolog   is started as a service
  815%   using some service managers. Would be  nice   if  we  could write an
  816%   error, but where?
  817
  818recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  819    halt(2).
  820recreate_worker(exception(Error), Queue) :-
  821    recreate_on_error(Error),
  822    queue_options(Queue, Options),
  823    atom_concat(Queue, '_', AliasBase),
  824    create_workers(1, 1, Queue, AliasBase, Options).
  825
  826recreate_on_error('$aborted').
  827recreate_on_error(time_limit_exceeded).
  828
  829%!  thread_httpd:message_level(+Exception, -Level)
  830%
  831%   Determine the message stream used  for   exceptions  that  may occur
  832%   during server_loop/5. Being multifile, clauses can   be added by the
  833%   application to refine error handling.   See  also message_hook/3 for
  834%   further programming error handling.
  835
  836:- multifile
  837    message_level/2.  838
  839message_level(error(io_error(read, _), _),               silent).
  840message_level(error(socket_error(epipe,_), _),           silent).
  841message_level(error(http_write_short(_Obj,_Written), _), silent).
  842message_level(error(timeout_error(read, _), _),          informational).
  843message_level(keep_alive_timeout,                        silent).
  844
  845current_message_level(Term, Level) :-
  846    (   message_level(Term, Level)
  847    ->  true
  848    ;   Level = error
  849    ).
  850
  851
  852%!  http_requeue(+Header)
  853%
  854%   Re-queue a connection to  the  worker   pool.  This  deals  with
  855%   processing additional requests on keep-alive connections.
  856
  857http_requeue(Header) :-
  858    requeue_header(Header, ClientOptions),
  859    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  860    memberchk(peer(Peer), ClientOptions),
  861    http_enough_workers(Queue, keep_alive, Peer),
  862    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  863    !.
  864http_requeue(Header) :-
  865    debug(http(error), 'Re-queue failed: ~p', [Header]),
  866    fail.
  867
  868requeue_header([], []).
  869requeue_header([H|T0], [H|T]) :-
  870    requeue_keep(H),
  871    !,
  872    requeue_header(T0, T).
  873requeue_header([_|T0], T) :-
  874    requeue_header(T0, T).
  875
  876requeue_keep(pool(_)).
  877requeue_keep(peer(_)).
  878requeue_keep(protocol(_)).
  879
  880
  881%!  http_process(Message, Queue, +Options)
  882%
  883%   Handle a single client message on the given stream.
  884
  885http_process(Goal, In, Out, Options) :-
  886    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  887          [Goal, In, Out]),
  888    option(timeout(TMO), Options, 60),
  889    set_stream(In, timeout(TMO)),
  890    set_stream(Out, timeout(TMO)),
  891    http_wrapper(Goal, In, Out, Connection,
  892                 [ request(Request)
  893                 | Options
  894                 ]),
  895    next(Connection, Request).
  896
  897next(Connection, Request) :-
  898    next_(Connection, Request), !.
  899next(Connection, Request) :-
  900    print_message(warning, goal_failed(next(Connection,Request))).
  901
  902next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  903    !,
  904    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  905    (   catch(call(SwitchGoal, In, Out), E,
  906              (   print_message(error, E),
  907                  fail))
  908    ->  true
  909    ;   http_close_connection(Request)
  910    ).
  911next_(spawned(ThreadId), _) :-
  912    !,
  913    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  914next_(Connection, Request) :-
  915    downcase_atom(Connection, 'keep-alive'),
  916    http_requeue(Request),
  917    !.
  918next_(_, Request) :-
  919    http_close_connection(Request).
  920
  921
  922%!  http_close_connection(+Request)
  923%
  924%   Close connection associated to Request.  See also http_requeue/1.
  925
  926http_close_connection(Request) :-
  927    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  928    memberchk(peer(Peer), Request),
  929    close_connection(Peer, In, Out).
  930
  931%!  close_connection(+Peer, +In, +Out)
  932%
  933%   Closes the connection from the server to the client.  Errors are
  934%   currently silently ignored.
  935
  936close_connection(Peer, In, Out) :-
  937    debug(http(connection), 'Closing connection from ~p', [Peer]),
  938    catch(close(In, [force(true)]), _, true),
  939    catch(close(Out, [force(true)]), _, true).
  940
  941%!  http_spawn(:Goal, +Options) is det.
  942%
  943%   Continue this connection on a  new   thread.  A handler may call
  944%   http_spawn/2 to start a new thread that continues processing the
  945%   current request using Goal. The original   thread returns to the
  946%   worker pool for processing new requests.   Options are passed to
  947%   thread_create/3, except for:
  948%
  949%       * pool(+Pool)
  950%       Interfaces to library(thread_pool), starting the thread
  951%       on the given pool.
  952%
  953%   If a pool does not exist, this predicate calls the multifile
  954%   hook http:create_pool/1 to create it. If this predicate succeeds
  955%   the operation is retried.
  956
  957http_spawn(Goal, Options) :-
  958    select_option(pool(Pool), Options, ThreadOptions),
  959    !,
  960    current_output(CGI),
  961    catch(thread_create_in_pool(Pool,
  962                                wrap_spawned(CGI, Goal), Id,
  963                                [ detached(true)
  964                                | ThreadOptions
  965                                ]),
  966          Error,
  967          true),
  968    (   var(Error)
  969    ->  http_spawned(Id)
  970    ;   Error = error(resource_error(threads_in_pool(_)), _)
  971    ->  throw(http_reply(busy))
  972    ;   Error = error(existence_error(thread_pool, Pool), _),
  973        create_pool(Pool)
  974    ->  http_spawn(Goal, Options)
  975    ;   throw(Error)
  976    ).
  977http_spawn(Goal, Options) :-
  978    current_output(CGI),
  979    thread_create(wrap_spawned(CGI, Goal), Id,
  980                  [ detached(true)
  981                  | Options
  982                  ]),
  983    http_spawned(Id).
  984
  985wrap_spawned(CGI, Goal) :-
  986    set_output(CGI),
  987    http_wrap_spawned(Goal, Request, Connection),
  988    next(Connection, Request).
  989
  990%!  create_pool(+Pool)
  991%
  992%   Lazy  creation  of  worker-pools  for   the  HTTP  server.  This
  993%   predicate calls the hook http:create_pool/1.   If the hook fails
  994%   it creates a default pool of size   10. This should suffice most
  995%   typical usecases. Note that we  get   a  permission error if the
  996%   pool is already created.  We can ignore this.
  997
  998create_pool(Pool) :-
  999    E = error(permission_error(create, thread_pool, Pool), _),
 1000    catch(http:create_pool(Pool), E, true).
 1001create_pool(Pool) :-
 1002    print_message(informational, httpd(created_pool(Pool))),
 1003    thread_pool_create(Pool, 10, []).
 1004
 1005
 1006		 /*******************************
 1007		 *         WAIT POLICIES	*
 1008		 *******************************/
 1009
 1010:- meta_predicate
 1011    thread_repeat_wait(0). 1012
 1013%!  thread_repeat_wait(:Goal) is multi.
 1014%
 1015%   Acts as `repeat,  thread_idle(Goal)`,  choosing   whether  to  use a
 1016%   `long` or `short` idle time based on the average firing rate.
 1017
 1018thread_repeat_wait(Goal) :-
 1019    new_rate_mma(5, 1000, State),
 1020    repeat,
 1021      update_rate_mma(State, MMA),
 1022      long(MMA, IsLong),
 1023      (   IsLong == brief
 1024      ->  call(Goal)
 1025      ;   thread_idle(Goal, IsLong)
 1026      ).
 1027
 1028long(MMA, brief) :-
 1029    MMA < 0.05,
 1030    !.
 1031long(MMA, short) :-
 1032    MMA < 1,
 1033    !.
 1034long(_, long).
 1035
 1036%!  new_rate_mma(+N, +Resolution, -State) is det.
 1037%!  update_rate_mma(!State, -MMA) is det.
 1038%
 1039%   Implement _Modified Moving  Average_  computing   the  average  time
 1040%   between requests as an exponential moving averate with alpha=1/N.
 1041%
 1042%   @arg Resolution is the time resolution  in 1/Resolution seconds. All
 1043%   storage is done in integers to avoid  the need for stack freezing in
 1044%   nb_setarg/3.
 1045%
 1046%   @see https://en.wikipedia.org/wiki/Moving_average
 1047
 1048new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
 1049    current_prolog_flag(max_tagged_integer, MaxI),
 1050    get_time(Base).
 1051
 1052update_rate_mma(State, MMAr) :-
 1053    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
 1054    get_time(Now),
 1055    Stamp is round((Now-Base)*Resolution),
 1056    (   Stamp > MaxI
 1057    ->  nb_setarg(1, State, Now),
 1058        nb_setarg(2, State, 0)
 1059    ;   true
 1060    ),
 1061    Diff is Stamp-Last,
 1062    nb_setarg(2, State, Stamp),
 1063    MMA is round(((N-1)*MMA0+Diff)/N),
 1064    nb_setarg(6, State, MMA),
 1065    MMAr is MMA/float(Resolution).
 1066
 1067
 1068                 /*******************************
 1069                 *            MESSAGES          *
 1070                 *******************************/
 1071
 1072:- multifile
 1073    prolog:message/3. 1074
 1075prolog:message(httpd_started_server(Port, Options)) -->
 1076    [ 'Started server at '-[] ],
 1077    http_root(Port, Options).
 1078prolog:message(httpd_stopped_worker(Self, Status)) -->
 1079    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1080prolog:message(httpd_restarted_worker(Self)) -->
 1081    [ 'Replaced aborted worker ~p'-[Self] ].
 1082prolog:message(httpd(created_pool(Pool))) -->
 1083    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1084      'Create this pool at startup-time or define the hook ', nl,
 1085      'http:create_pool/1 to avoid this message and create a ', nl,
 1086      'pool that fits the usage-profile.'
 1087    ].
 1088
 1089http_root(Address, Options) -->
 1090    { landing_page(Address, URI, Options) },
 1091    [ '~w'-[URI] ].
 1092
 1093landing_page(Host:Port, URI, Options) :-
 1094    !,
 1095    must_be(atom, Host),
 1096    must_be(integer, Port),
 1097    http_server_property(Port, scheme(Scheme)),
 1098    (   default_port(Scheme, Port)
 1099    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1100    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1101    ),
 1102    entry_page(Base, URI, Options).
 1103landing_page(unix_socket(Path), URI, _Options) :-
 1104    !,
 1105    format(string(URI), 'Unix domain socket "~w"', [Path]).
 1106landing_page(Port, URI, Options) :-
 1107    landing_page(localhost:Port, URI, Options).
 1108
 1109default_port(http, 80).
 1110default_port(https, 443).
 1111
 1112entry_page(Base, URI, Options) :-
 1113    option(entry_page(Entry), Options),
 1114    !,
 1115    uri_resolve(Entry, Base, URI).
 1116entry_page(Base, URI, _) :-
 1117    http_absolute_location(root(.), Entry, []),
 1118    uri_resolve(Entry, Base, URI)