View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker and Sean Charles
    4    E-mail:        jan@swi-prolog.org and <sean at objitsu dot com>
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2013-2020, Sean Charles
    7                              SWI-Prolog Solutions b.v.
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34
   35    NOTE
   36
   37    The original code was subject to the MIT licence and written by
   38    Sean Charles.  Re-licenced to standard SWI-Prolog BSD-2 with
   39    permission from Sean Charles.
   40*/
   41
   42:- module(redis,
   43          [ redis_server/3,             % +Alias, +Address, +Options
   44            redis_connect/1,            % -Connection
   45            redis_connect/3,            % -Connection, +Host, +Port
   46            redis_disconnect/1,         % +Connection
   47            redis_disconnect/2,         % +Connection, +Options
   48                                        % Queries
   49            redis/1,                    % +Request
   50            redis/2,                    % +Connection, +Request
   51            redis/3,                    % +Connection, +Request, -Reply
   52                                        % High level queries
   53            redis_get_list/3,           % +Redis, +Key, -List
   54            redis_get_list/4,           % +Redis, +Key, +ChunkSize, -List
   55            redis_set_list/3,           % +Redis, +Key, +List
   56            redis_get_hash/3,           % +Redis, +Key, -Data:dict
   57            redis_set_hash/3,           % +Redis, +Key, +Data:dict
   58            redis_scan/3,               % +Redis, -LazyList, +Options
   59            redis_sscan/4,              % +Redis, +Set, -LazyList, +Options
   60            redis_hscan/4,              % +Redis, +Hash, -LazyList, +Options
   61            redis_zscan/4,              % +Redis, +Set, -LazyList, +Options
   62                                        % Publish/Subscribe
   63            redis_subscribe/4,          % +Redis, +Channels, -Id, +Options
   64            redis_subscribe/2,          % +Id, +Channels
   65            redis_unsubscribe/2,        % +Id, +Channels
   66            redis_current_subscription/2, % ?Id,?Channels
   67            redis_write/2,              % +Redis, +Command
   68            redis_read/2,               % +Redis, -Reply
   69                                        % Building blocks
   70            redis_array_dict/3,         % ?Array, ?Tag, ?Dict
   71                                        % Admin stuff
   72            redis_property/2,           % +Reply, ?Property
   73            redis_current_command/2,    % +Redis,?Command
   74            redis_current_command/3     % +Redis, +Command, -Properties
   75          ]).   76:- autoload(library(socket), [tcp_connect/3]).   77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]).   78:- autoload(library(broadcast), [broadcast/1]).   79:- autoload(library(error),
   80            [ must_be/2,
   81              instantiation_error/1,
   82              uninstantiation_error/1,
   83              existence_error/2
   84            ]).   85:- autoload(library(lazy_lists), [lazy_list/2]).   86:- autoload(library(lists), [append/3, member/2]).   87:- autoload(library(option), [merge_options/3, option/2, option/3]).   88:- autoload(library(pairs), [group_pairs_by_key/2]).   89:- use_module(library(debug), [debug/3, assertion/1]).   90:- use_module(library(settings), [setting/4, setting/2]).   91
   92:- use_foreign_library(foreign(redis4pl)).   93
   94:- setting(max_retry_count, nonneg, 8640, % one day
   95           "Max number of retries").   96:- setting(max_retry_wait, number, 10,
   97           "Max time to wait between recovery attempts").   98
   99:- predicate_options(redis_server/3, 3,
  100                     [ pass_to(redis:redis_connect/3, 3)
  101                     ]).  102:- predicate_options(redis_connect/3, 3,
  103                     [ reconnect(boolean),
  104                       user(atom),
  105                       password(atomic),
  106                       version(between(2,3))
  107                     ]).  108:- predicate_options(redis_disconnect/2, 2,
  109                     [ force(boolean)
  110                     ]).  111:- predicate_options(redis_scan/3, 3,
  112                     [ match(atomic),
  113                       count(nonneg),
  114                       type(atom)
  115                     ]).  116% Actually not passing, but the same
  117:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  118:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  119:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).

Redis client

This library is a client to Redis, a popular key value store to deal with caching and communication between micro services.

In the typical use case we register the details of one or more Redis servers using redis_server/3. Subsequenly, redis/2-3 is used to issue commands on the server. For example:

?- redis_server(default, redis:6379, [password("secret")]).
?- redis(default, set(user, "Bob")).
?- redis(default, get(user), User).
User = "Bob"

*/

  140:- dynamic server/3.  141
  142:- dynamic ( connection/2               % ServerName, Stream
  143           ) as volatile.
 redis_server(+ServerName, +Address, +Options) is det
Register a redis server without connecting to it. The ServerName acts as a lazy connection alias. Initially the ServerName default points at localhost:6379 with no connect options. The default server is used for redis/1 and redis/2 and may be changed using this predicate. Options are described with redis_connect/3.

Connections established this way are by default automatically reconnected if the connection is lost for some reason unless a reconnect(false) option is specified.

  157redis_server(Alias, Address, Options) :-
  158    must_be(ground, Alias),
  159    retractall(server(Alias, _, _)),
  160    asserta(server(Alias, Address, Options)).
  161
  162server(default, localhost:6379, []).
 redis_connect(-Connection) is det
 redis_connect(+Address, -Connection, +Options) is det
