#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: set fileencoding=utf-8 : import sys import argcomplete import argparse from pathlib import Path from .utils import terminal_width, print_tag_args, print_chat_hist, ChatType from .storage import save_answers, create_chat_hist from .api_client import ai, openai_api_key, print_models from .configuration import Config from .chat import ChatDB from .message import Message, MessageFilter, MessageError, Question from itertools import zip_longest from typing import Any default_config = '.config.yaml' def tags_completer(prefix: str, parsed_args: Any, **kwargs: Any) -> list[str]: config = Config.from_file(parsed_args.config) return list(Message.tags_from_dir(Path(config.db), prefix=prefix)) def create_question_with_hist(args: argparse.Namespace, config: Config, ) -> tuple[ChatType, str, list[str]]: """ Creates the "AI request", including the question and chat history as determined by the specified tags. """ tags = args.or_tags or [] xtags = args.exclude_tags or [] otags = args.output_tags or [] if not args.source_code_only: print_tag_args(tags, xtags, otags) question_parts = [] question_list = args.question if args.question is not None else [] source_list = args.source if args.source is not None else [] for question, source in zip_longest(question_list, source_list, fillvalue=None): if question is not None and source is not None: with open(source) as r: question_parts.append(f"{question}\n\n```\n{r.read().strip()}\n```") elif question is not None: question_parts.append(question) elif source is not None: with open(source) as r: question_parts.append(f"```\n{r.read().strip()}\n```") full_question = '\n\n'.join(question_parts) chat = create_chat_hist(full_question, tags, xtags, config, match_all_tags=True if args.and_tags else False, # FIXME with_tags=False, with_file=False) return chat, full_question, tags def tags_cmd(args: argparse.Namespace, config: Config) -> None: """ Handler for the 'tags' command. """ chat = ChatDB.from_dir(cache_path=Path('.'), db_path=Path(config.db)) if args.list: tags_freq = chat.tags_frequency(args.prefix, args.contain) for tag, freq in tags_freq.items(): print(f"- {tag}: {freq}") # TODO: add renaming def config_cmd(args: argparse.Namespace, config: Config) -> None: """ Handler for the 'config' command. """ if args.list_models: print_models() elif args.print_model: print(config.openai.model) elif args.model: config.openai.model = args.model config.to_file(args.config) def question_cmd(args: argparse.Namespace, config: Config) -> None: """ Handler for the 'question' command. """ chat = ChatDB.from_dir(cache_path=Path('.'), db_path=Path(config.db)) # if it's a new question, create and store it immediately if args.ask or args.create: message = Message(question=Question(args.question), tags=args.ouput_tags, # FIXME ai=args.ai, model=args.model) chat.add_to_cache([message]) if args.create: return elif args.ask: # TODO: # * select the correct AIConfig # * modify it according to the given arguments # * create AI instance and make AI request # * add answer to the message above (and create # more messages for any additional answers) pass elif args.repeat: # TODO: repeat either the last question or the # one(s) given in 'args.repeat' (overwrite # existing ones if 'args.overwrite' is True) pass elif args.process: # TODO: process either all questions without an # answer or the one(s) given in 'args.process' pass def ask_cmd(args: argparse.Namespace, config: Config) -> None: """ Handler for the 'ask' command. """ if args.max_tokens: config.openai.max_tokens = args.max_tokens if args.temperature: config.openai.temperature = args.temperature if args.model: config.openai.model = args.model chat, question, tags = create_question_with_hist(args, config) print_chat_hist(chat, False, args.source_code_only) otags = args.output_tags or [] answers, usage = ai(chat, config, args.number) save_answers(question, answers, tags, otags, config) print("-" * terminal_width()) print(f"Usage: {usage}") def hist_cmd(args: argparse.Namespace, config: Config) -> None: """ Handler for the 'hist' command. """ mfilter = MessageFilter(tags_or=args.or_tags, tags_and=args.and_tags, tags_not=args.exclude_tags, question_contains=args.question, answer_contains=args.answer) chat = ChatDB.from_dir(Path('.'), Path(config.db), mfilter=mfilter) chat.print(args.source_code_only, args.with_tags, args.with_files) def print_cmd(args: argparse.Namespace, config: Config) -> None: """ Handler for the 'print' command. """ fname = Path(args.file) try: message = Message.from_file(fname) if message: print(message.to_str(source_code_only=args.source_code_only)) except MessageError: print(f"File is not a valid message: {args.file}") sys.exit(1) def create_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="ChatMastermind is a Python application that automates conversation with AI") parser.add_argument('-C', '--config', help='Config file name.', default=default_config) # subcommand-parser cmdparser = parser.add_subparsers(dest='command', title='commands', description='supported commands', required=True) # a parent parser for all commands that support tag selection tag_parser = argparse.ArgumentParser(add_help=False) tag_arg = tag_parser.add_argument('-t', '--or-tags', nargs='+', help='List of tag names (one must match)', metavar='OTAGS') tag_arg.completer = tags_completer # type: ignore atag_arg = tag_parser.add_argument('-k', '--and-tags', nargs='+', help='List of tag names (all must match)', metavar='ATAGS') atag_arg.completer = tags_completer # type: ignore etag_arg = tag_parser.add_argument('-x', '--exclude-tags', nargs='+', help='List of tag names to exclude', metavar='XTAGS') etag_arg.completer = tags_completer # type: ignore otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+', help='List of output tag names, default is input', metavar='OUTTAGS') otag_arg.completer = tags_completer # type: ignore # 'question' command parser question_cmd_parser = cmdparser.add_parser('question', parents=[tag_parser], help="ask, create and process questions.", aliases=['q']) question_cmd_parser.set_defaults(func=question_cmd) question_group = question_cmd_parser.add_mutually_exclusive_group(required=True) question_group.add_argument('-a', '--ask', nargs='+', help='Ask a question') question_group.add_argument('-c', '--create', nargs='+', help='Create a question') question_group.add_argument('-r', '--repeat', nargs='*', help='Repeat a question') question_group.add_argument('-p', '--process', nargs='*', help='Process existing questions') question_cmd_parser.add_argument('-O', '--overwrite', help='Overwrite existing messages when repeating them', action='store_true') question_cmd_parser.add_argument('-m', '--max-tokens', help='Max tokens to use', type=int) question_cmd_parser.add_argument('-T', '--temperature', help='Temperature to use', type=float) question_cmd_parser.add_argument('-A', '--AI', help='AI to use') question_cmd_parser.add_argument('-M', '--model', help='Model to use') question_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int, default=1) question_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query') question_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history', action='store_true') # 'ask' command parser ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser], help="Ask a question.", aliases=['a']) ask_cmd_parser.set_defaults(func=ask_cmd) ask_cmd_parser.add_argument('-q', '--question', nargs='+', help='Question to ask', required=True) ask_cmd_parser.add_argument('-m', '--max-tokens', help='Max tokens to use', type=int) ask_cmd_parser.add_argument('-T', '--temperature', help='Temperature to use', type=float) ask_cmd_parser.add_argument('-M', '--model', help='Model to use') ask_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int, default=1) ask_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query') ask_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history', action='store_true') # 'hist' command parser hist_cmd_parser = cmdparser.add_parser('hist', parents=[tag_parser], help="Print chat history.", aliases=['h']) hist_cmd_parser.set_defaults(func=hist_cmd) hist_cmd_parser.add_argument('-w', '--with-tags', help="Print chat history with tags.", action='store_true') hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.", action='store_true') hist_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code', action='store_true') hist_cmd_parser.add_argument('-A', '--answer', help='Search for answer substring') hist_cmd_parser.add_argument('-Q', '--question', help='Search for question substring') # 'tags' command parser tags_cmd_parser = cmdparser.add_parser('tags', help="Manage tags.", aliases=['t']) tags_cmd_parser.set_defaults(func=tags_cmd) tags_group = tags_cmd_parser.add_mutually_exclusive_group(required=True) tags_group.add_argument('-l', '--list', help="List all tags and their frequency", action='store_true') tags_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix") tags_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring") # 'config' command parser config_cmd_parser = cmdparser.add_parser('config', help="Manage configuration", aliases=['c']) config_cmd_parser.set_defaults(func=config_cmd) config_group = config_cmd_parser.add_mutually_exclusive_group(required=True) config_group.add_argument('-l', '--list-models', help="List all available models", action='store_true') config_group.add_argument('-m', '--print-model', help="Print the currently configured model", action='store_true') config_group.add_argument('-M', '--model', help="Set model in the config file") # 'print' command parser print_cmd_parser = cmdparser.add_parser('print', help="Print message files.", aliases=['p']) print_cmd_parser.set_defaults(func=print_cmd) print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True) print_cmd_parser.add_argument('-S', '--source-code-only', help='Print source code only (from the answer, if available)', action='store_true') argcomplete.autocomplete(parser) return parser def main() -> int: parser = create_parser() args = parser.parse_args() command = parser.parse_args() config = Config.from_file(args.config) openai_api_key(config.openai.api_key) command.func(command, config) return 0 if __name__ == '__main__': sys.exit(main())