Skip to content

Demonstrator for the OpenAI Assistants API

The code below is based on Medium – How to Create Your Own GPT Voice Assistant with Infinite Chat Memory by Jordan Gibbs, with a few enhancements to make it a bit more flexible.

It currently relies on assistants being created via https://platform.openai.com, that the user can choose from, yet it’s somewhat prepared for generating assistants from code as well. It’s also possible to select whether text or voice should be used, and if so in what language.

As of now it’s a demonstrator of possibilities. So far, Retrieval and Code Interpreter have been proven to work, but oddly with different results from Playground (that generates better results).

Noted issues

  • Latency is very high. To create a practical IVR solution latency needs to be much lower.
  • If language is not selected for Whisper (speech recognition) it can sometimes misinterpret the language entirely.
  • Whisper sometimes returns text that’s not at all related to what was spoken (and even before anything was spoken). Investigation is ongoing for why.

Webification

Using this as a base for a web application (via e.g. Flask) requires adding multi-user support as well as the ability to record and play audio via a browser, so some JavaScript is needed as well. A database would replace the use of configuration and log file. Content files would be stored in one folder per user. Assistants would be the same for all users. Threads etc would be user-specific to avoid crosstalk.

Prerequisites

Assuming Windows. Similar if not.

  • Set the environment variable OPENAI_API_KEY to your OpenAI API Key
  • Install (copy) ffmpeg to c:\ffmpeg and add c:\ffmpeg\bin to PATH
  • Install Python libraries pynput, markdown, sounddevice, openai, numpy

Modified source

# Based on https://medium.com/@jordan_gibbs/how-to-create-your-own-gpt-voice-assistant-with-infinite-chat-memory-in-python-d8b8e93f6b21
# Documentation:
# https://platform.openai.com/docs/guides/text-to-speech
# https://platform.openai.com/docs/guides/speech-to-text
# https://platform.openai.com/docs/assistants/overview
# https://platform.openai.com/docs/models/whisper
# https://platform.openai.com/docs/models/gpt-4-and-gpt-4-turbo
# https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes
# https://openai.com/research/whisper
# https://github.com/openai/openai-python

import openai
import json
from pynput import keyboard
import wave
import sounddevice as sd
import time
import os
import subprocess
import datetime as dt
import markdown
import codecs

def display_assistants(client):
    assistants = client.beta.assistants.list()
    items = assistants.data
    number = 1
    for item in items:
        print(f"Assistant {number}: {item.name}, {item.id}")
        number += 1
    
    return items

# This function creates a new assistant with the OpenAI Assistant API.
def setup_assistant(client, name, instructions, tools=[], model="gpt-4-1106-preview"):
    assistant = client.beta.assistants.create(name=name, instructions=instructions, tools=tools, model=model)
    return assistant.id

# Create a thread.
def setup_thread(client):
    thread = client.beta.threads.create()
    return thread.id    

# This function sends your voice message into the thread object, which then gets passed to the AI.
def send_message(client, thread_id, task):
    thread_message = client.beta.threads.messages.create(
        thread_id,
        role="user",
        content=task,
    )
    return thread_message

# Runs the assistant with the given thread and assistant IDs.
def run_assistant(client, assistant_id, thread_id):
    run = client.beta.threads.runs.create(
        thread_id=thread_id,
        assistant_id=assistant_id
    )

    while run.status == "in_progress" or run.status == "queued":
        time.sleep(1)
        run = client.beta.threads.runs.retrieve(
            thread_id=thread_id,
            run_id=run.id
        )

        if run.status == "completed":
            return client.beta.threads.messages.list(thread_id=thread_id)

# This function saves your session data locally, so you can easily retrieve it from the JSON file at any time.
def save_session(assistant_id, thread_id, user_name_input, assistant_voice, file_path='chat_sessions.json'):
    # Read possible existing file
    if os.path.exists(file_path):
        with open(file_path, 'r') as file:
            data = json.load(file)
    else:
        data = {"sessions": {}}

    # Find the next session number
    next_session_number = str(len(data["sessions"]) + 1)

    # Add the new session
    data["sessions"][next_session_number] = {
        "Assistant ID": assistant_id,
        "Thread ID": thread_id,
        "User Name Input": user_name_input,
        "Assistant Voice": assistant_voice
    }

    # Save data back to file
    with open(file_path, 'w') as file:
        json.dump(data, file, indent=4)

