View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker and Sean Charles
    4    E-mail:        jan@swi-prolog.org and <sean at objitsu dot com>
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2013-2020, Sean Charles
    7                              SWI-Prolog Solutions b.v.
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34
   35    NOTE
   36
   37    The original code was subject to the MIT licence and written by
   38    Sean Charles.  Re-licenced to standard SWI-Prolog BSD-2 with
   39    permission from Sean Charles.
   40*/
   41
   42:- module(redis,
   43          [ redis_server/3,             % +Alias, +Address, +Options
   44            redis_connect/1,            % -Connection
   45            redis_connect/3,            % -Connection, +Host, +Port
   46            redis_disconnect/1,         % +Connection
   47            redis_disconnect/2,         % +Connection, +Options
   48                                        % Queries
   49            redis/1,                    % +Request
   50            redis/2,                    % +Connection, +Request
   51            redis/3,                    % +Connection, +Request, -Reply
   52                                        % High level queries
   53            redis_get_list/3,           % +Redis, +Key, -List
   54            redis_get_list/4,           % +Redis, +Key, +ChunkSize, -List
   55            redis_set_list/3,           % +Redis, +Key, +List
   56            redis_get_hash/3,           % +Redis, +Key, -Data:dict
   57            redis_set_hash/3,           % +Redis, +Key, +Data:dict
   58            redis_scan/3,               % +Redis, -LazyList, +Options
   59            redis_sscan/4,              % +Redis, +Set, -LazyList, +Options
   60            redis_hscan/4,              % +Redis, +Hash, -LazyList, +Options
   61            redis_zscan/4,              % +Redis, +Set, -LazyList, +Options
   62                                        % Publish/Subscribe
   63            redis_subscribe/4,          % +Redis, +Channels, -Id, +Options
   64            redis_subscribe/2,          % +Id, +Channels
   65            redis_unsubscribe/2,        % +Id, +Channels
   66            redis_current_subscription/2, % ?Id,?Channels
   67            redis_write/2,              % +Redis, +Command
   68            redis_read/2,               % +Redis, -Reply
   69                                        % Building blocks
   70            redis_array_dict/3,         % ?Array, ?Tag, ?Dict
   71                                        % Admin stuff
   72            redis_property/2,           % +Reply, ?Property
   73            redis_current_command/2,    % +Redis,?Command
   74            redis_current_command/3     % +Redis, +Command, -Properties
   75          ]).   76:- autoload(library(socket), [tcp_connect/3]).   77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]).   78:- autoload(library(broadcast), [broadcast/1]).   79:- autoload(library(error),
   80            [ must_be/2,
   81              instantiation_error/1,
   82              uninstantiation_error/1,
   83              existence_error/2
   84            ]).   85:- autoload(library(lazy_lists), [lazy_list/2]).   86:- autoload(library(lists), [append/3, member/2]).   87:- autoload(library(option), [merge_options/3, option/2, option/3]).   88:- autoload(library(pairs), [group_pairs_by_key/2]).   89:- use_module(library(debug), [debug/3, assertion/1]).   90:- use_module(library(settings), [setting/4, setting/2]).   91
   92:- use_foreign_library(foreign(redis4pl)).   93
   94:- setting(max_retry_count, nonneg, 8640, % one day
   95           "Max number of retries").   96:- setting(max_retry_wait, number, 10,
   97           "Max time to wait between recovery attempts").   98
   99:- predicate_options(redis_server/3, 3,
  100                     [ pass_to(redis:redis_connect/3, 3)
  101                     ]).  102:- predicate_options(redis_connect/3, 3,
  103                     [ reconnect(boolean),
  104                       user(atom),
  105                       password(atomic),
  106                       version(between(2,3))
  107                     ]).  108:- predicate_options(redis_disconnect/2, 2,
  109                     [ force(boolean)
  110                     ]).  111:- predicate_options(redis_scan/3, 3,
  112                     [ match(atomic),
  113                       count(nonneg),
  114                       type(atom)
  115                     ]).  116% Actually not passing, but the same
  117:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  118:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  119:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  120
  121
  122/** <module> Redis client
  123
  124This library is a client  to   [Redis](https://redis.io),  a popular key
  125value store to  deal  with  caching   and  communication  between  micro
  126services.
  127
  128In the typical use case we register  the   details  of one or more Redis
  129servers using redis_server/3. Subsequenly, redis/2-3   is  used to issue
  130commands on the server.  For example:
  131
  132```
  133?- redis_server(default, redis:6379, [password("secret")]).
  134?- redis(default, set(user, "Bob")).
  135?- redis(default, get(user), User).
  136User = "Bob"
  137```
  138*/
  139
  140:- dynamic server/3.  141
  142:- dynamic ( connection/2               % ServerName, Stream
  143           ) as volatile.  144
  145%!  redis_server(+ServerName, +Address, +Options) is det.
  146%
  147%   Register a redis server without  connecting   to  it. The ServerName
  148%   acts as a lazy connection alias.  Initially the ServerName `default`
  149%   points at `localhost:6379` with no   connect  options. The `default`
  150%   server is used for redis/1 and redis/2 and may be changed using this
  151%   predicate.  Options are described with redis_connect/3.
  152%
  153%   Connections  established  this  way  are  by  default  automatically
  154%   reconnected if the connection  is  lost   for  some  reason unless a
  155%   reconnect(false) option is specified.
  156
  157redis_server(Alias, Address, Options) :-
  158    must_be(ground, Alias),
  159    retractall(server(Alias, _, _)),
  160    asserta(server(Alias, Address, Options)).
  161
  162server(default, localhost:6379, []).
  163
  164%!  redis_connect(-Connection) is det.
  165%!  redis_connect(+Address, -Connection, +Options) is det.
  166%!  redis_connect(-Connection, +Host, +Port) is det.
  167%
  168%   Connect to a redis server. The  main mode is redis_connect(+Address,
  169%   -Connection,   +Options).   redis_connect/1   is     equivalent   to
  170%   redis_connect(localhost:6379, Connection, []).  Options:
  171%
  172%     - reconnect(+Boolean)
  173%       If `true`, try to reconnect to the service when the connection
  174%       seems lost.  Default is `true` for connections specified using
  175%       redis_server/3 and `false` for explictly opened connections.
  176%     - user(+User)
  177%       If version(3) and password(Password) are specified, these
  178%       are used to authenticate using the `HELLO` command.
  179%     - password(+Password)
  180%       Authenticate using Password
  181%     - version(+Version)
  182%       Specify the connection protocol version.  Initially this is
  183%       version 2.  Redis 6 also supports version 3.  When specified
  184%       as `3`, the `HELLO` command is used to upgrade the protocol.
  185%
  186%   Instead of using these predicates, redis/2  and redis/3 are normally
  187%   used with a _server name_  argument registered using redis_server/3.
  188%   These  predicates  are  meant  for   creating  a  temporary  paralel
  189%   connection or using a connection with a _blocking_ call.
  190%
  191%   @compat   redis_connect(-Connection,   +Host,     +Port)    provides
  192%   compatibility to the original GNU-Prolog interface and is equivalent
  193%   to redis_connect(Host:Port, Connection, []).
  194%
  195%   @arg Address is a term Host:Port, unix(File) or the name of a server
  196%   registered  using  redis_server/3.  The  latter   realises  a  _new_
  197%   connection that is typically used for   blocking redis commands such
  198%   as listening for published messages, waiting on a list or stream.
  199
  200redis_connect(Conn) :-
  201    redis_connect(default, Conn, []).
  202
  203redis_connect(Conn, Host, Port) :-
  204    var(Conn),
  205    ground(Host), ground(Port),
  206    !,                                  % GNU-Prolog compatibility
  207    redis_connect(Host:Port, Conn, []).
  208redis_connect(Server, Conn, Options) :-
  209    atom(Server),
  210    !,
  211    (   server(Server, Address, DefaultOptions)
  212    ->  merge_options(Options, DefaultOptions, Options2),
  213        do_connect(Server, Address, Conn, [address(Address)|Options2])
  214    ;   existence_error(redis_server, Server)
  215    ).
  216redis_connect(Address, Conn, Options) :-
  217    do_connect(Address, Address, Conn, [address(Address)|Options]).
  218
  219%!  do_connect(+Id, +Address, -Conn, +Options)
  220%
  221%   Open the connection.  A connection is a compound term of the shape
  222%
  223%       redis_connection(Id, Stream, Failures, Options)
  224
  225do_connect(Id, Address0, Conn, Options) :-
  226    tcp_address(Address0, Address),
  227    tcp_connect(Address, Stream, Options),
  228    Conn = redis_connection(Id, Stream, 0, Options),
  229    hello(Conn, Options).
  230
  231tcp_address(unix(Path), Path) :-
  232    !.                                  % Using an atom is ambiguous
  233tcp_address(Address, Address).
  234
  235
  236%!  hello(+Connection, +Option)
  237%
  238%   Initialize the connection. This is  used   to  upgrade  to the RESP3
  239%   protocol and/or to authenticate.
  240
  241hello(Con, Options) :-
  242    option(version(V), Options),
  243    V >= 3,
  244    !,
  245    (   option(user(User), Options),
  246        option(password(Password), Options)
  247    ->  redis(Con, hello(3, auth, User, Password))
  248    ;   redis(Con, hello(3))
  249    ).
  250hello(Con, Options) :-
  251    option(password(Password), Options),
  252    !,
  253    redis(Con, auth(Password)).
  254hello(_, _).
  255
  256%!  redis_stream(+Spec, --Stream, +DoConnect) is det.
  257%
  258%   Get the stream to a Redis server from  Spec. Spec is either the name
  259%   of       a       registered       server       or       a       term
  260%   redis_connection(Id,Stream,Failures,Options).  If  the    stream  is
  261%   disconnected it will be reconnected.
  262
  263redis_stream(Var, S, _) :-
  264    (   var(Var)
  265    ->  !, instantiation_error(Var)
  266    ;   nonvar(S)
  267    ->  !, uninstantiation_error(S)
  268    ).
  269redis_stream(ServerName, S, Connect) :-
  270    atom(ServerName),
  271    !,
  272    (   connection(ServerName, S0)
  273    ->  S = S0
  274    ;   Connect == true,
  275        server(ServerName, Address, Options)
  276    ->  redis_connect(Address, Connection, Options),
  277        redis_stream(Connection, S, false),
  278        asserta(connection(ServerName, S))
  279    ;   existence_error(redis_server, ServerName)
  280    ).
  281redis_stream(redis_connection(_,S0,_,_), S, _) :-
  282    S0 \== (-),
  283    !,
  284    S = S0.
  285redis_stream(Redis, S, _) :-
  286    Redis = redis_connection(Id,-,_,Options),
  287    option(address(Address), Options),
  288    do_connect(Id,Address,Redis2,Options),
  289    arg(2, Redis2, S0),
  290    nb_setarg(2, Redis, S0),
  291    S = S0.
  292
  293has_redis_stream(Var, _) :-
  294    var(Var),
  295    !,
  296    instantiation_error(Var).
  297has_redis_stream(Alias, S) :-
  298    atom(Alias),
  299    !,
  300    connection(Alias, S).
  301has_redis_stream(redis_connection(_,S,_,_), S) :-
  302    S \== (-).
  303
  304
  305%!  redis_disconnect(+Connection) is det.
  306%!  redis_disconnect(+Connection, +Options) is det.
  307%
  308%   Disconnect from a redis server. The   second  form takes one option,
  309%   similar to close/2:
  310%
  311%     - force(Force)
  312%       When `true` (default `false`), do not raise any errors if
  313%       Connection does not exist or closing the connection raises
  314%       a network or I/O related exception.  This version is used
  315%       internally if a connection is in a broken state, either due
  316%       to a protocol error or a network issue.
  317
  318redis_disconnect(Redis) :-
  319    redis_disconnect(Redis, []).
  320
  321redis_disconnect(Redis, Options) :-
  322    option(force(true), Options),
  323    !,
  324    (   Redis = redis_connection(_Id, S, _, _Opts)
  325    ->  (   S == (-)
  326        ->  true
  327        ;   close(S, [force(true)]),
  328            nb_setarg(2, Redis, -)
  329        )
  330    ;   has_redis_stream(Redis, S)
  331    ->  close(S, [force(true)]),
  332        retractall(connection(_,S))
  333    ;   true
  334    ).
  335redis_disconnect(Redis, _Options) :-
  336    redis_stream(Redis, S, false),
  337    close(S),
  338    retractall(connection(_,S)).
  339
  340%!  redis(+Connection, +Request) is semidet.
  341%
  342%   This predicate is overloaded to handle two types of requests. First,
  343%   it is a shorthand for `redis(Connection, Command, _)` and second, it
  344%   can be used to exploit  Redis   _pipelines_  and _transactions_. The
  345%   second form is acticated if Request is  a _list_. In that case, each
  346%   element of the list is either a term  `Command -> Reply` or a simple
  347%   `Command`. Semantically this represents a   sequence  of redis/3 and
  348%   redis/2 calls.  It differs in the following aspects:
  349%
  350%     - All commands are sent in one batch, after which all replies are
  351%       read.  This reduces the number of _round trips_ and typically
  352%       greatly improves performance.
  353%     - If the first command is `multi` and the last `exec`, the
  354%       commands are executed as a Redis _transaction_, i.e., they
  355%       are executed _atomically_.
  356%     - If one of the commands returns an error, the subsequent commands
  357%       __are still executed__.
  358%     - You can not use variables from commands earlier in the list for
  359%       commands later in the list as a result of the above execution
  360%       order.
  361%
  362%   Procedurally, the process takes the following steps:
  363%
  364%     1. Send all commands
  365%     2. Read all replies and push messages
  366%     3. Handle all callbacks from push messages
  367%     4. Check whether one of the replies is an error.  If so,
  368%        raise this error (subsequent errors are lost)
  369%     5. Bind all replies for the `Command -> Reply` terms.
  370%
  371%   Examples
  372%
  373%   ```
  374%   ?- redis(default,
  375%            [ lpush(li,1),
  376%              lpush(li,2),
  377%              lrange(li,0,-1) -> List
  378%            ]).
  379%   List = ["2", "1"].
  380%   ```
  381
  382redis(Redis, PipeLine) :-
  383    is_list(PipeLine),
  384    !,
  385    redis_pipeline(Redis, PipeLine).
  386redis(Redis, Req) :-
  387    redis(Redis, Req, _).
  388
  389%!  redis(+Connection, +Command, -Reply) is semidet.
  390%
  391%   Execute a redis Command on  Connnection.   Next,  bind  Reply to the
  392%   returned result. Command is a  callable   term  whose functor is the
  393%   name of the Redis command  and   whose  arguments  are translated to
  394%   Redis arguments according to the rules below.  Note that all text is
  395%   always represented using UTF-8 encoding.
  396%
  397%     - Atomic values are emitted verbatim
  398%     - A term A:B:... where all arguments are either atoms,
  399%       strings or integers (__no floats__) is translated into
  400%       a string `"A:B:..."`.  This is a common shorthand for
  401%       representing Redis keys.
  402%     - A term Term as prolog is emitted as "\u0000T\u0000" followed
  403%       by Term in canonical form.
  404%     - Any other term is emitted as write/1.
  405%
  406%   Reply is either a plain term (often a  variable) or a term `Value as
  407%   Type`. In the latter form,  `Type`   dictates  how  the Redis _bulk_
  408%   reply is translated to Prolog. The default equals to `auto`, i.e.,
  409%   as a number of the content satisfies the Prolog number syntax and
  410%   as an atom otherwise.
  411%
  412%     - status(Atom)
  413%       Returned if the server replies with ``+ Status``.  Atom
  414%       is the textual value of `Status` converted to lower case,
  415%       e.g., status(ok) or status(pong).
  416%     - `nil`
  417%       This atom is returned for a NIL/NULL value.  Note that if
  418%       the reply is only `nil`, redis/3 _fails_.  The `nil` value
  419%       may be embedded inside lists or maps.
  420%     - A number
  421%       Returned if the server replies an integer (":Int"), double
  422%       (",Num") or big integer ("(Num")
  423%     - A string
  424%       Returned on a _bulk_ reply.  Bulk replies are supposed to be
  425%       in UTF-8 encoding.  The the bulk reply starts with
  426%       "\u0000T\u0000" it is supposed to be a Prolog term.
  427%       Note that this intepretation means it is __not__ possible
  428%       to read arbitrary binary blobs.
  429%     - A list of replies.  A list may also contain `nil`.  If Reply
  430%       as a whole would be `nil` the call fails.
  431%     - A list of _pairs_.  This is returned for the redis version 3
  432%       protocol "%Map".  Both the key and value respect the same
  433%       rules as above.
  434%
  435%   Redis _bulk_ replies are translated depending  on the `as` `Type` as
  436%   explained above.
  437%
  438%     - string
  439%     - string(Encoding)
  440%       Create a SWI-Prolog string object interpreting the blob as
  441%       following Encoding. Encoding is a restricted set of SWI-Prolog's
  442%       encodings: `bytes` (`iso_latin_1`), `utf8` and `text` (the
  443%       current locale translation).
  444%     - atom
  445%     - atom(Encoding)
  446%       As above, producing an atom.
  447%     - codes
  448%     - codes(Encoding)
  449%       As above, producing a list of integers (Unicode code points)
  450%     - chars
  451%     - chars(Encoding)
  452%       As above, producing a list of one-character atoms.
  453%     - integer
  454%     - float
  455%     - rational
  456%     - number
  457%       Interpret the bytes as a string representing a number.  If
  458%       the string does not represent a number of the requested type
  459%       a type_error(Type, String) is raised.
  460%     - tagged_integer
  461%       Same as integer, but demands the value to be between the Prolog
  462%       flags `min_tagged_integer` and `max_tagged_integer`, allowing
  463%       the value to be used as a dict key.
  464%     - auto
  465%       Same as auto(atom, number)
  466%     - auto(AsText,AsNumber)
  467%       If the bulk string confirms the syntax of AsNumber, convert
  468%       the value to the requested numberical type.  Else convert
  469%       the value to text according to AsText.  This is similar to
  470%       the Prolog predicate name/2.
  471%     - dict_key
  472%       Alias for auto(atom,tagged_integer).  This allows the value
  473%       to be used as a key for a SWI-Prolog dict.
  474%     - pairs(AsKey, AsValue)
  475%       Convert a map or array of even length into pairs for which the
  476%       key satisfies AsKey and the value AsValue.  The `pairs` type
  477%       can also be applied to a Redis array.  In this case the array
  478%       length must be even.  This notably allows fetching a Redis
  479%       _hash_ as pairs using ``HGETALL`` using version 2 of the
  480%       Redis protocol.
  481%     - dict(AsKey, AsValue)
  482%       Similar to pairs(AsKey, AsValue), but convert the resulting
  483%       pair list into a SWI-Prolog dict.  AsKey must convert to a
  484%       valid dict key, i.e., an atom or tagged integer. See `dict_key`.
  485%     - dict(AsValue)
  486%       Shorthand for dict(dict_key, AsValue).
  487%
  488%   Here are some simple examples
  489%
  490%   ```
  491%   ?- redis(default, set(a, 42), X).
  492%   X = status("OK").
  493%   ?- redis(default, get(a), X).
  494%   X = "42".
  495%   ?- redis(default, get(a), X as integer).
  496%   X = 42.
  497%   ?- redis(default, get(a), X as float).
  498%   X = 42.0.
  499%   ?- redis(default, set(swipl:version, 8)).
  500%   true.
  501%   ?- redis(default, incr(swipl:version), X).
  502%   X = 9.
  503%   ```
  504%
  505%   @error redis_error(Code, String)
  506
  507redis(Redis, Req, Out) :-
  508    out_val(Out, Val),
  509    redis1(Redis, Req, Out),
  510    Val \== nil.
  511
  512out_val(Out, Val) :-
  513    (   nonvar(Out),
  514        Out = (Val as _)
  515    ->  true
  516    ;   Val = Out
  517    ).
  518
  519redis1(Redis, Req, Out) :-
  520    Error = error(Formal, _),
  521    catch(redis2(Redis, Req, Out), Error, true),
  522    (   var(Formal)
  523    ->  true
  524    ;   recover(Error, Redis, redis1(Redis, Req, Out))
  525    ).
  526
  527redis2(Redis, Req, Out) :-
  528    atom(Redis),
  529    !,
  530    redis_stream(Redis, S, true),
  531    with_mutex(Redis,
  532               ( redis_write_msg(S, Req),
  533                 redis_read_stream(Redis, S, Out)
  534               )).
  535redis2(Redis, Req, Out) :-
  536    redis_stream(Redis, S, true),
  537    redis_write_msg(S, Req),
  538    redis_read_stream(Redis, S, Out).
  539
  540%!  redis_pipeline(+Redis, +PipeLine)
  541
  542redis_pipeline(Redis, PipeLine) :-
  543    Error = error(Formal, _),
  544    catch(redis_pipeline2(Redis, PipeLine), Error, true),
  545    (   var(Formal)
  546    ->  true
  547    ;   recover(Error, Redis, redis_pipeline(Redis, PipeLine))
  548    ).
  549
  550redis_pipeline2(Redis, PipeLine) :-
  551    atom(Redis),
  552    !,
  553    redis_stream(Redis, S, true),
  554    with_mutex(Redis,
  555               redis_pipeline3(Redis, S, PipeLine)).
  556redis_pipeline2(Redis, PipeLine) :-
  557    redis_stream(Redis, S, true),
  558    redis_pipeline3(Redis, S, PipeLine).
  559
  560redis_pipeline3(Redis, S, PipeLine) :-
  561    maplist(write_pipeline(S), PipeLine),
  562    flush_output(S),
  563    read_pipeline(Redis, S, PipeLine).
  564
  565write_pipeline(S, Command -> _Reply) :-
  566    !,
  567    redis_write_msg_no_flush(S, Command).
  568write_pipeline(S, Command) :-
  569    redis_write_msg_no_flush(S, Command).
  570
  571read_pipeline(Redis, S, PipeLine) :-
  572    E = error(Formal,_),
  573    catch(read_pipeline2(Redis, S, PipeLine), E, true),
  574    (   var(Formal)
  575    ->  true
  576    ;   reconnect_error(E)
  577    ->  redis_disconnect(Redis, [force(true)]),
  578        throw(E)
  579    ;   resync(Redis),
  580        throw(E)
  581    ).
  582
  583read_pipeline2(Redis, S, PipeLine) :-
  584    maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
  585    maplist(handle_push(Redis), Pushed),
  586    maplist(handle_error, Errors),
  587    maplist(bind_reply, PipeLine, Replies).
  588
  589redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
  590    !,
  591    redis_read_msg(S, ReplyIn, Reply, Error, Push).
  592redis_read_msg3(S, Var, Reply, Error, Push) :-
  593    redis_read_msg(S, Var, Reply, Error, Push).
  594
  595handle_push(Redis, Pushed) :-
  596    handle_push_messages(Pushed, Redis).
  597handle_error(Error) :-
  598    (   var(Error)
  599    ->  true
  600    ;   throw(Error)
  601    ).
  602bind_reply(_Command -> Reply0, Reply) :-
  603    !,
  604    Reply0 = Reply.
  605bind_reply(_Command, _).
  606
  607
  608%!  recover(+Error, +Redis, :Goal)
  609%
  610%   Error happened while running Goal on Redis. If this is a recoverable
  611%   error (i.e., a network or disconnected peer),  wait a little and try
  612%   running Goal again.
  613
  614:- meta_predicate recover(+, +, 0).  615
  616recover(Error, Redis, Goal) :-
  617    reconnect_error(Error),
  618    auto_reconnect(Redis),
  619    !,
  620    debug(redis(recover), '~p: got error ~p; trying to reconnect',
  621          [Redis, Error]),
  622    redis_disconnect(Redis, [force(true)]),
  623    (   wait_to_retry(Redis, Error)
  624    ->  call(Goal),
  625        retractall(failure(Redis, _))
  626    ;   throw(Error)
  627    ).
  628recover(Error, _, _) :-
  629    throw(Error).
  630
  631auto_reconnect(redis_connection(_,_,_,Options)) :-
  632    !,
  633    option(reconnect(true), Options).
  634auto_reconnect(Server) :-
  635    ground(Server),
  636    server(Server, _, Options),
  637    option(reconnect(true), Options, true).
  638
  639reconnect_error(error(socket_error(_Code, _),_)).
  640reconnect_error(error(syntax_error(unexpected_eof),_)).
  641
  642%!  wait(+Redis, +Error)
  643%
  644%   Wait for some time after a failure. First  we wait for 10ms. This is
  645%   doubled on each failure upto the   setting  `max_retry_wait`. If the
  646%   setting `max_retry_count` is exceeded we fail and the called signals
  647%   an exception.
  648
  649:- dynamic failure/2 as volatile.  650
  651wait_to_retry(Redis, Error) :-
  652    redis_failures(Redis, Failures),
  653    setting(max_retry_count, Count),
  654    Failures < Count,
  655    Failures2 is Failures+1,
  656    redis_set_failures(Redis, Failures2),
  657    setting(max_retry_wait, MaxWait),
  658    Wait is min(MaxWait*100, 1<<Failures)/100.0,
  659    debug(redis(recover), '  Sleeping ~p seconds', [Wait]),
  660    retry_message_level(Failures, Level),
  661    print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
  662    sleep(Wait).
  663
  664redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
  665    !,
  666    Failures = Failures0.
  667redis_failures(Server, Failures) :-
  668    atom(Server),
  669    (   failure(Server, Failures)
  670    ->  true
  671    ;   Failures = 0
  672    ).
  673
  674redis_set_failures(Connection, Count) :-
  675    compound(Connection),
  676    !,
  677    nb_setarg(3, Connection, Count).
  678redis_set_failures(Server, Count) :-
  679    atom(Server),
  680    retractall(failure(Server, _)),
  681    asserta(failure(Server, Count)).
  682
  683retry_message_level(0, warning) :- !.
  684retry_message_level(_, silent).
  685
  686
  687%!  redis(+Request)
  688%
  689%   Connect to the default redis server,   call  redist/3 using Request,
  690%   disconnect and print the result.  This   predicate  is  intended for
  691%   interactive usage.
  692
  693redis(Req) :-
  694    setup_call_cleanup(
  695        redis_connect(default, C, []),
  696        redis1(C, Req, Out),
  697        redis_disconnect(C)),
  698    print(Out).
  699
  700%!  redis_write(+Redis, +Command) is det.
  701%!  redis_read(+Redis, -Reply) is det.
  702%
  703%   Write command and read replies from a Redis server. These are
  704%   building blocks for subscribing to event streams.
  705
  706redis_write(Redis, Command) :-
  707    redis_stream(Redis, S, true),
  708    redis_write_msg(S, Command).
  709
  710redis_read(Redis, Reply) :-
  711    redis_stream(Redis, S, true),
  712    redis_read_stream(Redis, S, Reply).
  713
  714
  715		 /*******************************
  716		 *      HIGH LEVEL ACCESS	*
  717		 *******************************/
  718
  719%!  redis_get_list(+Redis, +Key, -List) is det.
  720%!  redis_get_list(+Redis, +Key, +ChunkSize, -List) is det.
  721%
  722%   Get the content of a Redis list in   List. If ChunkSize is given and
  723%   smaller than the list length, List is returned as a _lazy list_. The
  724%   actual values are requested using   redis  ``LRANGE`` requests. Note
  725%   that this results in O(N^2) complexity. Using   a  lazy list is most
  726%   useful for relatively short lists holding possibly large items.
  727%
  728%   Note that values retrieved are _strings_, unless the value was added
  729%   using `Term as prolog`.
  730%
  731%   @see lazy_list/2 for a discussion  on   the  difference between lazy
  732%   lists and normal lists.
  733
  734redis_get_list(Redis, Key, List) :-
  735    redis_get_list(Redis, Key, -1, List).
  736
  737redis_get_list(Redis, Key, Chunk, List) :-
  738    redis(Redis, llen(Key), Len),
  739    (   (   Chunk >= Len
  740        ;   Chunk == -1
  741        )
  742    ->  (   Len == 0
  743        ->  List = []
  744        ;   End is Len-1,
  745            list_range(Redis, Key, 0, End, List)
  746        )
  747    ;   lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
  748    ).
  749
  750rlist_next(State, List, Tail) :-
  751    State = s(Redis,Key,Offset,Slice,Len),
  752    End is min(Len-1, Offset+Slice-1),
  753    list_range(Redis, Key, Offset, End, Elems),
  754    (   End =:= Len-1
  755    ->  List = Elems,
  756        Tail = []
  757    ;   Offset2 is Offset+Slice,
  758        nb_setarg(3, State, Offset2),
  759        append(Elems, Tail, List)
  760    ).
  761
  762% Redis LRANGE demands End > Start and returns inclusive.
  763
  764list_range(DB, Key, Start, Start, [Elem]) :-
  765    !,
  766    redis(DB, lindex(Key, Start), Elem).
  767list_range(DB, Key, Start, End, List) :-
  768    !,
  769    redis(DB, lrange(Key, Start, End), List).
  770
  771
  772
  773%!  redis_set_list(+Redis, +Key, +List) is det.
  774%
  775%   Associate a Redis key with a list.  As   Redis  has no concept of an
  776%   empty list, if List is `[]`, Key  is _deleted_. Note that key values
  777%   are always strings in  Redis.  The   same  conversion  rules  as for
  778%   redis/1-3 apply.
  779
  780redis_set_list(Redis, Key, List) :-
  781    redis(Redis, del(Key), _),
  782    (   List == []
  783    ->  true
  784    ;   Term =.. [rpush,Key|List],
  785        redis(Redis, Term, _Count)
  786    ).
  787
  788
  789%!  redis_get_hash(+Redis, +Key, -Data:dict) is det.
  790%!  redis_set_hash(+Redis, +Key, +Data:dict) is det.
  791%
  792%   Put/get a Redis hash as a Prolog  dict. Putting a dict first deletes
  793%   Key. Note that in many cases   applications will manage Redis hashes
  794%   by key. redis_get_hash/3 is notably a   user friendly alternative to
  795%   the Redis ``HGETALL`` command. If the  Redis   hash  is  not used by
  796%   other (non-Prolog) applications one  may   also  consider  using the
  797%   `Term as prolog` syntax to store the Prolog dict as-is.
  798
  799redis_get_hash(Redis, Key, Dict) :-
  800    redis(Redis, hgetall(Key), Dict as dict(auto)).
  801
  802redis_set_hash(Redis, Key, Dict) :-
  803    redis_array_dict(Array, _, Dict),
  804    Term =.. [hset,Key|Array],
  805    redis(Redis, del(Key), _),
  806    redis(Redis, Term, _Count).
  807
  808%!  redis_array_dict(?Array, ?Tag, ?Dict) is det.
  809%
  810%   Translate a Redis reply representing  hash   data  into a SWI-Prolog
  811%   dict. Array is either a list  of   alternating  keys and values or a
  812%   list of _pairs_. When translating to an array, this is always a list
  813%   of alternating keys and values.
  814%
  815%   @arg Tag is the SWI-Prolog dict tag.
  816
  817redis_array_dict(Array, Tag, Dict) :-
  818    nonvar(Array),
  819    !,
  820    array_to_pairs(Array, Pairs),
  821    dict_pairs(Dict, Tag, Pairs).
  822redis_array_dict(TwoList, Tag, Dict) :-
  823    dict_pairs(Dict, Tag, Pairs),
  824    pairs_to_array(Pairs, TwoList).
  825
  826array_to_pairs([], []) :-
  827    !.
  828array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
  829    !,                                  % RESP3 returns a map as pairs.
  830    atom_string(Name, NameS),
  831    array_to_pairs(T0, T).
  832array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
  833    atom_string(Name, NameS),
  834    array_to_pairs(T0, T).
  835
  836pairs_to_array([], []) :-
  837    !.
  838pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
  839    atom_string(Name, NameS),
  840    pairs_to_array(T0, T).
  841
  842%!  redis_scan(+Redis, -LazyList, +Options) is det.
  843%!  redis_sscan(+Redis, +Set, -LazyList, +Options) is det.
  844%!  redis_hscan(+Redis, +Hash, -LazyList, +Options) is det.
  845%!  redis_zscan(+Redis, +Set, -LazyList, +Options) is det.
  846%
  847%   Map the Redis ``SCAN``, ``SSCAN``,   ``HSCAN`` and `ZSCAN`` commands
  848%   into a _lazy list_. For redis_scan/3 and redis_sscan/4 the result is
  849%   a list of strings. For redis_hscan/4   and redis_zscan/4, the result
  850%   is a list of _pairs_.   Options processed:
  851%
  852%     - match(Pattern)
  853%       Adds the ``MATCH`` subcommand, only returning matches for
  854%       Pattern.
  855%     - count(Count)
  856%       Adds the ``COUNT`` subcommand, giving a hint to the size of the
  857%       chunks fetched.
  858%     - type(Type)
  859%       Adds the ``TYPE`` subcommand, only returning answers of the
  860%       indicated type.
  861%
  862%   @see lazy_list/2.
  863
  864redis_scan(Redis, LazyList, Options) :-
  865    scan_options([match,count,type], Options, Parms),
  866    lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
  867
  868redis_sscan(Redis, Set, LazyList, Options) :-
  869    scan_options([match,count,type], Options, Parms),
  870    lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
  871
  872redis_hscan(Redis, Hash, LazyList, Options) :-
  873    scan_options([match,count,type], Options, Parms),
  874    lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
  875
  876redis_zscan(Redis, Set, LazyList, Options) :-
  877    scan_options([match,count,type], Options, Parms),
  878    lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
  879
  880scan_options([], _, []).
  881scan_options([H|T0], Options, [H,V|T]) :-
  882    Term =.. [H,V],
  883    option(Term, Options),
  884    !,
  885    scan_options(T0, Options, T).
  886scan_options([_|T0], Options, T) :-
  887    scan_options(T0, Options, T).
  888
  889
  890scan_next(State, List, Tail) :-
  891    State = s(Command,Redis,Cursor,Params),
  892    Command =.. CList,
  893    append(CList, [Cursor|Params], CList2),
  894    Term =.. CList2,
  895    redis(Redis, Term, [NewCursor,Elems0]),
  896    scan_pairs(Command, Elems0, Elems),
  897    (   NewCursor == 0
  898    ->  List = Elems,
  899        Tail = []
  900    ;   nb_setarg(3, State, NewCursor),
  901        append(Elems, Tail, List)
  902    ).
  903
  904scan_pairs(hscan(_), List, Pairs) :-
  905    !,
  906    scan_pairs(List, Pairs).
  907scan_pairs(zscan(_), List, Pairs) :-
  908    !,
  909    scan_pairs(List, Pairs).
  910scan_pairs(_, List, List).
  911
  912scan_pairs([], []).
  913scan_pairs([Key,Value|T0], [Key-Value|T]) :-
  914    !,
  915    scan_pairs(T0, T).
  916scan_pairs([Key-Value|T0], [Key-Value|T]) :-
  917    scan_pairs(T0, T).
  918
  919
  920		 /*******************************
  921		 *              ABOUT		*
  922		 *******************************/
  923
  924%!  redis_current_command(+Redis, ?Command) is nondet.
  925%!  redis_current_command(+Redis, ?Command, -Properties) is nondet.
  926%
  927%   True when Command has Properties. Fails   if Command is not defined.
  928%   The redis_current_command/3 version  returns   the  command argument
  929%   specification. See Redis documentation for an explanation.
  930
  931redis_current_command(Redis, Command) :-
  932    redis_current_command(Redis, Command, _).
  933
  934redis_current_command(Redis, Command, Properties) :-
  935    nonvar(Command),
  936    !,
  937    redis(Redis, command(info, Command), [[_|Properties]]).
  938redis_current_command(Redis, Command, Properties) :-
  939    redis(Redis, command, Commands),
  940    member([Name|Properties], Commands),
  941    atom_string(Command, Name).
  942
  943%!  redis_property(+Redis, ?Property) is nondet.
  944%
  945%   True if Property is a property of   the Redis server. Currently uses
  946%   redis(info, String) and parses the result.   As  this is for machine
  947%   usage, properties names *_human are skipped.
  948
  949redis_property(Redis, Property) :-
  950    redis(Redis, info, String),
  951    info_terms(String, Terms),
  952    member(Property, Terms).
  953
  954info_terms(Info, Pairs) :-
  955    split_string(Info, "\n", "\r\n ", Lines),
  956    convlist(info_line_term, Lines, Pairs).
  957
  958info_line_term(Line, Term) :-
  959    sub_string(Line, B, _, A, :),
  960    !,
  961    sub_atom(Line, 0, B, _, Name),
  962    \+ sub_atom(Name, _, _, 0, '_human'),
  963    sub_string(Line, _, A, 0, ValueS),
  964    (   number_string(Value, ValueS)
  965    ->  true
  966    ;   Value = ValueS
  967    ),
  968    Term =.. [Name,Value].
  969
  970
  971		 /*******************************
  972		 *            SUBSCRIBE		*
  973		 *******************************/
  974
  975%!  redis_subscribe(+Redis, +Channels, -Id, +Options) is det.
  976%
  977%   Subscribe to one or more  Redis   PUB/SUB  channels.  This predicate
  978%   creates a thread using thread_create/3 with  the given Options. Once
  979%   running, the thread listens for messages.   The message content is a
  980%   string or Prolog term  as  described   in  redis/3.  On  receiving a
  981%   message, the following message is broadcasted:
  982%
  983%       redis(Id, Channel, Data)
  984%
  985%   If redis_unsubscribe/2 removes the  last   subscription,  the thread
  986%   terminates.
  987%
  988%   To simply print the incomming messages use e.g.
  989%
  990%       ?- listen(redis(_, Channel, Data),
  991%                 format('Channel ~p got ~p~n', [Channel,Data])).
  992%       true.
  993%       ?- redis_subscribe(default, test, Id, []).
  994%       Id = redis_pubsub_3,
  995%       ?- redis(publish(test, "Hello world")).
  996%       Channel test got "Hello world"
  997%       1
  998%       true.
  999%
 1000%   @arg Id is the thread identifier of  the listening thread. Note that
 1001%   the Options alias(Name) can be used to get a system wide name.
 1002
 1003:- dynamic ( subscription/2,            % Id, Channel
 1004             listening/3                % Id, Connection, Thread
 1005           ) as volatile. 1006
 1007redis_subscribe(Redis, Spec, Id, Options) :-
 1008    atom(Redis),
 1009    !,
 1010    channels(Spec, Channels),
 1011    pubsub_thread_options(ThreadOptions, Options),
 1012    thread_create(setup_call_cleanup(
 1013                      redis_connect(Redis, Conn, [reconnect(true)]),
 1014                      redis_subscribe1(Redis, Conn, Channels),
 1015                      redis_disconnect(Conn)),
 1016                  Thread,
 1017                  ThreadOptions),
 1018    pubsub_id(Thread, Id).
 1019redis_subscribe(Redis, Spec, Id, Options) :-
 1020    channels(Spec, Channels),
 1021    pubsub_thread_options(ThreadOptions, Options),
 1022    thread_create(redis_subscribe1(Redis, Redis, Channels),
 1023                  Thread,
 1024                  ThreadOptions),
 1025    pubsub_id(Thread, Id).
 1026
 1027pubsub_thread_options(ThreadOptions, Options) :-
 1028    merge_options(Options, [detached(true)], ThreadOptions).
 1029
 1030pubsub_id(Thread, Thread).
 1031%pubsub_id(Thread, Id) :-
 1032%    thread_property(Thread, id(TID)),
 1033%    atom_concat('redis_pubsub_', TID, Id).
 1034
 1035redis_subscribe1(Redis, Conn, Channels) :-
 1036    Error = error(Formal, _),
 1037    catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
 1038    (   var(Formal)
 1039    ->  true
 1040    ;   recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
 1041        thread_self(Me),
 1042        pubsub_id(Me, Id),
 1043        findall(Channel, subscription(Id, Channel), CurrentChannels),
 1044        redis_subscribe1(Redis, Conn, CurrentChannels)
 1045    ).
 1046
 1047redis_subscribe2(Redis, Conn, Channels) :-
 1048    redis_subscribe3(Conn, Channels),
 1049    redis_listen(Redis, Conn).
 1050
 1051redis_subscribe3(Conn, Channels) :-
 1052    thread_self(Me),
 1053    pubsub_id(Me, Id),
 1054    prolog_listen(this_thread_exit, pubsub_clean(Id)),
 1055    maplist(register_subscription(Id), Channels),
 1056    redis_stream(Conn, S, true),
 1057    Req =.. [subscribe|Channels],
 1058    redis_write_msg(S, Req).
 1059
 1060pubsub_clean(Id) :-
 1061    retractall(listening(Id, _Connection, _Thread)),
 1062    retractall(subscription(Id, _Channel)).
 1063
 1064%!  redis_subscribe(+Id, +Channels) is det.
 1065%!  redis_unsubscribe(+Id, +Channels) is det.
 1066%
 1067%   Add/remove channels from for the   subscription. If no subscriptions
 1068%   remain, the listening thread terminates.
 1069%
 1070%   @arg Channels is either a single  channel   or  a list thereof. Each
 1071%   channel specification is either an atom   or a term `A:B:...`, where
 1072%   all parts are atoms.
 1073
 1074redis_subscribe(Id, Spec) :-
 1075    channels(Spec, Channels),
 1076    (   listening(Id, Connection, _Thread)
 1077    ->  true
 1078    ;   existence_error(redis_pubsub, Id)
 1079    ),
 1080    maplist(register_subscription(Id), Channels),
 1081    redis_stream(Connection, S, true),
 1082    Req =.. [subscribe|Channels],
 1083    redis_write_msg(S, Req).
 1084
 1085redis_unsubscribe(Id, Spec) :-
 1086    channels(Spec, Channels),
 1087    (   listening(Id, Connection, _Thread)
 1088    ->  true
 1089    ;   existence_error(redis_pubsub, Id)
 1090    ),
 1091    maplist(unregister_subscription(Id), Channels),
 1092    redis_stream(Connection, S, true),
 1093    Req =.. [unsubscribe|Channels],
 1094    redis_write_msg(S, Req).
 1095
 1096%!  redis_current_subscription(?Id, ?Channels)
 1097%
 1098%   True when a PUB/SUB subscription with Id is listening on Channels.
 1099
 1100redis_current_subscription(Id, Channels) :-
 1101    findall(Id-Channel, subscription(Id, Channel), Pairs),
 1102    keysort(Pairs, Sorted),
 1103    group_pairs_by_key(Sorted, Grouped),
 1104    member(Id-Channels, Grouped).
 1105
 1106channels(Spec, List) :-
 1107    is_list(Spec),
 1108    !,
 1109    maplist(channel_name, Spec, List).
 1110channels(Ch, [Key]) :-
 1111    channel_name(Ch, Key).
 1112
 1113channel_name(Atom, Atom) :-
 1114    atom(Atom),
 1115    !.
 1116channel_name(Key, Atom) :-
 1117    phrase(key_parts(Key), Parts),
 1118    !,
 1119    atomic_list_concat(Parts, :, Atom).
 1120channel_name(Key, _) :-
 1121    type_error(redis_key, Key).
 1122
 1123key_parts(Var) -->
 1124    { var(Var), !, fail }.
 1125key_parts(Atom) -->
 1126    { atom(Atom) },
 1127    !,
 1128    [Atom].
 1129key_parts(A:B) -->
 1130    key_parts(A),
 1131    key_parts(B).
 1132
 1133
 1134
 1135
 1136register_subscription(Id, Channel) :-
 1137    (   subscription(Id, Channel)
 1138    ->  true
 1139    ;   assertz(subscription(Id, Channel))
 1140    ).
 1141
 1142unregister_subscription(Id, Channel) :-
 1143    retractall(subscription(Id, Channel)).
 1144
 1145redis_listen(Redis, Conn) :-
 1146    thread_self(Me),
 1147    pubsub_id(Me, Id),
 1148    setup_call_cleanup(
 1149        assertz(listening(Id, Conn, Me), Ref),
 1150        redis_listen_loop(Redis, Id, Conn),
 1151        erase(Ref)).
 1152
 1153redis_listen_loop(Redis, Id, Conn) :-
 1154    redis_stream(Conn, S, true),
 1155    (   subscription(Id, _)
 1156    ->  redis_read_stream(Redis, S, Reply),
 1157        redis_broadcast(Redis, Reply),
 1158        redis_listen_loop(Redis, Id, Conn)
 1159    ;   true
 1160    ).
 1161
 1162redis_broadcast(_, [subscribe, _Channel, _N]) :-
 1163    !.
 1164redis_broadcast(Redis, [message, Channel, Data]) :-
 1165    !,
 1166    catch(broadcast(redis(Redis, Channel, Data)),
 1167          Error,
 1168          print_message(error, Error)).
 1169redis_broadcast(Redis, Message) :-
 1170    assertion((Message = [Type, Channel, _Data],
 1171               atom(Type),
 1172               atom(Channel))),
 1173    debug(redis(warning), '~p: Unknown message while listening: ~p',
 1174          [Redis,Message]).
 1175
 1176
 1177		 /*******************************
 1178		 *          READ/WRITE		*
 1179		 *******************************/
 1180
 1181%!  redis_read_stream(+Redis, +Stream, -Term) is det.
 1182%
 1183%   Read a message from a Redis stream.  Term is one of
 1184%
 1185%     - A list of terms (array)
 1186%     - A list of pairs (map, RESP3 only)
 1187%     - The atom `nil`
 1188%     - A number
 1189%     - A term status(String)
 1190%     - A string
 1191%     - A boolean (`true` or `false`).  RESP3 only.
 1192%
 1193%   If something goes wrong, the connection   is closed and an exception
 1194%   is raised.
 1195
 1196redis_read_stream(Redis, SI, Out) :-
 1197    E = error(Formal,_),
 1198    catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
 1199    (   var(Formal)
 1200    ->  handle_push_messages(Push, Redis),
 1201        (   var(Error)
 1202        ->  Out = Out0
 1203        ;   resync(Redis),
 1204            throw(Error)
 1205        )
 1206    ;   redis_disconnect(Redis, [force(true)]),
 1207        throw(E)
 1208    ).
 1209
 1210handle_push_messages([], _).
 1211handle_push_messages([H|T], Redis) :-
 1212    (   catch(handle_push_message(H, Redis), E,
 1213              print_message(warning, E))
 1214    ->  true
 1215    ;   true
 1216    ),
 1217    handle_push_messages(T, Redis).
 1218
 1219handle_push_message(["pubsub"|List], Redis) :-
 1220    redis_broadcast(Redis, List).
 1221% some protocol version 3 push messages (such as
 1222% __keyspace@* events) seem to come directly
 1223% without a pubsub header
 1224handle_push_message([message|List], Redis) :-
 1225    redis_broadcast(Redis, [message|List]).
 1226
 1227
 1228%!  resync(+Redis) is det.
 1229%
 1230%   Re-synchronize  after  an  error.  This  may  happen  if  some  type
 1231%   conversion fails and we have read  a   partial  reply. It is hard to
 1232%   figure out what to read from where we are, so we echo a random magic
 1233%   sequence and read until we find the reply.
 1234
 1235resync(Redis) :-
 1236    E = error(Formal,_),
 1237    catch(do_resync(Redis), E, true),
 1238    (   var(Formal)
 1239    ->  true
 1240    ;   redis_disconnect(Redis, [force(true)]),
 1241        throw(E)
 1242    ).
 1243
 1244do_resync(Redis) :-
 1245    A is random(1_000_000_000),
 1246    redis_stream(Redis, S, true),
 1247    redis_write_msg(S, echo(A)),
 1248    '$redis_resync'(S, A).
 1249
 1250
 1251%!  redis_read_msg(+Stream, -Message, -Error, -PushMessages) is det.
 1252%!  redis_write_msg(+Stream, +Message) is det.
 1253%
 1254%   Read/write a Redis message. Both these predicates are in the foreign
 1255%   module `redis4pl`.
 1256%
 1257%   @arg PushMessages is a list of push   messages that may be non-[] if
 1258%   protocol version 3 (see redis_connect/3) is selected. Using protocol
 1259%   version 2 this list is always empty.
 1260
 1261
 1262
 1263		 /*******************************
 1264		 *            MESSAGES		*
 1265		 *******************************/
 1266
 1267:- multifile
 1268    prolog:error_message//1,
 1269    prolog:message//1. 1270
 1271prolog:error_message(redis_error(Code, String)) -->
 1272    [ 'REDIS: ~w: ~s'-[Code, String] ].
 1273
 1274prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
 1275    [ 'REDIS: connection error.  Retrying in ~2f seconds'-[Wait], nl ],
 1276    [ '    '-[] ], '$messages':translate_message(Error)