Compare commits

...

4 Commits

7 changed files with 108 additions and 77 deletions

View File

@ -146,7 +146,7 @@ class Chat:
return sum(m.tokens() for m in self.messages) return sum(m.tokens() for m in self.messages)
def print(self, dump: bool = False, source_code_only: bool = False, def print(self, dump: bool = False, source_code_only: bool = False,
with_tags: bool = False, with_file: bool = False, with_tags: bool = False, with_files: bool = False,
paged: bool = True) -> None: paged: bool = True) -> None:
if dump: if dump:
pp(self) pp(self)
@ -157,15 +157,15 @@ class Chat:
output.extend(source_code(message.question, include_delims=True)) output.extend(source_code(message.question, include_delims=True))
continue continue
output.append('-' * terminal_width()) output.append('-' * terminal_width())
if with_tags:
output.append(message.tags_str())
if with_files:
output.append('FILE: ' + str(message.file_path))
output.append(Question.txt_header) output.append(Question.txt_header)
output.append(message.question) output.append(message.question)
if message.answer: if message.answer:
output.append(Answer.txt_header) output.append(Answer.txt_header)
output.append(message.answer) output.append(message.answer)
if with_tags:
output.append(message.tags_str())
if with_file:
output.append('FILE: ' + str(message.file_path))
if paged: if paged:
print_paged('\n'.join(output)) print_paged('\n'.join(output))
else: else:

View File

