module tarantool.connection_pool

This module provides API for interaction with Tarantool servers cluster.

class tarantool.connection_pool.ConnectionPool(addrs, user=None, password=None, socket_timeout=None, reconnect_max_attempts=0, reconnect_delay=0, connect_now=True, encoding='utf-8', call_16=False, connection_timeout=None, strategy_class=<class 'tarantool.connection_pool.RoundRobinStrategy'>, refresh_delay=1, fetch_schema=True)

Represents the pool of connections to a cluster of Tarantool servers.

To work with ConnectionPool, box.info must be callable for the user on each server.

ConnectionPool is best suited to work with a single replicaset. Its API is the same as a single server Connection, but requests support mode parameter (a tarantool.Mode value) to choose between read-write and read-only pool instances:

>>> resp = conn.select('demo', 'AAAA', mode=tarantool.Mode.PREFER_RO)
>>> resp
- ['AAAA', 'Alpha']
Parameters:
  • addrs (list) –

    List of dictionaries describing server addresses:

    {
        "host': "str" or None,     # mandatory
        "port": int or "str",      # mandatory
        "socket_fd": int,          # optional
        "transport": "str",        # optional
        "ssl_key_file": "str",     # optional
        "ssl_cert_file": "str",    # optional
        "ssl_ca_file": "str",      # optional
        "ssl_ciphers": "str"       # optional
        "ssl_password": "str",     # optional
        "ssl_password_file": "str" # optional
        "auth_type": "str"         # optional
    }
    

    Refer to corresponding Connection parameters.

  • user – Refer to user. The value is used for each connection in the pool.

  • password – Refer to password. The value is used for each connection in the pool.

  • socket_timeout – Refer to socket_timeout. The value is used for each connection in the pool.

  • reconnect_max_attempts – Refer to reconnect_max_attempts. The value is used for each connection in the pool. Be careful: it is internal Connection reconnect unrelated to pool reconnect mechanisms.

  • reconnect_delay – Refer to reconnect_delay. The value is used for each connection in the pool. Be careful: it is internal Connection reconnect unrelated to pool reconnect mechanisms.

  • connect_now (bool, optional) – If True, connect to all pool servers on initialization. Otherwise, you have to call connect() manually after initialization.

  • encoding – Refer to encoding. The value is used for each connection in the pool.

  • call_16 – Refer to call_16. The value is used for each connection in the pool.

  • connection_timeout (float, optional) – Refer to connection_timeout. The value is used for each connection in the pool.

  • strategy_class (StrategyInterface, optional) – Strategy for choosing a server based on a request mode. Defaults to the round-robin strategy.

  • refresh_delay – Minimal time between pool server box.info.ro status background refreshes, in seconds.

  • fetch_schema – Refer to fetch_schema.

Raise:

ConfigurationError, Connection exceptions

call(func_name, *args, mode=None, on_push=None, on_push_ctx=None)

Execute a CALL request on the pool server: call a stored Lua function. Refer to call().

Parameters:
Return type:

Response

Raise:

ValueError, call() exceptions

close()

Stop request processing, close each connection in the pool.

connect()

Create a connection to each address specified on initialization and start background process threads for them. There is no need to call this method explicitly until you have set connect_now=False on initialization.

If some connections have failed to connect successfully or provide box.info status (including the case when all of them have failed), no exceptions are raised. Attempts to reconnect and refresh the info would be processed in the background.

crud_count(space_name, conditions=None, opts=None, *, mode=Mode.ANY)

Execute an crud_count request on the pool server: gets rows count through the crud. Refer to crud_count().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_delete(space_name, key, opts=None, *, mode=Mode.ANY)

Execute an crud_delete request on the pool server: deletes row through the crud. Refer to crud_delete().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_get(space_name, key, opts=None, *, mode=Mode.ANY)

Execute an crud_get request on the pool server: gets row through the crud. Refer to crud_get().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_insert(space_name, values, opts=None, *, mode=Mode.ANY)

Execute an crud_insert request on the pool server: inserts row through the crud. Refer to crud_insert().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_insert_many(space_name, values, opts=None, *, mode=Mode.ANY)

Execute an crud_insert_many request on the pool server: inserts batch rows through the crud. Refer to crud_insert_many().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_insert_object(space_name, values, opts=None, *, mode=Mode.ANY)

