124 lines
4.5 KiB
Python
124 lines
4.5 KiB
Python
import argparse
|
|
from pathlib import Path
|
|
from itertools import zip_longest
|
|
from ..configuration import Config
|
|
from ..chat import ChatDB
|
|
from ..message import Message, MessageFilter, MessageError, Question, source_code
|
|
from ..ai_factory import create_ai
|
|
from ..ai import AI, AIResponse
|
|
|
|
|
|
def add_file_as_text(question_parts: list[str], file: str) -> None:
|
|
"""
|
|
Add the given file as plain text to the question part list.
|
|
If the file is a Message, add the answer.
|
|
"""
|
|
file_path = Path(file)
|
|
content: str
|
|
try:
|
|
message = Message.from_file(file_path)
|
|
if message and message.answer:
|
|
content = message.answer
|
|
except MessageError:
|
|
with open(file) as r:
|
|
content = r.read().strip()
|
|
if len(content) > 0:
|
|
question_parts.append(content)
|
|
|
|
|
|
def add_file_as_code(question_parts: list[str], file: str) -> None:
|
|
"""
|
|
Add all source code from the given file. If no code segments can be extracted,
|
|
the whole content is added as source code segment. If the file is a Message,
|
|
extract the source code from the answer.
|
|
"""
|
|
file_path = Path(file)
|
|
content: str
|
|
try:
|
|
message = Message.from_file(file_path)
|
|
if message and message.answer:
|
|
content = message.answer
|
|
except MessageError:
|
|
with open(file) as r:
|
|
content = r.read().strip()
|
|
# extract and add source code
|
|
code_parts = source_code(content, include_delims=True)
|
|
if len(code_parts) > 0:
|
|
question_parts += code_parts
|
|
else:
|
|
question_parts.append(f"```\n{content}\n```")
|
|
|
|
|
|
def create_message(chat: ChatDB, args: argparse.Namespace) -> Message:
|
|
"""
|
|
Creates a new message from the given arguments and writes it
|
|
to the cache directory.
|
|
"""
|
|
question_parts = []
|
|
question_list = args.ask if args.ask is not None else []
|
|
text_files = args.source_text if args.source_text is not None else []
|
|
code_files = args.source_code if args.source_code is not None else []
|
|
|
|
for question, text_file, code_file in zip_longest(question_list, text_files, code_files, fillvalue=None):
|
|
if question is not None and len(question.strip()) > 0:
|
|
question_parts.append(question)
|
|
if text_file is not None and len(text_file) > 0:
|
|
add_file_as_text(question_parts, text_file)
|
|
if code_file is not None and len(code_file) > 0:
|
|
add_file_as_code(question_parts, code_file)
|
|
|
|
full_question = '\n\n'.join(question_parts)
|
|
|
|
message = Message(question=Question(full_question),
|
|
tags=args.output_tags, # FIXME
|
|
ai=args.AI,
|
|
model=args.model)
|
|
chat.cache_add([message])
|
|
return message
|
|
|
|
|
|
def question_cmd(args: argparse.Namespace, config: Config) -> None:
|
|
"""
|
|
Handler for the 'question' command.
|
|
"""
|
|
mfilter = MessageFilter(tags_or=args.or_tags if args.or_tags is not None else set(),
|
|
tags_and=args.and_tags if args.and_tags is not None else set(),
|
|
tags_not=args.exclude_tags if args.exclude_tags is not None else set())
|
|
chat = ChatDB.from_dir(cache_path=Path('.'),
|
|
db_path=Path(config.db),
|
|
mfilter=mfilter)
|
|
# if it's a new question, create and store it immediately
|
|
if args.ask or args.create:
|
|
message = create_message(chat, args)
|
|
if args.create:
|
|
return
|
|
|
|
# create the correct AI instance
|
|
ai: AI = create_ai(args, config)
|
|
if args.ask:
|
|
ai.print()
|
|
chat.print(paged=False)
|
|
response: AIResponse = ai.request(message,
|
|
chat,
|
|
args.num_answers, # FIXME
|
|
args.output_tags) # FIXME
|
|
chat.msg_update([response.messages[0]])
|
|
chat.cache_add(response.messages[1:])
|
|
for idx, msg in enumerate(response.messages):
|
|
print(f"=== ANSWER {idx+1} ===")
|
|
print(msg.answer)
|
|
if response.tokens:
|
|
print("===============")
|
|
print(response.tokens)
|
|
elif args.repeat is not None:
|
|
lmessage = chat.msg_latest()
|
|
assert lmessage
|
|
# TODO: repeat either the last question or the
|
|
# one(s) given in 'args.repeat' (overwrite
|
|
# existing ones if 'args.overwrite' is True)
|
|
pass
|
|
elif args.process is not None:
|
|
# TODO: process either all questions without an
|
|
# answer or the one(s) given in 'args.process'
|
|
pass
|