Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloudflare AI: allow to use multiple models #338

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 61 additions & 25 deletions examples/pipelines/providers/anthropic_manifold_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""
title: Anthropic Manifold Pipeline
author: justinh-rahb, sriparashiva
date: 2024-06-20
version: 1.4
date: 2024-12-23
version: 1.5
license: MIT
description: A pipeline for generating text and processing images using the Anthropic API.
requirements: requests, sseclient-py
Expand Down Expand Up @@ -31,27 +31,49 @@ def __init__(self):
self.valves = self.Valves(
**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "your-api-key-here")}
)
self.url = 'https://api.anthropic.com/v1/messages'
self.url = "https://api.anthropic.com/v1/messages"
self.pipelines = []
self.update_headers()
self.get_anthropic_models()

def update_headers(self):
self.headers = {
'anthropic-version': '2023-06-01',
'content-type': 'application/json',
'x-api-key': self.valves.ANTHROPIC_API_KEY
"anthropic-version": "2023-06-01",
"content-type": "application/json",
"x-api-key": self.valves.ANTHROPIC_API_KEY,
}

def get_anthropic_models(self):
return [
{"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"},
{"id": "claude-3-opus-20240229", "name": "claude-3-opus"},
{"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"},
{"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"},
{"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"},
]
if self.valves.ANTHROPIC_API_KEY:
try:
list_models = requests.get(
"https://api.anthropic.com/v1/models",
headers=self.headers,
).json()

models = list_models["data"]
self.pipelines = [
{
"id": model["id"],
"name": model["display_name"],
}
for model in models
]
except Exception as e:
print(f"Error: {e}")
self.pipelines = [
{
"id": self.id,
"name": "Could not fetch models from Anthropic, please update the API Key in the valves.",
},
]
else:
self.pipelines = []

async def on_startup(self):
print(f"on_startup:{__name__}")
self.update_headers()
self.get_anthropic_models()
pass

async def on_shutdown(self):
Expand All @@ -60,9 +82,7 @@ async def on_shutdown(self):

async def on_valves_updated(self):
self.update_headers()

def pipelines(self) -> List[dict]:
return self.get_anthropic_models()
self.get_anthropic_models()

def process_image(self, image_data):
if image_data["url"].startswith("data:image"):
Expand All @@ -87,7 +107,7 @@ def pipe(
) -> Union[str, Generator, Iterator]:
try:
# Remove unnecessary keys
for key in ['user', 'chat_id', 'title']:
for key in ["user", "chat_id", "title"]:
body.pop(key, None)