redis_connect(-Connection, +Host, +Port) is det
Connect to a redis server. The main mode is redis_connect(+Address, -Connection, +Options). redis_connect/1 is equivalent to redis_connect(localhost:6379, Connection, []). Options:
reconnect(+Boolean)
If true, try to reconnect to the service when the connection seems lost. Default is true for connections specified using redis_server/3 and false for explictly opened connections.
user(+User)
If version(3) and password(Password) are specified, these are used to authenticate using the HELLO command.
password(+Password)
Authenticate using Password
version(+Version)
Specify the connection protocol version. Initially this is version 2. Redis 6 also supports version 3. When specified as 3, the HELLO command is used to upgrade the protocol.

Instead of using these predicates, redis/2 and redis/3 are normally used with a server name argument registered using redis_server/3. These predicates are meant for creating a temporary paralel connection or using a connection with a blocking call.

Arguments:
Address- is a term Host:Port, unix(File) or the name of a server registered using redis_server/3. The latter realises a new connection that is typically used for blocking redis commands such as listening for published messages, waiting on a list or stream.
Compatibility
- redis_connect(-Connection, +Host, +Port) provides compatibility to the original GNU-Prolog interface and is equivalent to redis_connect(Host:Port, Connection, []).
  200redis_connect(Conn) :-
  201    redis_connect(default, Conn, []).
  202
  203redis_connect(Conn, Host, Port) :-
  204    var(Conn),
  205    ground(Host), ground(Port),
  206    !,                                  % GNU-Prolog compatibility
  207    redis_connect(Host:Port, Conn, []).
  208redis_connect(Server, Conn, Options) :-
  209    atom(Server),
  210    !,
  211    (   server(Server, Address, DefaultOptions)
  212    ->  merge_options(Options, DefaultOptions, Options2),
  213        do_connect(Server, Address, Conn, [address(Address)|Options2])
  214    ;   existence_error(redis_server, Server)
  215    ).
  216redis_connect(Address, Conn, Options) :-
  217    do_connect(Address, Address, Conn, [address(Address)|Options]).
 do_connect(+Id, +Address, -Conn, +Options)
Open the connection. A connection is a compound term of the shape
redis_connection(Id, Stream, Failures, Options)
  225do_connect(Id, Address0, Conn, Options) :-
  226    tcp_address(Address0, Address),
  227    tcp_connect(Address, Stream, Options),
  228    Conn = redis_connection(Id, Stream, 0, Options),
  229    hello(Conn, Options).
  230
  231tcp_address(unix(Path), Path) :-
  232    !.                                  % Using an atom is ambiguous
  233tcp_address(Address, Address).
 hello(+Connection, +Option)
Initialize the connection. This is used to upgrade to the RESP3 protocol and/or to authenticate.
  241hello(Con, Options) :-
  242    option(version(V), Options),
  243    V >= 3,
  244    !,
  245    (   option(user(User), Options),
  246        option(password(Password), Options)
  247    ->  redis(Con, hello(3, auth, User, Password))
  248    ;   redis(Con, hello(3))
  249    ).
  250hello(Con, Options) :-
  251    option(password(Password), Options),
  252    !,
  253    redis(Con, auth(Password)).
  254hello(_, _).
 redis_stream(+Spec, --Stream, +DoConnect) is det
Get the stream to a Redis server from Spec. Spec is either the name of a registered server or a term redis_connection(Id,Stream,Failures,Options). If the stream is disconnected it will be reconnected.
  263redis_stream(Var, S, _) :-
  264    (   var(Var)
  265    ->  !, instantiation_error(Var)
  266    ;   nonvar(S)
  267    ->  !, uninstantiation_error(S)
  268    ).
  269redis_stream(ServerName, S, Connect) :-
  270    atom(ServerName),
  271    !,
  272    (   connection(ServerName, S0)
  273    ->  S = S0
  274    ;   Connect == true,
  275        server(ServerName, Address, Options)
  276    ->  redis_connect(Address, Connection, Options),
  277        redis_stream(Connection, S, false),
  278        asserta(connection(ServerName, S))
  279    ;   existence_error(redis_server, ServerName)
  280    ).
  281redis_stream(redis_connection(_,S0,_,_), S, _) :-
  282    S0 \== (-),
  283    !,
  284    S = S0.
  285redis_stream(Redis, S, _) :-
  286    Redis = redis_connection(Id,-,_,Options),
  287    option(address(Address), Options),
  288    do_connect(Id,Address,Redis2,Options),
  289    arg(2, Redis2, S0),
  290    nb_setarg(2, Redis, S0),
  291    S = S0.
  292
  293has_redis_stream(Var, _) :-
  294    var(Var),
  295    !,
  296    instantiation_error(Var).
  297has_redis_stream(Alias, S) :-
  298    atom(Alias),
  299    !,
  300    connection(Alias, S).
  301has_redis_stream(redis_connection(_,S,_,_), S) :-
  302    S \== (-).
 redis_disconnect(+Connection) is det
 redis_disconnect(+Connection, +Options) is det
