Skip to content

Latest commit

 

History

History
182 lines (141 loc) · 6.38 KB

README.md

File metadata and controls

182 lines (141 loc) · 6.38 KB

Tinybird Python SDK

SDK around Tinybird APIs.

If you want to manage Workspaces, Data Sources and Pipes you might be looking for the tinybird-cli.

The SDK is meant to programatically ingest NDJSON data or send any request to an API instance.

It contains handlers for:

  • logging events to a Tinybird Data Source from your Python module.
  • logging events from litellm to a Tinybird Data Source.

Ingest to a Tinybird DataSource

from tb.datasource import Datasource

with Datasource(datasource_name, tinybird_token) as ds:
    ds << {'key': 'value', 'key1': 'value1'}

You can also use the async version:

from tb.a.datasource import AsyncDatasource

async with AsyncDatasource(datasource_name, tinybird_token, api_url='https://api.us-east.tinybird.co') as ds:
    await ds << {'key': 'value', 'key1': 'value1'}

Notes:

  • The Datasource object does some in-memory buffering and uses the events API.
  • It only supports ndjson data
  • It automatically handles Rate Limits

Ingest using an API instance

from tb.a.api import AsyncAPI

async with AsyncAPI(tinybird_token, api_url) as api:
    await api.post('datasources',
        params={
            'name': 'datasource_name',
            'mode': 'append',
            'format': 'ndjson',
            'url': 'https://storage.googleapis.com/davidm-wadus/events.ndjson',
        }
    )
  • It automatically handles Rate Limits
  • Works with any Tinybird API
  • The post, get, send methods signatures are equivalent to the requests library.

Logging from your Python module to a Tinybird Data Source

import logging
from tb.logger import TinybirdLoggingHandler
from dotenv import load_dotenv

load_dotenv()
TB_API_URL = os.getenv("TINYBIRD_API_URL")
TB_WRITE_TOKEN = os.getenv("TINYBIRD_WRITE_TOKEN")

logger = logging.getLogger('your-logger-name')
handler = TinybirdLoggingHandler(TB_API_URL, TB_WRITE_TOKEN, 'your-app-name')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

Each time you call the logger an event to the tb_logs DataSource in your Workspace is sent.

To configure the DataSource name initialize the TinybirdLogginHandler like this:

handler = TinybirdLoggingHandler(TB_API_URL, TB_WRITE_TOKEN, 'your-app-name', ds_name="your_tb_ds_name")

Non-blocking logging

If you want to avoid blocking the main thread you can use a queue to send the logs to a different thread.

import logging
from multiprocessing import Queue
from tb.logger import TinybirdLoggingQueueHandler
from dotenv import load_dotenv

load_dotenv()
TB_API_URL = os.getenv("TINYBIRD_API_URL")
TB_WRITE_TOKEN = os.getenv("TINYBIRD_WRITE_TOKEN")

logger = logging.getLogger('your-logger-name')
handler = TinybirdLoggingQueueHandler(Queue(-1), TB_API_URL, TB_WRITE_TOKEN, 'your-app-name', ds_name="your_tb_ds_name")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

Logging from Litellm to a Tinybird Data Source

Install the ai extra:

pip install tinybird-python-sdk[ai]

Then use the following handler:

import litellm
from litellm import acompletion
from tb.litellm.handler import TinybirdLitellmAsyncHandler

customHandler = TinybirdLitellmAsyncHandler(
    api_url="https://api.us-east.aws.tinybird.co", 
    tinybird_token=os.getenv("TINYBIRD_TOKEN"), 
    datasource_name="litellm"
)

litellm.callbacks = [customHandler]

response = await acompletion(
    model="gpt-3.5-turbo", 
    messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}],
    stream=True,
    metadata={
        "organization": "tinybird",
        "environment": "dev",
        "project": "litellm_test",
        "chat_id": "1234567890",
    },
)

Track custom metadata using the metadata dictionary.

This is the schema for the litellm data source:

SCHEMA >
    `model` LowCardinality(String) `json:$.model` DEFAULT 'unknown',
    `messages` Array(Map(String, String)) `json:$.messages[:]` DEFAULT [],
    `user` String `json:$.user` DEFAULT 'unknown',
    `start_time` DateTime `json:$.start_time` DEFAULT now(),
    `end_time` DateTime `json:$.end_time` DEFAULT now(),
    `id` String `json:$.id` DEFAULT '',
    `stream` Boolean `json:$.stream` DEFAULT false,
    `call_type` LowCardinality(String) `json:$.call_type` DEFAULT 'unknown',
    `provider` LowCardinality(String) `json:$.provider` DEFAULT 'unknown',
    `api_key` String `json:$.api_key` DEFAULT '',
    `log_event_type` LowCardinality(String) `json:$.log_event_type` DEFAULT 'unknown',
    `llm_api_duration_ms` Float32 `json:$.llm_api_duration_ms` DEFAULT 0,
    `cache_hit` Boolean `json:$.cache_hit` DEFAULT false,
    `response_status` LowCardinality(String) `json:$.standard_logging_object_status` DEFAULT 'unknown',
    `response_time` Float32 `json:$.standard_logging_object_response_time` DEFAULT 0,
    `proxy_metadata` String `json:$.proxy_metadata` DEFAULT '',
    `organization` String `json:$.proxy_metadata.organization` DEFAULT '',
    `environment` String `json:$.proxy_metadata.environment` DEFAULT '',
    `project` String `json:$.proxy_metadata.project` DEFAULT '',
    `chat_id` String `json:$.proxy_metadata.chat_id` DEFAULT '',
    `response` String `json:$.response` DEFAULT '',
    `response_id` String `json:$.response.id`,
    `response_object` String `json:$.response.object` DEFAULT 'unknown',
    `response_choices` Array(String) `json:$.response.choices[:]` DEFAULT [],
    `completion_tokens` UInt16 `json:$.response.usage.completion_tokens` DEFAULT 0,
    `prompt_tokens` UInt16 `json:$.response.usage.prompt_tokens` DEFAULT 0,
    `total_tokens` UInt16 `json:$.response.usage.total_tokens` DEFAULT 0,
    `cost` Float32 `json:$.cost` DEFAULT 0,
    `exception` String `json:$.exception` DEFAULT '',
    `traceback` String `json:$.traceback` DEFAULT '',
    `duration` Float32 `json:$.duration` DEFAULT 0


ENGINE MergeTree
ENGINE_SORTING_KEY start_time, organization, project, model
ENGINE_PARTITION_KEY toYYYYMM(start_time)