# This function shows your available sessions when you request it.
def display_sessions(file_path='chat_sessions.json'):
    if not os.path.exists(file_path):
        print("No sessions available.")
        return

    with open(file_path, 'r') as file:
        data = json.load(file)

    print("Available Sessions:")
    for number, session in data["sessions"].items():
        print(f"Session {number}: {session['User Name Input']}")

# This function retrieves the session that you choose.
def get_session_data(session_number, file_path='chat_sessions.json'):
    with open(file_path, 'r') as file:
        data = json.load(file)

    session = data["sessions"].get(session_number)
    if session:
        return session["Assistant ID"], session["Thread ID"], session["User Name Input"], session["Assistant Voice"]
    else:
        print("Session not found.")
        return None, None, None, None

# This function downloads and writes your entire chat history to a text file, so you can keep your own records.
def collect_message_history(client, thread_id, user_name_input):
    messages = client.beta.threads.messages.list(thread_id)
    message_dict = json.loads(messages.model_dump_json())

    file_name = user_name_input.replace(" ", "_") + "_message_log.txt"
    with codecs.open(file_name, 'w', 'utf-8') as message_log:
        for message in reversed(message_dict['data']):
            # Extracting the text value from the message
            text_value = message['content'][0]['text']['value']

            # Adding a prefix to distinguish between user and assistant messages
            if message['role'] == 'assistant':
                prefix = f"{user_name_input}: "
            else:  # Assuming any other role is the user
                prefix = "You: "

            # Writing the prefixed message to the log
            output = prefix + text_value
            message_log.write(output + '\n')

    return f"Messages saved to {file_name}"

# This function uses OpenAI's whisper voice to text model to convert your voice input to text.
def whisper(client, language):
    record_audio()
    audio_file = open("user_response.wav", "rb")
    transcript = client.audio.transcriptions.create(model="whisper-1", file=audio_file, language=language)
    return transcript.text

# This function allows you to record your voice with a press of a button, right now set to 'page down'. You could
# also bypass the keyboard input logic to consistently talk to the AI without pressing a button.
# TODO Automatic start and stop of speech recognition
def record_audio(duration=None):
    CHUNK = 1024
    FORMAT = 'int16'
    CHANNELS = 1
    RATE = 10000
    WAVE_OUTPUT_FILENAME = "user_response.wav"

    frames = []
    stream = None
    is_recording = False
    recording_stopped = False

    def record_audio():
        nonlocal frames, stream
        frames = []

        stream = sd.InputStream(
            samplerate=RATE,
            channels=CHANNELS,
            dtype=FORMAT,
            blocksize=CHUNK,
            callback=callback
        )

        stream.start()

    def callback(indata, frame_count, time, status):
        nonlocal stream
        if is_recording:
            frames.append(indata.copy())

    def stop_recording():
        nonlocal frames, stream, recording_stopped

        stream.stop()
        stream.close()

        wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(2)
        wf.setframerate(RATE)
        wf.writeframes(b''.join(frames))
        wf.close()
        recording_stopped = True

    def on_key(key):
        nonlocal is_recording

        if key == keyboard.Key.page_down:
            if not is_recording:
                record_audio()
                is_recording = True
            else:
                stop_recording()
                is_recording = False

    listener = keyboard.Listener(on_press=on_key)
    listener.start()

    start_time = time.time()
    while listener.running:
        if recording_stopped:
            listener.stop()
        elif duration and (time.time() - start_time) > duration:
            listener.stop()
        time.sleep(0.01)

# This function takes the AI's text output and your voice selection and converts it into audio played by ffplay.
def voice_stream(client, input_text, voice):
    response = client.audio.speech.create(
        model="tts-1-hd",
        voice=voice,
        input=input_text
    )

    # Ensure the ffplay command is set up to read from stdin
    ffplay_cmd = ['ffplay', '-nodisp', '-autoexit', '-']
    ffplay_proc = subprocess.Popen(ffplay_cmd, stdin=subprocess.PIPE, stdout=open(os.devnull, 'wb'), stderr=subprocess.STDOUT)
    binary_content = response.content

    # Stream the audio to ffplay
    try:
        ffplay_proc.stdin.write(binary_content)
        ffplay_proc.stdin.flush()  # Ensure the audio is sent to ffplay
    except BrokenPipeError:
        # Handle the case where ffplay closes the pipe
        pass
    finally:
        ffplay_proc.stdin.close()
        ffplay_proc.wait()  # Wait for ffplay to finish playing the audio