Disconnect from a redis server. The second form takes one option, similar to close/2:
force(Force)
When true (default false), do not raise any errors if Connection does not exist or closing the connection raises a network or I/O related exception. This version is used internally if a connection is in a broken state, either due to a protocol error or a network issue.
  318redis_disconnect(Redis) :-
  319    redis_disconnect(Redis, []).
  320
  321redis_disconnect(Redis, Options) :-
  322    option(force(true), Options),
  323    !,
  324    (   Redis = redis_connection(_Id, S, _, _Opts)
  325    ->  (   S == (-)
  326        ->  true
  327        ;   close(S, [force(true)]),
  328            nb_setarg(2, Redis, -)
  329        )
  330    ;   has_redis_stream(Redis, S)
  331    ->  close(S, [force(true)]),
  332        retractall(connection(_,S))
  333    ;   true
  334    ).
  335redis_disconnect(Redis, _Options) :-
  336    redis_stream(Redis, S, false),
  337    close(S),
  338    retractall(connection(_,S)).
 redis(+Connection, +Request) is semidet
This predicate is overloaded to handle two types of requests. First, it is a shorthand for redis(Connection, Command, _) and second, it can be used to exploit Redis pipelines and transactions. The second form is acticated if Request is a list. In that case, each element of the list is either a term Command -> Reply or a simple Command. Semantically this represents a sequence of redis/3 and redis/2 calls. It differs in the following aspects:

Procedurally, the process takes the following steps:

  1. Send all commands
  2. Read all replies and push messages
  3. Handle all callbacks from push messages
  4. Check whether one of the replies is an error. If so, raise this error (subsequent errors are lost)
  5. Bind all replies for the Command -> Reply terms.

Examples

?- redis(default,
         [ lpush(li,1),
           lpush(li,2),
           lrange(li,0,-1) -> List
         ]).
List = ["2", "1"].
  382redis(Redis, PipeLine) :-
  383    is_list(PipeLine),
  384    !,
  385    redis_pipeline(Redis, PipeLine).
  386redis(Redis, Req) :-
  387    redis(Redis, Req, _).
 redis(+Connection, +Command, -Reply) is semidet
Execute a redis Command on Connnection. Next, bind Reply to the returned result. Command is a callable term whose functor is the name of the Redis command and whose arguments are translated to Redis arguments according to the rules below. Note that all text is always represented using UTF-8 encoding.

Reply is either a plain term (often a variable) or a term Value as Type. In the latter form, Type dictates how the Redis bulk reply is translated to Prolog. The default equals to auto, i.e., as a number of the content satisfies the Prolog number syntax and as an atom otherwise.

Redis bulk replies are translated depending on the as Type as explained above.

string
string(Encoding)
Create a SWI-Prolog string object interpreting the blob as following Encoding. Encoding is a restricted set of SWI-Prolog's encodings: bytes (iso_latin_1), utf8 and text (the current locale translation).
atom
atom(Encoding)
As above, producing an atom.
codes
codes(Encoding)
As above, producing a list of integers (Unicode code points)
chars
chars(Encoding)
As above, producing a list of one-character atoms.
integer
float
rational
number
Interpret the bytes as a string representing a number. If the string does not represent a number of the requested type a type_error(Type, String) is raised.
tagged_integer
Same as integer, but demands the value to be between the Prolog flags min_tagged_integer and max_tagged_integer, allowing the value to be used as a dict key.
auto
Same as auto(atom, number)
auto(AsText, AsNumber)
If the bulk string confirms the syntax of AsNumber, convert the value to the requested numberical type. Else convert the value to text according to AsText. This is similar to the Prolog predicate name/2.
dict_key
Alias for auto(atom,tagged_integer). This allows the value to be used as a key for a SWI-Prolog dict.
pairs(AsKey, AsValue)
Convert a map or array of even length into pairs for which the key satisfies AsKey and the value AsValue. The pairs type can also be applied to a Redis array. In this case the array length must be even. This notably allows fetching a Redis hash as pairs using HGETALL using version 2 of the Redis protocol.
dict(AsKey, AsValue)
Similar to pairs(AsKey, AsValue), but convert the resulting pair list into a SWI-Prolog dict. AsKey must convert to a valid dict key, i.e., an atom or tagged integer. See dict_key.
dict(AsValue)
Shorthand for dict(dict_key, AsValue).

Here are some simple examples