system_message, messages = pop_system_message(messages)
Expand All @@ -101,28 +121,40 @@ def pipe(
if isinstance(message.get("content"), list):
for item in message["content"]:
if item["type"] == "text":
processed_content.append({"type": "text", "text": item["text"]})
processed_content.append(
{"type": "text", "text": item["text"]}
)
elif item["type"] == "image_url":
if image_count >= 5:
raise ValueError("Maximum of 5 images per API call exceeded")
raise ValueError(
"Maximum of 5 images per API call exceeded"
)

processed_image = self.process_image(item["image_url"])
processed_content.append(processed_image)

if processed_image["source"]["type"] == "base64":
image_size = len(processed_image["source"]["data"]) * 3 / 4
image_size = (
len(processed_image["source"]["data"]) * 3 / 4
)
else:
image_size = 0

total_image_size += image_size
if total_image_size > 100 * 1024 * 1024:
raise ValueError("Total size of images exceeds 100 MB limit")
raise ValueError(
"Total size of images exceeds 100 MB limit"
)

image_count += 1
else:
processed_content = [{"type": "text", "text": message.get("content", "")}]
processed_content = [
{"type": "text", "text": message.get("content", "")}
]

processed_messages.append({"role": message["role"], "content": processed_content})
processed_messages.append(
{"role": message["role"], "content": processed_content}
)

# Prepare the payload
payload = {
Expand All @@ -145,7 +177,9 @@ def pipe(
return f"Error: {e}"

def stream_response(self, payload: dict) -> Generator:
response = requests.post(self.url, headers=self.headers, json=payload, stream=True)
response = requests.post(
self.url, headers=self.headers, json=payload, stream=True
)

if response.status_code == 200:
client = sseclient.SSEClient(response)
Expand All @@ -170,6 +204,8 @@ def get_completion(self, payload: dict) -> str:
response = requests.post(self.url, headers=self.headers, json=payload)
if response.status_code == 200:
res = response.json()
return res["content"][0]["text"] if "content" in res and res["content"] else ""
return (
res["content"][0]["text"] if "content" in res and res["content"] else ""
)
else:
raise Exception(f"Error: {response.status_code} - {response.text}")
72 changes: 52 additions & 20 deletions examples/pipelines/providers/cloudflare_ai_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
from pydantic import BaseModel
import os
from typing import Generator, Iterator, List, Union

import requests
from pydantic import BaseModel


class Pipeline:
class Valves(BaseModel):
CLOUDFLARE_ACCOUNT_ID: str = ""
CLOUDFLARE_API_KEY: str = ""
CLOUDFLARE_MODEL: str = ""
pass

def __init__(self):
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "openai_pipeline"
self.name = "Cloudlfare AI"
self.type = "manifold"
self.name = "Cloudflare/"
self.id = "cloudflare"
self.valves = self.Valves(
**{
"CLOUDFLARE_ACCOUNT_ID": os.getenv(
Expand All @@ -28,35 +24,71 @@ def __init__(self):
"CLOUDFLARE_API_KEY": os.getenv(
"CLOUDFLARE_API_KEY", "your-cloudflare-api-key"
),
"CLOUDFLARE_MODEL": os.getenv(
"CLOUDFLARE_MODELS",
"@cf/meta/llama-3.1-8b-instruct",
),
}
)
pass
self.pipelines = []
self.update_headers()
self.get_models()

def update_headers(self):
self.headers = {
"Authorization": f"Bearer {self.valves.CLOUDFLARE_API_KEY}",
"content-type": "application/json",
}

def get_models(self):
if self.valves.CLOUDFLARE_ACCOUNT_ID and self.valves.CLOUDFLARE_API_KEY:
try:
list_models = requests.get(
f"https://api.cloudflare.com/client/v4/accounts/{self.valves.CLOUDFLARE_ACCOUNT_ID}/ai/models/search?task=Text%20Generation",
headers=self.headers,
).json()

models = list_models["result"]
self.pipelines = [
{
"id": model["name"].replace("/", "___"),
"name": model["name"].replace("/", "___").split("___")[-1],
}
for model in models
]
except Exception as e:
print(f"Error: {e}")
self.pipelines = [
{
"id": self.id,
"name": "Could not fetch models from Cloudflare, please update the API Key in the valves.",
},
]
else:
self.pipelines = []

async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
self.update_headers()
self.get_models()
pass

async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
pass

async def on_valves_updated(self):
self.update_headers()
self.get_models()

def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.
print(f"pipe:{__name__}")

headers = {}
headers["Authorization"] = f"Bearer {self.valves.CLOUDFLARE_API_KEY}"
headers["Content-Type"] = "application/json"
# fix model name again, url messed up otherwise
model = model_id.replace("___", "/")

payload = {**body, "model": self.valves.CLOUDFLARE_MODEL}
payload = {**body, "model": model}

if "user" in payload:
del payload["user"]
Expand All @@ -69,7 +101,7 @@ def pipe(
r = requests.post(
url=f"https://api.cloudflare.com/client/v4/accounts/{self.valves.CLOUDFLARE_ACCOUNT_ID}/ai/v1/chat/completions",
json=payload,
headers=headers,
headers=self.headers,
stream=True,
)

Expand Down