# Create canned assistants.
# TODO Add essential ones evalualating different aspects of the Assistants API
# TODO Support uploading of files for Retrieval
def setup_assistants(client, voice):
    model = "gpt-4-1106-preview"
    assistants = [
        {
            "name": "Friend",
            "instructions": "You are a friend. Your name is Amanda. You are having a vocal conversation with a user. You will never output any markdown or formatted text of any kind, and you will speak in a concise, highly conversational manner. You will adopt any persona that the user may ask of you.",
            "tools": [],
            "model": model
        },
        {
            "name": "Math Tutor",
            "instructions": "Act as a math tutor correctly calculating any formula that the user provides. Ask the user for clarifications if not clear enough. If needed explain how you came to the conclusion given.",
            "tools": [{"type": "code_interpreter"}],
            "model": model
        },        
        {
            "name": "Storyteller",
            "instructions": "Act as a storyteller inventing exciting and slightly scary stories directed to small children, where the moral is to be friendly to and tolerant of others. Don't use complicated language that would be hard to understand for children.",
            "tools": [],
            "model": model
        }
    ]

    for item in assistants:
        name = item["name"]
        instructions = item["instructions"]
        tools = item["tools"]
        model = item["model"]
        assistant_id = setup_assistant(client, name=name, instructions=instructions, tools=tools, model=model)
        thread_id = setup_thread(client)
        print(f"Created Assistant {name}")        
        save_session(assistant_id, thread_id, name, voice)

# Ask the user about voice to use for TTS.
def input_voice(client):
    print(
        """Voices:
1: Alloy - Androgynous, Neutral
2: Echo - Male, Neutral
3: Fable - Male, British Accent
4: Onyx - Male, Deep
5: Nova - Female, Neutral
6: Shimmer - Female, Deep"""
    )
    voice_index = int(input("Enter the number for the voice you want: ")) - 1
    voice_names = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
    return voice_names[voice_index]

def main_loop():
    client = openai.OpenAI()

    user_choice = input("Type 'n' to create a new session. Press Enter to choose an existing session.")

    if user_choice == 'a':
        voice = input_voice(client)
        setup_assistants(client, voice)

    if user_choice == 'n':
        user_name_input = input("Please type a name for this chat session: ")
        assistants = display_assistants(client)
        assistant_index = int(input("Enter the number for the assistant to use: ")) - 1
        assistant_id = assistants[assistant_index].id
        thread_id = setup_thread(client)
        voice = input_voice(client)
        save_session(assistant_id, thread_id, user_name_input, voice)
    else:
        display_sessions()
        chosen_session_number = input("Enter the number for the session to use: ")
        assistant_id, thread_id, user_name_input, voice = get_session_data(chosen_session_number)

    print(f"Session with name {user_name_input}, assistant ID {assistant_id}, thread ID {thread_id}, voice {voice}")        
        
    if assistant_id and thread_id:
        # Voice
        voice_control = input("Type 'v' for voice control or press Enter for manual input: ").lower() == 'v'
        if voice_control:
            language = input("Type an ISO 639-1 language code or press Enter for English: ").lower()
            if language == "":
                language = "en"            

        first_iteration = True
        while True:
            if voice_control:
                print("Press Page Down to start/stop recording your voice message.")
                user_message = whisper(client, language)
                print(f"You: {user_message}")                
            else:
                user_message = input("Enter your prompt: ")

            if user_message.lower() in {'x', 'exit', 'exit.', 'q', 'quit'}:
                print("Exiting the program.")
                print(collect_message_history(client, thread_id, user_name_input))
                break            

            if first_iteration:
                current_time = dt.datetime.now().strftime("%Y-%m-%d %H:%M")
                user_message = f"It is now {current_time}. {user_message}"            
                first_iteration = False

            send_message(client, thread_id, user_message)
            messages = run_assistant(client, assistant_id, thread_id)
            message_dict = json.loads(messages.model_dump_json())
            most_recent_message = message_dict['data'][0]
            assistant_message = most_recent_message['content'][0]['text']['value']

            print(f"{user_name_input}:\n\n{assistant_message}")
            #print(markdown.markdown(f"{user_name_input}:\n\n{assistant_message}"))

            if voice_control:
                voice_stream(client, assistant_message, voice)

if __name__ == "__main__":
    main_loop()