. (utf8) 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2020, Torbjörn Lager, 8 VU University Amsterdam 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(pengines, 38 [ pengine_create/1, % +Options 39 pengine_ask/3, % +Pengine, :Query, +Options 40 pengine_next/2, % +Pengine. +Options 41 pengine_stop/2, % +Pengine. +Options 42 pengine_event/2, % -Event, +Options 43 pengine_input/2, % +Prompt, -Term 44 pengine_output/1, % +Term 45 pengine_respond/3, % +Pengine, +Input, +Options 46 pengine_debug/2, % +Format, +Args 47 pengine_self/1, % -Pengine 48 pengine_pull_response/2, % +Pengine, +Options 49 pengine_destroy/1, % +Pengine 50 pengine_destroy/2, % +Pengine, +Options 51 pengine_abort/1, % +Pengine 52 pengine_application/1, % +Application 53 current_pengine_application/1, % ?Application 54 pengine_property/2, % ?Pengine, ?Property 55 pengine_user/1, % -User 56 pengine_event_loop/2, % :Closure, +Options 57 pengine_rpc/2, % +Server, :Goal 58 pengine_rpc/3 % +Server, :Goal, +Options 59 ]).
70:- autoload(library(aggregate),[aggregate_all/3]). 71:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 72:- autoload(library(broadcast),[broadcast/1]). 73:- autoload(library(charsio),[open_chars_stream/2]). 74:- autoload(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 75:- autoload(library(error), 76 [ must_be/2, 77 existence_error/2, 78 permission_error/3, 79 domain_error/2 80 ]). 81:- autoload(library(filesex),[directory_file_path/3]). 82:- autoload(library(listing),[listing/1]). 83:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 84:- autoload(library(modules),[in_temporary_module/3]). 85:- autoload(library(occurs),[sub_term/2]). 86:- autoload(library(option), 87 [select_option/3,option/2,option/3,select_option/4]). 88:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 89:- autoload(library(sandbox),[safe_goal/1]). 90:- autoload(library(statistics),[thread_statistics/2]). 91:- autoload(library(term_to_json),[term_to_json/2]). 92:- autoload(library(thread_pool), 93 [thread_pool_create/3,thread_create_in_pool/4]). 94:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 95:- autoload(library(uri), 96 [ uri_components/2, 97 uri_query_components/2, 98 uri_data/3, 99 uri_data/4, 100 uri_encoded/3 101 ]). 102:- autoload(library(http/http_client),[http_read_data/3]). 103:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 104:- autoload(library(http/http_dispatch), 105 [http_handler/3,http_404/2,http_reply_file/3]). 106:- autoload(library(http/http_open),[http_open/3]). 107:- autoload(library(http/http_parameters),[http_parameters/2]). 108:- autoload(library(http/http_stream),[is_cgi_stream/1]). 109:- autoload(library(http/http_wrapper),[http_peer/2]). 110 111:- use_module(library(settings),[setting/2,setting/4]). 112:- use_module(library(http/http_json), 113 [http_read_json_dict/2,reply_json/1]). 114 115:- if(exists_source(library(uuid))). 116:- autoload(library(uuid), [uuid/2]). 117:- endif. 118 119 120:- meta_predicate 121 pengine_create( ), 122 pengine_rpc( , , ), 123 pengine_event_loop( , ). 124 125:- multifile 126 write_result/3, % +Format, +Event, +Dict 127 event_to_json/3, % +Event, -JSON, +Format 128 prepare_module/3, % +Module, +Application, +Options 129 prepare_goal/3, % +GoalIn, -GoalOut, +Options 130 authentication_hook/3, % +Request, +Application, -User 131 not_sandboxed/2. % +User, +App 132 133:- predicate_options(pengine_create/1, 1, 134 [ id(-atom), 135 alias(atom), 136 application(atom), 137 destroy(boolean), 138 server(atom), 139 ask(compound), 140 template(compound), 141 chunk(integer), 142 bindings(list), 143 src_list(list), 144 src_text(any), % text 145 src_url(atom), 146 src_predicates(list) 147 ]). 148:- predicate_options(pengine_ask/3, 3, 149 [ template(any), 150 chunk(integer), 151 bindings(list) 152 ]). 153:- predicate_options(pengine_next/2, 2, 154 [ chunk(integer), 155 pass_to(pengine_send/3, 3) 156 ]). 157:- predicate_options(pengine_stop/2, 2, 158 [ pass_to(pengine_send/3, 3) 159 ]). 160:- predicate_options(pengine_respond/3, 2, 161 [ pass_to(pengine_send/3, 3) 162 ]). 163:- predicate_options(pengine_rpc/3, 3, 164 [ chunk(integer), 165 pass_to(pengine_create/1, 1) 166 ]). 167:- predicate_options(pengine_send/3, 3, 168 [ delay(number) 169 ]). 170:- predicate_options(pengine_event/2, 2, 171 [ listen(atom), 172 pass_to(thread_get_message/3, 3) 173 ]). 174:- predicate_options(pengine_pull_response/2, 2, 175 [ pass_to(http_open/3, 3) 176 ]). 177:- predicate_options(pengine_event_loop/2, 2, 178 []). % not yet implemented 179 180% :- debug(pengine(transition)). 181:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 182 183goal_expansion(random_delay, Expanded) :- 184 ( debugging(pengine(delay)) 185 -> Expanded = do_random_delay 186 ; Expanded = true 187 ). 188 189do_random_delay :- 190 Delay is random(20)/1000, 191 sleep(Delay). 192 193:- meta_predicate % internal meta predicates 194 solve( , , , ), 195 findnsols_no_empty( , , , ), 196 pengine_event_loop( , , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
251pengine_create(M:Options0) :-
252 translate_local_sources(Options0, Options, M),
253 ( select_option(server(BaseURL), Options, RestOptions)
254 -> remote_pengine_create(BaseURL, RestOptions)
255 ; local_pengine_create(Options)
256 ).
src_predicates
and src_list
options into
src_text
. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
270translate_local_sources(OptionsIn, Options, Module) :- 271 translate_local_sources(OptionsIn, Sources, Options2, Module), 272 ( Sources == [] 273 -> Options = Options2 274 ; Sources = [Source] 275 -> Options = [src_text(Source)|Options2] 276 ; atomics_to_string(Sources, Source) 277 -> Options = [src_text(Source)|Options2] 278 ). 279 280translate_local_sources([], [], [], _). 281translate_local_sources([H0|T], [S0|S], Options, M) :- 282 nonvar(H0), 283 translate_local_source(H0, S0, M), 284 !, 285 translate_local_sources(T, S, Options, M). 286translate_local_sources([H|T0], S, [H|T], M) :- 287 translate_local_sources(T0, S, T, M). 288 289translate_local_source(src_predicates(PIs), Source, M) :- 290 must_be(list, PIs), 291 with_output_to(string(Source), 292 maplist(list_in_module(M), PIs)). 293translate_local_source(src_list(Terms), Source, _) :- 294 must_be(list, Terms), 295 with_output_to(string(Source), 296 forall(member(Term, Terms), 297 format('~k .~n', [Term]))). 298translate_local_source(src_text(Source), Source, _). 299 300list_in_module(M, PI) :- 301 listing(M:PI).
pengine_send(NameOrID, Term, [])
.
*/
308pengine_send(Target, Event) :-
309 pengine_send(Target, Event, []).
Any remaining options are passed to http_open/3. */
324pengine_send(Target, Event, Options) :- 325 must_be(atom, Target), 326 pengine_send2(Target, Event, Options). 327 328pengine_send2(self, Event, Options) :- 329 !, 330 thread_self(Queue), 331 delay_message(queue(Queue), Event, Options). 332pengine_send2(Name, Event, Options) :- 333 child(Name, Target), 334 !, 335 delay_message(pengine(Target), Event, Options). 336pengine_send2(Target, Event, Options) :- 337 delay_message(pengine(Target), Event, Options). 338 339delay_message(Target, Event, Options) :- 340 option(delay(Delay), Options), 341 !, 342 alarm(Delay, 343 send_message(Target, Event, Options), 344 _AlarmID, 345 [remove(true)]). 346delay_message(Target, Event, Options) :- 347 random_delay, 348 send_message(Target, Event, Options). 349 350send_message(queue(Queue), Event, _) :- 351 thread_send_message(Queue, pengine_request(Event)). 352send_message(pengine(Pengine), Event, Options) :- 353 ( pengine_remote(Pengine, Server) 354 -> remote_pengine_send(Server, Pengine, Event, Options) 355 ; pengine_thread(Pengine, Thread) 356 -> thread_send_message(Thread, pengine_request(Event)) 357 ; existence_error(pengine, Pengine) 358 ).
idle_limit
setting while using thread_idle/2 to minimis
resources.368pengine_request(Request) :- 369 thread_self(Me), 370 thread_get_message(Me, pengine_request(Request), [timeout(1)]), 371 !. 372pengine_request(Request) :- 373 pengine_self(Self), 374 get_pengine_application(Self, Application), 375 setting(Application:idle_limit, IdleLimit0), 376 IdleLimit is IdleLimit0-1, 377 thread_self(Me), 378 ( thread_idle(thread_get_message(Me, pengine_request(Request), 379 [timeout(IdleLimit)]), 380 long) 381 -> true 382 ; Request = destroy 383 ).
If the message cannot be sent within the idle_limit
setting of
the pengine, abort the pengine.
396pengine_reply(Event) :- 397 pengine_parent(Queue), 398 pengine_reply(Queue, Event). 399 400pengine_reply(_Queue, _Event0) :- 401 nb_current(pengine_idle_limit_exceeded, true), 402 !. 403pengine_reply(Queue, Event0) :- 404 arg(1, Event0, ID), 405 wrap_first_answer(ID, Event0, Event), 406 random_delay, 407 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 408 ( pengine_self(ID), 409 \+ pengine_detached(ID, _) 410 -> get_pengine_application(ID, Application), 411 setting(Application:idle_limit, IdleLimit), 412 debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]), 413 ( thread_send_message(Queue, pengine_event(ID, Event), 414 [ timeout(IdleLimit) 415 ]) 416 -> true 417 ; thread_self(Me), 418 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 419 [ID, Me]), 420 nb_setval(pengine_idle_limit_exceeded, true), 421 thread_detach(Me), 422 abort 423 ) 424 ; thread_send_message(Queue, pengine_event(ID, Event)) 425 ). 426 427wrap_first_answer(ID, Event0, CreateEvent) :- 428 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 429 arg(1, CreateEvent, ID), 430 !, 431 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 432wrap_first_answer(_ID, Event, Event). 433 434 435empty_queue :- 436 pengine_parent(Queue), 437 empty_queue(Queue, 0, Discarded), 438 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 439 440empty_queue(Queue, C0, C) :- 441 thread_get_message(Queue, _Term, [timeout(0)]), 442 !, 443 C1 is C0+1, 444 empty_queue(Queue, C1, C). 445empty_queue(_, C, C).
Options is a list of options:
Name = Var
terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true
or false
, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :- partition(pengine_ask_option, Options, AskOptions, SendOptions), pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
510pengine_ask(ID, Query, Options) :- 511 partition(pengine_ask_option, Options, AskOptions, SendOptions), 512 pengine_send(ID, ask(Query, AskOptions), SendOptions). 513 514 515pengine_ask_option(template(_)). 516pengine_ask_option(chunk(_)). 517pengine_ask_option(bindings(_)). 518pengine_ask_option(breakpoints(_)).
Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :- pengine_send(ID, next, Options).
*/
562pengine_next(ID, Options) :- 563 select_option(chunk(Count), Options, Options1), 564 !, 565 pengine_send(ID, next(Count), Options1). 566pengine_next(ID, Options) :- 567 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
*/
583pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
594pengine_abort(Name) :-
595 ( child(Name, Pengine)
596 -> true
597 ; Pengine = Name
598 ),
599 ( pengine_remote(Pengine, Server)
600 -> remote_pengine_abort(Server, Pengine, [])
601 ; pengine_thread(Pengine, Thread),
602 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
603 catch(thread_signal(Thread, throw(abort_query)), _, true)
604 ).
force(true)
, the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/614pengine_destroy(ID) :- 615 pengine_destroy(ID, []). 616 617pengine_destroy(Name, Options) :- 618 ( child(Name, ID) 619 -> true 620 ; ID = Name 621 ), 622 option(force(true), Options), 623 !, 624 ( pengine_thread(ID, Thread) 625 -> catch(thread_signal(Thread, abort), 626 error(existence_error(thread, _), _), true) 627 ; true 628 ). 629pengine_destroy(ID, _) :- 630 catch(pengine_send(ID, destroy), 631 error(existence_error(pengine, ID), _), 632 retractall(child(_,ID))). 633 634 635/*================= pengines administration ======================= 636*/
thread(ThreadId)
remote(URL)
647:- dynamic 648 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 649 pengine_queue/4, % Id, Queue, TimeOut, Time 650 output_queue/3, % Id, Queue, Time 651 pengine_user/2, % Id, User 652 pengine_data/2, % Id, Data 653 pengine_detached/2. % Id, Data 654:- volatile 655 current_pengine/6, 656 pengine_queue/4, 657 output_queue/3, 658 pengine_user/2, 659 pengine_data/2, 660 pengine_detached/2. 661 662:- thread_local 663 child/2. % ?Name, ?Child
669pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 670 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 671 672pengine_register_remote(Id, URL, Application, Destroy) :- 673 thread_self(Queue), 674 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http
and the queue is the
message queue used to send events to the HTTP workers.682pengine_unregister(Id) :- 683 thread_self(Me), 684 ( current_pengine(Id, Queue, Me, http, _, _) 685 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 686 ; true 687 ), 688 retractall(current_pengine(Id, _, Me, _, _, _)), 689 retractall(pengine_user(Id, _)), 690 retractall(pengine_data(Id, _)). 691 692pengine_unregister_remote(Id) :- 693 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
699pengine_self(Id) :- 700 thread_self(Thread), 701 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 702 703pengine_parent(Parent) :- 704 nb_getval(pengine_parent, Parent). 705 706pengine_thread(Pengine, Thread) :- 707 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 708 Thread \== 0, 709 !. 710 711pengine_remote(Pengine, URL) :- 712 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 713 714get_pengine_application(Pengine, Application) :- 715 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 716 !. 717 718get_pengine_module(Pengine, Pengine). 719 720:- if(current_predicate(uuid/2)). 721pengine_uuid(Id) :- 722 uuid(Id, [version(4)]). % Version 4 is random. 723:- else. 724pengine_uuid(Id) :- 725 ( current_prolog_flag(max_integer, Max1) 726 -> Max is Max1-1 727 ; Max is 1<<128 728 ), 729 random_between(0, Max, Num), 730 atom_number(Id, Num). 731:- endif.
This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.
748:- meta_predicate protect_pengine( , ). 749 750protect_pengine(Id, Goal) :- 751 term_hash(Id, Hash), 752 LockN is Hash mod 64, 753 atom_concat(pengine_done_, LockN, Lock), 754 with_mutex(Lock, 755 ( pengine_thread(Id, _) 756 -> Goal 757 ; Goal 758 )).
pengine_sandbox
. The example below creates a new application
address_book
and imports the API defined in the module file
adress_book_api.pl
into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
775pengine_application(Application) :- 776 throw(error(context_error(nodirective, 777 pengine_application(Application)), _)). 778 779:- multifile 780 system:term_expansion/2, 781 current_application/1.
789current_pengine_application(Application) :- 790 current_application(Application). 791 792 793% Default settings for all applications 794 795:- setting(thread_pool_size, integer, 100, 796 'Maximum number of pengines this application can run.'). 797:- setting(thread_pool_stacks, list(compound), [], 798 'Maximum stack sizes for pengines this application can run.'). 799:- setting(slave_limit, integer, 3, 800 'Maximum number of slave pengines a master pengine can create.'). 801:- setting(time_limit, number, 300, 802 'Maximum time to wait for output'). 803:- setting(idle_limit, number, 300, 804 'Pengine auto-destroys when idle for this time'). 805:- setting(safe_goal_limit, number, 10, 806 'Maximum time to try proving safety of the goal'). 807:- setting(program_space, integer, 100_000_000, 808 'Maximum memory used by predicates'). 809:- setting(allow_from, list(atom), [*], 810 'IP addresses from which remotes are allowed to connect'). 811:- setting(deny_from, list(atom), [], 812 'IP addresses from which remotes are NOT allowed to connect'). 813:- setting(debug_info, boolean, false, 814 'Keep information to support source-level debugging'). 815 816 817systemterm_expansion((:- pengine_application(Application)), Expanded) :- 818 must_be(atom, Application), 819 ( module_property(Application, file(_)) 820 -> permission_error(create, pengine_application, Application) 821 ; true 822 ), 823 expand_term((:- setting(Application:thread_pool_size, integer, 824 setting(pengines:thread_pool_size), 825 'Maximum number of pengines this \c 826 application can run.')), 827 ThreadPoolSizeSetting), 828 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 829 setting(pengines:thread_pool_stacks), 830 'Maximum stack sizes for pengines \c 831 this application can run.')), 832 ThreadPoolStacksSetting), 833 expand_term((:- setting(Application:slave_limit, integer, 834 setting(pengines:slave_limit), 835 'Maximum number of local slave pengines \c 836 a master pengine can create.')), 837 SlaveLimitSetting), 838 expand_term((:- setting(Application:time_limit, number, 839 setting(pengines:time_limit), 840 'Maximum time to wait for output')), 841 TimeLimitSetting), 842 expand_term((:- setting(Application:idle_limit, number, 843 setting(pengines:idle_limit), 844 'Pengine auto-destroys when idle for this time')), 845 IdleLimitSetting), 846 expand_term((:- setting(Application:safe_goal_limit, number, 847 setting(pengines:safe_goal_limit), 848 'Maximum time to try proving safety of the goal')), 849 SafeGoalLimitSetting), 850 expand_term((:- setting(Application:program_space, integer, 851 setting(pengines:program_space), 852 'Maximum memory used by predicates')), 853 ProgramSpaceSetting), 854 expand_term((:- setting(Application:allow_from, list(atom), 855 setting(pengines:allow_from), 856 'IP addresses from which remotes are allowed \c 857 to connect')), 858 AllowFromSetting), 859 expand_term((:- setting(Application:deny_from, list(atom), 860 setting(pengines:deny_from), 861 'IP addresses from which remotes are NOT \c 862 allowed to connect')), 863 DenyFromSetting), 864 expand_term((:- setting(Application:debug_info, boolean, 865 setting(pengines:debug_info), 866 'Keep information to support source-level \c 867 debugging')), 868 DebugInfoSetting), 869 flatten([ pengines:current_application(Application), 870 ThreadPoolSizeSetting, 871 ThreadPoolStacksSetting, 872 SlaveLimitSetting, 873 TimeLimitSetting, 874 IdleLimitSetting, 875 SafeGoalLimitSetting, 876 ProgramSpaceSetting, 877 AllowFromSetting, 878 DenyFromSetting, 879 DebugInfoSetting 880 ], Expanded). 881 882% Register default application 883 884:- pengine_application(pengine_sandbox).
alias
option when creating the pengine.true
if the pengines is destroyed automatically
after completing the query.debug_info
is present.921pengine_property(Id, Prop) :- 922 nonvar(Id), nonvar(Prop), 923 pengine_property2(Prop, Id), 924 !. 925pengine_property(Id, Prop) :- 926 pengine_property2(Prop, Id). 927 928pengine_property2(self(Id), Id) :- 929 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 930pengine_property2(module(Id), Id) :- 931 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 932pengine_property2(alias(Alias), Id) :- 933 child(Alias, Id), 934 Alias \== Id. 935pengine_property2(thread(Thread), Id) :- 936 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 937 Thread \== 0. 938pengine_property2(remote(Server), Id) :- 939 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 940pengine_property2(application(Application), Id) :- 941 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 942pengine_property2(destroy(Destroy), Id) :- 943 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 944pengine_property2(parent(Parent), Id) :- 945 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 946pengine_property2(source(SourceID, Source), Id) :- 947 pengine_data(Id, source(SourceID, Source)). 948pengine_property2(detached(When), Id) :- 949 pengine_detached(Id, When).
956pengine_output(Term) :-
957 pengine_self(Me),
958 pengine_reply(output(Me, Term)).
console.log(Message)
if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message])
. The debug
topic pengine(debug)
is enabled by default.
973pengine_debug(Format, Args) :- 974 pengine_parent(Queue), 975 pengine_self(Self), 976 catch(safe_goal(format(atom(_), Format, Args)), E, true), 977 ( var(E) 978 -> format(atom(Message), Format, Args) 979 ; message_to_string(E, Message) 980 ), 981 pengine_reply(Queue, debug(Self, Message)). 982 983 984/*================= Local pengine ======================= 985*/
996local_pengine_create(Options) :-
997 thread_self(Self),
998 option(application(Application), Options, pengine_sandbox),
999 create(Self, Child, Options, local, Application),
1000 option(alias(Name), Options, Child),
1001 assert(child(Name, Child)).
1008:- multifile thread_pool:create_pool/1. 1009 1010thread_poolcreate_pool(Application) :- 1011 current_application(Application), 1012 setting(Application:thread_pool_size, Size), 1013 setting(Application:thread_pool_stacks, Stacks), 1014 thread_pool_create(Application, Size, Stacks).
1024create(Queue, Child, Options, local, Application) :- 1025 !, 1026 pengine_child_id(Child), 1027 create0(Queue, Child, Options, local, Application). 1028create(Queue, Child, Options, URL, Application) :- 1029 pengine_child_id(Child), 1030 catch(create0(Queue, Child, Options, URL, Application), 1031 Error, 1032 create_error(Queue, Child, Error)). 1033 1034pengine_child_id(Child) :- 1035 ( nonvar(Child) 1036 -> true 1037 ; pengine_uuid(Child) 1038 ). 1039 1040create_error(Queue, Child, Error) :- 1041 pengine_reply(Queue, error(Child, Error)). 1042 1043create0(Queue, Child, Options, URL, Application) :- 1044 ( current_application(Application) 1045 -> true 1046 ; existence_error(pengine_application, Application) 1047 ), 1048 ( URL \== http % pengine is _not_ a child of the 1049 % HTTP server thread 1050 -> aggregate_all(count, child(_,_), Count), 1051 setting(Application:slave_limit, Max), 1052 ( Count >= Max 1053 -> throw(error(resource_error(max_pengines), _)) 1054 ; true 1055 ) 1056 ; true 1057 ), 1058 partition(pengine_create_option, Options, PengineOptions, RestOptions), 1059 thread_create_in_pool( 1060 Application, 1061 pengine_main(Queue, PengineOptions, Application), ChildThread, 1062 [ at_exit(pengine_done) 1063 | RestOptions 1064 ]), 1065 option(destroy(Destroy), PengineOptions, true), 1066 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 1067 thread_send_message(ChildThread, pengine_registered(Child)), 1068 ( option(id(Id), Options) 1069 -> Id = Child 1070 ; true 1071 ). 1072 1073pengine_create_option(src_text(_)). 1074pengine_create_option(src_url(_)). 1075pengine_create_option(application(_)). 1076pengine_create_option(destroy(_)). 1077pengine_create_option(ask(_)). 1078pengine_create_option(template(_)). 1079pengine_create_option(bindings(_)). 1080pengine_create_option(chunk(_)). 1081pengine_create_option(alias(_)). 1082pengine_create_option(user(_)).
at_exit
option. Destroys child
pengines using pengine_destroy/1. Cleaning up the Pengine is
synchronised by the pengine_done
mutex. See read_event/6.1091:- public 1092 pengine_done/0. 1093 1094pengine_done :- 1095 thread_self(Me), 1096 ( thread_property(Me, status(exception('$aborted'))), 1097 thread_detach(Me), 1098 pengine_self(Pengine) 1099 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1100 error(_,_), true) 1101 ; true 1102 ), 1103 forall(child(_Name, Child), 1104 pengine_destroy(Child)), 1105 pengine_self(Id), 1106 protect_pengine(Id, pengine_unregister(Id)).
1114:- thread_local wrap_first_answer_in_create_event/2. 1115 1116:- meta_predicate 1117 pengine_prepare_source( , ). 1118 1119pengine_main(Parent, Options, Application) :- 1120 fix_streams, 1121 thread_get_message(pengine_registered(Self)), 1122 nb_setval(pengine_parent, Parent), 1123 pengine_register_user(Options), 1124 set_prolog_flag(mitigate_spectre, true), 1125 catch(in_temporary_module( 1126 Self, 1127 pengine_prepare_source(Application, Options), 1128 pengine_create_and_loop(Self, Application, Options)), 1129 prepare_source_failed, 1130 pengine_terminate(Self)). 1131 1132pengine_create_and_loop(Self, Application, Options) :- 1133 setting(Application:slave_limit, SlaveLimit), 1134 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1135 ( option(ask(Query0), Options) 1136 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1137 ( string(Query0) % string is not callable 1138 -> ( option(template(TemplateS), Options) 1139 -> Ask2 = Query0-TemplateS 1140 ; Ask2 = Query0 1141 ), 1142 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1143 Error, true), 1144 ( var(Error) 1145 -> true 1146 ; send_error(Error), 1147 throw(prepare_source_failed) 1148 ) 1149 ; Query = Query0, 1150 option(template(Template), Options, Query), 1151 option(bindings(Bindings), Options, []) 1152 ), 1153 option(chunk(Chunk), Options, 1), 1154 pengine_ask(Self, Query, 1155 [ template(Template), 1156 chunk(Chunk), 1157 bindings(Bindings) 1158 ]) 1159 ; Extra = [], 1160 pengine_reply(CreateEvent) 1161 ), 1162 pengine_main_loop(Self).
1172ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1173 !, 1174 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1175 term_string(t(Template1,Ask1), AskTemplate, 1176 [ variable_names(Bindings0), 1177 module(Module) 1178 ]), 1179 phrase(template_bindings(Template1, Bindings0), Bindings). 1180ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1181 term_string(Ask1, Ask, 1182 [ variable_names(Bindings), 1183 module(Module) 1184 ]), 1185 exclude(anon, Bindings, Bindings1), 1186 dict_create(Template, swish_default_template, Bindings1). 1187 1188template_bindings(Var, Bindings) --> 1189 { var(Var) }, !, 1190 ( { var_binding(Bindings, Var, Binding) 1191 } 1192 -> [Binding] 1193 ; [] 1194 ). 1195template_bindings([H|T], Bindings) --> 1196 !, 1197 template_bindings(H, Bindings), 1198 template_bindings(T, Bindings). 1199template_bindings(Compoound, Bindings) --> 1200 { compound(Compoound), !, 1201 compound_name_arguments(Compoound, _, Args) 1202 }, 1203 template_bindings(Args, Bindings). 1204template_bindings(_, _) --> []. 1205 1206var_binding(Bindings, Var, Binding) :- 1207 member(Binding, Bindings), 1208 arg(2, Binding, V), 1209 V == Var, !.
1216fix_streams :- 1217 fix_stream(current_output). 1218 1219fix_stream(Name) :- 1220 is_cgi_stream(Name), 1221 !, 1222 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1223 set_stream(user_output, alias(Name)). 1224fix_stream(_).
1233pengine_prepare_source(Module:Application, Options) :- 1234 setting(Application:program_space, SpaceLimit), 1235 set_module(Module:program_space(SpaceLimit)), 1236 delete_import_module(Module, user), 1237 add_import_module(Module, Application, start), 1238 catch(prep_module(Module, Application, Options), Error, true), 1239 ( var(Error) 1240 -> true 1241 ; send_error(Error), 1242 throw(prepare_source_failed) 1243 ). 1244 1245prep_module(Module, Application, Options) :- 1246 maplist(copy_flag(Module, Application), [var_prefix]), 1247 forall(prepare_module(Module, Application, Options), true), 1248 setup_call_cleanup( 1249 '$set_source_module'(OldModule, Module), 1250 maplist(process_create_option(Module), Options), 1251 '$set_source_module'(OldModule)). 1252 1253copy_flag(Module, Application, Flag) :- 1254 current_prolog_flag(ApplicationFlag, Value), 1255 !, 1256 set_prolog_flag(ModuleFlag, Value). 1257copy_flag(_, _, _). 1258 1259process_create_option(Application, src_text(Text)) :- 1260 !, 1261 pengine_src_text(Text, Application). 1262process_create_option(Application, src_url(URL)) :- 1263 !, 1264 pengine_src_url(URL, Application). 1265process_create_option(_, _).
src_text
and
src_url
options1288pengine_main_loop(ID) :- 1289 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1290 1291pengine_aborted(ID) :- 1292 thread_self(Self), 1293 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1294 empty_queue, 1295 destroy_or_continue(abort(ID)).
1308guarded_main_loop(ID) :- 1309 pengine_request(Request), 1310 ( Request = destroy 1311 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1312 pengine_terminate(ID) 1313 ; Request = ask(Goal, Options) 1314 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1315 ask(ID, Goal, Options) 1316 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1317 pengine_reply(error(ID, error(protocol_error, _))), 1318 guarded_main_loop(ID) 1319 ). 1320 1321 1322pengine_terminate(ID) :- 1323 pengine_reply(destroy(ID)), 1324 thread_self(Me), % Make the thread silently disappear 1325 thread_detach(Me).
1336solve(Chunk, Template, Goal, ID) :- 1337 prolog_current_choice(Choice), 1338 State = count(Chunk), 1339 statistics(cputime, Epoch), 1340 Time = time(Epoch), 1341 nb_current('$variable_names', Bindings), 1342 filter_template(Template, Bindings, Template2), 1343 '$current_typein_module'(CurrTypeIn), 1344 ( '$set_typein_module'(ID), 1345 call_cleanup(catch(findnsols_no_empty(State, Template2, 1346 set_projection(Goal, Bindings), 1347 Result), 1348 Error, true), 1349 query_done(Det, CurrTypeIn)), 1350 arg(1, Time, T0), 1351 statistics(cputime, T1), 1352 CPUTime is T1-T0, 1353 ( var(Error) 1354 -> projection(Projection), 1355 ( var(Det) 1356 -> pengine_reply(success(ID, Result, Projection, 1357 CPUTime, true)), 1358 more_solutions(ID, Choice, State, Time) 1359 ; !, % commit 1360 destroy_or_continue(success(ID, Result, Projection, 1361 CPUTime, false)) 1362 ) 1363 ; !, % commit 1364 ( Error == abort_query 1365 -> throw(Error) 1366 ; destroy_or_continue(error(ID, Error)) 1367 ) 1368 ) 1369 ; !, % commit 1370 arg(1, Time, T0), 1371 statistics(cputime, T1), 1372 CPUTime is T1-T0, 1373 destroy_or_continue(failure(ID, CPUTime)) 1374 ). 1375solve(_, _, _, _). % leave a choice point 1376 1377query_done(true, CurrTypeIn) :- 1378 '$set_typein_module'(CurrTypeIn).
1387set_projection(Goal, Bindings) :- 1388 b_setval('$variable_names', Bindings), 1389 call(Goal). 1390 1391projection(Projection) :- 1392 nb_current('$variable_names', Bindings), 1393 !, 1394 maplist(var_name, Bindings, Projection). 1395projection([]).
1405filter_template(Template0, Bindings, Template) :- 1406 is_dict(Template0, swish_default_template), 1407 !, 1408 dict_create(Template, swish_default_template, Bindings). 1409filter_template(Template, _Bindings, Template). 1410 1411findnsols_no_empty(N, Template, Goal, List) :- 1412 findnsols(N, Template, Goal, List), 1413 List \== []. 1414 1415destroy_or_continue(Event) :- 1416 arg(1, Event, ID), 1417 ( pengine_property(ID, destroy(true)) 1418 -> thread_self(Me), 1419 thread_detach(Me), 1420 pengine_reply(destroy(ID, Event)) 1421 ; pengine_reply(Event), 1422 guarded_main_loop(ID) 1423 ).
chunk
solutions.next
, but sets the new chunk-size to Count.1441more_solutions(ID, Choice, State, Time) :- 1442 pengine_request(Event), 1443 more_solutions(Event, ID, Choice, State, Time). 1444 1445more_solutions(stop, ID, _Choice, _State, _Time) :- 1446 !, 1447 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1448 destroy_or_continue(stop(ID)). 1449more_solutions(next, ID, _Choice, _State, Time) :- 1450 !, 1451 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1452 statistics(cputime, T0), 1453 nb_setarg(1, Time, T0), 1454 fail. 1455more_solutions(next(Count), ID, _Choice, State, Time) :- 1456 Count > 0, 1457 !, 1458 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1459 nb_setarg(1, State, Count), 1460 statistics(cputime, T0), 1461 nb_setarg(1, Time, T0), 1462 fail. 1463more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1464 !, 1465 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1466 prolog_cut_to(Choice), 1467 ask(ID, Goal, Options). 1468more_solutions(destroy, ID, _Choice, _State, _Time) :- 1469 !, 1470 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1471 pengine_terminate(ID). 1472more_solutions(Event, ID, Choice, State, Time) :- 1473 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1474 pengine_reply(error(ID, error(protocol_error, _))), 1475 more_solutions(ID, Choice, State, Time).
chunk(N)
option.
1483ask(ID, Goal, Options) :-
1484 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1485 !,
1486 ( var(Error)
1487 -> option(template(Template), Options, Goal),
1488 option(chunk(N), Options, 1),
1489 solve(N, Template, Goal1, ID)
1490 ; pengine_reply(error(ID, Error)),
1491 guarded_main_loop(ID)
1492 ).
Note that expand_goal(Module:GoalIn, GoalOut)
is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y
while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1506prepare_goal(ID, Goal0, Module:Goal, Options) :-
1507 option(bindings(Bindings), Options, []),
1508 b_setval('$variable_names', Bindings),
1509 ( prepare_goal(Goal0, Goal1, Options)
1510 -> true
1511 ; Goal1 = Goal0
1512 ),
1513 get_pengine_module(ID, Module),
1514 setup_call_cleanup(
1515 '$set_source_module'(Old, Module),
1516 expand_goal(Goal1, Goal),
1517 '$set_source_module'(_, Old)),
1518 ( pengine_not_sandboxed(ID)
1519 -> true
1520 ; get_pengine_application(ID, App),
1521 setting(App:safe_goal_limit, Limit),
1522 catch(call_with_time_limit(
1523 Limit,
1524 safe_goal(Module:Goal)), E, true)
1525 -> ( var(E)
1526 -> true
1527 ; E = time_limit_exceeded
1528 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1529 ; throw(E)
1530 )
1531 ).
not_sandboxed(User, Application)
must succeed.
1551pengine_not_sandboxed(ID) :-
1552 pengine_user(ID, User),
1553 pengine_property(ID, application(App)),
1554 not_sandboxed(User, App),
1555 !.
1577pengine_pull_response(Pengine, Options) :- 1578 pengine_remote(Pengine, Server), 1579 !, 1580 remote_pengine_pull_response(Server, Pengine, Options). 1581pengine_pull_response(_ID, _Options).
1590pengine_input(Prompt, Term) :-
1591 pengine_self(Self),
1592 pengine_parent(Parent),
1593 pengine_reply(Parent, prompt(Self, Prompt)),
1594 pengine_request(Request),
1595 ( Request = input(Input)
1596 -> Term = Input
1597 ; Request == destroy
1598 -> abort
1599 ; throw(error(protocol_error,_))
1600 ).
Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :- pengine_send(Pengine, input(Input), Options).
*/
1617pengine_respond(Pengine, Input, Options) :-
1618 pengine_send(Pengine, input(Input), Options).
1627send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1628 is_list(Frames), 1629 !, 1630 with_output_to(string(Stack), 1631 print_prolog_backtrace(current_output, Frames)), 1632 pengine_self(Self), 1633 replace_blobs(Formal, Formal1), 1634 replace_blobs(Message, Message1), 1635 pengine_reply(error(Self, error(Formal1, 1636 context(prolog_stack(Stack), Message1)))). 1637send_error(Error) :- 1638 pengine_self(Self), 1639 replace_blobs(Error, Error1), 1640 pengine_reply(error(Self, Error1)).
1648replace_blobs(Blob, Atom) :- 1649 blob(Blob, Type), Type \== text, 1650 !, 1651 format(atom(Atom), '~p', [Blob]). 1652replace_blobs(Term0, Term) :- 1653 compound(Term0), 1654 !, 1655 compound_name_arguments(Term0, Name, Args0), 1656 maplist(replace_blobs, Args0, Args), 1657 compound_name_arguments(Term, Name, Args). 1658replace_blobs(Term, Term). 1659 1660 1661/*================= Remote pengines ======================= 1662*/ 1663 1664 1665remote_pengine_create(BaseURL, Options) :- 1666 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1667 ( option(ask(Query), PengineOptions0), 1668 \+ option(template(_Template), PengineOptions0) 1669 -> PengineOptions = [template(Query)|PengineOptions0] 1670 ; PengineOptions = PengineOptions0 1671 ), 1672 options_to_dict(PengineOptions, PostData), 1673 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1674 arg(1, Reply, ID), 1675 ( option(id(ID2), Options) 1676 -> ID = ID2 1677 ; true 1678 ), 1679 option(alias(Name), Options, ID), 1680 assert(child(Name, ID)), 1681 ( ( functor(Reply, create, _) % actually created 1682 ; functor(Reply, output, _) % compiler messages 1683 ) 1684 -> option(application(Application), PengineOptions, pengine_sandbox), 1685 option(destroy(Destroy), PengineOptions, true), 1686 pengine_register_remote(ID, BaseURL, Application, Destroy) 1687 ; true 1688 ), 1689 thread_self(Queue), 1690 pengine_reply(Queue, Reply). 1691 1692options_to_dict(Options, Dict) :- 1693 select_option(ask(Ask), Options, Options1), 1694 select_option(template(Template), Options1, Options2), 1695 !, 1696 no_numbered_var_in(Ask+Template), 1697 findall(AskString-TemplateString, 1698 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1699 [ AskString-TemplateString ]), 1700 options_to_dict(Options2, Dict0), 1701 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1702options_to_dict(Options, Dict) :- 1703 maplist(prolog_option, Options, Options1), 1704 dict_create(Dict, _, Options1). 1705 1706no_numbered_var_in(Term) :- 1707 sub_term(Sub, Term), 1708 subsumes_term('$VAR'(_), Sub), 1709 !, 1710 domain_error(numbered_vars_free_term, Term). 1711no_numbered_var_in(_). 1712 1713ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1714 numbervars(Ask+Template, 0, _), 1715 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1716 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1717 Template, WOpts 1718 ]), 1719 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1720 1721prolog_option(Option0, Option) :- 1722 create_option_type(Option0, term), 1723 !, 1724 Option0 =.. [Name,Value], 1725 format(string(String), '~k', [Value]), 1726 Option =.. [Name,String]. 1727prolog_option(Option, Option). 1728 1729create_option_type(ask(_), term). 1730create_option_type(template(_), term). 1731create_option_type(application(_), atom). 1732 1733remote_pengine_send(BaseURL, ID, Event, Options) :- 1734 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1735 thread_self(Queue), 1736 pengine_reply(Queue, Reply). 1737 1738remote_pengine_pull_response(BaseURL, ID, Options) :- 1739 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1740 thread_self(Queue), 1741 pengine_reply(Queue, Reply). 1742 1743remote_pengine_abort(BaseURL, ID, Options) :- 1744 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1745 thread_self(Queue), 1746 pengine_reply(Queue, Reply).
1753remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1754 !, 1755 server_url(Server, Action, [id=ID], URL), 1756 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1757 [ post(prolog(Event)) % makes it impossible to interrupt. 1758 | Options 1759 ]), 1760 call_cleanup( 1761 read_prolog_reply(Stream, Reply), 1762 close(Stream)). 1763remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1764 server_url(Server, Action, [id=ID|Params], URL), 1765 http_open(URL, Stream, Options), 1766 call_cleanup( 1767 read_prolog_reply(Stream, Reply), 1768 close(Stream)). 1769 1770remote_post_rec(Server, Action, Data, Reply, Options) :- 1771 server_url(Server, Action, [], URL), 1772 probe(Action, URL), 1773 http_open(URL, Stream, 1774 [ post(json(Data)) 1775 | Options 1776 ]), 1777 call_cleanup( 1778 read_prolog_reply(Stream, Reply), 1779 close(Stream)).
1787probe(create, URL) :- 1788 !, 1789 http_open(URL, Stream, [method(options)]), 1790 close(Stream). 1791probe(_, _). 1792 1793read_prolog_reply(In, Reply) :- 1794 set_stream(In, encoding(utf8)), 1795 read(In, Reply0), 1796 rebind_cycles(Reply0, Reply). 1797 1798rebind_cycles(@(Reply, Bindings), Reply) :- 1799 is_list(Bindings), 1800 !, 1801 maplist(bind, Bindings). 1802rebind_cycles(Reply, Reply). 1803 1804bind(Var = Value) :- 1805 Var = Value. 1806 1807server_url(Server, Action, Params, URL) :- 1808 uri_components(Server, Components0), 1809 uri_query_components(Query, Params), 1810 uri_data(path, Components0, Path0), 1811 atom_concat('pengine/', Action, PAction), 1812 directory_file_path(Path0, PAction, Path), 1813 uri_data(path, Components0, Path, Components), 1814 uri_data(search, Components, Query), 1815 uri_components(URL, Components).
Valid options are:
timeout
.1836pengine_event(Event) :- 1837 pengine_event(Event, []). 1838 1839pengine_event(Event, Options) :- 1840 thread_self(Self), 1841 option(listen(Id), Options, _), 1842 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1843 -> true 1844 ; Event = timeout 1845 ), 1846 update_remote_destroy(Event). 1847 1848update_remote_destroy(Event) :- 1849 destroy_event(Event), 1850 arg(1, Event, Id), 1851 pengine_remote(Id, _Server), 1852 !, 1853 pengine_unregister_remote(Id). 1854update_remote_destroy(_). 1855 1856destroy_event(destroy(_)). 1857destroy_event(destroy(_,_)). 1858destroy_event(create(_,Features)) :- 1859 memberchk(answer(Answer), Features), 1860 !, 1861 nonvar(Answer), 1862 destroy_event(Answer).
ignore(call(Closure, E))
. A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all
,
all_but_sender
or a Prolog list of NameOrIDs. [not yet
implemented]*/
1891pengine_event_loop(Closure, Options) :- 1892 child(_,_), 1893 !, 1894 pengine_event(Event), 1895 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1896 -> forall(child(_,ID), pengine_send(ID, Event)) 1897 ; true 1898 ), 1899 pengine_event_loop(Event, Closure, Options). 1900pengine_event_loop(_, _). 1901 1902:- meta_predicate 1903 pengine_process_event( , , , ). 1904 1905pengine_event_loop(Event, Closure, Options) :- 1906 pengine_process_event(Event, Closure, Continue, Options), 1907 ( Continue == true 1908 -> pengine_event_loop(Closure, Options) 1909 ; true 1910 ). 1911 1912pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1913 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1914 ( select(answer(First), T, T1) 1915 -> ignore(call(Closure, create(ID, T1))), 1916 pengine_process_event(First, Closure, Continue, Options) 1917 ; ignore(call(Closure, create(ID, T))), 1918 Continue = true 1919 ). 1920pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1921 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1922 ignore(call(Closure, output(ID, Msg))), 1923 pengine_pull_response(ID, []). 1924pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1925 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1926 ignore(call(Closure, debug(ID, Msg))), 1927 pengine_pull_response(ID, []). 1928pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1929 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1930 ignore(call(Closure, prompt(ID, Term))). 1931pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1932 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1933 ignore(call(Closure, success(ID, Sol, More))). 1934pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1935 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1936 ignore(call(Closure, failure(ID))). 1937pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1938 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1939 ( call(Closure, error(ID, Error)) 1940 -> Continue = true 1941 ; forall(child(_,Child), pengine_destroy(Child)), 1942 throw(Error) 1943 ). 1944pengine_process_event(stop(ID), Closure, true, _Options) :- 1945 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1946 ignore(call(Closure, stop(ID))). 1947pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1948 pengine_process_event(Event, Closure, _, Options), 1949 pengine_process_event(destroy(ID), Closure, Continue, Options). 1950pengine_process_event(destroy(ID), Closure, true, _Options) :- 1951 retractall(child(_,ID)), 1952 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1953 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit
.Remaining options (except the server option) are passed to pengine_create/1. */
1982pengine_rpc(URL, Query) :- 1983 pengine_rpc(URL, Query, []). 1984 1985pengine_rpc(URL, Query, M:Options0) :- 1986 translate_local_sources(Options0, Options1, M), 1987 ( option(timeout(_), Options1) 1988 -> Options = Options1 1989 ; setting(time_limit, Limit), 1990 Options = [timeout(Limit)|Options1] 1991 ), 1992 term_variables(Query, Vars), 1993 Template =.. [v|Vars], 1994 State = destroy(true), % modified by process_event/4 1995 setup_call_catcher_cleanup( 1996 pengine_create([ ask(Query), 1997 template(Template), 1998 server(URL), 1999 id(Id) 2000 | Options 2001 ]), 2002 wait_event(Template, State, [listen(Id)|Options]), 2003 Why, 2004 pengine_destroy_and_wait(State, Id, Why)). 2005 2006pengine_destroy_and_wait(destroy(true), Id, Why) :- 2007 !, 2008 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 2009 pengine_destroy(Id), 2010 wait_destroy(Id, 10). 2011pengine_destroy_and_wait(_, _, Why) :- 2012 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 2013 2014wait_destroy(Id, _) :- 2015 \+ child(_, Id), 2016 !. 2017wait_destroy(Id, N) :- 2018 pengine_event(Event, [listen(Id),timeout(10)]), 2019 !, 2020 ( destroy_event(Event) 2021 -> retractall(child(_,Id)) 2022 ; succ(N1, N) 2023 -> wait_destroy(Id, N1) 2024 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 2025 pengine_unregister_remote(Id), 2026 retractall(child(_,Id)) 2027 ). 2028 2029wait_event(Template, State, Options) :- 2030 pengine_event(Event, Options), 2031 debug(pengine(event), 'Received ~p', [Event]), 2032 process_event(Event, Template, State, Options). 2033 2034process_event(create(_ID, Features), Template, State, Options) :- 2035 memberchk(answer(First), Features), 2036 process_event(First, Template, State, Options). 2037process_event(error(_ID, Error), _Template, _, _Options) :- 2038 throw(Error). 2039process_event(failure(_ID, _Time), _Template, _, _Options) :- 2040 fail. 2041process_event(prompt(ID, Prompt), Template, State, Options) :- 2042 pengine_rpc_prompt(ID, Prompt, Reply), 2043 pengine_send(ID, input(Reply)), 2044 wait_event(Template, State, Options). 2045process_event(output(ID, Term), Template, State, Options) :- 2046 pengine_rpc_output(ID, Term), 2047 pengine_pull_response(ID, Options), 2048 wait_event(Template, State, Options). 2049process_event(debug(ID, Message), Template, State, Options) :- 2050 debug(pengine(debug), '~w', [Message]), 2051 pengine_pull_response(ID, Options), 2052 wait_event(Template, State, Options). 2053process_event(success(_ID, Solutions, _Proj, _Time, false), 2054 Template, _, _Options) :- 2055 !, 2056 member(Template, Solutions). 2057process_event(success(ID, Solutions, _Proj, _Time, true), 2058 Template, State, Options) :- 2059 ( member(Template, Solutions) 2060 ; pengine_next(ID, Options), 2061 wait_event(Template, State, Options) 2062 ). 2063process_event(destroy(ID, Event), Template, State, Options) :- 2064 !, 2065 retractall(child(_,ID)), 2066 nb_setarg(1, State, false), 2067 debug(pengine(destroy), 'State: ~p~n', [State]), 2068 process_event(Event, Template, State, Options). 2069% compatibility with older versions of the protocol. 2070process_event(success(ID, Solutions, Time, More), 2071 Template, State, Options) :- 2072 process_event(success(ID, Solutions, _Proj, Time, More), 2073 Template, State, Options). 2074 2075 2076pengine_rpc_prompt(ID, Prompt, Term) :- 2077 prompt(ID, Prompt, Term0), 2078 !, 2079 Term = Term0. 2080pengine_rpc_prompt(_ID, Prompt, Term) :- 2081 setup_call_cleanup( 2082 prompt(Old, Prompt), 2083 read(Term), 2084 prompt(_, Old)). 2085 2086pengine_rpc_output(ID, Term) :- 2087 output(ID, Term), 2088 !. 2089pengine_rpc_output(_ID, Term) :- 2090 print(Term).
2097:- multifile prompt/3.
2104:- multifile output/2. 2105 2106 2107/*================= HTTP handlers ======================= 2108*/ 2109 2110% Declare HTTP locations we serve and how. Note that we use 2111% time_limit(inifinite) because pengines have their own timeout. Also 2112% note that we use spawn. This is needed because we can easily get 2113% many clients waiting for some action on a pengine to complete. 2114% Without spawning, we would quickly exhaust the worker pool of the 2115% HTTP server. 2116% 2117% FIXME: probably we should wait for a short time for the pengine on 2118% the default worker thread. Only if that time has expired, we can 2119% call http_spawn/2 to continue waiting on a new thread. That would 2120% improve the performance and reduce the usage of threads. 2121 2122:- http_handler(root(pengine), http_404([]), 2123 [ id(pengines) ]). 2124:- http_handler(root(pengine/create), http_pengine_create, 2125 [ time_limit(infinite), spawn([]) ]). 2126:- http_handler(root(pengine/send), http_pengine_send, 2127 [ time_limit(infinite), spawn([]) ]). 2128:- http_handler(root(pengine/pull_response), http_pengine_pull_response, 2129 [ time_limit(infinite), spawn([]) ]). 2130:- http_handler(root(pengine/abort), http_pengine_abort, []). 2131:- http_handler(root(pengine/detach), http_pengine_detach, []). 2132:- http_handler(root(pengine/list), http_pengine_list, []). 2133:- http_handler(root(pengine/ping), http_pengine_ping, []). 2134:- http_handler(root(pengine/destroy_all), http_pengine_destroy_all, []). 2135 2136:- http_handler(root(pengine/'pengines.js'), 2137 http_reply_file(library('http/web/js/pengines.js'), []), []). 2138:- http_handler(root(pengine/'plterm.css'), 2139 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json
and as
www-form-encoded
. Accepted parameters:
Parameter | Default | Comment |
---|---|---|
format | prolog | Output format |
application | pengine_sandbox | Pengine application |
chunk | 1 | Chunk-size for results |
solutions | chunked | If all , emit all results |
ask | - | The query |
template | - | Output template |
src_text | "" | Program |
src_url | - | Program to download |
disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
2166http_pengine_create(Request) :- 2167 reply_options(Request, [post]), 2168 !. 2169http_pengine_create(Request) :- 2170 memberchk(content_type(CT), Request), 2171 sub_atom(CT, 0, _, _, 'application/json'), 2172 !, 2173 http_read_json_dict(Request, Dict), 2174 dict_atom_option(format, Dict, Format, prolog), 2175 dict_atom_option(application, Dict, Application, pengine_sandbox), 2176 http_pengine_create(Request, Application, Format, Dict). 2177http_pengine_create(Request) :- 2178 Optional = [optional(true)], 2179 OptString = [string|Optional], 2180 Form = [ format(Format, [default(prolog)]), 2181 application(Application, [default(pengine_sandbox)]), 2182 chunk(_, [integer, default(1)]), 2183 solutions(_, [oneof([all,chunked]), default(chunked)]), 2184 ask(_, OptString), 2185 template(_, OptString), 2186 src_text(_, OptString), 2187 disposition(_, OptString), 2188 src_url(_, Optional) 2189 ], 2190 http_parameters(Request, Form), 2191 form_dict(Form, Dict), 2192 http_pengine_create(Request, Application, Format, Dict). 2193 2194dict_atom_option(Key, Dict, Atom, Default) :- 2195 ( get_dict(Key, Dict, String) 2196 -> atom_string(Atom, String) 2197 ; Atom = Default 2198 ). 2199 2200form_dict(Form, Dict) :- 2201 form_values(Form, Pairs), 2202 dict_pairs(Dict, _, Pairs). 2203 2204form_values([], []). 2205form_values([H|T], Pairs) :- 2206 arg(1, H, Value), 2207 nonvar(Value), 2208 !, 2209 functor(H, Name, _), 2210 Pairs = [Name-Value|PairsT], 2211 form_values(T, PairsT). 2212form_values([_|T], Pairs) :- 2213 form_values(T, Pairs).
2218http_pengine_create(Request, Application, Format, Dict) :- 2219 current_application(Application), 2220 !, 2221 allowed(Request, Application), 2222 authenticate(Request, Application, UserOptions), 2223 dict_to_options(Dict, Application, CreateOptions0), 2224 append(UserOptions, CreateOptions0, CreateOptions), 2225 pengine_uuid(Pengine), 2226 message_queue_create(Queue, [max_size(25)]), 2227 setting(Application:time_limit, TimeLimit), 2228 get_time(Now), 2229 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2230 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2231 create(Queue, Pengine, CreateOptions, http, Application), 2232 create_wait_and_output_result(Pengine, Queue, Format, 2233 TimeLimit, Dict), 2234 gc_abandoned_queues. 2235http_pengine_create(_Request, Application, Format, _Dict) :- 2236 Error = existence_error(pengine_application, Application), 2237 pengine_uuid(ID), 2238 output_result(Format, error(ID, error(Error, _))). 2239 2240 2241dict_to_options(Dict, Application, CreateOptions) :- 2242 dict_pairs(Dict, _, Pairs), 2243 pairs_create_options(Pairs, Application, CreateOptions). 2244 2245pairs_create_options([], _, []) :- !. 2246pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2247 Opt =.. [N,V], 2248 pengine_create_option(Opt), N \== user, 2249 !, 2250 ( create_option_type(Opt, atom) 2251 -> atom_string(V, V0) % term creation must be done if 2252 ; V = V0 % we created the source and know 2253 ), % the operators. 2254 pairs_create_options(T0, App, T). 2255pairs_create_options([_|T0], App, T) :- 2256 pairs_create_options(T0, App, T).
time_limit
,
Pengine is aborted and the result is error(time_limit_exceeded,
_)
.
2267wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
2268 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2269 [ timeout(TimeLimit)
2270 ]),
2271 Error, true)
2272 -> ( var(Error)
2273 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2274 ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2275 protect_pengine(Pengine, output_result(Format, Event))
2276 ; output_result(Format, died(Pengine))
2277 )
2278 ; time_limit_exceeded(Pengine, Format)
2279 ).
disposition
key to denote the
download location.2288create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2289 get_dict(solutions, Dict, all), 2290 !, 2291 between(1, infinite, Page), 2292 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2293 [ timeout(TimeLimit) 2294 ]), 2295 Error, true) 2296 -> ( var(Error) 2297 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2298 ( destroy_queue_from_http(Pengine, Event, Queue) 2299 -> !, 2300 protect_pengine(Pengine, 2301 output_result(Format, page(Page, Event), Dict)) 2302 ; is_more_event(Event) 2303 -> pengine_thread(Pengine, Thread), 2304 thread_send_message(Thread, pengine_request(next)), 2305 protect_pengine(Pengine, 2306 output_result(Format, page(Page, Event), Dict)), 2307 fail 2308 ; !, 2309 protect_pengine(Pengine, 2310 output_result(Format, page(Page, Event), Dict)) 2311 ) 2312 ; !, output_result(Format, died(Pengine)) 2313 ) 2314 ; !, time_limit_exceeded(Pengine, Format) 2315 ), 2316 !. 2317create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :- 2318 wait_and_output_result(Pengine, Queue, Format, TimeLimit). 2319 2320is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2321is_more_event(create(_, Options)) :- 2322 memberchk(answer(Event), Options), 2323 is_more_event(Event).
2337time_limit_exceeded(Pengine, Format) :-
2338 call_cleanup(
2339 pengine_destroy(Pengine, [force(true)]),
2340 output_result(Format,
2341 destroy(Pengine,
2342 error(Pengine, time_limit_exceeded)))).
2357destroy_queue_from_http(ID, _, Queue) :- 2358 output_queue(ID, Queue, _), 2359 !, 2360 destroy_queue_if_empty(Queue). 2361destroy_queue_from_http(ID, Event, Queue) :- 2362 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2363 is_destroy_event(Event), 2364 !, 2365 message_queue_property(Queue, size(Waiting)), 2366 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2367 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2368 2369is_destroy_event(destroy(_)). 2370is_destroy_event(destroy(_,_)). 2371is_destroy_event(create(_, Options)) :- 2372 memberchk(answer(Event), Options), 2373 is_destroy_event(Event). 2374 2375destroy_queue_if_empty(Queue) :- 2376 thread_peek_message(Queue, _), 2377 !. 2378destroy_queue_if_empty(Queue) :- 2379 retractall(output_queue(_, Queue, _)), 2380 message_queue_destroy(Queue).
2388:- dynamic 2389 last_gc/1. 2390 2391gc_abandoned_queues :- 2392 consider_queue_gc, 2393 !, 2394 get_time(Now), 2395 ( output_queue(_, Queue, Time), 2396 Now-Time > 15*60, 2397 retract(output_queue(_, Queue, Time)), 2398 message_queue_destroy(Queue), 2399 fail 2400 ; retractall(last_gc(_)), 2401 asserta(last_gc(Now)) 2402 ). 2403gc_abandoned_queues. 2404 2405consider_queue_gc :- 2406 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2407 N > 100, 2408 ( last_gc(Time), 2409 get_time(Now), 2410 Now-Time > 5*60 2411 -> true 2412 ; \+ last_gc(_) 2413 ).
2431:- dynamic output_queue_destroyed/1. 2432 2433sync_destroy_queue_from_http(ID, Queue) :- 2434 ( output_queue(ID, Queue, _) 2435 -> destroy_queue_if_empty(Queue) 2436 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2437 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2438 [Queue]), 2439 get_time(Now), 2440 asserta(output_queue(ID, Queue, Now)) 2441 ; message_queue_destroy(Queue), 2442 asserta(output_queue_destroyed(Queue)) 2443 ).
pengine
held.2450sync_destroy_queue_from_pengine(ID, Queue) :- 2451 ( retract(output_queue_destroyed(Queue)) 2452 -> true 2453 ; get_time(Now), 2454 asserta(output_queue(ID, Queue, Now)) 2455 ), 2456 retractall(pengine_queue(ID, Queue, _, _)). 2457 2458 2459http_pengine_send(Request) :- 2460 reply_options(Request, [get,post]), 2461 !. 2462http_pengine_send(Request) :- 2463 http_parameters(Request, 2464 [ id(ID, [ type(atom) ]), 2465 event(EventString, [optional(true)]), 2466 format(Format, [default(prolog)]) 2467 ]), 2468 catch(read_event(ID, Request, Format, EventString, Event), 2469 Error, 2470 true), 2471 ( var(Error) 2472 -> debug(pengine(event), 'HTTP send: ~p', [Event]), 2473 ( pengine_thread(ID, Thread) 2474 -> pengine_queue(ID, Queue, TimeLimit, _), 2475 random_delay, 2476 broadcast(pengine(send(ID, Event))), 2477 thread_send_message(Thread, pengine_request(Event)), 2478 wait_and_output_result(ID, Queue, Format, TimeLimit) 2479 ; atom(ID) 2480 -> pengine_died(Format, ID) 2481 ; http_404([], Request) 2482 ) 2483 ; Error = error(existence_error(pengine, ID), _) 2484 -> pengine_died(Format, ID) 2485 ; output_result(Format, error(ID, Error)) 2486 ). 2487 2488pengine_died(Format, Pengine) :- 2489 output_result(Format, error(Pengine, 2490 error(existence_error(pengine, Pengine),_))).
pengine_done
mutex.
2501read_event(Pengine, Request, Format, EventString, Event) :- 2502 protect_pengine( 2503 Pengine, 2504 ( get_pengine_module(Pengine, Module), 2505 read_event_2(Request, EventString, Module, Event0, Bindings) 2506 )), 2507 !, 2508 fix_bindings(Format, Event0, Bindings, Event). 2509read_event(Pengine, Request, _Format, _EventString, _Event) :- 2510 debug(pengine(event), 'Pengine ~q vanished', [Pengine]), 2511 discard_post_data(Request), 2512 existence_error(pengine, Pengine).
event
parameter or as a posted document.2520read_event_2(_Request, EventString, Module, Event, Bindings) :- 2521 nonvar(EventString), 2522 !, 2523 term_string(Event, EventString, 2524 [ variable_names(Bindings), 2525 module(Module) 2526 ]). 2527read_event_2(Request, _EventString, Module, Event, Bindings) :- 2528 option(method(post), Request), 2529 http_read_data(Request, Event, 2530 [ content_type('application/x-prolog'), 2531 module(Module), 2532 variable_names(Bindings) 2533 ]).
2539discard_post_data(Request) :- 2540 option(method(post), Request), 2541 !, 2542 setup_call_cleanup( 2543 open_null_stream(NULL), 2544 http_read_data(Request, _, [to(stream(NULL))]), 2545 close(NULL)). 2546discard_post_data(_).
json(-s)
Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2554fix_bindings(Format, 2555 ask(Goal, Options0), Bindings, 2556 ask(Goal, NewOptions)) :- 2557 json_lang(Format), 2558 !, 2559 exclude(anon, Bindings, NamedBindings), 2560 template(NamedBindings, Template, Options0, Options1), 2561 select_option(chunk(Paging), Options1, Options2, 1), 2562 NewOptions = [ template(Template), 2563 chunk(Paging), 2564 bindings(NamedBindings) 2565 | Options2 2566 ]. 2567fix_bindings(_, Command, _, Command). 2568 2569template(_, Template, Options0, Options) :- 2570 select_option(template(Template), Options0, Options), 2571 !. 2572template(Bindings, Template, Options, Options) :- 2573 dict_create(Template, swish_default_template, Bindings). 2574 2575anon(Name=_) :- 2576 sub_atom(Name, 0, _, _, '_'), 2577 sub_atom(Name, 1, 1, _, Next), 2578 char_type(Next, prolog_var_start). 2579 2580var_name(Name=_, Name).
2587json_lang(json) :- !. 2588json_lang(Format) :- 2589 sub_atom(Format, 0, _, _, 'json-').
2596http_pengine_pull_response(Request) :- 2597 reply_options(Request, [get]), 2598 !. 2599http_pengine_pull_response(Request) :- 2600 http_parameters(Request, 2601 [ id(ID, []), 2602 format(Format, [default(prolog)]) 2603 ]), 2604 reattach(ID), 2605 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2606 -> true 2607 ; output_queue(ID, Queue, _), 2608 TimeLimit = 0 2609 ) 2610 -> wait_and_output_result(ID, Queue, Format, TimeLimit) 2611 ; http_404([], Request) 2612 ).
2621http_pengine_abort(Request) :- 2622 reply_options(Request, [get,post]), 2623 !. 2624http_pengine_abort(Request) :- 2625 http_parameters(Request, 2626 [ id(ID, []) 2627 ]), 2628 ( pengine_thread(ID, _Thread) 2629 -> broadcast(pengine(abort(ID))), 2630 abort_pending_output(ID), 2631 pengine_abort(ID), 2632 reply_json(true) 2633 ; http_404([], Request) 2634 ).
2646http_pengine_detach(Request) :- 2647 reply_options(Request, [post]), 2648 !. 2649http_pengine_detach(Request) :- 2650 http_parameters(Request, 2651 [ id(ID, []) 2652 ]), 2653 http_read_json_dict(Request, ClientData), 2654 ( pengine_property(ID, application(Application)), 2655 allowed(Request, Application), 2656 authenticate(Request, Application, _UserOptions) 2657 -> broadcast(pengine(detach(ID))), 2658 get_time(Now), 2659 assertz(pengine_detached(ID, ClientData.put(time, Now))), 2660 pengine_queue(ID, Queue, _TimeLimit, _Now), 2661 message_queue_set(Queue, max_size(1000)), 2662 pengine_reply(Queue, detached(ID)), 2663 reply_json(true) 2664 ; http_404([], Request) 2665 ). 2666 2667:- if(\+current_predicate(message_queue_set/2)). 2668message_queue_set(_,_). 2669:- endif. 2670 2671reattach(ID) :- 2672 ( retract(pengine_detached(ID, _Data)), 2673 pengine_queue(ID, Queue, _TimeLimit, _Now) 2674 -> message_queue_set(Queue, max_size(25)) 2675 ; true 2676 ).
2684http_pengine_destroy_all(Request) :- 2685 reply_options(Request, [get,post]), 2686 !. 2687http_pengine_destroy_all(Request) :- 2688 http_parameters(Request, 2689 [ ids(IDsAtom, []) 2690 ]), 2691 atomic_list_concat(IDs, ',', IDsAtom), 2692 forall(( member(ID, IDs), 2693 \+ pengine_detached(ID, _) 2694 ), 2695 pengine_destroy(ID, [force(true)])), 2696 reply_json("ok").
status(Pengine, Stats)
is created, where Stats
is the return of thread_statistics/2.2704http_pengine_ping(Request) :- 2705 reply_options(Request, [get]), 2706 !. 2707http_pengine_ping(Request) :- 2708 http_parameters(Request, 2709 [ id(Pengine, []), 2710 format(Format, [default(prolog)]) 2711 ]), 2712 ( pengine_thread(Pengine, Thread), 2713 Error = error(_,_), 2714 catch(thread_statistics(Thread, Stats), Error, fail) 2715 -> output_result(Format, ping(Pengine, Stats)) 2716 ; output_result(Format, died(Pengine)) 2717 ).
2726http_pengine_list(Request) :- 2727 reply_options(Request, [get]), 2728 !. 2729http_pengine_list(Request) :- 2730 http_parameters(Request, 2731 [ status(Status, [default(detached), oneof([detached])]), 2732 application(Application, [default(pengine_sandbox)]) 2733 ]), 2734 allowed(Request, Application), 2735 authenticate(Request, Application, _UserOptions), 2736 findall(Term, listed_pengine(Application, Status, Term), Terms), 2737 reply_json(json{pengines: Terms}). 2738 2739listed_pengine(Application, detached, State) :- 2740 State = pengine{id:Id, 2741 detached:Time, 2742 queued:Queued, 2743 stats:Stats}, 2744 2745 pengine_property(Id, application(Application)), 2746 pengine_property(Id, detached(Time)), 2747 pengine_queue(Id, Queue, _TimeLimit, _Now), 2748 message_queue_property(Queue, size(Queued)), 2749 ( pengine_thread(Id, Thread), 2750 catch(thread_statistics(Thread, Stats), _, fail) 2751 -> true 2752 ; Stats = thread{status:died} 2753 ).
prolog
, json
or json-s
.2762:- dynamic 2763 pengine_replying/2. % +Pengine, +Thread 2764 2765output_result(Format, Event) :- 2766 arg(1, Event, Pengine), 2767 thread_self(Thread), 2768 cors_enable, % contingent on http:cors setting 2769 disable_client_cache, 2770 setup_call_cleanup( 2771 asserta(pengine_replying(Pengine, Thread), Ref), 2772 catch(output_result(Format, Event, _{}), 2773 pengine_abort_output, 2774 true), 2775 erase(Ref)). 2776 2777output_result(Lang, Event, Dict) :- 2778 write_result(Lang, Event, Dict), 2779 !. 2780output_result(prolog, Event, _) :- 2781 !, 2782 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2783 write_term(Event, 2784 [ quoted(true), 2785 ignore_ops(true), 2786 fullstop(true), 2787 blobs(portray), 2788 portray_goal(portray_blob), 2789 nl(true) 2790 ]). 2791output_result(Lang, Event, _) :- 2792 json_lang(Lang), 2793 !, 2794 ( event_term_to_json_data(Event, JSON, Lang) 2795 -> reply_json(JSON) 2796 ; assertion(event_term_to_json_data(Event, _, Lang)) 2797 ). 2798output_result(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2799 domain_error(pengine_format, Lang).
'$BLOB'(Type)
.
Future versions may include more info, depending on Type.2809:- public portray_blob/2. % called from write-term 2810portray_blob(Blob, _Options) :- 2811 blob(Blob, Type), 2812 writeq('$BLOB'(Type)).
2819abort_pending_output(Pengine) :- 2820 forall(pengine_replying(Pengine, Thread), 2821 abort_output_thread(Thread)). 2822 2823abort_output_thread(Thread) :- 2824 catch(thread_signal(Thread, throw(pengine_abort_output)), 2825 error(existence_error(thread, _), _), 2826 true).
prolog
and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2842disable_client_cache :- 2843 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2844 Pragma: no-cache\r\n\c 2845 Expires: 0\r\n'). 2846 2847event_term_to_json_data(Event, JSON, Lang) :- 2848 event_to_json(Event, JSON, Lang), 2849 !. 2850event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2851 json{event:success, id:ID, time:Time, 2852 data:Bindings, more:More, projection:Projection}, 2853 json) :- 2854 !, 2855 term_to_json(Bindings0, Bindings). 2856event_term_to_json_data(destroy(ID, Event), 2857 json{event:destroy, id:ID, data:JSON}, 2858 Style) :- 2859 !, 2860 event_term_to_json_data(Event, JSON, Style). 2861event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2862 !, 2863 ( select(answer(First0), Features0, Features1) 2864 -> event_term_to_json_data(First0, First, Style), 2865 Features = [answer(First)|Features1] 2866 ; Features = Features0 2867 ), 2868 dict_create(JSON, json, [event(create), id(ID)|Features]). 2869event_term_to_json_data(destroy(ID, Event), 2870 json{event:destroy, id:ID, data:JSON}, Style) :- 2871 !, 2872 event_term_to_json_data(Event, JSON, Style). 2873event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2874 !, 2875 Error0 = json{event:error, id:ID, data:Message}, 2876 add_error_details(ErrorTerm, Error0, Error), 2877 message_to_string(ErrorTerm, Message). 2878event_term_to_json_data(failure(ID, Time), 2879 json{event:failure, id:ID, time:Time}, _) :- 2880 !. 2881event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2882 functor(EventTerm, F, 1), 2883 !, 2884 arg(1, EventTerm, ID). 2885event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2886 functor(EventTerm, F, 2), 2887 arg(1, EventTerm, ID), 2888 arg(2, EventTerm, Data), 2889 term_to_json(Data, JSON). 2890 2891:- public add_error_details/3.
pengines_io.pl
.
2898add_error_details(Error, JSON0, JSON) :-
2899 add_error_code(Error, JSON0, JSON1),
2900 add_error_location(Error, JSON1, JSON).
code
field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error
, type_error
, etc. Some errors carry more
information:
2913add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 2914 atom(Type), 2915 !, 2916 to_atomic(Obj, Value), 2917 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 2918add_error_code(error(Formal, _), Error0, Error) :- 2919 callable(Formal), 2920 !, 2921 functor(Formal, Code, _), 2922 Error = Error0.put(code, Code). 2923add_error_code(_, Error, Error). 2924 2925% What to do with large integers? 2926to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 2927to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 2928to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 2929to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location
property if the error can be associated with a
source location. The location is an object with properties file
and line
and, if available, the character location in the line.2938add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 2939 atom(Path), integer(Line), 2940 !, 2941 Term = Term0.put(_{location:_{file:Path, line:Line}}). 2942add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 2943 atom(Path), integer(Line), integer(Ch), 2944 !, 2945 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 2946add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More)
and output(ID,
Term)
into a format suitable for processing at the client side.2957%:- multifile pengines:event_to_json/3. 2958 2959 2960 /******************************* 2961 * ACCESS CONTROL * 2962 *******************************/
forbidden
header if contact is not allowed.2969allowed(Request, Application) :- 2970 setting(Application:allow_from, Allow), 2971 match_peer(Request, Allow), 2972 setting(Application:deny_from, Deny), 2973 \+ match_peer(Request, Deny), 2974 !. 2975allowed(Request, _Application) :- 2976 memberchk(request_uri(Here), Request), 2977 throw(http_reply(forbidden(Here))). 2978 2979match_peer(_, Allowed) :- 2980 memberchk(*, Allowed), 2981 !. 2982match_peer(_, []) :- !, fail. 2983match_peer(Request, Allowed) :- 2984 http_peer(Request, Peer), 2985 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 2986 ( memberchk(Peer, Allowed) 2987 -> true 2988 ; member(Pattern, Allowed), 2989 match_peer_pattern(Pattern, Peer) 2990 ). 2991 2992match_peer_pattern(Pattern, Peer) :- 2993 ip_term(Pattern, IP), 2994 ip_term(Peer, IP), 2995 !. 2996 2997ip_term(Peer, Pattern) :- 2998 split_string(Peer, ".", "", PartStrings), 2999 ip_pattern(PartStrings, Pattern). 3000 3001ip_pattern([], []). 3002ip_pattern([*], _) :- !. 3003ip_pattern([S|T0], [N|T]) :- 3004 number_string(N, S), 3005 ip_pattern(T0, T).
[user(User)]
, []
or
an exception.3013authenticate(Request, Application, UserOptions) :- 3014 authentication_hook(Request, Application, User), 3015 !, 3016 must_be(ground, User), 3017 UserOptions = [user(User)]. 3018authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path)))
)
Reject the request using a 403 repply.3040pengine_register_user(Options) :- 3041 option(user(User), Options), 3042 !, 3043 pengine_self(Me), 3044 asserta(pengine_user(Me, User)). 3045pengine_register_user(_).
3056pengine_user(User) :-
3057 pengine_self(Me),
3058 pengine_user(Me, User).
3064reply_options(Request, Allowed) :- 3065 option(method(options), Request), 3066 !, 3067 cors_enable(Request, 3068 [ methods(Allowed) 3069 ]), 3070 format('Content-type: text/plain\r\n'), 3071 format('~n'). % empty body 3072 3073 3074 /******************************* 3075 * COMPILE SOURCE * 3076 *******************************/
3085pengine_src_text(Src, Module) :- 3086 pengine_self(Self), 3087 format(atom(ID), 'pengine://~w/src', [Self]), 3088 extra_load_options(Self, Options), 3089 setup_call_cleanup( 3090 open_chars_stream(Src, Stream), 3091 load_files(Module:ID, 3092 [ stream(Stream), 3093 module(Module), 3094 silent(true) 3095 | Options 3096 ]), 3097 close(Stream)), 3098 keep_source(Self, ID, Src). 3099 3100system'#file'(File, _Line) :- 3101 prolog_load_context(stream, Stream), 3102 set_stream(Stream, file_name(File)), 3103 set_stream(Stream, record_position(false)), 3104 set_stream(Stream, record_position(true)).
3114pengine_src_url(URL, Module) :- 3115 pengine_self(Self), 3116 uri_encoded(path, URL, Path), 3117 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 3118 extra_load_options(Self, Options), 3119 ( get_pengine_application(Self, Application), 3120 setting(Application:debug_info, false) 3121 -> setup_call_cleanup( 3122 http_open(URL, Stream, []), 3123 ( set_stream(Stream, encoding(utf8)), 3124 load_files(Module:ID, 3125 [ stream(Stream), 3126 module(Module) 3127 | Options 3128 ]) 3129 ), 3130 close(Stream)) 3131 ; setup_call_cleanup( 3132 http_open(URL, TempStream, []), 3133 ( set_stream(TempStream, encoding(utf8)), 3134 read_string(TempStream, _, Src) 3135 ), 3136 close(TempStream)), 3137 setup_call_cleanup( 3138 open_chars_stream(Src, Stream), 3139 load_files(Module:ID, 3140 [ stream(Stream), 3141 module(Module) 3142 | Options 3143 ]), 3144 close(Stream)), 3145 keep_source(Self, ID, Src) 3146 ). 3147 3148 3149extra_load_options(Pengine, Options) :- 3150 pengine_not_sandboxed(Pengine), 3151 !, 3152 Options = []. 3153extra_load_options(_, [sandboxed(true)]). 3154 3155 3156keep_source(Pengine, ID, SrcText) :- 3157 get_pengine_application(Pengine, Application), 3158 setting(Application:debug_info, true), 3159 !, 3160 to_string(SrcText, SrcString), 3161 assertz(pengine_data(Pengine, source(ID, SrcString))). 3162keep_source(_, _, _). 3163 3164to_string(String, String) :- 3165 string(String), 3166 !. 3167to_string(Atom, String) :- 3168 atom_string(Atom, String), 3169 !. 3170 3171 /******************************* 3172 * SANDBOX * 3173 *******************************/ 3174 3175:- multifile 3176 sandbox:safe_primitive/1. 3177 3178sandbox:safe_primitive(pengines:pengine_input(_, _)). 3179sandbox:safe_primitive(pengines:pengine_output(_)). 3180sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3181 3182 3183 /******************************* 3184 * MESSAGES * 3185 *******************************/ 3186 3187prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3188 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3189 'This is normally caused by an insufficiently instantiated'-[], nl, 3190 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3191 'find all possible instantations of Var.'-[] 3192 ]
Pengines: Web Logic Programming Made Easy
The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.