?- redis(default, set(a, 42), X).
X = status("OK").
?- redis(default, get(a), X).
X = "42".
?- redis(default, get(a), X as integer).
X = 42.
?- redis(default, get(a), X as float).
X = 42.0.
?- redis(default, set(swipl:version, 8)).
true.
?- redis(default, incr(swipl:version), X).
X = 9.
Errors
- redis_error(Code, String)
  507redis(Redis, Req, Out) :-
  508    out_val(Out, Val),
  509    redis1(Redis, Req, Out),
  510    Val \== nil.
  511
  512out_val(Out, Val) :-
  513    (   nonvar(Out),
  514        Out = (Val as _)
  515    ->  true
  516    ;   Val = Out
  517    ).
  518
  519redis1(Redis, Req, Out) :-
  520    Error = error(Formal, _),
  521    catch(redis2(Redis, Req, Out), Error, true),
  522    (   var(Formal)
  523    ->  true
  524    ;   recover(Error, Redis, redis1(Redis, Req, Out))
  525    ).
  526
  527redis2(Redis, Req, Out) :-
  528    atom(Redis),
  529    !,
  530    redis_stream(Redis, S, true),
  531    with_mutex(Redis,
  532               ( redis_write_msg(S, Req),
  533                 redis_read_stream(Redis, S, Out)
  534               )).
  535redis2(Redis, Req, Out) :-
  536    redis_stream(Redis, S, true),
  537    redis_write_msg(S, Req),
  538    redis_read_stream(Redis, S, Out).
 redis_pipeline(+Redis, +PipeLine)
  542redis_pipeline(Redis, PipeLine) :-
  543    Error = error(Formal, _),
  544    catch(redis_pipeline2(Redis, PipeLine), Error, true),
  545    (   var(Formal)
  546    ->  true
  547    ;   recover(Error, Redis, redis_pipeline(Redis, PipeLine))
  548    ).
  549
  550redis_pipeline2(Redis, PipeLine) :-
  551    atom(Redis),
  552    !,
  553    redis_stream(Redis, S, true),
  554    with_mutex(Redis,
  555               redis_pipeline3(Redis, S, PipeLine)).
  556redis_pipeline2(Redis, PipeLine) :-
  557    redis_stream(Redis, S, true),
  558    redis_pipeline3(Redis, S, PipeLine).
  559
  560redis_pipeline3(Redis, S, PipeLine) :-
  561    maplist(write_pipeline(S), PipeLine),
  562    flush_output(S),
  563    read_pipeline(Redis, S, PipeLine).
  564
  565write_pipeline(S, Command -> _Reply) :-
  566    !,
  567    redis_write_msg_no_flush(S, Command).
  568write_pipeline(S, Command) :-
  569    redis_write_msg_no_flush(S, Command).
  570
  571read_pipeline(Redis, S, PipeLine) :-
  572    E = error(Formal,_),
  573    catch(read_pipeline2(Redis, S, PipeLine), E, true),
  574    (   var(Formal)
  575    ->  true
  576    ;   reconnect_error(E)
  577    ->  redis_disconnect(Redis, [force(true)]),
  578        throw(E)
  579    ;   resync(Redis),
  580        throw(E)
  581    ).
  582
  583read_pipeline2(Redis, S, PipeLine) :-
  584    maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
  585    maplist(handle_push(Redis), Pushed),
  586    maplist(handle_error, Errors),
  587    maplist(bind_reply, PipeLine, Replies).
  588
  589redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
  590    !,
  591    redis_read_msg(S, ReplyIn, Reply, Error, Push).
  592redis_read_msg3(S, Var, Reply, Error, Push) :-
  593    redis_read_msg(S, Var, Reply, Error, Push).
  594
  595handle_push(Redis, Pushed) :-
  596    handle_push_messages(Pushed, Redis).
  597handle_error(Error) :-
  598    (   var(Error)
  599    ->  true
  600    ;   throw(Error)
  601    ).
  602bind_reply(_Command -> Reply0, Reply) :-
  603    !,
  604    Reply0 = Reply.
  605bind_reply(_Command, _).
 recover(+Error, +Redis, :Goal)
Error happened while running Goal on Redis. If this is a recoverable error (i.e., a network or disconnected peer), wait a little and try running Goal again.
  614:- meta_predicate recover(+, +, 0).  615
  616recover(Error, Redis, Goal) :-
  617    reconnect_error(Error),
  618    auto_reconnect(Redis),
  619    !,
  620    debug(redis(recover), '~p: got error ~p; trying to reconnect',
  621          [Redis, Error]),
  622    redis_disconnect(Redis, [force(true)]),
  623    (   wait_to_retry(Redis, Error)
  624    ->  call(Goal),
  625        retractall(failure(Redis, _))
  626    ;   throw(Error)
  627    ).
  628recover(Error, _, _) :-
  629    throw(Error).
  630
  631auto_reconnect(redis_connection(_,_,_,Options)) :-
  632    !,
  633    option(reconnect(true), Options).
  634auto_reconnect(Server) :-
  635    ground(Server),
  636    server(Server, _, Options),
  637    option(reconnect(true), Options, true).
  638
  639reconnect_error(error(socket_error(_Code, _),_)).
  640reconnect_error(error(syntax_error(unexpected_eof),_)).
 wait(+Redis, +Error)
Wait for some time after a failure. First we wait for 10ms. This is doubled on each failure upto the setting max_retry_wait. If the setting max_retry_count is exceeded we fail and the called signals an exception.
  649:- dynamic failure/2 as volatile.  650
  651wait_to_retry(Redis, Error) :-
  652    redis_failures(Redis, Failures),
  653    setting(max_retry_count, Count),
  654    Failures < Count,
  655    Failures2 is Failures+1,
  656    redis_set_failures(Redis, Failures2),
  657    setting(max_retry_wait, MaxWait),
  658    Wait is min(MaxWait*100, 1<<Failures)/100.0,
  659    debug(redis(recover), '  Sleeping ~p seconds', [Wait]),
  660    retry_message_level(Failures, Level),
  661    print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
  662    sleep(Wait).
  663
  664redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
  665    !,
  666    Failures = Failures0.
  667redis_failures(Server, Failures) :-
  668    atom(Server),
  669    (   failure(Server, Failures)
  670    ->  true
  671    ;   Failures = 0
  672    ).
  673
  674redis_set_failures(Connection, Count) :-
  675    compound(Connection),
  676    !,
  677    nb_setarg(3, Connection, Count).
  678redis_set_failures(Server, Count) :-
  679    atom(Server),
  680    retractall(failure(Server, _)),
  681    asserta(failure(Server, Count)).
  682
  683retry_message_level(0, warning) :- !.
  684retry_message_level(_, silent).
 redis(+Request)
