implemented sub-commands

This commit is contained in:
juk0de 2023-08-11 18:12:49 +02:00
parent df91ca863a
commit 8c35196a3a
5 changed files with 176 additions and 116 deletions

View File

@ -5,7 +5,7 @@ def openai_api_key(api_key: str) -> None:
openai.api_key = api_key
def display_models() -> None:
def print_models() -> None:
not_ready = []
for engine in sorted(openai.Engine.list()['data'], key=lambda x: x['id']):
if engine['ready']:

View File

@ -7,32 +7,33 @@ import sys
import argcomplete
import argparse
import pathlib
from .utils import terminal_width, process_tags, display_chat, display_source_code, display_tags_frequency
from .storage import save_answers, create_chat, get_tags, get_tags_unique, read_file, dump_data
from .api_client import ai, openai_api_key, display_models
from .utils import terminal_width, process_tags, print_chat_hist, display_source_code, print_tags_frequency
from .storage import save_answers, create_chat_hist, get_tags, get_tags_unique, read_file, dump_data
from .api_client import ai, openai_api_key, print_models
from itertools import zip_longest
def run_print_command(args: argparse.Namespace, config: dict) -> None:
fname = pathlib.Path(args.print)
if fname.suffix == '.yaml':
with open(args.print, 'r') as f:
data = yaml.load(f, Loader=yaml.FullLoader)
elif fname.suffix == '.txt':
data = read_file(fname)
else:
print(f"Unknown file type: {args.print}")
sys.exit(1)
if args.only_source_code:
display_source_code(data['answer'])
else:
print(dump_data(data).strip())
default_config = '.config.yaml'
def process_and_display_chat(args: argparse.Namespace,
def tags_completer(prefix, parsed_args, **kwargs):
with open(parsed_args.config, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return get_tags_unique(config, prefix)
def read_config(path: str):
with open(path, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return config
def create_question_and_chat(args: argparse.Namespace,
config: dict,
dump: bool = False
) -> tuple[list[dict[str, str]], str, list[str]]:
"""
Creates the "SI request", including the question and chat history as determined
by the specified tags.
"""
tags = args.tags or []
extags = args.extags or []
otags = args.output_tags or []
@ -55,25 +56,42 @@ def process_and_display_chat(args: argparse.Namespace,
question_parts.append(f"```\n{r.read().strip()}\n```")
full_question = '\n\n'.join(question_parts)
chat = create_chat(full_question, tags, extags, config,
args.match_all_tags, args.with_tags,
args.with_file)
display_chat(chat, dump, args.only_source_code)
chat = create_chat_hist(full_question, tags, extags, config,
args.match_all_tags, args.with_tags,
args.with_file)
return chat, full_question, tags
def process_and_display_tags(args: argparse.Namespace,
config: dict,
dump: bool = False
) -> None:
display_tags_frequency(get_tags(config, None), dump)
def tag_cmd(args: argparse.Namespace) -> None:
"""
Handler for the 'tag' command.
"""
config = read_config(args.config)
if args.list:
print_tags_frequency(get_tags(config, None), args.dump)
def handle_question(args: argparse.Namespace,
config: dict,
dump: bool = False
) -> None:
chat, question, tags = process_and_display_chat(args, config, dump)
def model_cmd(args: argparse.Namespace) -> None:
"""
Handler for the 'model' command.
"""
if args.list:
print_models()
def ask_cmd(args: argparse.Namespace) -> None:
"""
Handler for the 'ask' command.
"""
config = read_config(args.config)
if args.max_tokens:
config['openai']['max_tokens'] = args.max_tokens
if args.temperature:
config['openai']['temperature'] = args.temperature
if args.model:
config['openai']['model'] = args.model
chat, question, tags = create_question_and_chat(args, config)
print_chat_hist(chat, args.dump, args.only_source_code)
otags = args.output_tags or []
answers, usage = ai(chat, config, args.number)
save_answers(question, answers, tags, otags, config)
@ -81,77 +99,119 @@ def handle_question(args: argparse.Namespace,
print(f"Usage: {usage}")
def tags_completer(prefix, parsed_args, **kwargs):
with open(parsed_args.config, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return get_tags_unique(config, prefix)
def hist_cmd(args: argparse.Namespace) -> None:
"""
Handler for the 'hist' command.
"""
config = read_config(args.config)
chat, q, t = create_question_and_chat(args, config)
print_chat_hist(chat, args.dump, args.only_source_code)
def print_cmd(args: argparse.Namespace) -> None:
"""
Handler for the 'print' command.
"""
fname = pathlib.Path(args.print)
if fname.suffix == '.yaml':
with open(args.print, 'r') as f:
data = yaml.load(f, Loader=yaml.FullLoader)
elif fname.suffix == '.txt':
data = read_file(fname)
else:
print(f"Unknown file type: {args.print}")
sys.exit(1)
if args.only_source_code:
display_source_code(data['answer'])
else:
print(dump_data(data).strip())
def create_parser() -> argparse.ArgumentParser:
default_config = '.config.yaml'
parser = argparse.ArgumentParser(
description="ChatMastermind is a Python application that automates conversation with AI")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-p', '--print', help='File to print')
group.add_argument('-q', '--question', nargs='*', help='Question to ask')
group.add_argument('-D', '--chat-dump', help="Print chat history as Python structure", action='store_true')
group.add_argument('-d', '--chat', help="Print chat history as readable text", action='store_true')
group.add_argument('-l', '--list-tags', help="List all tags and their frequency", action='store_true')
group.add_argument('-L', '--list-models', help="List all available models", action='store_true')
parser.add_argument('-c', '--config', help='Config file name.', default=default_config)
parser.add_argument('-m', '--max-tokens', help='Max tokens to use', type=int)
parser.add_argument('-T', '--temperature', help='Temperature to use', type=float)
parser.add_argument('-M', '--model', help='Model to use')
parser.add_argument('-n', '--number', help='Number of answers to produce', type=int, default=1)
parser.add_argument('-s', '--source', nargs='*', help='Source add content of a file to the query')
parser.add_argument('-S', '--only-source-code', help='Print only source code', action='store_true')
parser.add_argument('-w', '--with-tags', help="Print chat history with tags.", action='store_true')
parser.add_argument('-W', '--with-file',
help="Print chat history with filename.",
action='store_true')
parser.add_argument('-a', '--match-all-tags',
help="All given tags must match when selecting chat history entries.",
action='store_true')
tags_arg = parser.add_argument('-t', '--tags', nargs='*', help='List of tag names', metavar='TAGS')
tags_arg.completer = tags_completer # type: ignore
extags_arg = parser.add_argument('-e', '--extags', nargs='*', help='List of tag names to exclude', metavar='EXTAGS')
extags_arg.completer = tags_completer # type: ignore
otags_arg = parser.add_argument('-o', '--output-tags', nargs='*', help='List of output tag names, default is input', metavar='OTAGS')
otags_arg.completer = tags_completer # type: ignore
argcomplete.autocomplete(parser)
# subcommand-parser
cmdparser = parser.add_subparsers(dest='command',
title='commands',
description='supported commands')
cmdparser.required = True
# a parent parser for all commands that support tag selection
tag_parser = argparse.ArgumentParser(add_help=False)
tag_arg = tag_parser.add_argument('-t', '--tags', nargs='*',
help='List of tag names', metavar='TAGS')
tag_arg.completer = tags_completer # type: ignore
extag_arg = tag_parser.add_argument('-e', '--extags', nargs='*',
help='List of tag names to exclude', metavar='EXTAGS')
extag_arg.completer = tags_completer # type: ignore
otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='*',
help='List of output tag names, default is input', metavar='OTAGS')
otag_arg.completer = tags_completer # type: ignore
tag_parser.add_argument('-a', '--match-all-tags',
help="All given tags must match when selecting chat history entries",
action='store_true')
# enable autocompletion for tags
argcomplete.autocomplete(tag_parser)
# 'ask' command parser
ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser],
help="Ask a question.")
ask_cmd_parser.set_defaults(func=ask_cmd)
ask_cmd_parser.add_argument('-q', '--question', nargs='+', help='Question to ask', required=True)
ask_cmd_parser.add_argument('-m', '--max-tokens', help='Max tokens to use', type=int)
ask_cmd_parser.add_argument('-T', '--temperature', help='Temperature to use', type=float)
ask_cmd_parser.add_argument('-M', '--model', help='Model to use')
ask_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int, default=1)
ask_cmd_parser.add_argument('-s', '--source', nargs='*', help='Source add content of a file to the query')
# 'hist' command parser
hist_cmd_parser = cmdparser.add_parser('hist', parents=[tag_parser],
help="Print chat history.")
hist_cmd_parser.set_defaults(func=hist_cmd)
hist_cmd_parser.add_argument('-d', '--dump', help="Print chat history as Python structure",
action='store_true')
hist_cmd_parser.add_argument('-w', '--with-tags', help="Print chat history with tags.",
action='store_true')
hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.",
action='store_true')
hist_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code',
action='store_true')
# 'tag' command parser
tag_cmd_parser = cmdparser.add_parser('tag',
help="Manage tags.")
tag_cmd_parser.set_defaults(func=tag_cmd)
tag_cmd_parser.add_argument('-l', '--list', help="List all tags and their frequency",
action='store_true')
# 'model' command parser
model_cmd_parser = cmdparser.add_parser('model',
help="Manage models.")
model_cmd_parser.set_defaults(func=model_cmd)
model_cmd_parser.add_argument('-l', '--list', help="List all available models",
action='store_true')
# 'print' command parser
print_cmd_parser = cmdparser.add_parser('print',
help="Print files.")
print_cmd_parser.set_defaults(func=print_cmd)
print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True)
print_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code',
action='store_true')
return parser
def main() -> int:
parser = create_parser()
args = parser.parse_args()
command = parser.parse_args()
with open(args.config, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
openai_api_key(read_config(args.config)['openai']['api_key'])
openai_api_key(config['openai']['api_key'])
if args.max_tokens:
config['openai']['max_tokens'] = args.max_tokens
if args.temperature:
config['openai']['temperature'] = args.temperature
if args.model:
config['openai']['model'] = args.model
if args.print:
run_print_command(args, config)
elif args.question:
handle_question(args, config)
elif args.chat_dump:
process_and_display_chat(args, config, dump=True)
elif args.chat:
process_and_display_chat(args, config)
elif args.list_tags:
process_and_display_tags(args, config)
elif args.list_models:
display_models()
command.func(command)
return 0

View File

@ -63,14 +63,14 @@ def save_answers(question: str,
f.write(f'{num}')
def create_chat(question: Optional[str],
tags: Optional[List[str]],
extags: Optional[List[str]],
config: Dict[str, Any],
match_all_tags: bool = False,
with_tags: bool = False,
with_file: bool = False
) -> List[Dict[str, str]]:
def create_chat_hist(question: Optional[str],
tags: Optional[List[str]],
extags: Optional[List[str]],
config: Dict[str, Any],
match_all_tags: bool = False,
with_tags: bool = False,
with_file: bool = False
) -> List[Dict[str, str]]:
chat: List[Dict[str, str]] = []
append_message(chat, 'system', config['system'].strip())
for file in sorted(pathlib.Path(config['db']).iterdir()):

View File

@ -57,7 +57,7 @@ def display_source_code(content: str) -> None:
pass
def display_chat(chat, dump=False, source_code=False) -> None:
def print_chat_hist(chat, dump=False, source_code=False) -> None:
if dump:
pp(chat)
return
@ -75,7 +75,7 @@ def display_chat(chat, dump=False, source_code=False) -> None:
print(f"{message['role'].upper()}: {message['content']}")
def display_tags_frequency(tags: List[str], dump=False) -> None:
def print_tags_frequency(tags: List[str], dump=False) -> None:
if dump:
pp(tags)
return

View File

@ -3,9 +3,9 @@ import io
import pathlib
import argparse
from chatmastermind.utils import terminal_width
from chatmastermind.main import create_parser, handle_question
from chatmastermind.main import create_parser, ask_cmd
from chatmastermind.api_client import ai
from chatmastermind.storage import create_chat, save_answers, dump_data
from chatmastermind.storage import create_chat_hist, save_answers, dump_data
from unittest import mock
from unittest.mock import patch, MagicMock, Mock
@ -30,7 +30,7 @@ class TestCreateChat(unittest.TestCase):
{'question': 'test_content', 'answer': 'some answer',
'tags': ['test_tag']}))
test_chat = create_chat(self.question, self.tags, None, self.config)
test_chat = create_chat_hist(self.question, self.tags, None, self.config)
self.assertEqual(len(test_chat), 4)
self.assertEqual(test_chat[0],
@ -52,7 +52,7 @@ class TestCreateChat(unittest.TestCase):
{'question': 'test_content', 'answer': 'some answer',
'tags': ['other_tag']}))
test_chat = create_chat(self.question, self.tags, None, self.config)
test_chat = create_chat_hist(self.question, self.tags, None, self.config)
self.assertEqual(len(test_chat), 2)
self.assertEqual(test_chat[0],
@ -75,7 +75,7 @@ class TestCreateChat(unittest.TestCase):
'tags': ['test_tag2']})),
)
test_chat = create_chat(self.question, [], None, self.config)
test_chat = create_chat_hist(self.question, [], None, self.config)
self.assertEqual(len(test_chat), 6)
self.assertEqual(test_chat[0],
@ -112,24 +112,24 @@ class TestHandleQuestion(unittest.TestCase):
'setting2': 'value2'
}
@patch("chatmastermind.main.create_chat", return_value="test_chat")
@patch("chatmastermind.main.create_chat_hist", return_value="test_chat")
@patch("chatmastermind.main.process_tags")
@patch("chatmastermind.main.ai", return_value=(["answer1", "answer2", "answer3"], "test_usage"))
@patch("chatmastermind.utils.pp")
@patch("builtins.print")
def test_handle_question(self, mock_print, mock_pp, mock_ai,
mock_process_tags, mock_create_chat):
def test_ask_cmd(self, mock_print, mock_pp, mock_ai,
mock_process_tags, mock_create_chat_hist):
open_mock = MagicMock()
with patch("chatmastermind.storage.open", open_mock):
handle_question(self.args, self.config, True)
ask_cmd(self.args, self.config, True)
mock_process_tags.assert_called_once_with(self.args.tags,
self.args.extags,
[])
mock_create_chat.assert_called_once_with(self.question,
self.args.tags,
self.args.extags,
self.config,
False, False, False)
mock_create_chat_hist.assert_called_once_with(self.question,
self.args.tags,
self.args.extags,
self.config,
False, False, False)
mock_pp.assert_called_once_with("test_chat")
mock_ai.assert_called_with("test_chat",
self.config,