The code below is based on Medium – How to Create Your Own GPT Voice Assistant with Infinite Chat Memory by Jordan Gibbs, with a few enhancements to make it a bit more flexible.
It currently relies on assistants being created via https://platform.openai.com, that the user can choose from, yet it’s somewhat prepared for generating assistants from code as well. It’s also possible to select whether text or voice should be used, and if so in what language.
As of now it’s a demonstrator of possibilities. So far, Retrieval and Code Interpreter have been proven to work, but oddly with different results from Playground (that generates better results).
Noted issues
- Latency is very high. To create a practical IVR solution latency needs to be much lower.
- If language is not selected for Whisper (speech recognition) it can sometimes misinterpret the language entirely.
- Whisper sometimes returns text that’s not at all related to what was spoken (and even before anything was spoken). Investigation is ongoing for why.
Webification
Using this as a base for a web application (via e.g. Flask) requires adding multi-user support as well as the ability to record and play audio via a browser, so some JavaScript is needed as well. A database would replace the use of configuration and log file. Content files would be stored in one folder per user. Assistants would be the same for all users. Threads etc would be user-specific to avoid crosstalk.
Prerequisites
Assuming Windows. Similar if not.
- Set the environment variable OPENAI_API_KEY to your OpenAI API Key
- Install (copy) ffmpeg to c:\ffmpeg and add c:\ffmpeg\bin to PATH
- Install Python libraries pynput, markdown, sounddevice, openai, numpy
Modified source
# Based on https://medium.com/@jordan_gibbs/how-to-create-your-own-gpt-voice-assistant-with-infinite-chat-memory-in-python-d8b8e93f6b21 # Documentation: # https://platform.openai.com/docs/guides/text-to-speech # https://platform.openai.com/docs/guides/speech-to-text # https://platform.openai.com/docs/assistants/overview # https://platform.openai.com/docs/models/whisper # https://platform.openai.com/docs/models/gpt-4-and-gpt-4-turbo # https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes # https://openai.com/research/whisper # https://github.com/openai/openai-python import openai import json from pynput import keyboard import wave import sounddevice as sd import time import os import subprocess import datetime as dt import markdown import codecs def display_assistants(client): assistants = client.beta.assistants.list() items = assistants.data number = 1 for item in items: print(f"Assistant {number}: {item.name}, {item.id}") number += 1 return items # This function creates a new assistant with the OpenAI Assistant API. def setup_assistant(client, name, instructions, tools=[], model="gpt-4-1106-preview"): assistant = client.beta.assistants.create(name=name, instructions=instructions, tools=tools, model=model) return assistant.id # Create a thread. def setup_thread(client): thread = client.beta.threads.create() return thread.id # This function sends your voice message into the thread object, which then gets passed to the AI. def send_message(client, thread_id, task): thread_message = client.beta.threads.messages.create( thread_id, role="user", content=task, ) return thread_message # Runs the assistant with the given thread and assistant IDs. def run_assistant(client, assistant_id, thread_id): run = client.beta.threads.runs.create( thread_id=thread_id, assistant_id=assistant_id ) while run.status == "in_progress" or run.status == "queued": time.sleep(1) run = client.beta.threads.runs.retrieve( thread_id=thread_id, run_id=run.id ) if run.status == "completed": return client.beta.threads.messages.list(thread_id=thread_id) # This function saves your session data locally, so you can easily retrieve it from the JSON file at any time. def save_session(assistant_id, thread_id, user_name_input, assistant_voice, file_path='chat_sessions.json'): # Read possible existing file if os.path.exists(file_path): with open(file_path, 'r') as file: data = json.load(file) else: data = {"sessions": {}} # Find the next session number next_session_number = str(len(data["sessions"]) + 1) # Add the new session data["sessions"][next_session_number] = { "Assistant ID": assistant_id, "Thread ID": thread_id, "User Name Input": user_name_input, "Assistant Voice": assistant_voice } # Save data back to file with open(file_path, 'w') as file: json.dump(data, file, indent=4) # This function shows your available sessions when you request it. def display_sessions(file_path='chat_sessions.json'): if not os.path.exists(file_path): print("No sessions available.") return with open(file_path, 'r') as file: data = json.load(file) print("Available Sessions:") for number, session in data["sessions"].items(): print(f"Session {number}: {session['User Name Input']}") # This function retrieves the session that you choose. def get_session_data(session_number, file_path='chat_sessions.json'): with open(file_path, 'r') as file: data = json.load(file) session = data["sessions"].get(session_number) if session: return session["Assistant ID"], session["Thread ID"], session["User Name Input"], session["Assistant Voice"] else: print("Session not found.") return None, None, None, None # This function downloads and writes your entire chat history to a text file, so you can keep your own records. def collect_message_history(client, thread_id, user_name_input): messages = client.beta.threads.messages.list(thread_id) message_dict = json.loads(messages.model_dump_json()) file_name = user_name_input.replace(" ", "_") + "_message_log.txt" with codecs.open(file_name, 'w', 'utf-8') as message_log: for message in reversed(message_dict['data']): # Extracting the text value from the message text_value = message['content'][0]['text']['value'] # Adding a prefix to distinguish between user and assistant messages if message['role'] == 'assistant': prefix = f"{user_name_input}: " else: # Assuming any other role is the user prefix = "You: " # Writing the prefixed message to the log output = prefix + text_value message_log.write(output + '\n') return f"Messages saved to {file_name}" # This function uses OpenAI's whisper voice to text model to convert your voice input to text. def whisper(client, language): record_audio() audio_file = open("user_response.wav", "rb") transcript = client.audio.transcriptions.create(model="whisper-1", file=audio_file, language=language) return transcript.text # This function allows you to record your voice with a press of a button, right now set to 'page down'. You could # also bypass the keyboard input logic to consistently talk to the AI without pressing a button. # TODO Automatic start and stop of speech recognition def record_audio(duration=None): CHUNK = 1024 FORMAT = 'int16' CHANNELS = 1 RATE = 10000 WAVE_OUTPUT_FILENAME = "user_response.wav" frames = [] stream = None is_recording = False recording_stopped = False def record_audio(): nonlocal frames, stream frames = [] stream = sd.InputStream( samplerate=RATE, channels=CHANNELS, dtype=FORMAT, blocksize=CHUNK, callback=callback ) stream.start() def callback(indata, frame_count, time, status): nonlocal stream if is_recording: frames.append(indata.copy()) def stop_recording(): nonlocal frames, stream, recording_stopped stream.stop() stream.close() wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(2) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() recording_stopped = True def on_key(key): nonlocal is_recording if key == keyboard.Key.page_down: if not is_recording: record_audio() is_recording = True else: stop_recording() is_recording = False listener = keyboard.Listener(on_press=on_key) listener.start() start_time = time.time() while listener.running: if recording_stopped: listener.stop() elif duration and (time.time() - start_time) > duration: listener.stop() time.sleep(0.01) # This function takes the AI's text output and your voice selection and converts it into audio played by ffplay. def voice_stream(client, input_text, voice): response = client.audio.speech.create( model="tts-1-hd", voice=voice, input=input_text ) # Ensure the ffplay command is set up to read from stdin ffplay_cmd = ['ffplay', '-nodisp', '-autoexit', '-'] ffplay_proc = subprocess.Popen(ffplay_cmd, stdin=subprocess.PIPE, stdout=open(os.devnull, 'wb'), stderr=subprocess.STDOUT) binary_content = response.content # Stream the audio to ffplay try: ffplay_proc.stdin.write(binary_content) ffplay_proc.stdin.flush() # Ensure the audio is sent to ffplay except BrokenPipeError: # Handle the case where ffplay closes the pipe pass finally: ffplay_proc.stdin.close() ffplay_proc.wait() # Wait for ffplay to finish playing the audio # Create canned assistants. # TODO Add essential ones evalualating different aspects of the Assistants API # TODO Support uploading of files for Retrieval def setup_assistants(client, voice): model = "gpt-4-1106-preview" assistants = [ { "name": "Friend", "instructions": "You are a friend. Your name is Amanda. You are having a vocal conversation with a user. You will never output any markdown or formatted text of any kind, and you will speak in a concise, highly conversational manner. You will adopt any persona that the user may ask of you.", "tools": [], "model": model }, { "name": "Math Tutor", "instructions": "Act as a math tutor correctly calculating any formula that the user provides. Ask the user for clarifications if not clear enough. If needed explain how you came to the conclusion given.", "tools": [{"type": "code_interpreter"}], "model": model }, { "name": "Storyteller", "instructions": "Act as a storyteller inventing exciting and slightly scary stories directed to small children, where the moral is to be friendly to and tolerant of others. Don't use complicated language that would be hard to understand for children.", "tools": [], "model": model } ] for item in assistants: name = item["name"] instructions = item["instructions"] tools = item["tools"] model = item["model"] assistant_id = setup_assistant(client, name=name, instructions=instructions, tools=tools, model=model) thread_id = setup_thread(client) print(f"Created Assistant {name}") save_session(assistant_id, thread_id, name, voice) # Ask the user about voice to use for TTS. def input_voice(client): print( """Voices: 1: Alloy - Androgynous, Neutral 2: Echo - Male, Neutral 3: Fable - Male, British Accent 4: Onyx - Male, Deep 5: Nova - Female, Neutral 6: Shimmer - Female, Deep""" ) voice_index = int(input("Enter the number for the voice you want: ")) - 1 voice_names = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"] return voice_names[voice_index] def main_loop(): client = openai.OpenAI() user_choice = input("Type 'n' to create a new session. Press Enter to choose an existing session.") if user_choice == 'a': voice = input_voice(client) setup_assistants(client, voice) if user_choice == 'n': user_name_input = input("Please type a name for this chat session: ") assistants = display_assistants(client) assistant_index = int(input("Enter the number for the assistant to use: ")) - 1 assistant_id = assistants[assistant_index].id thread_id = setup_thread(client) voice = input_voice(client) save_session(assistant_id, thread_id, user_name_input, voice) else: display_sessions() chosen_session_number = input("Enter the number for the session to use: ") assistant_id, thread_id, user_name_input, voice = get_session_data(chosen_session_number) print(f"Session with name {user_name_input}, assistant ID {assistant_id}, thread ID {thread_id}, voice {voice}") if assistant_id and thread_id: # Voice voice_control = input("Type 'v' for voice control or press Enter for manual input: ").lower() == 'v' if voice_control: language = input("Type an ISO 639-1 language code or press Enter for English: ").lower() if language == "": language = "en" first_iteration = True while True: if voice_control: print("Press Page Down to start/stop recording your voice message.") user_message = whisper(client, language) print(f"You: {user_message}") else: user_message = input("Enter your prompt: ") if user_message.lower() in {'x', 'exit', 'exit.', 'q', 'quit'}: print("Exiting the program.") print(collect_message_history(client, thread_id, user_name_input)) break if first_iteration: current_time = dt.datetime.now().strftime("%Y-%m-%d %H:%M") user_message = f"It is now {current_time}. {user_message}" first_iteration = False send_message(client, thread_id, user_message) messages = run_assistant(client, assistant_id, thread_id) message_dict = json.loads(messages.model_dump_json()) most_recent_message = message_dict['data'][0] assistant_message = most_recent_message['content'][0]['text']['value'] print(f"{user_name_input}:\n\n{assistant_message}") #print(markdown.markdown(f"{user_name_input}:\n\n{assistant_message}")) if voice_control: voice_stream(client, assistant_message, voice) if __name__ == "__main__": main_loop()