Connect to the default redis server, call redist/3 using Request, disconnect and print the result. This predicate is intended for interactive usage.
  693redis(Req) :-
  694    setup_call_cleanup(
  695        redis_connect(default, C, []),
  696        redis1(C, Req, Out),
  697        redis_disconnect(C)),
  698    print(Out).
 redis_write(+Redis, +Command) is det
 redis_read(+Redis, -Reply) is det
Write command and read replies from a Redis server. These are building blocks for subscribing to event streams.
  706redis_write(Redis, Command) :-
  707    redis_stream(Redis, S, true),
  708    redis_write_msg(S, Command).
  709
  710redis_read(Redis, Reply) :-
  711    redis_stream(Redis, S, true),
  712    redis_read_stream(Redis, S, Reply).
  713
  714
  715		 /*******************************
  716		 *      HIGH LEVEL ACCESS	*
  717		 *******************************/
 redis_get_list(+Redis, +Key, -List) is det
 redis_get_list(+Redis, +Key, +ChunkSize, -List) is det
Get the content of a Redis list in List. If ChunkSize is given and smaller than the list length, List is returned as a lazy list. The actual values are requested using redis LRANGE requests. Note that this results in O(N^2) complexity. Using a lazy list is most useful for relatively short lists holding possibly large items.

Note that values retrieved are strings, unless the value was added using Term as prolog.

See also
- lazy_list/2 for a discussion on the difference between lazy lists and normal lists.
  734redis_get_list(Redis, Key, List) :-
  735    redis_get_list(Redis, Key, -1, List).
  736
  737redis_get_list(Redis, Key, Chunk, List) :-
  738    redis(Redis, llen(Key), Len),
  739    (   (   Chunk >= Len
  740        ;   Chunk == -1
  741        )
  742    ->  (   Len == 0
  743        ->  List = []
  744        ;   End is Len-1,
  745            list_range(Redis, Key, 0, End, List)
  746        )
  747    ;   lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
  748    ).
  749
  750rlist_next(State, List, Tail) :-
  751    State = s(Redis,Key,Offset,Slice,Len),
  752    End is min(Len-1, Offset+Slice-1),
  753    list_range(Redis, Key, Offset, End, Elems),
  754    (   End =:= Len-1
  755    ->  List = Elems,
  756        Tail = []
  757    ;   Offset2 is Offset+Slice,
  758        nb_setarg(3, State, Offset2),
  759        append(Elems, Tail, List)
  760    ).
  761
  762% Redis LRANGE demands End > Start and returns inclusive.
  763
  764list_range(DB, Key, Start, Start, [Elem]) :-
  765    !,
  766    redis(DB, lindex(Key, Start), Elem).
  767list_range(DB, Key, Start, End, List) :-
  768    !,
  769    redis(DB, lrange(Key, Start, End), List).
 redis_set_list(+Redis, +Key, +List) is det
Associate a Redis key with a list. As Redis has no concept of an empty list, if List is [], Key is deleted. Note that key values are always strings in Redis. The same conversion rules as for redis/1-3 apply.
  780redis_set_list(Redis, Key, List) :-
  781    redis(Redis, del(Key), _),
  782    (   List == []
  783    ->  true
  784    ;   Term =.. [rpush,Key|List],
  785        redis(Redis, Term, _Count)
  786    ).
 redis_get_hash(+Redis, +Key, -Data:dict) is det
 redis_set_hash(+Redis, +Key, +Data:dict) is det
Put/get a Redis hash as a Prolog dict. Putting a dict first deletes Key. Note that in many cases applications will manage Redis hashes by key. redis_get_hash/3 is notably a user friendly alternative to the Redis HGETALL command. If the Redis hash is not used by other (non-Prolog) applications one may also consider using the Term as prolog syntax to store the Prolog dict as-is.
  799redis_get_hash(Redis, Key, Dict) :-
  800    redis(Redis, hgetall(Key), Dict as dict(auto)).
  801
  802redis_set_hash(Redis, Key, Dict) :-
  803    redis_array_dict(Array, _, Dict),
  804    Term =.. [hset,Key|Array],
  805    redis(Redis, del(Key), _),
  806    redis(Redis, Term, _Count).
 redis_array_dict(?Array, ?Tag, ?Dict) is det
Translate a Redis reply representing hash data into a SWI-Prolog dict. Array is either a list of alternating keys and values or a list of pairs. When translating to an array, this is always a list of alternating keys and values.
Arguments:
Tag- is the SWI-Prolog dict tag.
  817redis_array_dict(Array, Tag, Dict) :-
  818    nonvar(Array),
  819    !,
  820    array_to_pairs(Array, Pairs),
  821    dict_pairs(Dict, Tag, Pairs).
  822redis_array_dict(TwoList, Tag, Dict) :-
  823    dict_pairs(Dict, Tag, Pairs),
  824    pairs_to_array(Pairs, TwoList).
  825
  826array_to_pairs([], []) :-
  827    !.
  828array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
  829    !,                                  % RESP3 returns a map as pairs.
  830    atom_string(Name, NameS),
  831    array_to_pairs(T0, T).
  832array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
  833    atom_string(Name, NameS),
  834    array_to_pairs(T0, T).
  835
  836pairs_to_array([], []) :-
  837    !.
  838pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
  839    atom_string(Name, NameS),
  840    pairs_to_array(T0, T).
 redis_scan(+Redis, -LazyList, +Options) is det
 redis_sscan(+Redis, +Set, -LazyList, +Options) is det
 redis_hscan(+Redis, +Hash, -LazyList, +Options) is det
 redis_zscan(+Redis, +Set, -LazyList, +Options) is det