Execute an crud_insert_object request on the pool server: inserts object row through the crud. Refer to crud_insert_object().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_insert_object_many(space_name, values, opts=None, *, mode=Mode.ANY)

Execute an crud_insert_object_many request on the pool server: inserts batch object rows through the crud. Refer to crud_insert_object_many().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_len(space_name, opts=None, *, mode=Mode.ANY)

Execute an crud_len request on the pool server: gets the number of tuples in the space through crud. Refer to crud_len().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_max(space_name, index_name, opts=None, *, mode=Mode.ANY)

Execute an crud_max request on the pool server: gets rows with maximum value in the specified index through crud. Refer to crud_max().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_min(space_name, index_name, opts=None, *, mode=Mode.ANY)

Execute an crud_min request on the pool server: gets rows with minimum value in the specified index through crud. Refer to crud_min().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_replace(space_name, values, opts=None, *, mode=Mode.ANY)

Execute an crud_replace request on the pool server: replaces row through the crud. Refer to crud_replace().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_replace_many(space_name, values, opts=None, *, mode=Mode.ANY)

Execute an crud_replace_many request on the pool server: replaces batch rows through the crud. Refer to crud_replace_many().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_replace_object(space_name, values, opts=None, *, mode=Mode.ANY)

Execute an crud_replace_object request on the pool server: replaces object row through the crud. Refer to crud_replace_object().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_replace_object_many(space_name, values, opts=None, *, mode=Mode.ANY)

Execute an crud_replace_object_many request on the pool server: replaces batch object rows through the crud. Refer to crud_replace_object_many().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_select(space_name, conditions=None, opts=None, *, mode=Mode.ANY)

Execute an crud_select request on the pool server: selects rows through the crud. Refer to crud_select().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_stats(space_name=None, *, mode=Mode.ANY)

Execute an crud_stats request on the pool server: gets statistics through the crud. Refer to crud_stats().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_storage_info(opts=None, *, mode=Mode.ANY)

Execute an crud_storage_info request on the pool server: gets storages status through the crud. Refer to crud_storage_info().

Parameters:
  • opts – Refer to opts.

  • mode (Mode, optional) – Request mode.

Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_truncate(space_name, opts=None, *, mode=Mode.ANY)

Execute an crud_truncate request on the pool server: truncates rows through crud. Refer to crud_truncate().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_unflatten_rows(rows, metadata, *, mode=Mode.ANY)

Makes rows unflatten through the crud. Refer to crud_unflatten_rows().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_update(space_name, key, operations=None, opts=None, *, mode=Mode.ANY)

Execute an crud_update request on the pool server: updates row through the crud. Refer to crud_update().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_upsert(space_name, values, operations=None, opts=None, *, mode=Mode.ANY)

Execute an crud_upsert request on the pool server: upserts row through the crud. Refer to crud_upsert().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_upsert_many(space_name, values_operation, opts=None, *, mode=Mode.ANY)

Execute an crud_upsert_many request on the pool server: upserts batch rows through the crud. Refer to crud_upsert_many().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_upsert_object(space_name, values, operations=None, opts=None, *, mode=Mode.ANY)

Execute an crud_upsert_object request on the pool server: upserts object row through the crud. Refer to crud_upsert_object().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

crud_upsert_object_many(space_name, values_operation, opts=None, *, mode=Mode.ANY)

Execute an crud_upsert_object_many request on the pool server: upserts batch object rows through the crud. Refer to crud_upsert_object_many().

Parameters:
Return type:

CrudResult

Raise:

CrudModuleError, DatabaseError

