import pathlib import tempfile from typing import cast from .test_main import CmmTestCase from chatmastermind.message import source_code, Message, MessageError, Question, Answer, AILine, ModelLine, MessageFilter from chatmastermind.tags import Tag, TagLine class SourceCodeTestCase(CmmTestCase): def test_source_code_with_include_delims(self) -> None: text = """ Some text before the code block ```python print("Hello, World!") ``` Some text after the code block ```python x = 10 y = 20 print(x + y) ``` """ expected_result = [ " ```python\n print(\"Hello, World!\")\n ```\n", " ```python\n x = 10\n y = 20\n print(x + y)\n ```\n" ] result = source_code(text, include_delims=True) self.assertEqual(result, expected_result) def test_source_code_without_include_delims(self) -> None: text = """ Some text before the code block ```python print("Hello, World!") ``` Some text after the code block ```python x = 10 y = 20 print(x + y) ``` """ expected_result = [ " print(\"Hello, World!\")\n", " x = 10\n y = 20\n print(x + y)\n" ] result = source_code(text, include_delims=False) self.assertEqual(result, expected_result) def test_source_code_with_single_code_block(self) -> None: text = "```python\nprint(\"Hello, World!\")\n```" expected_result = ["```python\nprint(\"Hello, World!\")\n```\n"] result = source_code(text, include_delims=True) self.assertEqual(result, expected_result) def test_source_code_with_no_code_blocks(self) -> None: text = "Some text without any code blocks" expected_result: list[str] = [] result = source_code(text, include_delims=True) self.assertEqual(result, expected_result) class QuestionTestCase(CmmTestCase): def test_question_with_prefix(self) -> None: with self.assertRaises(MessageError): Question("=== QUESTION === What is your name?") def test_question_without_prefix(self) -> None: question = Question("What is your favorite color?") self.assertIsInstance(question, Question) self.assertEqual(question, "What is your favorite color?") class AnswerTestCase(CmmTestCase): def test_answer_with_prefix(self) -> None: with self.assertRaises(MessageError): Answer("=== ANSWER === Yes") def test_answer_without_prefix(self) -> None: answer = Answer("No") self.assertIsInstance(answer, Answer) self.assertEqual(answer, "No") class MessageToFileTxtTestCase(CmmTestCase): def setUp(self) -> None: self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt') self.file_path = pathlib.Path(self.file.name) self.message_complete = Message(Question('This is a question.'), Answer('This is an answer.'), {Tag('tag1'), Tag('tag2')}, ai='ChatGPT', model='gpt-3.5-turbo', file_path=self.file_path) self.message_min = Message(Question('This is a question.'), file_path=self.file_path) def tearDown(self) -> None: self.file.close() self.file_path.unlink() def test_to_file_txt_complete(self) -> None: self.message_complete.to_file(self.file_path) with open(self.file_path, "r") as fd: content = fd.read() expected_content = f"""{TagLine.prefix} tag1 tag2 {AILine.prefix} ChatGPT {ModelLine.prefix} gpt-3.5-turbo {Question.txt_header} This is a question. {Answer.txt_header} This is an answer. """ self.assertEqual(content, expected_content) def test_to_file_txt_min(self) -> None: self.message_min.to_file(self.file_path) with open(self.file_path, "r") as fd: content = fd.read() expected_content = f"""{Question.txt_header} This is a question. """ self.assertEqual(content, expected_content) def test_to_file_unsupported_file_type(self) -> None: unsupported_file_path = pathlib.Path("example.doc") with self.assertRaises(MessageError) as cm: self.message_complete.to_file(unsupported_file_path) self.assertEqual(str(cm.exception), "File type '.doc' is not supported") def test_to_file_no_file_path(self) -> None: """ Provoke an exception using an empty path. """ with self.assertRaises(MessageError) as cm: # clear the internal file_path self.message_complete.file_path = None self.message_complete.to_file(None) self.assertEqual(str(cm.exception), "Got no valid path to write message") # reset the internal file_path self.message_complete.file_path = self.file_path class MessageToFileYamlTestCase(CmmTestCase): def setUp(self) -> None: self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml') self.file_path = pathlib.Path(self.file.name) self.message_complete = Message(Question('This is a question.'), Answer('This is an answer.'), {Tag('tag1'), Tag('tag2')}, ai='ChatGPT', model='gpt-3.5-turbo', file_path=self.file_path) self.message_multiline = Message(Question('This is a\nmultiline question.'), Answer('This is a\nmultiline answer.'), {Tag('tag1'), Tag('tag2')}, ai='ChatGPT', model='gpt-3.5-turbo', file_path=self.file_path) self.message_min = Message(Question('This is a question.'), file_path=self.file_path) def tearDown(self) -> None: self.file.close() self.file_path.unlink() def test_to_file_yaml_complete(self) -> None: self.message_complete.to_file(self.file_path) with open(self.file_path, "r") as fd: content = fd.read() expected_content = f"""{Question.yaml_key}: This is a question. {Answer.yaml_key}: This is an answer. {Message.ai_yaml_key}: ChatGPT {Message.model_yaml_key}: gpt-3.5-turbo {Message.tags_yaml_key}: - tag1 - tag2 """ self.assertEqual(content, expected_content) def test_to_file_yaml_multiline(self) -> None: self.message_multiline.to_file(self.file_path) with open(self.file_path, "r") as fd: content = fd.read() expected_content = f"""{Question.yaml_key}: |- This is a multiline question. {Answer.yaml_key}: |- This is a multiline answer. {Message.ai_yaml_key}: ChatGPT {Message.model_yaml_key}: gpt-3.5-turbo {Message.tags_yaml_key}: - tag1 - tag2 """ self.assertEqual(content, expected_content) def test_to_file_yaml_min(self) -> None: self.message_min.to_file(self.file_path) with open(self.file_path, "r") as fd: content = fd.read() expected_content = f"{Question.yaml_key}: This is a question.\n" self.assertEqual(content, expected_content) class MessageFromFileTxtTestCase(CmmTestCase): def setUp(self) -> None: self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt') self.file_path = pathlib.Path(self.file.name) with open(self.file_path, "w") as fd: fd.write(f"""{TagLine.prefix} tag1 tag2 {AILine.prefix} ChatGPT {ModelLine.prefix} gpt-3.5-turbo {Question.txt_header} This is a question. {Answer.txt_header} This is an answer. """) self.file_min = tempfile.NamedTemporaryFile(delete=False, suffix='.txt') self.file_path_min = pathlib.Path(self.file_min.name) with open(self.file_path_min, "w") as fd: fd.write(f"""{Question.txt_header} This is a question. """) def tearDown(self) -> None: self.file.close() self.file_min.close() self.file_path.unlink() self.file_path_min.unlink() def test_from_file_txt_complete(self) -> None: """ Read a complete message (with all optional values). """ message = Message.from_file(self.file_path) self.assertIsNotNone(message) self.assertIsInstance(message, Message) if message: # mypy bug self.assertEqual(message.question, 'This is a question.') self.assertEqual(message.answer, 'This is an answer.') self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')}) self.assertEqual(message.ai, 'ChatGPT') self.assertEqual(message.model, 'gpt-3.5-turbo') self.assertEqual(message.file_path, self.file_path) def test_from_file_txt_min(self) -> None: """ Read a message with only required values. """ message = Message.from_file(self.file_path_min) self.assertIsNotNone(message) self.assertIsInstance(message, Message) if message: # mypy bug self.assertEqual(message.question, 'This is a question.') self.assertEqual(message.file_path, self.file_path_min) self.assertIsNone(message.answer) def test_from_file_txt_tags_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(tags_or={Tag('tag1')})) self.assertIsNotNone(message) self.assertIsInstance(message, Message) if message: # mypy bug self.assertEqual(message.question, 'This is a question.') self.assertEqual(message.answer, 'This is an answer.') self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')}) self.assertEqual(message.file_path, self.file_path) def test_from_file_txt_tags_dont_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(tags_or={Tag('tag3')})) self.assertIsNone(message) def test_from_file_txt_no_tags_dont_match(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(tags_or={Tag('tag1')})) self.assertIsNone(message) def test_from_file_txt_no_tags_match_tags_not(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(tags_not={Tag('tag1')})) self.assertIsNotNone(message) self.assertIsInstance(message, Message) if message: # mypy bug self.assertEqual(message.question, 'This is a question.') self.assertSetEqual(cast(set[Tag], message.tags), set()) self.assertEqual(message.file_path, self.file_path_min) def test_from_file_not_exists(self) -> None: file_not_exists = pathlib.Path("example.txt") with self.assertRaises(MessageError) as cm: Message.from_file(file_not_exists) self.assertEqual(str(cm.exception), f"Message file '{file_not_exists}' does not exist") def test_from_file_txt_question_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(question_contains='question')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_txt_answer_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(answer_contains='answer')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_txt_answer_available(self) -> None: message = Message.from_file(self.file_path, MessageFilter(answer_state='available')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_txt_answer_missing(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(answer_state='missing')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_txt_question_doesnt_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(question_contains='answer')) self.assertIsNone(message) def test_from_file_txt_answer_doesnt_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(answer_contains='question')) self.assertIsNone(message) def test_from_file_txt_answer_not_exists(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(answer_contains='answer')) self.assertIsNone(message) def test_from_file_txt_answer_not_available(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(answer_state='available')) self.assertIsNone(message) def test_from_file_txt_answer_not_missing(self) -> None: message = Message.from_file(self.file_path, MessageFilter(answer_state='missing')) self.assertIsNone(message) def test_from_file_txt_ai_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(ai='ChatGPT')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_txt_ai_doesnt_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(ai='Foo')) self.assertIsNone(message) def test_from_file_txt_model_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(model='gpt-3.5-turbo')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_txt_model_doesnt_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(model='Bar')) self.assertIsNone(message) class MessageFromFileYamlTestCase(CmmTestCase): def setUp(self) -> None: self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml') self.file_path = pathlib.Path(self.file.name) with open(self.file_path, "w") as fd: fd.write(f""" {Question.yaml_key}: |- This is a question. {Answer.yaml_key}: |- This is an answer. {Message.ai_yaml_key}: ChatGPT {Message.model_yaml_key}: gpt-3.5-turbo {Message.tags_yaml_key}: - tag1 - tag2 """) self.file_min = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml') self.file_path_min = pathlib.Path(self.file_min.name) with open(self.file_path_min, "w") as fd: fd.write(f""" {Question.yaml_key}: |- This is a question. """) def tearDown(self) -> None: self.file.close() self.file_path.unlink() self.file_min.close() self.file_path_min.unlink() def test_from_file_yaml_complete(self) -> None: """ Read a complete message (with all optional values). """ message = Message.from_file(self.file_path) self.assertIsInstance(message, Message) self.assertIsNotNone(message) if message: # mypy bug self.assertEqual(message.question, 'This is a question.') self.assertEqual(message.answer, 'This is an answer.') self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')}) self.assertEqual(message.ai, 'ChatGPT') self.assertEqual(message.model, 'gpt-3.5-turbo') self.assertEqual(message.file_path, self.file_path) def test_from_file_yaml_min(self) -> None: """ Read a message with only the required values. """ message = Message.from_file(self.file_path_min) self.assertIsInstance(message, Message) self.assertIsNotNone(message) if message: # mypy bug self.assertEqual(message.question, 'This is a question.') self.assertSetEqual(cast(set[Tag], message.tags), set()) self.assertEqual(message.file_path, self.file_path_min) self.assertIsNone(message.answer) def test_from_file_not_exists(self) -> None: file_not_exists = pathlib.Path("example.yaml") with self.assertRaises(MessageError) as cm: Message.from_file(file_not_exists) self.assertEqual(str(cm.exception), f"Message file '{file_not_exists}' does not exist") def test_from_file_yaml_tags_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(tags_or={Tag('tag1')})) self.assertIsNotNone(message) self.assertIsInstance(message, Message) if message: # mypy bug self.assertEqual(message.question, 'This is a question.') self.assertEqual(message.answer, 'This is an answer.') self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')}) self.assertEqual(message.file_path, self.file_path) def test_from_file_yaml_tags_dont_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(tags_or={Tag('tag3')})) self.assertIsNone(message) def test_from_file_yaml_no_tags_dont_match(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(tags_or={Tag('tag1')})) self.assertIsNone(message) def test_from_file_yaml_no_tags_match_tags_not(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(tags_not={Tag('tag1')})) self.assertIsNotNone(message) self.assertIsInstance(message, Message) if message: # mypy bug self.assertEqual(message.question, 'This is a question.') self.assertSetEqual(cast(set[Tag], message.tags), set()) self.assertEqual(message.file_path, self.file_path_min) def test_from_file_yaml_question_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(question_contains='question')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_yaml_answer_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(answer_contains='answer')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_yaml_answer_available(self) -> None: message = Message.from_file(self.file_path, MessageFilter(answer_state='available')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_yaml_answer_missing(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(answer_state='missing')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_yaml_question_doesnt_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(question_contains='answer')) self.assertIsNone(message) def test_from_file_yaml_answer_doesnt_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(answer_contains='question')) self.assertIsNone(message) def test_from_file_yaml_answer_not_exists(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(answer_contains='answer')) self.assertIsNone(message) def test_from_file_yaml_answer_not_available(self) -> None: message = Message.from_file(self.file_path_min, MessageFilter(answer_state='available')) self.assertIsNone(message) def test_from_file_yaml_answer_not_missing(self) -> None: message = Message.from_file(self.file_path, MessageFilter(answer_state='missing')) self.assertIsNone(message) def test_from_file_yaml_ai_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(ai='ChatGPT')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_yaml_ai_doesnt_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(ai='Foo')) self.assertIsNone(message) def test_from_file_yaml_model_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(model='gpt-3.5-turbo')) self.assertIsNotNone(message) self.assertIsInstance(message, Message) def test_from_file_yaml_model_doesnt_match(self) -> None: message = Message.from_file(self.file_path, MessageFilter(model='Bar')) self.assertIsNone(message) class TagsFromFileTestCase(CmmTestCase): def setUp(self) -> None: self.file_txt = tempfile.NamedTemporaryFile(delete=False, suffix='.txt') self.file_path_txt = pathlib.Path(self.file_txt.name) with open(self.file_path_txt, "w") as fd: fd.write(f"""{TagLine.prefix} tag1 tag2 ptag3 {Question.txt_header} This is a question. {Answer.txt_header} This is an answer. """) self.file_txt_no_tags = tempfile.NamedTemporaryFile(delete=False, suffix='.txt') self.file_path_txt_no_tags = pathlib.Path(self.file_txt_no_tags.name) with open(self.file_path_txt_no_tags, "w") as fd: fd.write(f"""{Question.txt_header} This is a question. {Answer.txt_header} This is an answer. """) self.file_yaml = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml') self.file_path_yaml = pathlib.Path(self.file_yaml.name) with open(self.file_path_yaml, "w") as fd: fd.write(f""" {Question.yaml_key}: |- This is a question. {Answer.yaml_key}: |- This is an answer. {Message.tags_yaml_key}: - tag1 - tag2 - ptag3 """) self.file_yaml_no_tags = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml') self.file_path_yaml_no_tags = pathlib.Path(self.file_yaml_no_tags.name) with open(self.file_path_yaml_no_tags, "w") as fd: fd.write(f""" {Question.yaml_key}: |- This is a question. {Answer.yaml_key}: |- This is an answer. """) def tearDown(self) -> None: self.file_txt.close() self.file_path_txt.unlink() self.file_yaml.close() self.file_path_yaml.unlink() def test_tags_from_file_txt(self) -> None: tags = Message.tags_from_file(self.file_path_txt) self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2'), Tag('ptag3')}) def test_tags_from_file_txt_no_tags(self) -> None: tags = Message.tags_from_file(self.file_path_txt_no_tags) self.assertSetEqual(tags, set()) def test_tags_from_file_yaml(self) -> None: tags = Message.tags_from_file(self.file_path_yaml) self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2'), Tag('ptag3')}) def test_tags_from_file_yaml_no_tags(self) -> None: tags = Message.tags_from_file(self.file_path_yaml_no_tags) self.assertSetEqual(tags, set()) def test_tags_from_file_txt_prefix(self) -> None: tags = Message.tags_from_file(self.file_path_txt, prefix='p') self.assertSetEqual(tags, {Tag('ptag3')}) def test_tags_from_file_yaml_prefix(self) -> None: tags = Message.tags_from_file(self.file_path_yaml, prefix='p') self.assertSetEqual(tags, {Tag('ptag3')}) class TagsFromDirTestCase(CmmTestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.temp_dir_no_tags = tempfile.TemporaryDirectory() self.tag_sets = [ {Tag('atag1'), Tag('atag2')}, {Tag('btag3'), Tag('btag4')}, {Tag('ctag5'), Tag('ctag6')} ] self.files = [ pathlib.Path(self.temp_dir.name, 'file1.txt'), pathlib.Path(self.temp_dir.name, 'file2.yaml'), pathlib.Path(self.temp_dir.name, 'file3.txt') ] self.files_no_tags = [ pathlib.Path(self.temp_dir_no_tags.name, 'file4.txt'), pathlib.Path(self.temp_dir_no_tags.name, 'file5.yaml'), pathlib.Path(self.temp_dir_no_tags.name, 'file6.txt') ] for file, tags in zip(self.files, self.tag_sets): message = Message(Question('This is a question.'), Answer('This is an answer.'), tags) message.to_file(file) for file in self.files_no_tags: message = Message(Question('This is a question.'), Answer('This is an answer.')) message.to_file(file) def tearDown(self) -> None: self.temp_dir.cleanup() def test_tags_from_dir(self) -> None: all_tags = Message.tags_from_dir(pathlib.Path(self.temp_dir.name)) expected_tags = self.tag_sets[0] | self.tag_sets[1] | self.tag_sets[2] self.assertEqual(all_tags, expected_tags) def test_tags_from_dir_prefix(self) -> None: atags = Message.tags_from_dir(pathlib.Path(self.temp_dir.name), prefix='a') expected_tags = self.tag_sets[0] self.assertEqual(atags, expected_tags) def test_tags_from_dir_no_tags(self) -> None: all_tags = Message.tags_from_dir(pathlib.Path(self.temp_dir_no_tags.name)) self.assertSetEqual(all_tags, set()) class MessageIDTestCase(CmmTestCase): def setUp(self) -> None: self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt') self.file_path = pathlib.Path(self.file.name) self.message = Message(Question('This is a question.'), file_path=self.file_path) self.message_no_file_path = Message(Question('This is a question.')) def tearDown(self) -> None: self.file.close() self.file_path.unlink() def test_msg_id_txt(self) -> None: self.assertEqual(self.message.msg_id(), self.file_path.name) def test_msg_id_txt_exception(self) -> None: with self.assertRaises(MessageError): self.message_no_file_path.msg_id() class MessageHashTestCase(CmmTestCase): def setUp(self) -> None: self.message1 = Message(Question('This is a question.'), tags={Tag('tag1')}, file_path=pathlib.Path('/tmp/foo/bla')) self.message2 = Message(Question('This is a new question.'), file_path=pathlib.Path('/tmp/foo/bla')) self.message3 = Message(Question('This is a question.'), Answer('This is an answer.'), file_path=pathlib.Path('/tmp/foo/bla')) # message4 is a copy of message1, because only question and # answer are used for hashing and comparison self.message4 = Message(Question('This is a question.'), tags={Tag('tag1'), Tag('tag2')}, ai='Blabla', file_path=pathlib.Path('foobla')) def test_set_hashing(self) -> None: msgs: set[Message] = {self.message1, self.message2, self.message3, self.message4} self.assertEqual(len(msgs), 3) for msg in [self.message1, self.message2, self.message3]: self.assertIn(msg, msgs)