Map the Redis SCAN, SSCAN, HSCAN and ZSCAN` commands into a lazy list. For redis_scan/3 and redis_sscan/4 the result is a list of strings. For redis_hscan/4 and redis_zscan/4, the result is a list of pairs. Options processed:
match(Pattern)
Adds the MATCH subcommand, only returning matches for Pattern.
count(Count)
Adds the COUNT subcommand, giving a hint to the size of the chunks fetched.
type(Type)
Adds the TYPE subcommand, only returning answers of the indicated type.
See also
- lazy_list/2.
  864redis_scan(Redis, LazyList, Options) :-
  865    scan_options([match,count,type], Options, Parms),
  866    lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
  867
  868redis_sscan(Redis, Set, LazyList, Options) :-
  869    scan_options([match,count,type], Options, Parms),
  870    lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
  871
  872redis_hscan(Redis, Hash, LazyList, Options) :-
  873    scan_options([match,count,type], Options, Parms),
  874    lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
  875
  876redis_zscan(Redis, Set, LazyList, Options) :-
  877    scan_options([match,count,type], Options, Parms),
  878    lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
  879
  880scan_options([], _, []).
  881scan_options([H|T0], Options, [H,V|T]) :-
  882    Term =.. [H,V],
  883    option(Term, Options),
  884    !,
  885    scan_options(T0, Options, T).
  886scan_options([_|T0], Options, T) :-
  887    scan_options(T0, Options, T).
  888
  889
  890scan_next(State, List, Tail) :-
  891    State = s(Command,Redis,Cursor,Params),
  892    Command =.. CList,
  893    append(CList, [Cursor|Params], CList2),
  894    Term =.. CList2,
  895    redis(Redis, Term, [NewCursor,Elems0]),
  896    scan_pairs(Command, Elems0, Elems),
  897    (   NewCursor == 0
  898    ->  List = Elems,
  899        Tail = []
  900    ;   nb_setarg(3, State, NewCursor),
  901        append(Elems, Tail, List)
  902    ).
  903
  904scan_pairs(hscan(_), List, Pairs) :-
  905    !,
  906    scan_pairs(List, Pairs).
  907scan_pairs(zscan(_), List, Pairs) :-
  908    !,
  909    scan_pairs(List, Pairs).
  910scan_pairs(_, List, List).
  911
  912scan_pairs([], []).
  913scan_pairs([Key,Value|T0], [Key-Value|T]) :-
  914    !,
  915    scan_pairs(T0, T).
  916scan_pairs([Key-Value|T0], [Key-Value|T]) :-
  917    scan_pairs(T0, T).
  918
  919
  920		 /*******************************
  921		 *              ABOUT		*
  922		 *******************************/
 redis_current_command(+Redis, ?Command) is nondet
 redis_current_command(+Redis, ?Command, -Properties) is nondet
True when Command has Properties. Fails if Command is not defined. The redis_current_command/3 version returns the command argument specification. See Redis documentation for an explanation.
  931redis_current_command(Redis, Command) :-
  932    redis_current_command(Redis, Command, _).
  933
  934redis_current_command(Redis, Command, Properties) :-
  935    nonvar(Command),
  936    !,
  937    redis(Redis, command(info, Command), [[_|Properties]]).
  938redis_current_command(Redis, Command, Properties) :-
  939    redis(Redis, command, Commands),
  940    member([Name|Properties], Commands),
  941    atom_string(Command, Name).
 redis_property(+Redis, ?Property) is nondet
True if Property is a property of the Redis server. Currently uses redis(info, String) and parses the result. As this is for machine usage, properties names *_human are skipped.
  949redis_property(Redis, Property) :-
  950    redis(Redis, info, String),
  951    info_terms(String, Terms),
  952    member(Property, Terms).
  953
  954info_terms(Info, Pairs) :-
  955    split_string(Info, "\n", "\r\n ", Lines),
  956    convlist(info_line_term, Lines, Pairs).
  957
  958info_line_term(Line, Term) :-
  959    sub_string(Line, B, _, A, :),
  960    !,
  961    sub_atom(Line, 0, B, _, Name),
  962    \+ sub_atom(Name, _, _, 0, '_human'),
  963    sub_string(Line, _, A, 0, ValueS),
  964    (   number_string(Value, ValueS)
  965    ->  true
  966    ;   Value = ValueS
  967    ),
  968    Term =.. [Name,Value].
  969
  970
  971		 /*******************************
  972		 *            SUBSCRIBE		*
  973		 *******************************/
 redis_subscribe(+Redis, +Channels, -Id, +Options) is det
Subscribe to one or more Redis PUB/SUB channels. This predicate creates a thread using thread_create/3 with the given Options. Once running, the thread listens for messages. The message content is a string or Prolog term as described in redis/3. On receiving a message, the following message is broadcasted:
redis(Id, Channel, Data)

If redis_unsubscribe/2 removes the last subscription, the thread terminates.

To simply print the incomming messages use e.g.

?- listen(redis(_, Channel, Data),
          format('Channel ~p got ~p~n', [Channel,Data])).
true.
?- redis_subscribe(default, test, Id, []).
Id = redis_pubsub_3,
?- redis(publish(test, "Hello world")).
Channel test got "Hello world"
1
true.
Arguments:
Id- is the thread identifier of the listening thread. Note that the Options alias(Name) can be used to get a system wide name.
 1003:- dynamic ( subscription/2,            % Id, Channel
 1004             listening/3                % Id, Connection, Thread
 1005           ) as volatile. 1006
 1007redis_subscribe(Redis, Spec, Id, Options) :-
 1008    atom(Redis),
 1009    !,
 1010    channels(Spec, Channels),
 1011    pubsub_thread_options(ThreadOptions, Options),
 1012    thread_create(setup_call_cleanup(
 1013                      redis_connect(Redis, Conn, [reconnect(true)]),
 1014                      redis_subscribe1(Redis, Conn, Channels),
 1015                      redis_disconnect(Conn)),
 1016                  Thread,
 1017                  ThreadOptions),
 1018    pubsub_id(Thread, Id).
 1019redis_subscribe(Redis, Spec, Id, Options) :-
 1020    channels(Spec, Channels),
 1021    pubsub_thread_options(ThreadOptions, Options),
 1022    thread_create(redis_subscribe1(Redis, Redis, Channels),
 1023                  Thread,
 1024                  ThreadOptions),
 1025    pubsub_id(Thread, Id).
 1026
 1027pubsub_thread_options(ThreadOptions, Options) :-
 1028    merge_options(Options, [detached(true)], ThreadOptions).
 1029
 1030pubsub_id(Thread, Thread).
 1031%pubsub_id(Thread, Id) :-
 1032%    thread_property(Thread, id(TID)),
 1033%    atom_concat('redis_pubsub_', TID, Id).
 1034
 1035redis_subscribe1(Redis, Conn, Channels) :-
 1036    Error = error(Formal, _),
 1037    catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
 1038    (   var(Formal)
 1039    ->  true
 1040    ;   recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
 1041        thread_self(Me),
 1042        pubsub_id(Me, Id),
 1043        findall(Channel, subscription(Id, Channel), CurrentChannels),
 1044        redis_subscribe1(Redis, Conn, CurrentChannels)
 1045    ).
 1046
 1047redis_subscribe2(Redis, Conn, Channels) :-
 1048    redis_subscribe3(Conn, Channels),
 1049    redis_listen(Redis, Conn).
 1050
 1051redis_subscribe3(Conn, Channels) :-
 1052    thread_self(Me),
 1053    pubsub_id(Me, Id),
 1054    prolog_listen(this_thread_exit, pubsub_clean(Id)),
 1055    maplist(register_subscription(Id), Channels),
 1056    redis_stream(Conn, S, true),
 1057    Req =.. [subscribe|Channels],
 1058    redis_write_msg(S, Req).
 1059
 1060pubsub_clean(Id) :-
 1061    retractall(listening(Id, _Connection, _Thread)),
 1062    retractall(subscription(Id, _Channel)).
 redis_subscribe(+Id, +Channels) is det
 redis_unsubscribe(+Id, +Channels) is det
Add/remove channels from for the subscription. If no subscriptions remain, the listening thread terminates.
Arguments:
Channels- is either a single channel or a list thereof. Each channel specification is either an atom or a term `A:B:...`, where all parts are atoms.
 1074redis_subscribe(Id, Spec) :-
 1075    channels(Spec, Channels),
 1076    (   listening(Id, Connection, _Thread)
 1077    ->  true
 1078    ;   existence_error(redis_pubsub, Id)
 1079    ),
 1080    maplist(register_subscription(Id), Channels),
 1081    redis_stream(Connection, S, true),
 1082    Req =.. [subscribe|Channels],
 1083    redis_write_msg(S, Req).
 1084
 1085redis_unsubscribe(Id, Spec) :-
 1086    channels(Spec, Channels),
 1087    (   listening(Id, Connection, _Thread)
 1088    ->  true
 1089    ;   existence_error(redis_pubsub, Id)
 1090    ),
 1091    maplist(unregister_subscription(Id), Channels),
 1092    redis_stream(Connection, S, true),
 1093    Req =.. [unsubscribe|Channels],
 1094    redis_write_msg(S, Req).
 redis_current_subscription(?Id, ?Channels)
True when a PUB/SUB subscription with Id is listening on Channels.
 1100redis_current_subscription(Id, Channels) :-
 1101    findall(Id-Channel, subscription(Id, Channel), Pairs),
 1102    keysort(Pairs, Sorted),
 1103    group_pairs_by_key(Sorted, Grouped),
 1104    member(Id-Channels, Grouped).
 1105
 1106channels(Spec, List) :-
 1107    is_list(Spec),
 1108    !,
 1109    maplist(channel_name, Spec, List).
 1110channels(Ch, [Key]) :-
 1111    channel_name(Ch, Key).
 1112
 1113channel_name(Atom, Atom) :-
 1114    atom(Atom),
 1115    !.
 1116channel_name(Key, Atom) :-
 1117    phrase(key_parts(Key), Parts),
 1118    !,
 1119    atomic_list_concat(Parts, :, Atom).
 1120channel_name(Key, _) :-
 1121    type_error(redis_key, Key).
 1122
 1123key_parts(Var) -->
 1124    { var(Var), !, fail }.
 1125key_parts(Atom) -->
 1126    { atom(Atom) },
 1127    !,
 1128    [Atom].
 1129key_parts(A:B) -->
 1130    key_parts(A),
 1131    key_parts(B).
 1132
 1133
 1134
 1135
 1136register_subscription(Id, Channel) :-
 1137    (   subscription(Id, Channel)
 1138    ->  true
 1139    ;   assertz(subscription(Id, Channel))
 1140    ).
 1141
 1142unregister_subscription(Id, Channel) :-
 1143    retractall(subscription(Id, Channel)).
 1144
 1145redis_listen(Redis, Conn) :-
 1146    thread_self(Me),
 1147    pubsub_id(Me, Id),
 1148    setup_call_cleanup(
 1149        assertz(listening(Id, Conn, Me), Ref),
 1150        redis_listen_loop(Redis, Id, Conn),
 1151        erase(Ref)).
 1152
 1153redis_listen_loop(Redis, Id, Conn) :-
 1154    redis_stream(Conn, S, true),
 1155    (   subscription(Id, _)
 1156    ->  redis_read_stream(Redis, S, Reply),
 1157        redis_broadcast(Redis, Reply),
 1158        redis_listen_loop(Redis, Id, Conn)
 1159    ;   true
 1160    ).
 1161
 1162redis_broadcast(_, [subscribe, _Channel, _N]) :-
 1163    !.
 1164redis_broadcast(Redis, [message, Channel, Data]) :-
 1165    !,
 1166    catch(broadcast(redis(Redis, Channel, Data)),
 1167          Error,
 1168          print_message(error, Error)).
 1169redis_broadcast(Redis, Message) :-
 1170    assertion((Message = [Type, Channel, _Data],
 1171               atom(Type),
 1172               atom(Channel))),
 1173    debug(redis(warning), '~p: Unknown message while listening: ~p',
 1174          [Redis,Message]).
 1175
 1176
 1177		 /*******************************
 1178		 *          READ/WRITE		*
 1179		 *******************************/
 redis_read_stream(+Redis, +Stream, -Term) is det
Read a message from a Redis stream. Term is one of

If something goes wrong, the connection is closed and an exception is raised.

 1196redis_read_stream(Redis, SI, Out) :-
 1197    E = error(Formal,_),
 1198    catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
 1199    (   var(Formal)
 1200    ->  handle_push_messages(Push, Redis),
 1201        (   var(Error)
 1202        ->  Out = Out0
 1203        ;   resync(Redis),
 1204            throw(Error)
 1205        )
 1206    ;   redis_disconnect(Redis, [force(true)]),
 1207        throw(E)
 1208    ).
 1209
 1210handle_push_messages([], _).
 1211handle_push_messages([H|T], Redis) :-
 1212    (   catch(handle_push_message(H, Redis), E,
 1213              print_message(warning, E))
 1214    ->  true
 1215    ;   true
 1216    ),
 1217    handle_push_messages(T, Redis).
 1218
 1219handle_push_message(["pubsub"|List], Redis) :-
 1220    redis_broadcast(Redis, List).
 1221% some protocol version 3 push messages (such as
 1222% __keyspace@* events) seem to come directly
 1223% without a pubsub header
 1224handle_push_message([message|List], Redis) :-
 1225    redis_broadcast(Redis, [message|List]).
 resync(+Redis) is det
Re-synchronize after an error. This may happen if some type conversion fails and we have read a partial reply. It is hard to figure out what to read from where we are, so we echo a random magic sequence and read until we find the reply.
 1235resync(Redis) :-
 1236    E = error(Formal,_),
 1237    catch(do_resync(Redis), E, true),
 1238    (   var(Formal)
 1239    ->  true
 1240    ;   redis_disconnect(Redis, [force(true)]),
 1241        throw(E)
 1242    ).
 1243
 1244do_resync(Redis) :-
 1245    A is random(1_000_000_000),
 1246    redis_stream(Redis, S, true),
 1247    redis_write_msg(S, echo(A)),
 1248    '$redis_resync'(S, A).
 redis_read_msg(+Stream, -Message, -Error, -PushMessages) is det
 redis_write_msg(+Stream, +Message) is det
Read/write a Redis message. Both these predicates are in the foreign module redis4pl.
Arguments:
PushMessages- is a list of push messages that may be non-[] if protocol version 3 (see redis_connect/3) is selected. Using protocol version 2 this list is always empty.
 1263		 /*******************************
 1264		 *            MESSAGES		*
 1265		 *******************************/
 1266
 1267:- multifile
 1268    prolog:error_message//1,
 1269    prolog:message//1. 1270
 1271prolog:error_message(redis_error(Code, String)) -->
 1272    [ 'REDIS: ~w: ~s'-[Code, String] ].
 1273
 1274prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
 1275    [ 'REDIS: connection error.  Retrying in ~2f seconds'-[Wait], nl ],
 1276    [ '    '-[] ], '$messages':translate_message(Error)