delete(space_name, key, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute an DELETE request on the pool server: delete a tuple in the space. Refer to delete().

Parameters:
Return type:

Response

Raise:

delete() exceptions

eval(expr, *args, mode=None, on_push=None, on_push_ctx=None)

Execute an EVAL request on the pool server: evaluate a Lua expression. Refer to eval().

Parameters:
Return type:

Response

Raise:

ValueError, eval() exceptions

execute(query, params=None, *, mode=None)

Execute an SQL request on the pool server. Refer to execute().

Parameters:
Return type:

Response

Raise:

ValueError, execute() exceptions

insert(space_name, values, *, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute an INSERT request on the pool server: insert a tuple to the space. Refer to insert().

Parameters:
Return type:

Response

Raise:

insert() exceptions

is_closed()

Returns False if at least one connection is not closed and is ready to process requests. Otherwise, returns True.

Return type:

bool

ping(notime=False, *, mode=None)

Execute a PING request on the pool server: send an empty request and receive an empty response from the server. Refer to ping().

Parameters:
Returns:

Refer to ping().

Raise:

ValueError, ping() exceptions

replace(space_name, values, *, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute a REPLACE request on the pool server: replace a tuple in the space. Refer to replace().

Parameters:
Return type:

Response

Raise:

replace() exceptions

select(space_name, key, *, offset=0, limit=4294967295, index=0, iterator=None, mode=Mode.ANY, on_push=None, on_push_ctx=None)

Execute a SELECT request on the pool server: update a tuple from the space. Refer to select().

Parameters:
Return type:

Response

Raise:

select() exceptions

update(space_name, key, op_list, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute an UPDATE request on the pool server: update a tuple in the space. Refer to update().

Parameters:
Return type:

Response

Raise:

upsert() exceptions

upsert(space_name, tuple_value, op_list, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None)

Execute an UPSERT request on the pool server: upsert a tuple to the space. Refer to upsert().

Parameters:
Return type:

Response

Raise:

upsert() exceptions

class tarantool.connection_pool.InstanceState(status: Status = Status.UNHEALTHY, read_only: Optional[bool] = None)

Cluster single server state.

read_only: Optional[bool] = None
Type:

bool, optional

status: Status = 2
Type:

Status

class tarantool.connection_pool.Mode(value)

Request mode.

ANY = 1

Send a request to any server.

PREFER_RO = 5

Send a request to RO server, if possible, RW server otherwise.

PREFER_RW = 4

Send a request to RW server, if possible, RO server otherwise.

RO = 3

Send a request to RO server.

RW = 2

Send a request to RW server.

class tarantool.connection_pool.PoolTask(method_name: str, args: tuple, kwargs: dict)

Store request type and arguments to pass them to some server thread.

args: tuple

Connection method args.

Type:

tuple

kwargs: dict

Connection method kwargs.

Type:

dict

method_name: str

Connection method name.

Type:

str

class tarantool.connection_pool.PoolUnit(addr: dict, conn: ~tarantool.connection.Connection, input_queue: ~queue.Queue = <factory>, output_queue: ~queue.Queue = <factory>, thread: ~typing.Optional[~threading.Thread] = None, state: ~tarantool.connection_pool.InstanceState = <factory>, request_processing_enabled: bool = False)

Class to store a Tarantool server metainfo and to work with it as a part of connection pool.

addr: dict

{"host": host, "port": port, "socket_fd": socket_fd} info.

Type:

dict

conn: Connection
Type:

Connection

get_address()

Get an address string representation.

input_queue: Queue

Channel to pass requests for the server thread.

Type:

queue.Queue

output_queue: Queue

Channel to receive responses from the server thread.

Type:

queue.Queue

request_processing_enabled: bool = False

Flag used to stop requests processing requests in the background thread on connection close or destruction.

Type:

bool

state: InstanceState

Current server state.

Type:

InstanceState

thread: Optional[Thread] = None

Background thread to process requests for the server.

Type:

threading.Thread

class tarantool.connection_pool.RoundRobinStrategy(pool)

Simple round-robin pool servers rotation.

Type:

list of PoolUnit objects

build()

Initialize (or re-initialize) internal pools to rotate servers based on box.info.ro state.

getnext(mode)

Get server based on the request mode.

Parameters:

mode (Mode) – Request mode

Return type:

PoolUnit

Raise:

PoolTolopogyError

update()

Set flag to re-initialize internal pools on next getnext() call.

class tarantool.connection_pool.Status(value)

Cluster single server status.

HEALTHY = 1

Server is healthy: connection is successful, box.info.ro could be extracted, box.info.status is “running”.

UNHEALTHY = 2

Server is unhealthy: either connection is failed, box.info cannot be extracted, box.info.status is not “running”.

class tarantool.connection_pool.StrategyInterface(pool)

Defines strategy to choose a pool server based on a request mode.

Type:

list of PoolUnit objects

abstract getnext(mode)

Get a pool server based on a request mode.

Parameters:

mode (Mode) – Request mode.

abstract update()

Refresh the strategy state.

tarantool.connection_pool.queue_factory()

Build a queue-based channel.