Compare commits
4 Commits
1fd615baaa
...
d39ff8f28e
| Author | SHA1 | Date | |
|---|---|---|---|
| d39ff8f28e | |||
| a33f6afd5f | |||
| 548a049cc4 | |||
| c3439db5fd |
@ -6,11 +6,13 @@ import yaml
|
||||
import sys
|
||||
import argcomplete
|
||||
import argparse
|
||||
import pathlib
|
||||
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, print_tags_frequency, ChatType
|
||||
from .storage import save_answers, create_chat_hist, get_tags, get_tags_unique, read_file, dump_data
|
||||
from pathlib import Path
|
||||
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, ChatType
|
||||
from .storage import save_answers, create_chat_hist, read_file, dump_data
|
||||
from .api_client import ai, openai_api_key, print_models
|
||||
from .configuration import Config
|
||||
from .chat import ChatDB
|
||||
from .message import Message, MessageFilter
|
||||
from itertools import zip_longest
|
||||
from typing import Any
|
||||
|
||||
@ -18,9 +20,8 @@ default_config = '.config.yaml'
|
||||
|
||||
|
||||
def tags_completer(prefix: str, parsed_args: Any, **kwargs: Any) -> list[str]:
|
||||
with open(parsed_args.config, 'r') as f:
|
||||
config = yaml.load(f, Loader=yaml.FullLoader)
|
||||
return get_tags_unique(config, prefix)
|
||||
config = Config.from_file(parsed_args.config)
|
||||
return list(Message.tags_from_dir(Path(config.db), prefix=prefix))
|
||||
|
||||
|
||||
def create_question_with_hist(args: argparse.Namespace,
|
||||
@ -31,11 +32,11 @@ def create_question_with_hist(args: argparse.Namespace,
|
||||
by the specified tags.
|
||||
"""
|
||||
tags = args.tags or []
|
||||
extags = args.extags or []
|
||||
etags = args.etags or []
|
||||
otags = args.output_tags or []
|
||||
|
||||
if not args.only_source_code:
|
||||
print_tag_args(tags, extags, otags)
|
||||
if not args.source_code_only:
|
||||
print_tag_args(tags, etags, otags)
|
||||
|
||||
question_parts = []
|
||||
question_list = args.question if args.question is not None else []
|
||||
@ -52,17 +53,23 @@ def create_question_with_hist(args: argparse.Namespace,
|
||||
question_parts.append(f"```\n{r.read().strip()}\n```")
|
||||
|
||||
full_question = '\n\n'.join(question_parts)
|
||||
chat = create_chat_hist(full_question, tags, extags, config,
|
||||
args.match_all_tags, False, False)
|
||||
chat = create_chat_hist(full_question, tags, etags, config,
|
||||
match_all_tags=True if args.atags else False, # FIXME
|
||||
with_tags=False,
|
||||
with_file=False)
|
||||
return chat, full_question, tags
|
||||
|
||||
|
||||
def tag_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
def tags_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
"""
|
||||
Handler for the 'tag' command.
|
||||
Handler for the 'tags' command.
|
||||
"""
|
||||
chat = ChatDB.from_dir(cache_path=Path('.'),
|
||||
db_path=Path(config.db))
|
||||
if args.list:
|
||||
print_tags_frequency(get_tags(config, None))
|
||||
tags_freq = chat.tags_frequency(args.prefix, args.contain)
|
||||
for tag, freq in tags_freq.items():
|
||||
print(f"- {tag}: {freq}")
|
||||
|
||||
|
||||
def config_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
@ -89,7 +96,7 @@ def ask_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
if args.model:
|
||||
config.openai.model = args.model
|
||||
chat, question, tags = create_question_with_hist(args, config)
|
||||
print_chat_hist(chat, False, args.only_source_code)
|
||||
print_chat_hist(chat, False, args.source_code_only)
|
||||
otags = args.output_tags or []
|
||||
answers, usage = ai(chat, config, args.number)
|
||||
save_answers(question, answers, tags, otags, config)
|
||||
@ -101,21 +108,23 @@ def hist_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
"""
|
||||
Handler for the 'hist' command.
|
||||
"""
|
||||
tags = args.tags or []
|
||||
extags = args.extags or []
|
||||
|
||||
chat = create_chat_hist(None, tags, extags, config,
|
||||
args.match_all_tags,
|
||||
args.with_tags,
|
||||
args.with_files)
|
||||
print_chat_hist(chat, args.dump, args.only_source_code)
|
||||
mfilter = MessageFilter(tags_or=args.tags,
|
||||
tags_and=args.atags,
|
||||
tags_not=args.etags)
|
||||
chat = ChatDB.from_dir(Path('.'),
|
||||
Path(config.db),
|
||||
mfilter=mfilter)
|
||||
chat.print(args.source_code_only,
|
||||
args.with_tags,
|
||||
args.with_files)
|
||||
|
||||
|
||||
def print_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
"""
|
||||
Handler for the 'print' command.
|
||||
"""
|
||||
fname = pathlib.Path(args.file)
|
||||
fname = Path(args.file)
|
||||
if fname.suffix == '.yaml':
|
||||
with open(args.file, 'r') as f:
|
||||
data = yaml.load(f, Loader=yaml.FullLoader)
|
||||
@ -124,7 +133,7 @@ def print_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
else:
|
||||
print(f"Unknown file type: {args.file}")
|
||||
sys.exit(1)
|
||||
if args.only_source_code:
|
||||
if args.source_code_only:
|
||||
display_source_code(data['answer'])
|
||||
else:
|
||||
print(dump_data(data).strip())
|
||||
@ -144,18 +153,17 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
# a parent parser for all commands that support tag selection
|
||||
tag_parser = argparse.ArgumentParser(add_help=False)
|
||||
tag_arg = tag_parser.add_argument('-t', '--tags', nargs='+',
|
||||
help='List of tag names', metavar='TAGS')
|
||||
help='List of tag names (one must match)', metavar='TAGS')
|
||||
tag_arg.completer = tags_completer # type: ignore
|
||||
extag_arg = tag_parser.add_argument('-e', '--extags', nargs='+',
|
||||
help='List of tag names to exclude', metavar='EXTAGS')
|
||||
extag_arg.completer = tags_completer # type: ignore
|
||||
atag_arg = tag_parser.add_argument('-a', '--atags', nargs='+',
|
||||
help='List of tag names (all must match)', metavar='TAGS')
|
||||
atag_arg.completer = tags_completer # type: ignore
|
||||
etag_arg = tag_parser.add_argument('-e', '--etags', nargs='+',
|
||||
help='List of tag names to exclude', metavar='ETAGS')
|
||||
etag_arg.completer = tags_completer # type: ignore
|
||||
otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+',
|
||||
help='List of output tag names, default is input', metavar='OTAGS')
|
||||
otag_arg.completer = tags_completer # type: ignore
|
||||
tag_parser.add_argument('-a', '--match-all-tags',
|
||||
help="All given tags must match when selecting chat history entries",
|
||||
action='store_true')
|
||||
# enable autocompletion for tags
|
||||
|
||||
# 'ask' command parser
|
||||
ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser],
|
||||
@ -170,7 +178,7 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
ask_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int,
|
||||
default=1)
|
||||
ask_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query')
|
||||
ask_cmd_parser.add_argument('-S', '--only-source-code', help='Add pure source code to the chat history',
|
||||
ask_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history',
|
||||
action='store_true')
|
||||
|
||||
# 'hist' command parser
|
||||
@ -178,23 +186,23 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
help="Print chat history.",
|
||||
aliases=['h'])
|
||||
hist_cmd_parser.set_defaults(func=hist_cmd)
|
||||
hist_cmd_parser.add_argument('-d', '--dump', help="Print chat history as Python structure",
|
||||
action='store_true')
|
||||
hist_cmd_parser.add_argument('-w', '--with-tags', help="Print chat history with tags.",
|
||||
action='store_true')
|
||||
hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.",
|
||||
action='store_true')
|
||||
hist_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code',
|
||||
hist_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
|
||||
action='store_true')
|
||||
|
||||
# 'tag' command parser
|
||||
tag_cmd_parser = cmdparser.add_parser('tag',
|
||||
help="Manage tags.",
|
||||
aliases=['t'])
|
||||
tag_cmd_parser.set_defaults(func=tag_cmd)
|
||||
tag_group = tag_cmd_parser.add_mutually_exclusive_group(required=True)
|
||||
tag_group.add_argument('-l', '--list', help="List all tags and their frequency",
|
||||
action='store_true')
|
||||
# 'tags' command parser
|
||||
tags_cmd_parser = cmdparser.add_parser('tags',
|
||||
help="Manage tags.",
|
||||
aliases=['t'])
|
||||
tags_cmd_parser.set_defaults(func=tags_cmd)
|
||||
tags_group = tags_cmd_parser.add_mutually_exclusive_group(required=True)
|
||||
tags_group.add_argument('-l', '--list', help="List all tags and their frequency",
|
||||
action='store_true')
|
||||
tags_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix")
|
||||
tags_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring")
|
||||
|
||||
# 'config' command parser
|
||||
config_cmd_parser = cmdparser.add_parser('config',
|
||||
@ -214,7 +222,7 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
aliases=['p'])
|
||||
print_cmd_parser.set_defaults(func=print_cmd)
|
||||
print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True)
|
||||
print_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code',
|
||||
print_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
|
||||
action='store_true')
|
||||
|
||||
argcomplete.autocomplete(parser)
|
||||
|
||||
@ -128,29 +128,29 @@ class ModelLine(str):
|
||||
return cls(' '.join([cls.prefix, model]))
|
||||
|
||||
|
||||
class Question(str):
|
||||
class Answer(str):
|
||||
"""
|
||||
A single question with a defined header.
|
||||
A single answer with a defined header.
|
||||
"""
|
||||
tokens: int = 0 # tokens used by this question
|
||||
txt_header: ClassVar[str] = '=== QUESTION ==='
|
||||
yaml_key: ClassVar[str] = 'question'
|
||||
tokens: int = 0 # tokens used by this answer
|
||||
txt_header: ClassVar[str] = '=== ANSWER ==='
|
||||
yaml_key: ClassVar[str] = 'answer'
|
||||
|
||||
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
||||
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||
"""
|
||||
Make sure the question string does not contain the header.
|
||||
Make sure the answer string does not contain the header as a whole line.
|
||||
"""
|
||||
if cls.txt_header in string:
|
||||
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
|
||||
if cls.txt_header in string.split('\n'):
|
||||
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
|
||||
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
||||
"""
|
||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||
"""
|
||||
if any(cls.txt_header in string for string in strings):
|
||||
if cls.txt_header in strings:
|
||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||
return instance
|
||||
@ -162,29 +162,33 @@ class Question(str):
|
||||
return source_code(self, include_delims)
|
||||
|
||||
|
||||
class Answer(str):
|
||||
class Question(str):
|
||||
"""
|
||||
A single answer with a defined header.
|
||||
A single question with a defined header.
|
||||
"""
|
||||
tokens: int = 0 # tokens used by this answer
|
||||
txt_header: ClassVar[str] = '=== ANSWER ==='
|
||||
yaml_key: ClassVar[str] = 'answer'
|
||||
tokens: int = 0 # tokens used by this question
|
||||
txt_header: ClassVar[str] = '=== QUESTION ==='
|
||||
yaml_key: ClassVar[str] = 'question'
|
||||
|
||||
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
||||
"""
|
||||
Make sure the answer string does not contain the header.
|
||||
Make sure the question string does not contain the header as a whole line
|
||||
(also not that from 'Answer', so it's always clear where the answer starts).
|
||||
"""
|
||||
if cls.txt_header in string:
|
||||
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
||||
string_lines = string.split('\n')
|
||||
if cls.txt_header in string_lines:
|
||||
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
|
||||
if Answer.txt_header in string_lines:
|
||||
raise MessageError(f"Question '{string}' contains the header '{Answer.txt_header}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
||||
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
|
||||
"""
|
||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||
"""
|
||||
if any(cls.txt_header in string for string in strings):
|
||||
if cls.txt_header in strings:
|
||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||
return instance
|
||||
|
||||
@ -78,8 +78,3 @@ def print_chat_hist(chat: ChatType, dump: bool = False, source_code: bool = Fals
|
||||
print(message['content'])
|
||||
else:
|
||||
print(f"{message['role'].upper()}: {message['content']}")
|
||||
|
||||
|
||||
def print_tags_frequency(tags: list[str]) -> None:
|
||||
for tag in sorted(set(tags)):
|
||||
print(f"- {tag}: {tags.count(tag)}")
|
||||
|
||||
@ -115,11 +115,12 @@ class TestHandleQuestion(CmmTestCase):
|
||||
self.question = "test question"
|
||||
self.args = argparse.Namespace(
|
||||
tags=['tag1'],
|
||||
extags=['extag1'],
|
||||
atags=None,
|
||||
etags=['etag1'],
|
||||
output_tags=None,
|
||||
question=[self.question],
|
||||
source=None,
|
||||
only_source_code=False,
|
||||
source_code_only=False,
|
||||
number=3,
|
||||
max_tokens=None,
|
||||
temperature=None,
|
||||
@ -143,16 +144,18 @@ class TestHandleQuestion(CmmTestCase):
|
||||
with patch("chatmastermind.storage.open", open_mock):
|
||||
ask_cmd(self.args, self.config)
|
||||
mock_print_tag_args.assert_called_once_with(self.args.tags,
|
||||
self.args.extags,
|
||||
self.args.etags,
|
||||
[])
|
||||
mock_create_chat_hist.assert_called_once_with(self.question,
|
||||
self.args.tags,
|
||||
self.args.extags,
|
||||
self.args.etags,
|
||||
self.config,
|
||||
False, False, False)
|
||||
match_all_tags=False,
|
||||
with_tags=False,
|
||||
with_file=False)
|
||||
mock_print_chat_hist.assert_called_once_with('test_chat',
|
||||
False,
|
||||
self.args.only_source_code)
|
||||
self.args.source_code_only)
|
||||
mock_ai.assert_called_with("test_chat",
|
||||
self.config,
|
||||
self.args.number)
|
||||
@ -227,7 +230,7 @@ class TestCreateParser(CmmTestCase):
|
||||
mock_add_subparsers.assert_called_once_with(dest='command', title='commands', description='supported commands', required=True)
|
||||
mock_cmdparser.add_parser.assert_any_call('ask', parents=ANY, help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('hist', parents=ANY, help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('tag', help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('tags', help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY)
|
||||
self.assertTrue('.config.yaml' in parser.get_default('config'))
|
||||
|
||||
@ -61,22 +61,39 @@ class SourceCodeTestCase(CmmTestCase):
|
||||
|
||||
|
||||
class QuestionTestCase(CmmTestCase):
|
||||
def test_question_with_prefix(self) -> None:
|
||||
def test_question_with_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Question("=== QUESTION === What is your name?")
|
||||
Question(f"{Question.txt_header}\nWhat is your name?")
|
||||
|
||||
def test_question_without_prefix(self) -> None:
|
||||
def test_question_with_answer_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Question(f"{Answer.txt_header}\nBob")
|
||||
|
||||
def test_question_with_legal_header(self) -> None:
|
||||
"""
|
||||
If the header is just a part of a line, it's fine.
|
||||
"""
|
||||
question = Question(f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
|
||||
self.assertIsInstance(question, Question)
|
||||
self.assertEqual(question, f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
|
||||
|
||||
def test_question_without_header(self) -> None:
|
||||
question = Question("What is your favorite color?")
|
||||
self.assertIsInstance(question, Question)
|
||||
self.assertEqual(question, "What is your favorite color?")
|
||||
|
||||
|
||||
class AnswerTestCase(CmmTestCase):
|
||||
def test_answer_with_prefix(self) -> None:
|
||||
def test_answer_with_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Answer("=== ANSWER === Yes")
|
||||
Answer(f"{Answer.txt_header}\nno")
|
||||
|
||||
def test_answer_without_prefix(self) -> None:
|
||||
def test_answer_with_legal_header(self) -> None:
|
||||
answer = Answer(f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
|
||||
self.assertIsInstance(answer, Answer)
|
||||
self.assertEqual(answer, f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
|
||||
|
||||
def test_answer_without_header(self) -> None:
|
||||
answer = Answer("No")
|
||||
self.assertIsInstance(answer, Answer)
|
||||
self.assertEqual(answer, "No")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user