import sys import argparse from pathlib import Path from itertools import zip_longest from ..configuration import Config from ..chat import ChatDB from ..message import Message, MessageFilter, MessageError, Question, source_code from ..ai_factory import create_ai from ..ai import AI, AIResponse def add_file_as_text(question_parts: list[str], file: str) -> None: """ Add the given file as plain text to the question part list. If the file is a Message, add the answer. """ file_path = Path(file) content: str try: message = Message.from_file(file_path) if message and message.answer: content = message.answer except MessageError: with open(file) as r: content = r.read().strip() if len(content) > 0: question_parts.append(content) def add_file_as_code(question_parts: list[str], file: str) -> None: """ Add all source code from the given file. If no code segments can be extracted, the whole content is added as source code segment. If the file is a Message, extract the source code from the answer. """ file_path = Path(file) content: str try: message = Message.from_file(file_path) if message and message.answer: content = message.answer except MessageError: with open(file) as r: content = r.read().strip() # extract and add source code code_parts = source_code(content, include_delims=True) if len(code_parts) > 0: question_parts += code_parts else: question_parts.append(f"```\n{content}\n```") def create_message(chat: ChatDB, args: argparse.Namespace) -> Message: """ Creates a new message from the given arguments and writes it to the cache directory. """ question_parts = [] question_list = args.ask if args.ask is not None else [] text_files = args.source_text if args.source_text is not None else [] code_files = args.source_code if args.source_code is not None else [] for question, text_file, code_file in zip_longest(question_list, text_files, code_files, fillvalue=None): if question is not None and len(question.strip()) > 0: question_parts.append(question) if text_file is not None and len(text_file) > 0: add_file_as_text(question_parts, text_file) if code_file is not None and len(code_file) > 0: add_file_as_code(question_parts, code_file) full_question = '\n\n'.join(question_parts) message = Message(question=Question(full_question), tags=args.output_tags, # FIXME ai=args.AI, model=args.model) # only write the message (as a backup), don't add it # to the current chat history chat.cache_write([message]) return message def make_request(ai: AI, chat: ChatDB, message: Message, args: argparse.Namespace) -> None: """ Make an AI request with the give AI, chat history, message and CLI arguments. Print all answers. """ ai.print() chat.print(paged=False) print(message.to_str() + '\n') response: AIResponse = ai.request(message, chat, args.num_answers, args.output_tags) # write all answers to the cache, don't add them to the chat history chat.cache_write(response.messages) for idx, msg in enumerate(response.messages): print(f"=== ANSWER {idx+1} ===") print(msg.answer) if response.tokens: print("===============") print(response.tokens) def question_cmd(args: argparse.Namespace, config: Config) -> None: """ Handler for the 'question' command. """ mfilter = MessageFilter(tags_or=args.or_tags if args.or_tags is not None else set(), tags_and=args.and_tags if args.and_tags is not None else set(), tags_not=args.exclude_tags if args.exclude_tags is not None else set()) chat = ChatDB.from_dir(cache_path=Path('.'), db_path=Path(config.db), mfilter=mfilter) # if it's a new question, create and store it immediately if args.ask or args.create: message = create_message(chat, args) if args.create: return # create the correct AI instance ai: AI = create_ai(args, config) # === ASK === if args.ask: make_request(ai, chat, message, args) # === REPEAT === elif args.repeat is not None: lmessage = chat.msg_latest(source='cache') if lmessage is None: print("No message found to repeat!") sys.exit(1) else: print(f"Repeating message '{lmessage.msg_id()}':") # overwrite the latest message if requested or empty if lmessage.answer is None or args.overwrite is True: lmessage.clear_answer() make_request(ai, chat, lmessage, args) # otherwise create a new one else: args.ask = [lmessage.question] message = create_message(chat, args) make_request(ai, chat, message, args) # === PROCESS === elif args.process is not None: # TODO: process either all questions without an # answer or the one(s) given in 'args.process' pass