329 lines
14 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: set fileencoding=utf-8 :
import sys
import argcomplete
import argparse
from pathlib import Path
from .utils import terminal_width, print_tag_args, print_chat_hist, ChatType
from .storage import save_answers, create_chat_hist
from .api_client import ai, openai_api_key, print_models
from .configuration import Config
from .chat import ChatDB
from .message import Message, MessageFilter, MessageError, Question
from .ai_factory import create_ai
from .ai import AI, AIResponse
from itertools import zip_longest
from typing import Any
default_config = '.config.yaml'
def tags_completer(prefix: str, parsed_args: Any, **kwargs: Any) -> list[str]:
config = Config.from_file(parsed_args.config)
return get_tags_unique(config, prefix)
def create_question_with_hist(args: argparse.Namespace,
config: Config,
) -> tuple[ChatType, str, list[str]]:
"""
Creates the "AI request", including the question and chat history as determined
by the specified tags.
"""
tags = args.or_tags or []
xtags = args.exclude_tags or []
otags = args.output_tags or []
if not args.source_code_only:
print_tag_args(tags, xtags, otags)
question_parts = []
question_list = args.question if args.question is not None else []
source_list = args.source if args.source is not None else []
for question, source in zip_longest(question_list, source_list, fillvalue=None):
if question is not None and source is not None:
with open(source) as r:
question_parts.append(f"{question}\n\n```\n{r.read().strip()}\n```")
elif question is not None:
question_parts.append(question)
elif source is not None:
with open(source) as r:
question_parts.append(f"```\n{r.read().strip()}\n```")
full_question = '\n\n'.join(question_parts)
chat = create_chat_hist(full_question, tags, xtags, config,
match_all_tags=True if args.and_tags else False, # FIXME
with_tags=False,
with_file=False)
return chat, full_question, tags
def tags_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'tags' command.
"""
chat = ChatDB.from_dir(cache_path=Path('.'),
db_path=Path(config.db))
if args.list:
tags_freq = chat.tags_frequency(args.prefix, args.contain)
for tag, freq in tags_freq.items():
print(f"- {tag}: {freq}")
# TODO: add renaming
def config_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'config' command.
"""
if args.list_models:
print_models()
elif args.print_model:
print(config.openai.model)
elif args.model:
config.openai.model = args.model
config.to_file(args.config)
def question_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'question' command.
"""
chat = ChatDB.from_dir(cache_path=Path('.'),
db_path=Path(config.db))
# if it's a new question, create and store it immediately
if args.ask or args.create:
message = Message(question=Question(args.question),
tags=args.ouput_tags, # FIXME
ai=args.ai,
model=args.model)
chat.add_to_cache([message])
if args.create:
return
# create the correct AI instance
ai: AI = create_ai(args, config)
if args.ask:
response: AIResponse = ai.request(message,
chat,
args.num_answers, # FIXME
args.otags) # FIXME
assert response
# TODO:
# * add answer to the message above (and create
# more messages for any additional answers)
pass
elif args.repeat:
lmessage = chat.latest_message()
assert lmessage
# TODO: repeat either the last question or the
# one(s) given in 'args.repeat' (overwrite
# existing ones if 'args.overwrite' is True)
pass
elif args.process:
# TODO: process either all questions without an
# answer or the one(s) given in 'args.process'
pass
def ask_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'ask' command.
"""
if args.max_tokens:
config.openai.max_tokens = args.max_tokens
if args.temperature:
config.openai.temperature = args.temperature
if args.model:
config.openai.model = args.model
chat, question, tags = create_question_with_hist(args, config)
print_chat_hist(chat, False, args.source_code_only)
otags = args.output_tags or []
answers, usage = ai(chat, config, args.num_answers)
save_answers(question, answers, tags, otags, config)
print("-" * terminal_width())
print(f"Usage: {usage}")
def hist_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'hist' command.
"""
mfilter = MessageFilter(tags_or=args.or_tags,
tags_and=args.and_tags,
tags_not=args.exclude_tags,
question_contains=args.question,
answer_contains=args.answer)
chat = ChatDB.from_dir(Path('.'),
Path(config.db),
mfilter=mfilter)
chat.print(args.source_code_only,
args.with_tags,
args.with_files)
def print_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'print' command.
"""
fname = Path(args.file)
try:
message = Message.from_file(fname)
if message:
print(message.to_str(source_code_only=args.source_code_only))
except MessageError:
print(f"File is not a valid message: {args.file}")
sys.exit(1)
if args.source_code_only:
display_source_code(data['answer'])
elif args.answer:
print(data['answer'].strip())
elif args.question:
print(data['question'].strip())
else:
print(dump_data(data).strip())
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="ChatMastermind is a Python application that automates conversation with AI")
parser.add_argument('-C', '--config', help='Config file name.', default=default_config)
# subcommand-parser
cmdparser = parser.add_subparsers(dest='command',
title='commands',
description='supported commands',
required=True)
# a parent parser for all commands that support tag selection
tag_parser = argparse.ArgumentParser(add_help=False)
tag_arg = tag_parser.add_argument('-t', '--or-tags', nargs='+',
help='List of tag names (one must match)', metavar='OTAGS')
tag_arg.completer = tags_completer # type: ignore
atag_arg = tag_parser.add_argument('-k', '--and-tags', nargs='+',
help='List of tag names (all must match)', metavar='ATAGS')
atag_arg.completer = tags_completer # type: ignore
etag_arg = tag_parser.add_argument('-x', '--exclude-tags', nargs='+',
help='List of tag names to exclude', metavar='XTAGS')
etag_arg.completer = tags_completer # type: ignore
otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+',
help='List of output tag names, default is input', metavar='OUTTAGS')
otag_arg.completer = tags_completer # type: ignore
# 'question' command parser
question_cmd_parser = cmdparser.add_parser('question', parents=[tag_parser],
help="ask, create and process questions.",
aliases=['q'])
question_cmd_parser.set_defaults(func=question_cmd)
question_group = question_cmd_parser.add_mutually_exclusive_group(required=True)
question_group.add_argument('-a', '--ask', nargs='+', help='Ask a question')
question_group.add_argument('-c', '--create', nargs='+', help='Create a question')
question_group.add_argument('-r', '--repeat', nargs='*', help='Repeat a question')
question_group.add_argument('-p', '--process', nargs='*', help='Process existing questions')
question_cmd_parser.add_argument('-O', '--overwrite', help='Overwrite existing messages when repeating them',
action='store_true')
question_cmd_parser.add_argument('-m', '--max-tokens', help='Max tokens to use', type=int)
question_cmd_parser.add_argument('-T', '--temperature', help='Temperature to use', type=float)
question_cmd_parser.add_argument('-A', '--AI', help='AI to use')
question_cmd_parser.add_argument('-M', '--model', help='Model to use')
question_cmd_parser.add_argument('-n', '--num-answers', help='Number of answers to produce', type=int,
default=1)
question_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query')
question_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history',
action='store_true')
# 'ask' command parser
ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser],
help="Ask a question.",
aliases=['a'])
ask_cmd_parser.set_defaults(func=ask_cmd)
ask_cmd_parser.add_argument('-q', '--question', nargs='+', help='Question to ask',
required=True)
ask_cmd_parser.add_argument('-m', '--max-tokens', help='Max tokens to use', type=int)
ask_cmd_parser.add_argument('-T', '--temperature', help='Temperature to use', type=float)
ask_cmd_parser.add_argument('-M', '--model', help='Model to use')
ask_cmd_parser.add_argument('-n', '--num-answers', help='Number of answers to produce', type=int,
default=1)
ask_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query')
ask_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history',
action='store_true')
# 'hist' command parser
hist_cmd_parser = cmdparser.add_parser('hist', parents=[tag_parser],
help="Print chat history.",
aliases=['h'])
hist_cmd_parser.set_defaults(func=hist_cmd)
hist_cmd_parser.add_argument('-w', '--with-tags', help="Print chat history with tags.",
action='store_true')
hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.",
action='store_true')
hist_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
action='store_true')
hist_cmd_parser.add_argument('-A', '--answer', help='Search for answer substring')
hist_cmd_parser.add_argument('-Q', '--question', help='Search for question substring')
# 'tags' command parser
tags_cmd_parser = cmdparser.add_parser('tags',
help="Manage tags.",
aliases=['t'])
tags_cmd_parser.set_defaults(func=tags_cmd)
tags_group = tags_cmd_parser.add_mutually_exclusive_group(required=True)
tags_group.add_argument('-l', '--list', help="List all tags and their frequency",
action='store_true')
tags_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix")
tags_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring")
# 'config' command parser
config_cmd_parser = cmdparser.add_parser('config',
help="Manage configuration",
aliases=['c'])
config_cmd_parser.set_defaults(func=config_cmd)
config_group = config_cmd_parser.add_mutually_exclusive_group(required=True)
config_group.add_argument('-l', '--list-models', help="List all available models",
action='store_true')
config_group.add_argument('-m', '--print-model', help="Print the currently configured model",
action='store_true')
config_group.add_argument('-M', '--model', help="Set model in the config file")
# 'print' command parser
print_cmd_parser = cmdparser.add_parser('print',
help="Print message files.",
aliases=['p'])
print_cmd_parser.set_defaults(func=print_cmd)
print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True)
<<<<<<< HEAD
print_cmd_modes = print_cmd_parser.add_mutually_exclusive_group()
print_cmd_modes.add_argument('-q', '--question', help='Print only question', action='store_true')
print_cmd_modes.add_argument('-a', '--answer', help='Print only answer', action='store_true')
print_cmd_modes.add_argument('-S', '--only-source-code', help='Print only source code', action='store_true')
||||||| parent of bf1cbff (cmm: the 'print' command now uses 'Message.from_file()')
print_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
action='store_true')
=======
print_cmd_parser.add_argument('-S', '--source-code-only', help='Print source code only (from the answer, if available)',
action='store_true')
>>>>>>> bf1cbff (cmm: the 'print' command now uses 'Message.from_file()')
argcomplete.autocomplete(parser)
return parser
def main() -> int:
parser = create_parser()
args = parser.parse_args()
command = parser.parse_args()
config = Config.from_file(args.config)
openai_api_key(config.openai.api_key)
command.func(command, config)
return 0
if __name__ == '__main__':
sys.exit(main())