#!/usr/bin/env python # -*- coding: utf-8 -*- # vim: set fileencoding=utf-8 : import yaml import sys import argcomplete import argparse import pathlib from .utils import terminal_width, process_tags, display_chat, display_source_code, display_tags_frequency from .storage import save_answers, create_chat, get_tags, get_tags_unique, read_file, dump_data from .api_client import ai, openai_api_key from itertools import zip_longest def run_print_command(args: argparse.Namespace, config: dict) -> None: fname = pathlib.Path(args.print) if fname.suffix == '.yaml': with open(args.print, 'r') as f: data = yaml.load(f, Loader=yaml.FullLoader) elif fname.suffix == '.txt': data = read_file(fname) else: print(f"Unknown file type: {args.print}") sys.exit(1) if args.only_source_code: display_source_code(data['answer']) else: print(dump_data(data).strip()) def process_and_display_chat(args: argparse.Namespace, config: dict, dump: bool = False ) -> tuple[list[dict[str, str]], str, list[str]]: tags = args.tags or [] extags = args.extags or [] otags = args.output_tags or [] if not args.only_source_code: process_tags(tags, extags, otags) question_parts = [] question_list = args.question if args.question is not None else [] source_list = args.source if args.source is not None else [] for question, source in zip_longest(question_list, source_list, fillvalue=None): if question is not None and source is not None: with open(source) as r: question_parts.append(f"{question}\n\n```\n{r.read().strip()}\n```") elif question is not None: question_parts.append(question) elif source is not None: with open(source) as r: question_parts.append(f"```\n{r.read().strip()}\n```") full_question = '\n\n'.join(question_parts) chat = create_chat(full_question, tags, extags, config, args.match_all_tags, args.with_tags, args.with_file) display_chat(chat, dump, args.only_source_code) return chat, full_question, tags def process_and_display_tags(args: argparse.Namespace, config: dict, dump: bool=False ) -> None: display_tags_frequency(get_tags(config, None), dump) def handle_question(args: argparse.Namespace, config: dict, dump: bool = False ) -> None: chat, question, tags = process_and_display_chat(args, config, dump) otags = args.output_tags or [] answers, usage = ai(chat, config, args.number) save_answers(question, answers, tags, otags, config) print("-" * terminal_width()) print(f"Usage: {usage}") def tags_completer(prefix, parsed_args, **kwargs): with open(parsed_args.config, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) return get_tags_unique(config, prefix) def create_parser() -> argparse.ArgumentParser: default_config = '.config.yaml' parser = argparse.ArgumentParser( description="ChatMastermind is a Python application that automates conversation with AI") group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-p', '--print', help='File to print') group.add_argument('-q', '--question', nargs='*', help='Question to ask') group.add_argument('-D', '--chat-dump', help="Print chat history as Python structure", action='store_true') group.add_argument('-d', '--chat', help="Print chat history as readable text", action='store_true') group.add_argument('-l', '--list-tags', help="List all tags and their frequency", action='store_true') parser.add_argument('-c', '--config', help='Config file name.', default=default_config) parser.add_argument('-m', '--max-tokens', help='Max tokens to use', type=int) parser.add_argument('-T', '--temperature', help='Temperature to use', type=float) parser.add_argument('-M', '--model', help='Model to use') parser.add_argument('-n', '--number', help='Number of answers to produce', type=int, default=1) parser.add_argument('-s', '--source', nargs='*', help='Source add content of a file to the query') parser.add_argument('-S', '--only-source-code', help='Print only source code', action='store_true') parser.add_argument('-w', '--with-tags', help="Print chat history with tags.", action='store_true') parser.add_argument('-W', '--with-file', help="Print chat history with filename.", action='store_true') parser.add_argument('-a', '--match-all-tags', help="All given tags must match when selecting chat history entries.", action='store_true') tags_arg = parser.add_argument('-t', '--tags', nargs='*', help='List of tag names', metavar='TAGS') tags_arg.completer = tags_completer # type: ignore extags_arg = parser.add_argument('-e', '--extags', nargs='*', help='List of tag names to exclude', metavar='EXTAGS') extags_arg.completer = tags_completer # type: ignore otags_arg = parser.add_argument('-o', '--output-tags', nargs='*', help='List of output tag names, default is input', metavar='OTAGS') otags_arg.completer = tags_completer # type: ignore argcomplete.autocomplete(parser) return parser def main() -> int: parser = create_parser() args = parser.parse_args() with open(args.config, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) openai_api_key(config['openai']['api_key']) if args.max_tokens: config['openai']['max_tokens'] = args.max_tokens if args.temperature: config['openai']['temperature'] = args.temperature if args.model: config['openai']['model'] = args.model if args.print: run_print_command(args, config) elif args.question: handle_question(args, config) elif args.chat_dump: process_and_display_chat(args, config, dump=True) elif args.chat: process_and_display_chat(args, config) elif args.list_tags: process_and_display_tags(args, config) return 0 if __name__ == '__main__': sys.exit(main())