@ -7,10 +7,12 @@ import sys
import argcomplete import argcomplete
import argparse import argparse
import pathlib import pathlib
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, print_tags_frequency, ChatType from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, ChatType
from .storage import save_answers, create_chat_hist, get_tags, get_tags_unique, read_file, dump_data from .storage import save_answers, create_chat_hist, get_tags_unique, read_file, dump_data
from .api_client import ai, openai_api_key, print_models from .api_client import ai, openai_api_key, print_models
from .configuration import Config from .configuration import Config
from .chat import ChatDB
from .message import MessageFilter
from itertools import zip_longest from itertools import zip_longest
from typing import Any from typing import Any
@ -31,11 +33,11 @@ def create_question_with_hist(args: argparse.Namespace,
by the specified tags. by the specified tags.
""" """
tags = args.tags or [] tags = args.tags or []
extags = args.extags or [] etags = args.etags or []
otags = args.output_tags or [] otags = args.output_tags or []
if not args.only_source_code: if not args.source_code_only:
print_tag_args(tags, extags, otags) print_tag_args(tags, etags, otags)
question_parts = [] question_parts = []
question_list = args.question if args.question is not None else [] question_list = args.question if args.question is not None else []
@ -52,8 +54,10 @@ def create_question_with_hist(args: argparse.Namespace,
question_parts.append(f"```\n{r.read().strip()}\n```") question_parts.append(f"```\n{r.read().strip()}\n```")
full_question = '\n\n'.join(question_parts) full_question = '\n\n'.join(question_parts)
chat = create_chat_hist(full_question, tags, extags, config, chat = create_chat_hist(full_question, tags, etags, config,
args.match_all_tags, False, False) match_all_tags=True if args.atags else False, # FIXME
with_tags=False,
with_file=False)
return chat, full_question, tags return chat, full_question, tags
@ -61,8 +65,12 @@ def tag_cmd(args: argparse.Namespace, config: Config) -> None:
""" """
Handler for the 'tag' command. Handler for the 'tag' command.
""" """
chat = ChatDB.from_dir(cache_path=pathlib.Path('.'),
db_path=pathlib.Path(config.db))
if args.list: if args.list:
print_tags_frequency(get_tags(config, None)) tags_freq = chat.tags_frequency(args.prefix, args.contain)
for tag, freq in tags_freq.items():
print(f"- {tag}: {freq}")
def config_cmd(args: argparse.Namespace, config: Config) -> None: def config_cmd(args: argparse.Namespace, config: Config) -> None:
@ -89,7 +97,7 @@ def ask_cmd(args: argparse.Namespace, config: Config) -> None:
if args.model: if args.model:
config.openai.model = args.model config.openai.model = args.model
chat, question, tags = create_question_with_hist(args, config) chat, question, tags = create_question_with_hist(args, config)
print_chat_hist(chat, False, args.only_source_code) print_chat_hist(chat, False, args.source_code_only)
otags = args.output_tags or [] otags = args.output_tags or []
answers, usage = ai(chat, config, args.number) answers, usage = ai(chat, config, args.number)
save_answers(question, answers, tags, otags, config) save_answers(question, answers, tags, otags, config)
@ -101,14 +109,17 @@ def hist_cmd(args: argparse.Namespace, config: Config) -> None:
""" """
Handler for the 'hist' command. Handler for the 'hist' command.
""" """
tags = args.tags or []
extags = args.extags or []
chat = create_chat_hist(None, tags, extags, config, mfilter = MessageFilter(tags_or=args.tags,
args.match_all_tags, tags_and=args.atags,
args.with_tags, tags_not=args.etags)
args.with_files) chat = ChatDB.from_dir(pathlib.Path('.'),
print_chat_hist(chat, args.dump, args.only_source_code) pathlib.Path(config.db),
mfilter=mfilter)
chat.print(args.dump,
args.source_code_only,
args.with_tags,
args.with_files)
def print_cmd(args: argparse.Namespace, config: Config) -> None: def print_cmd(args: argparse.Namespace, config: Config) -> None:
@ -124,7 +135,7 @@ def print_cmd(args: argparse.Namespace, config: Config) -> None:
else: else:
print(f"Unknown file type: {args.file}") print(f"Unknown file type: {args.file}")
sys.exit(1) sys.exit(1)
if args.only_source_code: if args.source_code_only:
display_source_code(data['answer']) display_source_code(data['answer'])
else: else:
print(dump_data(data).strip()) print(dump_data(data).strip())
@ -144,18 +155,17 @@ def create_parser() -> argparse.ArgumentParser:
# a parent parser for all commands that support tag selection # a parent parser for all commands that support tag selection
tag_parser = argparse.ArgumentParser(add_help=False) tag_parser = argparse.ArgumentParser(add_help=False)
tag_arg = tag_parser.add_argument('-t', '--tags', nargs='+', tag_arg = tag_parser.add_argument('-t', '--tags', nargs='+',
help='List of tag names', metavar='TAGS') help='List of tag names (one must match)', metavar='TAGS')
tag_arg.completer = tags_completer # type: ignore tag_arg.completer = tags_completer # type: ignore
extag_arg = tag_parser.add_argument('-e', '--extags', nargs='+', atag_arg = tag_parser.add_argument('-a', '--atags', nargs='+',
help='List of tag names to exclude', metavar='EXTAGS') help='List of tag names (all must match)', metavar='TAGS')
extag_arg.completer = tags_completer # type: ignore atag_arg.completer = tags_completer # type: ignore
etag_arg = tag_parser.add_argument('-e', '--etags', nargs='+',
help='List of tag names to exclude', metavar='ETAGS')
etag_arg.completer = tags_completer # type: ignore
otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+', otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+',
help='List of output tag names, default is input', metavar='OTAGS') help='List of output tag names, default is input', metavar='OTAGS')
otag_arg.completer = tags_completer # type: ignore otag_arg.completer = tags_completer # type: ignore
tag_parser.add_argument('-a', '--match-all-tags',
help="All given tags must match when selecting chat history entries",
action='store_true')
# enable autocompletion for tags
# 'ask' command parser # 'ask' command parser
ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser], ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser],
@ -170,7 +180,7 @@ def create_parser() -> argparse.ArgumentParser:
ask_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int, ask_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int,
default=1) default=1)
ask_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query') ask_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query')
ask_cmd_parser.add_argument('-S', '--only-source-code', help='Add pure source code to the chat history', ask_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history',
action='store_true') action='store_true')
# 'hist' command parser # 'hist' command parser
@ -184,7 +194,7 @@ def create_parser() -> argparse.ArgumentParser:
action='store_true') action='store_true')
hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.", hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.",
action='store_true') action='store_true')
hist_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code', hist_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
action='store_true') action='store_true')
# 'tag' command parser # 'tag' command parser
@ -195,6 +205,8 @@ def create_parser() -> argparse.ArgumentParser:
tag_group = tag_cmd_parser.add_mutually_exclusive_group(required=True) tag_group = tag_cmd_parser.add_mutually_exclusive_group(required=True)
tag_group.add_argument('-l', '--list', help="List all tags and their frequency", tag_group.add_argument('-l', '--list', help="List all tags and their frequency",
action='store_true') action='store_true')
tag_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix")
tag_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring")
# 'config' command parser # 'config' command parser
config_cmd_parser = cmdparser.add_parser('config', config_cmd_parser = cmdparser.add_parser('config',
@ -214,7 +226,7 @@ def create_parser() -> argparse.ArgumentParser:
aliases=['p']) aliases=['p'])
print_cmd_parser.set_defaults(func=print_cmd) print_cmd_parser.set_defaults(func=print_cmd)
print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True) print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True)
print_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code', print_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
action='store_true') action='store_true')
argcomplete.autocomplete(parser) argcomplete.autocomplete(parser)

