Log tasks to phospho

Tasks are the basic bricks that make up your LLM apps. If you’re a programmer, you can think of tasks like functions.

A task is made of at least two things:

  • input (str): What goes into a task. Eg: what the user asks to the assistant.
  • output (Optional[str]): What goes out of the task. Eg: what the assistant replied to the user.

Example of tasks you can log to phospho:

  • Call to an LLM (input = query, output = llm response)
  • Answering a question (input = question, output = answer)
  • Searching in documents (input = search query, output = document)
  • Summarizing a text (input = text, output = summary)
  • Performing inference of a model (input = X, output = y)

Install phospho module

The phospho Python module in the easiest way to log to phospho. It is compatible with Python 3.9+.

pip install --upgrade phospho

The phospho module is an open source work in progress. Your help is deeply appreciated!

Initialize phospho

In your app, initialize the phospho module. By default, phospho will look for PHOSPHO_API_KEY and PHOSPHO_PROJECT_ID environment variables.

Learn how to get your api key and project id by clicking here!

import phospho

phospho.init()

You can also pass the api_key and project_id parameters to phospho.init.

phospho.init(api_key="phospho-key", project_id="phospho-project-id")

Log with phospho.log

The most minimal way to log a task is to use phospho.log.

Logging text inputs and outputs

input_text = "Hello! This is what the user asked to the system"
output_text = "This is the response showed to the user by the app."

# This is how you log a task to phospho
phospho.log(input=input_text, output=output_text)

Note that the output is optional. If you don’t pass an output, phospho will log None.

Logging OpenAI queries and responses

phospho aims to be battery included. So if you pass something else than a str to phospho.log, phospho extracts what’s usually considered “the input” or “the output”.

For example, if you use the OpenAI API:

import openai
import phospho

phospho.init()
openai_client = openai.OpenAI(api_key="openai-key")

input_prompt = "Explain quantum computers in less than 20 words."

# This is your LLM app code
query = {
    "messages": [{"role": "system", "content": "You are a helpful assistant."},
                 {"role": "user", "content": input_prompt},
    ],
    "model": "gpt-3.5-turbo",
}
response = openai_client.chat.completions.create(**query)

# You can directly pass as dict or a ChatCompletion as input and output
log = phospho.log(input=query, output=response)
print("input:", log["input"])
print("output:", log["output"])

Result:

input: Explain quantum computers in less than 20 words.
output: Qubits harness quantum physics for faster, more powerful computation.

Custom extractors

Pass custom extractors to phospho.log to extract the input and output from any object. The original object will be converted to a dict (if jsonable) or a string and stored in raw_input and raw_output.

phospho.log(
    input={"custom_input": "this is a complex object"},
    output={"custom_output": "which is not a string nor a standard object"},
    # Custom extractors return a string
    input_to_str_function=lambda x: x["custom_input"],
    output_to_str_fucntion=lambda x: x["custom_output"],
)

Logging additional metadata

You can log additional data with each interaction (user id, version id,…) by passing arguments to phospho.log.

log = phospho.log(
    input="log this",
    output="and that",
    # There is a metadata field
    metadata={"always": "moooore"},
    # Every extra keyword argument is logged as metadata
    log_anything_and_everything="even this is ok",
)

Wrapping functions’ arguments and values

Wrap any function with phospho.wrap to automatically log an interaction when they are called.

  • The passed arguments are logged as input
  • The returned value is logged as output

Use the phospho.wrap decorator

@phospho.wrap
def answer(messages: List[Dict[str, str]]) -> Optional[str]:
    response = openai_client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages,
    )
    return response.choices[0].delta.content

Like phospho.log, every extra keyword argument is logged as metadata.

How to log metadata with phospho.wrap?

@phospho.wrap(metadata={"more": "details"})
def answer(messages: List[Dict[str, str]]) -> Optional[str]:
    response = openai_client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages,
    )
    return response.choices[0].delta.content

Wrap an imported function with phospho.wrap

If you can’t change the function definition, you can wrap it this way.

# You can wrap any function call in phospho.wrap
response = phospho.wrap(
    openai_client.chat.completions.create,
    # Pass additional metadata
    metadata={"more": "details"},
)(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain quantum computers in less than 20 words."},
    ],
    model="gpt-3.5-turbo",
)

If you want to wrap all calls to a function, you can override the function definition.

openai_client.chat.completions.create = phospho.wrap(
    openai_client.chat.completions.create
)

Streaming

phospho supports streamed outputs. This is useful when you want to log the output of a streaming API.

Example with phospho.log

Pass stream=True to phospho.log to handle streaming responses. When iterating over the response, phospho will automatically log each chunk until the iteration is completed.

For example, you can pass streaming OpenAI responses to phospho.log the following way:


from openai.types.chat import ChatCompletionChunk
from openai._streaming import Stream

query = {
    "messages": [{"role": "system", "content": "You are a helpful assistant."},
                 {"role": "user", "content": "Explain quantum computers in less than 20 words."},
    ],
    "model": "gpt-3.5-turbo",
    # Enable streaming on OpenAI
    "stream": True
}
# OpenAI completion function return a Stream of chunks
response: Stream[ChatCompletionChunk] = openai_client.chat.completions.create(**query)

# Pass stream=True to phospho.log to handle this
phospho.log(input=query, output=response, stream=True)

Example with phospho.wrap

  • Pass stream=True. This tells phospho to concatenate the string outputs.
  • Pass a stop function, such that stop(output) is True when the streaming is finished and trigger the logging of the task.
@phospho.wrap(stream=True, stop=lambda token: token is None)
def answer(messages: List[Dict[str, str]]) -> Generator[Optional[str], Any, None]:
    streaming_response: Stream[
        ChatCompletionChunk
    ] = openai_client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages,
        stream=True,
    )
    for response in streaming_response:
        yield response.choices[0].delta.content

Example with a generator

For generators, you need to wrap it with phospho.MutableGenerator or phospho.MutableAsyncGenerator.

# Example with an Ollama endpoint

r = requests.post(
    "http://localhost:11434/api/generate",
    json={
        "model": ...,
        "prompt": ...,
        "context": ...,
    },
    # This connects to a streaming API endpoint
    stream=True,
)
r.raise_for_status()
response_iterator = r.iter_lines()

# In order to directly log this to phospho, we need to wrap it this way
response_iterator = phospho.MutableGenerator(
    generator=response_iterator,
    # Indicate when the streaming stops
    stop=lambda line: json.loads(line).get("done", False),
)

# Log the generated content to phospho with Stream=True
phospho.log(input=prompt, output=response_iterator, stream=True)

# As you iterate over the response, phospho combines the chunks
# When stop(output) is True, the iteration is completed and the task is logged
for line in response_iterator:
    print(line)