From 2aa397d609e8ef12790fbede786170850d9fb7e9 Mon Sep 17 00:00:00 2001 From: juk0de Date: Fri, 18 Aug 2023 16:07:50 +0200 Subject: [PATCH] added new module 'message.py' --- chatmastermind/message.py | 135 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 chatmastermind/message.py diff --git a/chatmastermind/message.py b/chatmastermind/message.py new file mode 100644 index 0000000..5136a4e --- /dev/null +++ b/chatmastermind/message.py @@ -0,0 +1,135 @@ +""" +Module implementing message related functions and classes. +""" +import pathlib +from typing import Type, TypeVar, Optional, Any, Final +from dataclasses import dataclass, asdict, field +from .tags import Tag + +QuestionInst = TypeVar('QuestionInst', bound='Question') +AnswerInst = TypeVar('AnswerInst', bound='Answer') +MessageInst = TypeVar('MessageInst', bound='Message') + + +class MessageError(Exception): + pass + + +def source_code(text: str, include_delims: bool = False) -> list[str]: + """ + Extract all source code sections from the given text, i. e. all lines + surrounded by lines tarting with '```'. If 'include_delims' is True, + the surrounding lines are included, otherwise they are omitted. The + result list contains every source code section as a single string. + The order in the list represents the order of the sections in the text. + """ + code_sections: list[str] = [] + code_lines: list[str] = [] + in_code_block = False + + for line in text.split('\n'): + if line.strip().startswith('```'): + if include_delims: + code_lines.append(line) + if in_code_block: + code_sections.append('\n'.join(code_lines) + '\n') + code_lines.clear() + in_code_block = not in_code_block + elif in_code_block: + code_lines.append(line) + + return code_sections + + +class Question(str): + """ + A single question with a defined header. + """ + header: Final[str] = '=== QUESTION ===' + + def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst: + """ + Make sure the question string does not contain the header. + """ + if cls.header in string: + raise MessageError(f"Question '{string}' contains the header '{cls.header}'") + instance = super().__new__(cls, string) + return instance + + def source_code(self, include_delims: bool = False) -> list[str]: + """ + Extract and return all source code sections. + """ + return source_code(self, include_delims) + + +class Answer(str): + """ + A single answer with a defined header. + """ + header: Final[str] = '=== ANSWER ===' + + def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst: + """ + Make sure the answer string does not contain the header. + """ + if cls.header in string: + raise MessageError(f"Answer '{string}' contains the header '{cls.header}'") + instance = super().__new__(cls, string) + return instance + + def source_code(self, include_delims: bool = False) -> list[str]: + """ + Extract and return all source code sections. + """ + return source_code(self, include_delims) + + +@dataclass +class Message(): + """ + Single message. Consists of a question and optionally an answer, a set of tags + and a file path. + """ + question: Question + answer: Optional[Answer] + tags: Optional[set[Tag]] + file_path: Optional[pathlib.Path] + file_suffixes: Final[list[str]] = field(default_factory=lambda: ['.txt', '.yaml']) + + # @classmethod + # def from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> MessageInst: + # """ + # Create a Message from the given file. Expects the following file structure: + # * TagLine (from 'self.tags') + # * Question.Header + # * Question + # * Answer.Header + # """ + # if file_path: + # self.file_path = file_path + # if not self.file_path: + # raise MessageError("Got no valid path to read message") + # if self.file_path.suffix not in self.file_suffixes: + # raise MessageError(f"File type '{self.file_path.suffix}' is not supported") + # pass + + def to_file(self, file_path: Optional[pathlib.Path]) -> None: + """ + Write Message to the given file. Creates the following file structure: + * TagLine (from 'self.tags') + * Question.Header + * Question + * Answer.Header + * Answer + """ + if file_path: + self.file_path = file_path + if not self.file_path: + raise MessageError("Got no valid path to write message") + if self.file_path.suffix not in self.file_suffixes: + raise MessageError(f"File type '{self.file_path.suffix}' is not supported") + pass + + def asdict(self) -> dict[str, Any]: + return asdict(self)