View File

@ -128,29 +128,29 @@ class ModelLine(str):
return cls(' '.join([cls.prefix, model])) return cls(' '.join([cls.prefix, model]))
class Question(str): class Answer(str):
""" """
A single question with a defined header. A single answer with a defined header.
""" """
tokens: int = 0 # tokens used by this question tokens: int = 0 # tokens used by this answer
txt_header: ClassVar[str] = '=== QUESTION ===' txt_header: ClassVar[str] = '=== ANSWER ==='
yaml_key: ClassVar[str] = 'question' yaml_key: ClassVar[str] = 'answer'
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst: def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
""" """
Make sure the question string does not contain the header. Make sure the answer string does not contain the header as a whole line.
""" """
if cls.txt_header in string: if cls.txt_header in string.split('\n'):
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'") raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
instance = super().__new__(cls, string) instance = super().__new__(cls, string)
return instance return instance
@classmethod @classmethod
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst: def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
""" """
Build Question from a list of strings. Make sure strings do not contain the header. Build Question from a list of strings. Make sure strings do not contain the header.
""" """
if any(cls.txt_header in string for string in strings): if cls.txt_header in strings:
raise MessageError(f"Question contains the header '{cls.txt_header}'") raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip()) instance = super().__new__(cls, '\n'.join(strings).strip())
return instance return instance
@ -162,29 +162,33 @@ class Question(str):
return source_code(self, include_delims) return source_code(self, include_delims)
class Answer(str): class Question(str):
""" """
A single answer with a defined header. A single question with a defined header.
""" """
tokens: int = 0 # tokens used by this answer tokens: int = 0 # tokens used by this question
txt_header: ClassVar[str] = '=== ANSWER ===' txt_header: ClassVar[str] = '=== QUESTION ==='
yaml_key: ClassVar[str] = 'answer' yaml_key: ClassVar[str] = 'question'
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst: def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
""" """
Make sure the answer string does not contain the header. Make sure the question string does not contain the header as a whole line
(also not that from 'Answer', so it's always clear where the answer starts).
""" """
if cls.txt_header in string: string_lines = string.split('\n')
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'") if cls.txt_header in string_lines:
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
if Answer.txt_header in string_lines:
raise MessageError(f"Question '{string}' contains the header '{Answer.txt_header}'")
instance = super().__new__(cls, string) instance = super().__new__(cls, string)
return instance return instance
@classmethod @classmethod
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst: def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
""" """
Build Question from a list of strings. Make sure strings do not contain the header. Build Question from a list of strings. Make sure strings do not contain the header.
""" """
if any(cls.txt_header in string for string in strings): if cls.txt_header in strings:
raise MessageError(f"Question contains the header '{cls.txt_header}'") raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip()) instance = super().__new__(cls, '\n'.join(strings).strip())
return instance return instance

View File

