35
36:- module(rdf_persistency,
37 [ rdf_attach_db/2, 38 rdf_detach_db/0, 39 rdf_current_db/1, 40 rdf_persistency/2, 41 rdf_flush_journals/1, 42 rdf_persistency_property/1, 43 rdf_journal_file/2, 44 rdf_snapshot_file/2, 45 rdf_db_to_file/2 46 ]). 47:- use_module(library(semweb/rdf_db),
48 [ rdf_graph/1, rdf_unload_graph/1, rdf_statistics/1,
49 rdf_load_db/1, rdf_retractall/4, rdf_create_graph/1,
50 rdf_assert/4, rdf_update/5, rdf_monitor/2, rdf/4,
51 rdf_save_db/2, rdf_atom_md5/3, rdf_current_ns/2,
52 rdf_register_ns/3
53 ]). 54
55:- autoload(library(apply),[maplist/2,maplist/3,partition/4,exclude/3]). 56:- autoload(library(debug),[debug/3]). 57:- autoload(library(error),
58 [permission_error/3,must_be/2,domain_error/2]). 59:- autoload(library(filesex),
60 [directory_file_path/3,make_directory_path/1]). 61:- autoload(library(lists),[select/3,append/3]). 62:- autoload(library(option),[option/2,option/3]). 63:- autoload(library(readutil),[read_file_to_terms/3]). 64:- autoload(library(socket),[gethostname/1]). 65:- autoload(library(thread),[concurrent/3]). 66:- autoload(library(uri),[uri_encoded/3]). 67
99
100:- volatile
101 rdf_directory/1,
102 rdf_lock/2,
103 rdf_option/1,
104 source_journal_fd/2,
105 file_base_db/2. 106:- dynamic
107 rdf_directory/1, 108 rdf_lock/2, 109 rdf_option/1, 110 source_journal_fd/2, 111 file_base_db/2. 112
113:- meta_predicate
114 no_agc(0). 115
116:- predicate_options(rdf_attach_db/2, 2,
117 [ access(oneof([read_write,read_only])),
118 concurrency(positive_integer),
119 max_open_journals(positive_integer),
120 silent(oneof([true,false,brief])),
121 log_nested_transactions(boolean)
122 ]). 123
171
172rdf_attach_db(DirSpec, Options) :-
173 option(access(read_only), Options),
174 !,
175 absolute_file_name(DirSpec,
176 Directory,
177 [ access(read),
178 file_type(directory)
179 ]),
180 rdf_attach_db_ro(Directory, Options).
181rdf_attach_db(DirSpec, Options) :-
182 option(access(read_write), Options),
183 !,
184 rdf_attach_db_rw(DirSpec, Options).
185rdf_attach_db(DirSpec, Options) :-
186 absolute_file_name(DirSpec,
187 Directory,
188 [ access(exist),
189 file_type(directory),
190 file_errors(fail)
191 ]),
192 !,
193 ( access_file(Directory, write)
194 -> catch(rdf_attach_db_rw(Directory, Options), E, true),
195 ( var(E)
196 -> true
197 ; E = error(permission_error(lock, rdf_db, _), _)
198 -> print_message(warning, E),
199 print_message(warning, rdf(read_only)),
200 rdf_attach_db(DirSpec, [access(read_only)|Options])
201 ; throw(E)
202 )
203 ; print_message(warning,
204 error(permission_error(write, directory, Directory))),
205 print_message(warning, rdf(read_only)),
206 rdf_attach_db_ro(Directory, Options)
207 ).
208rdf_attach_db(DirSpec, Options) :-
209 catch(rdf_attach_db_rw(DirSpec, Options), E, true),
210 ( var(E)
211 -> true
212 ; print_message(warning, E),
213 print_message(warning, rdf(read_only)),
214 rdf_attach_db(DirSpec, [access(read_only)|Options])
215 ).
216
217
218rdf_attach_db_rw(DirSpec, Options) :-
219 absolute_file_name(DirSpec,
220 Directory,
221 [ access(write),
222 file_type(directory),
223 file_errors(fail)
224 ]),
225 !,
226 ( rdf_directory(Directory)
227 -> true 228 ; rdf_detach_db,
229 mkdir(Directory),
230 lock_db(Directory),
231 assert(rdf_directory(Directory)),
232 assert_options(Options),
233 stop_monitor, 234 no_agc(load_db),
235 at_halt(rdf_detach_db),
236 start_monitor
237 ).
238rdf_attach_db_rw(DirSpec, Options) :-
239 absolute_file_name(DirSpec,
240 Directory,
241 [ solutions(all)
242 ]),
243 ( exists_directory(Directory)
244 -> access_file(Directory, write)
245 ; catch(make_directory(Directory), _, fail)
246 ),
247 !,
248 rdf_attach_db(Directory, Options).
249rdf_attach_db_rw(DirSpec, _) :- 250 absolute_file_name(DirSpec, 251 Directory,
252 [ access(exist),
253 file_type(directory)
254 ]),
255 permission_error(write, directory, Directory).
256
260
261rdf_attach_db_ro(Directory, Options) :-
262 rdf_detach_db,
263 assert(rdf_directory(Directory)),
264 assert_options(Options),
265 stop_monitor, 266 no_agc(load_db).
267
268
269assert_options([]).
270assert_options([H|T]) :-
271 ( option_type(H, Check)
272 -> Check,
273 assert(rdf_option(H))
274 ; true 275 ),
276 assert_options(T).
277
278option_type(concurrency(X), must_be(positive_integer, X)).
279option_type(max_open_journals(X), must_be(positive_integer, X)).
280option_type(directory_levels(X), must_be(positive_integer, X)).
281option_type(silent(X), must_be(oneof([true,false,brief]), X)).
282option_type(log_nested_transactions(X), must_be(boolean, X)).
283option_type(access(X), must_be(oneof([read_write,
284 read_only]), X)).
285
286
298
299rdf_persistency_property(Property) :-
300 var(Property),
301 !,
302 rdf_persistency_property_(Property).
303rdf_persistency_property(Property) :-
304 rdf_persistency_property_(Property),
305 !.
306
307rdf_persistency_property_(Property) :-
308 rdf_option(Property).
309rdf_persistency_property_(directory(Dir)) :-
310 rdf_directory(Dir).
311
317
318no_agc(Goal) :-
319 current_prolog_flag(agc_margin, Old),
320 setup_call_cleanup(
321 set_prolog_flag(agc_margin, 0),
322 Goal,
323 set_prolog_flag(agc_margin, Old)).
324
325
331
332rdf_detach_db :-
333 debug(halt, 'Detaching RDF database', []),
334 stop_monitor,
335 close_journals,
336 ( retract(rdf_directory(Dir))
337 -> debug(halt, 'DB Directory: ~w', [Dir]),
338 save_prefixes(Dir),
339 retractall(rdf_option(_)),
340 retractall(source_journal_fd(_,_)),
341 unlock_db(Dir)
342 ; true
343 ).
344
345
349
350rdf_current_db(Directory) :-
351 rdf_directory(Dir),
352 !,
353 Dir = Directory.
354
355
366
367rdf_flush_journals(Options) :-
368 option(graph(Graph), Options, _),
369 forall(rdf_graph(Graph),
370 rdf_flush_journal(Graph, Options)).
371
372rdf_flush_journal(Graph, Options) :-
373 db_files(Graph, _SnapshotFile, JournalFile),
374 db_file(JournalFile, File),
375 ( \+ exists_file(File)
376 -> true
377 ; memberchk(min_size(KB), Options),
378 size_file(File, Size),
379 Size / 1024 < KB
380 -> true
381 ; create_db(Graph)
382 ).
383
384 387
393
394load_db :-
395 rdf_directory(Dir),
396 concurrency(Jobs),
397 cpu_stat_key(Jobs, StatKey),
398 get_time(Wall0),
399 statistics(StatKey, T0),
400 load_prefixes(Dir),
401 verbosity(Silent),
402 find_dbs(Dir, Graphs, SnapShots, Journals),
403 length(Graphs, GraphCount),
404 maplist(rdf_unload_graph, Graphs),
405 rdf_statistics(triples(Triples0)),
406 load_sources(snapshots, SnapShots, Silent, Jobs),
407 load_sources(journals, Journals, Silent, Jobs),
408 rdf_statistics(triples(Triples1)),
409 statistics(StatKey, T1),
410 get_time(Wall1),
411 T is T1 - T0,
412 Wall is Wall1 - Wall0,
413 Triples = Triples1 - Triples0,
414 message_level(Silent, Level),
415 print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
416
417load_sources(_, [], _, _) :- !.
418load_sources(Type, Sources, Silent, Jobs) :-
419 length(Sources, Count),
420 RunJobs is min(Count, Jobs),
421 print_message(informational, rdf(restoring(Type, Count, RunJobs))),
422 make_goals(Sources, Silent, 1, Count, Goals),
423 concurrent(RunJobs, Goals, []).
424
425
427
428make_goals([], _, _, _, []).
429make_goals([DB|T0], Silent, I, Total,
430 [load_source(DB, Silent, I, Total)|T]) :-
431 I2 is I + 1,
432 make_goals(T0, Silent, I2, Total, T).
433
434verbosity(Silent) :-
435 rdf_option(silent(Silent)),
436 !.
437verbosity(Silent) :-
438 current_prolog_flag(verbose, silent),
439 !,
440 Silent = true.
441verbosity(brief).
442
443
447
448concurrency(Jobs) :-
449 rdf_option(concurrency(Jobs)),
450 !.
451concurrency(Jobs) :-
452 current_prolog_flag(cpu_count, Jobs),
453 Jobs > 0,
454 !.
455concurrency(1).
456
457cpu_stat_key(1, cputime) :- !.
458cpu_stat_key(_, process_cputime).
459
460
469
470find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
471 directory_files(Dir, Files),
472 phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
473 maplist(db_graph, Scanned, UnsortedGraphs),
474 sort(UnsortedGraphs, Graphs),
475 ( consider_reindex_db(Dir, Graphs, Scanned)
476 -> find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
477 ; partition(db_is_snapshot, Scanned, Snapshots, Journals),
478 sort(Snapshots, SnapBySize),
479 sort(Journals, JournalBySize)
480 ).
481
482consider_reindex_db(Dir, Graphs, Scanned) :-
483 length(Graphs, Count),
484 Count > 0,
485 DepthNeeded is floor(log(Count)/log(256)),
486 ( maplist(depth_db(DepthNow), Scanned)
487 -> ( DepthNeeded > DepthNow
488 -> true
489 ; retractall(rdf_option(directory_levels(_))),
490 assertz(rdf_option(directory_levels(DepthNow))),
491 fail
492 )
493 ; true
494 ),
495 reindex_db(Dir, DepthNeeded).
496
497db_is_snapshot(Term) :-
498 arg(2, Term, trp).
499
500db_graph(Term, DB) :-
501 arg(3, Term, DB).
502
503db_file_name(Term, File) :-
504 arg(4, Term, File).
505
506depth_db(Depth, DB) :-
507 arg(5, DB, Depth).
508
513
514scan_db_files([], _, _, _) -->
515 [].
516scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
517 { nofollow(Nofollow) },
518 !,
519 scan_db_files(T, Dir, Prefix, Depth).
520scan_db_files([File|T], Dir, Prefix, Depth) -->
521 { file_name_extension(Base, Ext, File),
522 db_extension(Ext),
523 !,
524 rdf_db_to_file(DB, Base),
525 directory_file_path(Prefix, File, DBFile),
526 directory_file_path(Dir, DBFile, AbsFile),
527 size_file(AbsFile, Size)
528 },
529 [ db(Size, Ext, DB, AbsFile, Depth) ],
530 scan_db_files(T, Dir, Prefix, Depth).
531scan_db_files([D|T], Dir, Prefix, Depth) -->
532 { directory_file_path(Prefix, D, SubD),
533 directory_file_path(Dir, SubD, AbsD),
534 exists_directory(AbsD),
535 \+ read_link(AbsD, _, _), 536 !,
537 directory_files(AbsD, SubFiles),
538 SubDepth is Depth + 1
539 },
540 scan_db_files(SubFiles, Dir, SubD, SubDepth),
541 scan_db_files(T, Dir, Prefix, Depth).
542scan_db_files([_|T], Dir, Prefix, Depth) -->
543 scan_db_files(T, Dir, Prefix, Depth).
544
545nofollow(.).
546nofollow(..).
547
548db_extension(trp).
549db_extension(jrn).
550
551:- public load_source/4. 552
553load_source(DB, Silent, Nth, Total) :-
554 db_file_name(DB, File),
555 db_graph(DB, Graph),
556 message_level(Silent, Level),
557 graph_triple_count(Graph, Count0),
558 statistics(cputime, T0),
559 ( db_is_snapshot(DB)
560 -> print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
561 rdf_load_db(File)
562 ; print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
563 load_journal(File, Graph)
564 ),
565 statistics(cputime, T1),
566 T is T1 - T0,
567 graph_triple_count(Graph, Count1),
568 Count is Count1 - Count0,
569 print_message(Level, rdf(restore(Silent,
570 done(Graph, T, Count, Nth, Total)))).
571
572
573graph_triple_count(Graph, Count) :-
574 rdf_statistics(triples_by_graph(Graph, Count)),
575 !.
576graph_triple_count(_, 0).
577
578
583
584attach_graph(Graph, Options) :-
585 ( option(silent(true), Options)
586 -> Level = silent
587 ; Level = informational
588 ),
589 db_files(Graph, SnapshotFile, JournalFile),
590 rdf_retractall(_,_,_,Graph),
591 statistics(cputime, T0),
592 print_message(Level, rdf(restore(Silent, Graph))),
593 db_file(SnapshotFile, AbsSnapShot),
594 ( exists_file(AbsSnapShot)
595 -> print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
596 rdf_load_db(AbsSnapShot)
597 ; true
598 ),
599 ( exists_db(JournalFile)
600 -> print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
601 load_journal(JournalFile, Graph)
602 ; true
603 ),
604 statistics(cputime, T1),
605 T is T1 - T0,
606 ( rdf_statistics(triples_by_graph(Graph, Count))
607 -> true
608 ; Count = 0
609 ),
610 print_message(Level, rdf(restore(Silent,
611 done(Graph, T, Count)))).
612
613message_level(true, silent) :- !.
614message_level(_, informational).
615
616
617 620
625
626load_journal(File, DB) :-
627 rdf_create_graph(DB),
628 setup_call_cleanup(
629 open(File, read, In, [encoding(utf8)]),
630 ( read(In, T0),
631 process_journal(T0, In, DB)
632 ),
633 close(In)).
634
635process_journal(end_of_file, _, _) :- !.
636process_journal(Term, In, DB) :-
637 ( process_journal_term(Term, DB)
638 -> true
639 ; throw(error(type_error(journal_term, Term), _))
640 ),
641 read(In, T2),
642 process_journal(T2, In, DB).
643
644process_journal_term(assert(S,P,O), DB) :-
645 rdf_assert(S,P,O,DB).
646process_journal_term(assert(S,P,O,Line), DB) :-
647 rdf_assert(S,P,O,DB:Line).
648process_journal_term(retract(S,P,O), DB) :-
649 rdf_retractall(S,P,O,DB).
650process_journal_term(retract(S,P,O,Line), DB) :-
651 rdf_retractall(S,P,O,DB:Line).
652process_journal_term(update(S,P,O,Action), DB) :-
653 ( rdf_update(S,P,O,DB, Action)
654 -> true
655 ; print_message(warning, rdf(update_failed(S,P,O,Action)))
656 ).
657process_journal_term(start(_), _). 658process_journal_term(end(_), _).
659process_journal_term(begin(_), _). 660process_journal_term(end, _).
661process_journal_term(begin(_,_,_,_), _). 662process_journal_term(end(_,_,_), _).
663
664
665 668
669:- dynamic
670 blocked_db/2, 671 transaction_message/3, 672 transaction_db/3. 673
678
679rdf_persistency(DB, Bool) :-
680 must_be(atom, DB),
681 must_be(boolean, Bool),
682 fail.
683rdf_persistency(DB, false) :-
684 !,
685 ( blocked_db(DB, persistency)
686 -> true
687 ; assert(blocked_db(DB, persistency)),
688 delete_db(DB)
689 ).
690rdf_persistency(DB, true) :-
691 ( retract(blocked_db(DB, persistency))
692 -> create_db(DB)
693 ; true
694 ).
695
699
700:- multifile
701 rdf_db:property_of_graph/2. 702
703rdf_db:property_of_graph(persistent(State), Graph) :-
704 ( blocked_db(Graph, persistency)
705 -> State = false
706 ; State = true
707 ).
708
709
715
716start_monitor :-
717 rdf_monitor(monitor,
718 [ -assert(load)
719 ]).
720stop_monitor :-
721 rdf_monitor(monitor,
722 [ -all
723 ]).
724
731
732monitor(Msg) :-
733 debug(monitor, 'Monitor: ~p~n', [Msg]),
734 fail.
735monitor(assert(S,P,O,DB:Line)) :-
736 !,
737 \+ blocked_db(DB, _),
738 journal_fd(DB, Fd),
739 open_transaction(DB, Fd),
740 format(Fd, '~q.~n', [assert(S,P,O,Line)]),
741 sync_journal(DB, Fd).
742monitor(assert(S,P,O,DB)) :-
743 \+ blocked_db(DB, _),
744 journal_fd(DB, Fd),
745 open_transaction(DB, Fd),
746 format(Fd, '~q.~n', [assert(S,P,O)]),
747 sync_journal(DB, Fd).
748monitor(retract(S,P,O,DB:Line)) :-
749 !,
750 \+ blocked_db(DB, _),
751 journal_fd(DB, Fd),
752 open_transaction(DB, Fd),
753 format(Fd, '~q.~n', [retract(S,P,O,Line)]),
754 sync_journal(DB, Fd).
755monitor(retract(S,P,O,DB)) :-
756 \+ blocked_db(DB, _),
757 journal_fd(DB, Fd),
758 open_transaction(DB, Fd),
759 format(Fd, '~q.~n', [retract(S,P,O)]),
760 sync_journal(DB, Fd).
761monitor(update(S,P,O,DB:Line,Action)) :-
762 !,
763 \+ blocked_db(DB, _),
764 ( Action = graph(NewDB)
765 -> monitor(assert(S,P,O,NewDB)),
766 monitor(retract(S,P,O,DB:Line))
767 ; journal_fd(DB, Fd),
768 format(Fd, '~q.~n', [update(S,P,O,Action)]),
769 sync_journal(DB, Fd)
770 ).
771monitor(update(S,P,O,DB,Action)) :-
772 \+ blocked_db(DB, _),
773 ( Action = graph(NewDB)
774 -> monitor(assert(S,P,O,NewDB)),
775 monitor(retract(S,P,O,DB))
776 ; journal_fd(DB, Fd),
777 open_transaction(DB, Fd),
778 format(Fd, '~q.~n', [update(S,P,O,Action)]),
779 sync_journal(DB, Fd)
780 ).
781monitor(load(BE, _DumpFileURI)) :-
782 ( BE = end(Graphs)
783 -> sync_loaded_graphs(Graphs)
784 ; true
785 ).
786monitor(create_graph(Graph)) :-
787 \+ blocked_db(Graph, _),
788 journal_fd(Graph, Fd),
789 open_transaction(Graph, Fd),
790 sync_journal(Graph, Fd).
791monitor(reset) :-
792 forall(rdf_graph(Graph), delete_db(Graph)).
793 794
795monitor(transaction(BE, Id)) :-
796 monitor_transaction(Id, BE).
797
798monitor_transaction(load_journal(DB), begin(_)) :-
799 !,
800 assert(blocked_db(DB, journal)).
801monitor_transaction(load_journal(DB), end(_)) :-
802 !,
803 retractall(blocked_db(DB, journal)).
804
805monitor_transaction(parse(URI), begin(_)) :-
806 !,
807 ( blocked_db(URI, persistency)
808 -> true
809 ; assert(blocked_db(URI, parse))
810 ).
811monitor_transaction(parse(URI), end(_)) :-
812 !,
813 ( retract(blocked_db(URI, parse))
814 -> create_db(URI)
815 ; true
816 ).
817monitor_transaction(unload(DB), begin(_)) :-
818 !,
819 ( blocked_db(DB, persistency)
820 -> true
821 ; assert(blocked_db(DB, unload))
822 ).
823monitor_transaction(unload(DB), end(_)) :-
824 !,
825 ( retract(blocked_db(DB, unload))
826 -> delete_db(DB)
827 ; true
828 ).
829monitor_transaction(log(Msg), begin(N)) :-
830 !,
831 check_nested(N),
832 get_time(Time),
833 asserta(transaction_message(N, Time, Msg)).
834monitor_transaction(log(_), end(N)) :-
835 check_nested(N),
836 retract(transaction_message(N, _, _)),
837 !,
838 findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
839 end_transactions(DBs, N).
840monitor_transaction(log(Msg, DB), begin(N)) :-
841 !,
842 check_nested(N),
843 get_time(Time),
844 asserta(transaction_message(N, Time, Msg)),
845 journal_fd(DB, Fd),
846 open_transaction(DB, Fd).
847monitor_transaction(log(Msg, _DB), end(N)) :-
848 monitor_transaction(log(Msg), end(N)).
849
850
856
857check_nested(0) :- !.
858check_nested(_) :-
859 rdf_option(log_nested_transactions(true)).
860
861
869
870open_transaction(DB, Fd) :-
871 transaction_message(N, Time, Msg),
872 !,
873 ( transaction_db(N, DB, _)
874 -> true
875 ; next_transaction_id(DB, Id),
876 assert(transaction_db(N, DB, Id)),
877 RoundedTime is round(Time*100)/100,
878 format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
879 ).
880open_transaction(_,_).
881
882
890
891:- dynamic
892 current_transaction_id/2. 893
894next_transaction_id(DB, Id) :-
895 retract(current_transaction_id(DB, Last)),
896 !,
897 Id is Last + 1,
898 assert(current_transaction_id(DB, Id)).
899next_transaction_id(DB, Id) :-
900 db_files(DB, _, Journal),
901 exists_file(Journal),
902 !,
903 size_file(Journal, Size),
904 open_db(Journal, read, In, []),
905 call_cleanup(iterative_expand(In, Size, Last), close(In)),
906 Id is Last + 1,
907 assert(current_transaction_id(DB, Id)).
908next_transaction_id(DB, 1) :-
909 assert(current_transaction_id(DB, 1)).
910
911iterative_expand(_, 0, 0) :- !.
912iterative_expand(In, Size, Last) :- 913 Max is floor(log(Size)/log(2)),
914 between(10, Max, Step),
915 Offset is -(1<<Step),
916 seek(In, Offset, eof, _),
917 skip(In, 10), 918 read(In, T0),
919 last_transaction_id(T0, In, 0, Last),
920 Last > 0,
921 !.
922iterative_expand(In, _, Last) :- 923 seek(In, 0, bof, _),
924 read(In, T0),
925 last_transaction_id(T0, In, 0, Last).
926
927last_transaction_id(end_of_file, _, Last, Last) :- !.
928last_transaction_id(end(Id, _, _), In, _, Last) :-
929 read(In, T1),
930 last_transaction_id(T1, In, Id, Last).
931last_transaction_id(_, In, Id, Last) :-
932 read(In, T1),
933 last_transaction_id(T1, In, Id, Last).
934
935
947
948end_transactions(DBs, N) :-
949 end_transactions(DBs, DBs, N).
950
951end_transactions([], _, _).
952end_transactions([DB:Id|T], DBs, N) :-
953 journal_fd(DB, Fd),
954 once(select(DB:Id, DBs, Others)),
955 format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
956 sync_journal(DB, Fd),
957 end_transactions(T, DBs, N).
958
959
964
965sync_loaded_graphs(Graphs) :-
966 maplist(create_db, Graphs).
967
968
969 972
980
981journal_fd(DB, Fd) :-
982 source_journal_fd(DB, Fd),
983 !.
984journal_fd(DB, Fd) :-
985 with_mutex(rdf_journal_file,
986 journal_fd_(DB, Out)),
987 Fd = Out.
988
989journal_fd_(DB, Fd) :-
990 source_journal_fd(DB, Fd),
991 !.
992journal_fd_(DB, Fd) :-
993 limit_fd_pool,
994 db_files(DB, _Snapshot, Journal),
995 open_db(Journal, append, Fd,
996 [ close_on_abort(false)
997 ]),
998 time_stamp(Now),
999 format(Fd, '~q.~n', [start([time(Now)])]),
1000 assert(source_journal_fd(DB, Fd)). 1001
1008
1009limit_fd_pool :-
1010 predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
1011 !,
1012 ( rdf_option(max_open_journals(Max))
1013 -> true
1014 ; Max = 10
1015 ),
1016 Close is N - Max,
1017 forall(between(1, Close, _),
1018 close_oldest_journal).
1019limit_fd_pool.
1020
1021close_oldest_journal :-
1022 source_journal_fd(DB, _Fd),
1023 !,
1024 debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
1025 close_journal(DB).
1026close_oldest_journal.
1027
1028
1034
1035sync_journal(DB, _) :-
1036 transaction_db(_, DB, _),
1037 !.
1038sync_journal(_, Fd) :-
1039 flush_output(Fd).
1040
1044
1045close_journal(DB) :-
1046 with_mutex(rdf_journal_file,
1047 close_journal_(DB)).
1048
1049close_journal_(DB) :-
1050 ( retract(source_journal_fd(DB, Fd))
1051 -> time_stamp(Now),
1052 format(Fd, '~q.~n', [end([time(Now)])]),
1053 close(Fd, [force(true)])
1054 ; true
1055 ).
1056
1060
1061close_journals :-
1062 forall(source_journal_fd(DB, _),
1063 catch(close_journal(DB), E,
1064 print_message(error, E))).
1065
1070
1071create_db(Graph) :-
1072 \+ rdf(_,_,_,Graph),
1073 !,
1074 debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
1075 delete_db(Graph).
1076create_db(Graph) :-
1077 debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
1078 close_journal(Graph),
1079 db_abs_files(Graph, Snapshot, Journal),
1080 atom_concat(Snapshot, '.new', NewSnapshot),
1081 ( catch(( create_directory_levels(Snapshot),
1082 rdf_save_db(NewSnapshot, Graph)
1083 ), Error,
1084 ( print_message(warning, Error),
1085 fail
1086 ))
1087 -> ( exists_file(Journal)
1088 -> delete_file(Journal)
1089 ; true
1090 ),
1091 rename_file(NewSnapshot, Snapshot),
1092 debug(rdf_persistency, 'Saved Graph ~w', [Graph])
1093 ; catch(delete_file(NewSnapshot), _, true)
1094 ).
1095
1096
1100
1101delete_db(DB) :-
1102 with_mutex(rdf_journal_file,
1103 delete_db_(DB)).
1104
1105delete_db_(DB) :-
1106 close_journal_(DB),
1107 db_abs_files(DB, Snapshot, Journal),
1108 !,
1109 ( exists_file(Journal)
1110 -> delete_file(Journal)
1111 ; true
1112 ),
1113 ( exists_file(Snapshot)
1114 -> delete_file(Snapshot)
1115 ; true
1116 ).
1117delete_db_(_).
1118
1119 1122
1126
1127lock_db(Dir) :-
1128 lockfile(Dir, File),
1129 catch(open(File, update, Out, [lock(write), wait(false)]),
1130 error(permission_error(Access, _, _), _),
1131 locked_error(Access, Dir)),
1132 ( current_prolog_flag(pid, PID)
1133 -> true
1134 ; PID = 0 1135 ),
1136 time_stamp(Now),
1137 gethostname(Host),
1138 format(Out, '/* RDF Database is in use */~n~n', []),
1139 format(Out, '~q.~n', [ locked([ time(Now),
1140 pid(PID),
1141 host(Host)
1142 ])
1143 ]),
1144 flush_output(Out),
1145 set_end_of_stream(Out),
1146 assert(rdf_lock(Dir, lock(Out, File))),
1147 at_halt(unlock_db(Dir)).
1148
1149locked_error(lock, Dir) :-
1150 lockfile(Dir, File),
1151 ( catch(read_file_to_terms(File, Terms, []), _, fail),
1152 Terms = [locked(Args)]
1153 -> Context = rdf_locked(Args)
1154 ; Context = context(_, 'Database is in use')
1155 ),
1156 throw(error(permission_error(lock, rdf_db, Dir), Context)).
1157locked_error(open, Dir) :-
1158 throw(error(permission_error(lock, rdf_db, Dir),
1159 context(_, 'Lock file cannot be opened'))).
1160
1163
1164unlock_db(Dir) :-
1165 retract(rdf_lock(Dir, lock(Out, File))),
1166 !,
1167 unlock_db(Out, File).
1168unlock_db(_).
1169
1170unlock_db(Out, File) :-
1171 close(Out),
1172 delete_file(File).
1173
1174 1177
1178lockfile(Dir, LockFile) :-
1179 atomic_list_concat([Dir, /, lock], LockFile).
1180
1181directory_levels(Levels) :-
1182 rdf_option(directory_levels(Levels)),
1183 !.
1184directory_levels(2).
1185
1186db_file(Base, File) :-
1187 rdf_directory(Dir),
1188 directory_levels(Levels),
1189 db_file(Dir, Base, Levels, File).
1190
1191db_file(Dir, Base, Levels, File) :-
1192 dir_levels(Base, Levels, Segments, [Base]),
1193 atomic_list_concat([Dir|Segments], /, File).
1194
1195open_db(Base, Mode, Stream, Options) :-
1196 db_file(Base, File),
1197 create_directory_levels(File),
1198 open(File, Mode, Stream, [encoding(utf8)|Options]).
1199
1200create_directory_levels(_File) :-
1201 rdf_option(directory_levels(0)),
1202 !.
1203create_directory_levels(File) :-
1204 file_directory_name(File, Dir),
1205 make_directory_path(Dir).
1206
1207exists_db(Base) :-
1208 db_file(Base, File),
1209 exists_file(File).
1210
1215
1216dir_levels(_, 0, Segments, Segments) :- !.
1217dir_levels(File, Levels, Segments, Tail) :-
1218 rdf_atom_md5(File, 1, Hash),
1219 create_dir_levels(Levels, 0, Hash, Segments, Tail).
1220
1221create_dir_levels(0, _, _, Segments, Segments) :- !.
1222create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
1223 sub_atom(Hash, S, 2, _, S1),
1224 S2 is S+2,
1225 N2 is N-1,
1226 create_dir_levels(N2, S2, Hash, Segments0, Tail).
1227
1228
1236
1237db_files(DB, Snapshot, Journal) :-
1238 nonvar(DB),
1239 !,
1240 rdf_db_to_file(DB, Base),
1241 atom_concat(Base, '.trp', Snapshot),
1242 atom_concat(Base, '.jrn', Journal).
1243db_files(DB, Snapshot, Journal) :-
1244 nonvar(Snapshot),
1245 !,
1246 atom_concat(Base, '.trp', Snapshot),
1247 atom_concat(Base, '.jrn', Journal),
1248 rdf_db_to_file(DB, Base).
1249db_files(DB, Snapshot, Journal) :-
1250 nonvar(Journal),
1251 !,
1252 atom_concat(Base, '.jrn', Journal),
1253 atom_concat(Base, '.trp', Snapshot),
1254 rdf_db_to_file(DB, Base).
1255
1256db_abs_files(DB, Snapshot, Journal) :-
1257 db_files(DB, Snapshot0, Journal0),
1258 db_file(Snapshot0, Snapshot),
1259 db_file(Journal0, Journal).
1260
1261
1266
1267rdf_journal_file(Graph, Journal) :-
1268 ( var(Graph)
1269 -> rdf_graph(Graph)
1270 ; true
1271 ),
1272 db_abs_files(Graph, _Snapshot, Journal),
1273 exists_file(Journal).
1274
1275
1280
1281rdf_snapshot_file(Graph, Snapshot) :-
1282 ( var(Graph)
1283 -> rdf_graph(Graph) 1284 ; true
1285 ),
1286 db_abs_files(Graph, Snapshot, _Journal),
1287 exists_file(Snapshot).
1288
1289
1298
1299rdf_db_to_file(DB, File) :-
1300 file_base_db(File, DB),
1301 !.
1302rdf_db_to_file(DB, File) :-
1303 url_to_filename(DB, File),
1304 assert(file_base_db(File, DB)).
1305
1316
1317url_to_filename(URL, FileName) :-
1318 atomic(URL),
1319 !,
1320 atom_codes(URL, Codes),
1321 phrase(url_encode(EncCodes), Codes),
1322 atom_codes(FileName, EncCodes).
1323url_to_filename(URL, FileName) :-
1324 uri_encoded(path, URL, FileName).
1325
1326url_encode([0'+|T]) -->
1327 " ",
1328 !,
1329 url_encode(T).
1330url_encode([C|T]) -->
1331 alphanum(C),
1332 !,
1333 url_encode(T).
1334url_encode([C|T]) -->
1335 no_enc_extra(C),
1336 !,
1337 url_encode(T).
1338url_encode(Enc) -->
1339 ( "\r\n"
1340 ; "\n"
1341 ),
1342 !,
1343 { string_codes("%0D%0A", Codes),
1344 append(Codes, T, Enc)
1345 },
1346 url_encode(T).
1347url_encode([]) -->
1348 eos,
1349 !.
1350url_encode([0'%,D1,D2|T]) -->
1351 [C],
1352 { Dv1 is (C>>4 /\ 0xf),
1353 Dv2 is (C /\ 0xf),
1354 code_type(D1, xdigit(Dv1)),
1355 code_type(D2, xdigit(Dv2))
1356 },
1357 url_encode(T).
1358
1359eos([], []).
1360
1361alphanum(C) -->
1362 [C],
1363 { C < 128, 1364 code_type(C, alnum)
1365 }.
1366
(0'_) --> "_".
1368
1369
1370 1373
1377
1378reindex_db(Dir, Levels) :-
1379 directory_files(Dir, Files),
1380 reindex_files(Files, Dir, '.', 0, Levels),
1381 remove_empty_directories(Files, Dir).
1382
1383reindex_files([], _, _, _, _).
1384reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
1385 nofollow(Nofollow),
1386 !,
1387 reindex_files(Files, Dir, Prefix, CLevel, Levels).
1388reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
1389 CLevel \== Levels,
1390 file_name_extension(_Base, Ext, File),
1391 db_extension(Ext),
1392 !,
1393 directory_file_path(Prefix, File, DBFile),
1394 directory_file_path(Dir, DBFile, OldPath),
1395 db_file(Dir, File, Levels, NewPath),
1396 debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
1397 file_directory_name(NewPath, NewDir),
1398 make_directory_path(NewDir),
1399 rename_file(OldPath, NewPath),
1400 reindex_files(Files, Dir, Prefix, CLevel, Levels).
1401reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
1402 directory_file_path(Prefix, D, SubD),
1403 directory_file_path(Dir, SubD, AbsD),
1404 exists_directory(AbsD),
1405 \+ read_link(AbsD, _, _), 1406 !,
1407 directory_files(AbsD, SubFiles),
1408 CLevel2 is CLevel + 1,
1409 reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
1410 reindex_files(Files, Dir, Prefix, CLevel, Levels).
1411reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
1412 reindex_files(Files, Dir, Prefix, CLevel, Levels).
1413
1414
1415remove_empty_directories([], _).
1416remove_empty_directories([File|Files], Dir) :-
1417 \+ nofollow(File),
1418 directory_file_path(Dir, File, Path),
1419 exists_directory(Path),
1420 \+ read_link(Path, _, _),
1421 !,
1422 directory_files(Path, Content),
1423 exclude(nofollow, Content, RealContent),
1424 ( RealContent == []
1425 -> debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
1426 delete_directory(Path)
1427 ; remove_empty_directories(RealContent, Path)
1428 ),
1429 remove_empty_directories(Files, Dir).
1430remove_empty_directories([_|Files], Dir) :-
1431 remove_empty_directories(Files, Dir).
1432
1433
1434 1437
1438save_prefixes(Dir) :-
1439 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
1440 setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
1441 write_prefixes(Out),
1442 close(Out)).
1443
1444write_prefixes(Out) :-
1445 format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
1446 forall(rdf_current_ns(Alias, URI),
1447 format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
1448
1456
1457load_prefixes(Dir) :-
1458 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
1459 ( exists_file(PrefixFile)
1460 -> setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
1461 read_prefixes(In),
1462 close(In))
1463 ; true
1464 ).
1465
1466read_prefixes(Stream) :-
1467 read_term(Stream, T0, []),
1468 read_prefixes(T0, Stream).
1469
1470read_prefixes(end_of_file, _) :- !.
1471read_prefixes(prefix(Alias, URI), Stream) :-
1472 !,
1473 must_be(atom, Alias),
1474 must_be(atom, URI),
1475 catch(rdf_register_ns(Alias, URI, []), E,
1476 print_message(warning, E)),
1477 read_term(Stream, T, []),
1478 read_prefixes(T, Stream).
1479read_prefixes(Term, _) :-
1480 domain_error(prefix_term, Term).
1481
1482
1483 1486
1490
1491mkdir(Directory) :-
1492 exists_directory(Directory),
1493 !.
1494mkdir(Directory) :-
1495 make_directory(Directory).
1496
1500
1501time_stamp(Int) :-
1502 get_time(Now),
1503 Int is round(Now).
1504
1505
1506 1509
1510:- multifile
1511 prolog:message/3,
1512 prolog:message_context/3. 1513
1514prolog:message(rdf(Term)) -->
1515 message(Term).
1516
1517message(restoring(Type, Count, Jobs)) -->
1518 [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
1519message(restore(attached(Graphs, Triples, Time/Wall))) -->
1520 { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
1521 [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
1522 [Graphs, Triples, Wall, Percent, Time] ].
1524message(restore(true, Action)) -->
1525 !,
1526 silent_message(Action).
1527message(restore(brief, Action)) -->
1528 !,
1529 brief_message(Action).
1530message(restore(_, Graph)) -->
1531 [ 'Restoring ~p ... '-[Graph], flush ].
1532message(restore(_, snapshot(_))) -->
1533 [ at_same_line, '(snapshot) '-[], flush ].
1534message(restore(_, journal(_))) -->
1535 [ at_same_line, '(journal) '-[], flush ].
1536message(restore(_, done(_, Time, Count))) -->
1537 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
1539message(restore(_, snapshot(G, _))) -->
1540 [ 'Restoring ~p\t(snapshot)'-[G], flush ].
1541message(restore(_, journal(G, _))) -->
1542 [ 'Restoring ~p\t(journal)'-[G], flush ].
1543message(restore(_, done(_, Time, Count))) -->
1544 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
1546message(update_failed(S,P,O,Action)) -->
1547 [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
1549message(reindex(Count, Depth)) -->
1550 [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
1551message(reindex(Depth)) -->
1552 [ 'Fixing database directory structure (~d levels)'-[Depth] ].
1553message(read_only) -->
1554 [ 'Cannot write persistent store; continuing in read-only mode.', nl,
1555 'All changes to the RDF store will be lost if this process terminates.'
1556 ].
1557
1558silent_message(_Action) --> [].
1559
1560brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
1561 { file_base_name(Graph, Base) },
1562 [ at_same_line,
1563 '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
1564 flush
1565 ].
1566brief_message(_) --> [].
1567
1568
1569prolog:message_context(rdf_locked(Args)) -->
1570 { memberchk(time(Time), Args),
1571 memberchk(pid(Pid), Args),
1572 format_time(string(S), '%+', Time)
1573 },
1574 [ nl,
1575 'locked at ~s by process id ~w'-[S,Pid]
1576 ]