kinto-http is the Python library to interact with a Kinto server.
There is also a similar client in JavaScript.
Use pip:
$ pip install kinto-http
Here is an overview of what the API provides:
import kinto_http
client = kinto_http.Client(server_url="http://localhost:8888/v1",
auth=('alexis', 'p4ssw0rd'))
records = client.get_records(bucket='default', collection='todos')
for i, record in enumerate(records):
record['title'] = 'Todo {}'.format(i)
client.update_record(data=record)
The passed auth
parameter is a requests
authentication policy.
By default, a simple tuple will become a Basic Auth
authorization request header, that can authenticate users with Kinto Accounts.
import kinto_http
auth = ('alexis', 'p4ssw0rd')
client = kinto_http.Client(server_url='http://localhost:8888/v1',
auth=auth)
It is also possible to pass a bucket
ID and/or collection
ID to set them as default values for the parameters of the client operations.
client = Client(bucket="payments", collection="receipts", auth=auth)
After creating a client, you can also replicate an existing one and overwrite some key arguments.
client2 = client.clone(collection="orders")
An asynchronous client is also available. It has all the same endpoints as the sync client except for the batch operations.
from kinto_http import AsyncClient
auth = ('alexis', 'p4ssw0rd')
client = AsyncClient(server_url='http://localhost:8888/v1', auth=auth)
info = await client.server_info()
assert 'schema' in info['capabilities'], "Server doesn't support schema validation."
The dry_mode
parameter can be set to simulate requests without actually sending them over the network.
When enabled, dry mode ensures that no external calls are made, making it useful for testing or debugging.
Instead of performing real HTTP operations, the client logs the requests with DEBUG
level.
import kinto_http
client = kinto_http.Client(auth=kinto_http.BearerTokenAuth("XYPJTNsFKV2"))
The authorization header is prefixed with Bearer
by default. If the header_type
is customized on the server,
the client must specify the expected type: kinto_http.BearerTokenAuth("XYPJTNsFKV2", type="Bearer+OIDC")
Note
Passing a string containing Bearer
will be instantiate a kinto_http.BearerTokenAuth()
object automatically.
In other words, kinto_http.Client(auth="Bearer+OIDC XYPJTNsFKV2")
is equivalent to kinto_http.Client(auth=kinto_http.BearerTokenAuth("XYPJTNsFKV2", type="Bearer+OIDC"))
import kinto_http
client = kinto_http.Client(server_url='http://localhost:8888/v1', auth=kinto_http.BrowserOAuth())
The client will open a browser page and will catch the Bearer token obtained after the OAuth dance.
Custom headers can be specified in the Client constructor, and will be sent in every request:
import kinto_http
client = kinto_http.Client(server_url="http://server/v1", headers={
"Allow-Access": "CDN",
"User-Agent": "blocklist-updater"
})
You can use the server_info()
method to fetch the server information:
from kinto_http import Client
client = Client(server_url='http://localhost:8888/v1')
info = client.server_info()
assert 'schema' in info['capabilities'], "Server doesn't support schema validation."
get_bucket(id=None, **kwargs)
: retrieve single bucketget_buckets(**kwargs)
: retrieve all readable bucketscreate_bucket(id=None, data=None, **kwargs)
: create a bucketupdate_bucket(id=None, data=None, **kwargs)
: create or replace an existing bucketpatch_bucket(id=None, changes=None, **kwargs)
: modify some fields in an existing bucketdelete_bucket(id=None, **kwargs)
: delete a bucket and everything under itdelete_buckets(**kwargs)
: delete every writable buckets
get_group(id=None, bucket=None, **kwargs)
: retrieve single groupget_groups(bucket=None, **kwargs)
: retrieve all readable groupscreate_group(id=None, data=None, bucket=None, **kwargs)
: create a groupupdate_group(id=None, data=None, bucket=None, **kwargs)
: create or replace an existing grouppatch_group(id=None, changes=None, bucket=None, **kwargs)
: modify some fields in an existing groupdelete_group(id=None, bucket=None, **kwargs)
: delete a group and everything under itdelete_groups(bucket=None, **kwargs)
: delete every writable groups
get_collection(id=None, bucket=None, **kwargs)
: retrieve single collectionget_collections(bucket=None, **kwargs)
: retrieve all readable collectionscreate_collection(id=None, data=None, bucket=None, **kwargs)
: create a collectionupdate_collection(id=None, data=None, bucket=None, **kwargs)
: create or replace an existing collectionpatch_collection(id=None, changes=None, bucket=None, **kwargs)
: modify some fields in an existing collectiondelete_collection(id=None, bucket=None, **kwargs)
: delete a collection and everything under itdelete_collections(bucket=None, **kwargs)
: delete every writable collections
get_record(id=None, bucket=None, collection=None, **kwargs)
: retrieve single recordget_records(bucket=None, collection=None, **kwargs)
: retrieve all readable recordsget_paginated_records(bucket=None, collection=None, **kwargs)
: paginated list of recordsget_records_timestamp(bucket=None, collection=None, **kwargs)
: return the records timestamp of this collectioncreate_record(id=None, data=None, bucket=None, collection=None, **kwargs)
: create a recordupdate_record(id=None, data=None, bucket=None, collection=None, **kwargs)
: create or replace an existing recordpatch_record(id=None, changes=None, bucket=None, collection=None, **kwargs)
: modify some fields in an existing recorddelete_record(id=None, bucket=None, collection=None, **kwargs)
: delete a record and everything under itdelete_records(bucket=None, collection=None, **kwargs)
: delete every writable records
The objects permissions can be specified or modified by passing a permissions
to create_*()
, patch_*()
, or update_*()
methods:
client.create_record(data={'foo': 'bar'},
permissions={'read': ['group:groupid']})
record = client.get_record('123', collection='todos', bucket='alexis')
record['permissions']['write'].append('leplatrem')
client.update_record(data=record)
In order to obtain a list of all the permissions, on every object, use the get_permissions()
method:
all_perms = client.get_permissions(exclude_resource_names=("record",))
has_collection_perms = any(
p for p in all_perms
if p["collection_id"] == "my-collection"
and "write" in p["permissions"]
)
In some cases, you might want to create a bucket, collection, group or record only if
it doesn't exist already. To do so, you can pass the if_not_exists=True
to the create_*()
methods:
client.create_bucket(id='blog', if_not_exists=True) client.create_collection(id='articles', bucket='blog', if_not_exists=True)
In some cases, you might want to delete a bucket, collection, group or record only if
it exists already. To do so, you can pass the if_exists=True
to the delete_*
methods:
client.delete_bucket(id='bucket', if_exists=True)
The .patch_*()
operations receive a changes
parameter.
from kinto_http.patch_type import BasicPatch, MergePatch, JSONPatch
client.patch_record(id='abc', changes=BasicPatch({'over': 'write'}))
client.patch_record(id='todo', changes=MergePatch({'assignee': 'bob'}))
client.patch_record(id='receipts', changes=JSONPatch([
{'op': 'add', 'path': '/data/members/0', 'value': 'ldap:user@corp.com'}
]))
The create_*()
, patch_*()
, and update_*()
methods take a safe
argument (default: True
).
If True
, the client will ensure that the object doesn't exist already for creations, or wasn't modified on the server side since we fetched it. The timestamp will be implicitly read from the last_modified
field in the passed data
object, or taken explicitly from the if_match
parameter.
Rather than issuing a request for each and every operation, it is possible to batch several operations in one request (sync client only).
Using the batch()
method as a Python context manager (with
):
with client.batch() as batch:
for idx in range(0, 100):
batch.update_record(data={'id': idx})
Note
Besides the results()
method, a batch object shares all the same methods as
another client.
Reading data from batch operations is achieved by using the results()
method
available after a batch context is closed.
with client.batch() as batch:
batch.get_record('r1')
batch.get_record('r2')
batch.get_record('r3')
r1, r2, r3 = batch.results()
Failing operations will raise a KintoException
, which has request
and response
attributes.
try:
client.create_group(id="friends")
except kinto_http.KintoException as e:
if e.response and e.response.status_code == 403:
print("Not allowed!")
A timeout
value in seconds can be specified in the client constructor:
client = KintoClient(server_url="...", timeout=5)
To distinguish the connect from the read timeout, use a tuple:
client = KintoClient(server_url="...", timeout=(3.05, 27))
For an infinit timeout, use None
:
client = KintoClient(server_url="...", timeout=None)
See the timeout documentation of the underlying requests
library.
When the server is throttled (under heavy load or maintenance) it can return error responses.
The client can hence retry to send the same request until it succeeds. To enable this, specify the number of retries on the client:
client = Client(server_url='http://localhost:8888/v1',
auth=credentials,
retry=10)
The Kinto protocol lets the server define the duration in seconds between retries. It is possible (but not recommended) to force this value in the clients:
client = Client(server_url='http://localhost:8888/v1',
auth=credentials,
retry=10,
retry_after=5)
When the server responses are paginated, the client will download every page and merge them transparently.
The get_paginated_records()
method returns a generator that will yield each page:
for page in client.get_paginated_records():
records = page["data"]
It is possible to specify a limit for the number of items to be retrieved in one page:
records = client.get_records(_limit=10)
In order to retrieve every available pages with a limited number of items in each of them, you can specify the number of pages:
records = client.get_records(_limit=10, pages=float('inf')) # Infinity
If the built-in history plugin is enabled, it is possible to retrieve the history of changes:
# Get the complete history of a bucket
changes = client.get_history(bucket='default')
# and optionally use filters
hist = client.get_history(bucket='default', _limit=2, _sort='-last_modified', _since='1533762576015')
hist = client.get_history(bucket='default', resource_name='collection')
The history of a bucket can also be purged with:
client.purge_history(bucket='default')
If the kinto-attachment plugin is enabled, it is possible to fetch, add, or remove attachments on records:
filepath = client.download_attachment(record_obj)
client.add_attachment(id="record-id", filepath="/path/to/image.png")
Or remove them:
client.remove_attachment(id="record-id")
The get_endpoint()
method returns an endpoint URL on the server:
client = Client(server_url='http://localhost:8888/v1',
auth=('token', 'your-token'),
bucket="payments",
collection="receipts")
print(client.get_endpoint("record",
id="c6894b2c-1856-11e6-9415-3c970ede22b0"))
# '/buckets/payments/collections/receipts/records/c6894b2c-1856-11e6-9415-3c970ede22b0'
In addition to the data types supported by JSON, kinto-http.py also supports native Python date and datetime objects.
In case a payload contain a date or a datetime object, kinto-http.py will encode it as an ISO formatted string.
Please note that this transformation is only one-way. While reading a record, if a string contains a ISO formated string, kinto-http.py will not convert it to a native Python date or datetime object.
If you know that a field will be a datetime, you might consider encoding it yourself to be more explicit about it being a string for Kinto.
In order to have common arguments and options for scripts, some utilities are provided to ease configuration and initialization of client from command-line arguments.
import argparse
import logging
from kinto_http import cli_utils
logger = logging.getLogger(__name__)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download records")
cli_utils.set_parser_server_options(parser)
args = parser.parse_args()
cli_utils.setup_logger(logger, args)
logger.debug("Instantiate Kinto client.")
client = cli_utils.create_client_from_args(args)
logger.info("Fetch records.")
records = client.get_records()
logger.warn("{} records.".format(len(records)))
The script now accepts basic options:
$ python example.py --help usage: example.py [-h] [-s SERVER] [-a AUTH] [-b BUCKET] [-c COLLECTION] [-v] [-q] [-D] Download records optional arguments: -h, --help show this help message and exit -s SERVER, --server SERVER The location of the remote server (with prefix) -a AUTH, --auth AUTH BasicAuth credentials: `token:my-secret` or Authorization header: `Bearer token` -b BUCKET, --bucket BUCKET Bucket name. -c COLLECTION, --collection COLLECTION Collection name. --retry RETRY Number of retries when a request fails --retry-after RETRY_AFTER Delay in seconds between retries when requests fail (default: provided by server) -v, --verbose Show all messages. -q, --quiet Show only critical errors. -D, --debug Show all messages, including debug messages.