@ -78,8 +78,3 @@ def print_chat_hist(chat: ChatType, dump: bool = False, source_code: bool = Fals
print(message['content']) print(message['content'])
else: else:
print(f"{message['role'].upper()}: {message['content']}") print(f"{message['role'].upper()}: {message['content']}")
def print_tags_frequency(tags: list[str]) -> None:
for tag in sorted(set(tags)):
print(f"- {tag}: {tags.count(tag)}")

View File

@ -82,21 +82,21 @@ Answer 2
@patch('sys.stdout', new_callable=StringIO) @patch('sys.stdout', new_callable=StringIO)
def test_print_with_tags_and_file(self, mock_stdout: StringIO) -> None: def test_print_with_tags_and_file(self, mock_stdout: StringIO) -> None:
self.chat.add_msgs([self.message1, self.message2]) self.chat.add_msgs([self.message1, self.message2])
self.chat.print(paged=False, with_tags=True, with_file=True) self.chat.print(paged=False, with_tags=True, with_files=True)
expected_output = f"""{'-'*terminal_width()} expected_output = f"""{'-'*terminal_width()}
{TagLine.prefix} atag1 btag2
FILE: 0001.txt
{Question.txt_header} {Question.txt_header}
Question 1 Question 1
{Answer.txt_header} {Answer.txt_header}
Answer 1 Answer 1
{TagLine.prefix} atag1 btag2
FILE: 0001.txt
{'-'*terminal_width()} {'-'*terminal_width()}
{TagLine.prefix} btag2
FILE: 0002.txt
{Question.txt_header} {Question.txt_header}
Question 2 Question 2
{Answer.txt_header} {Answer.txt_header}
Answer 2 Answer 2
{TagLine.prefix} btag2
FILE: 0002.txt
""" """
self.assertEqual(mock_stdout.getvalue(), expected_output) self.assertEqual(mock_stdout.getvalue(), expected_output)

View File

@ -115,11 +115,12 @@ class TestHandleQuestion(CmmTestCase):
self.question = "test question" self.question = "test question"
self.args = argparse.Namespace( self.args = argparse.Namespace(
tags=['tag1'], tags=['tag1'],
extags=['extag1'], atags=None,
etags=['etag1'],
output_tags=None, output_tags=None,
question=[self.question], question=[self.question],
source=None, source=None,
only_source_code=False, source_code_only=False,
number=3, number=3,
max_tokens=None, max_tokens=None,
temperature=None, temperature=None,
@ -143,16 +144,18 @@ class TestHandleQuestion(CmmTestCase):
with patch("chatmastermind.storage.open", open_mock): with patch("chatmastermind.storage.open", open_mock):
ask_cmd(self.args, self.config) ask_cmd(self.args, self.config)
mock_print_tag_args.assert_called_once_with(self.args.tags, mock_print_tag_args.assert_called_once_with(self.args.tags,
self.args.extags, self.args.etags,
[]) [])
mock_create_chat_hist.assert_called_once_with(self.question, mock_create_chat_hist.assert_called_once_with(self.question,
self.args.tags, self.args.tags,
self.args.extags, self.args.etags,
self.config, self.config,
False, False, False) match_all_tags=False,
with_tags=False,
with_file=False)
mock_print_chat_hist.assert_called_once_with('test_chat', mock_print_chat_hist.assert_called_once_with('test_chat',
False, False,
self.args.only_source_code) self.args.source_code_only)
mock_ai.assert_called_with("test_chat", mock_ai.assert_called_with("test_chat",
self.config, self.config,
self.args.number) self.args.number)

View File

@ -61,22 +61,39 @@ class SourceCodeTestCase(CmmTestCase):
class QuestionTestCase(CmmTestCase): class QuestionTestCase(CmmTestCase):
def test_question_with_prefix(self) -> None: def test_question_with_header(self) -> None:
with self.assertRaises(MessageError): with self.assertRaises(MessageError):
Question("=== QUESTION === What is your name?") Question(f"{Question.txt_header}\nWhat is your name?")
def test_question_without_prefix(self) -> None: def test_question_with_answer_header(self) -> None:
with self.assertRaises(MessageError):
Question(f"{Answer.txt_header}\nBob")
def test_question_with_legal_header(self) -> None:
"""
If the header is just a part of a line, it's fine.
"""
question = Question(f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
self.assertIsInstance(question, Question)
self.assertEqual(question, f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
def test_question_without_header(self) -> None:
question = Question("What is your favorite color?") question = Question("What is your favorite color?")
self.assertIsInstance(question, Question) self.assertIsInstance(question, Question)
self.assertEqual(question, "What is your favorite color?") self.assertEqual(question, "What is your favorite color?")
class AnswerTestCase(CmmTestCase): class AnswerTestCase(CmmTestCase):
def test_answer_with_prefix(self) -> None: def test_answer_with_header(self) -> None:
with self.assertRaises(MessageError): with self.assertRaises(MessageError):
Answer("=== ANSWER === Yes") Answer(f"{Answer.txt_header}\nno")
def test_answer_without_prefix(self) -> None: def test_answer_with_legal_header(self) -> None:
answer = Answer(f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
self.assertIsInstance(answer, Answer)
self.assertEqual(answer, f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
def test_answer_without_header(self) -> None:
answer = Answer("No") answer = Answer("No")
self.assertIsInstance(answer, Answer) self.assertIsInstance(answer, Answer)
self.assertEqual(answer, "No") self.assertEqual(answer, "No")