Added tags filtering (prefix and contained string) to TagLine and Message
This commit is contained in:
parent
fc1b8006a0
commit
c7b99fe9b4
@ -219,21 +219,57 @@ class Message():
|
||||
file_path=data.get(cls.file_yaml_key, None))
|
||||
|
||||
@classmethod
|
||||
def tags_from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> set[Tag]:
|
||||
def tags_from_file(cls: Type[MessageInst],
|
||||
file_path: pathlib.Path,
|
||||
prefix: Optional[str] = None,
|
||||
contain: Optional[str] = None) -> set[Tag]:
|
||||
"""
|
||||
Return only the tags from the given Message file.
|
||||
Return only the tags from the given Message file,
|
||||
optionally filtered based on prefix or contained string.
|
||||
"""
|
||||
tags: set[Tag] = set()
|
||||
if not file_path.exists():
|
||||
raise MessageError(f"Message file '{file_path}' does not exist")
|
||||
if file_path.suffix not in cls.file_suffixes:
|
||||
raise MessageError(f"File type '{file_path.suffix}' is not supported")
|
||||
# for TXT, it's enough to read the TagLine
|
||||
if file_path.suffix == '.txt':
|
||||
with open(file_path, "r") as fd:
|
||||
tags = TagLine(fd.readline()).tags()
|
||||
try:
|
||||
tags = TagLine(fd.readline()).tags(prefix, contain)
|
||||
except TagError:
|
||||
pass # message without tags
|
||||
else: # '.yaml'
|
||||
with open(file_path, "r") as fd:
|
||||
data = yaml.load(fd, Loader=yaml.FullLoader)
|
||||
tags = set(sorted(data[cls.tags_yaml_key]))
|
||||
try:
|
||||
message = cls.from_file(file_path)
|
||||
if message:
|
||||
msg_tags = message.filter_tags(prefix=prefix, contain=contain)
|
||||
except MessageError as e:
|
||||
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||
if msg_tags:
|
||||
tags = msg_tags
|
||||
return tags
|
||||
|
||||
@classmethod
|
||||
def tags_from_dir(cls: Type[MessageInst],
|
||||
path: pathlib.Path,
|
||||
glob: Optional[str] = None,
|
||||
prefix: Optional[str] = None,
|
||||
contain: Optional[str] = None) -> set[Tag]:
|
||||
|
||||
"""
|
||||
Return only the tags from message files in the given directory.
|
||||
The files can be filtered using 'glob', the tags by using 'prefix'
|
||||
and 'contain'.
|
||||
"""
|
||||
tags: set[Tag] = set()
|
||||
file_iter = path.glob(glob) if glob else path.iterdir()
|
||||
for file_path in sorted(file_iter):
|
||||
if file_path.is_file():
|
||||
try:
|
||||
tags |= cls.tags_from_file(file_path, prefix, contain)
|
||||
except MessageError as e:
|
||||
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||
return tags
|
||||
|
||||
@classmethod
|
||||
@ -395,6 +431,19 @@ class Message():
|
||||
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
|
||||
yaml.dump(data, fd, sort_keys=False)
|
||||
|
||||
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
||||
"""
|
||||
Filter tags based on their prefix (i. e. the tag starts with a given string)
|
||||
or some contained string.
|
||||
"""
|
||||
res_tags = self.tags
|
||||
if res_tags:
|
||||
if prefix and len(prefix) > 0:
|
||||
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
|
||||
if contain and len(contain) > 0:
|
||||
res_tags -= {tag for tag in res_tags if contain not in tag}
|
||||
return res_tags or set()
|
||||
|
||||
def match(self, mfilter: MessageFilter) -> bool: # noqa: 13
|
||||
"""
|
||||
Matches the current Message to the given filter atttributes.
|
||||
|
||||
@ -118,9 +118,10 @@ class TagLine(str):
|
||||
"""
|
||||
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
|
||||
|
||||
def tags(self) -> set[Tag]:
|
||||
def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
||||
"""
|
||||
Returns all tags contained in this line as a set.
|
||||
Returns all tags contained in this line as a set, optionally
|
||||
filtered based on prefix or contained string.
|
||||
"""
|
||||
tagstr = self[len(self.prefix):].strip()
|
||||
separator = Tag.default_separator
|
||||
@ -130,7 +131,12 @@ class TagLine(str):
|
||||
if s in tagstr:
|
||||
separator = s
|
||||
break
|
||||
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
|
||||
res_tags = set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
|
||||
if prefix and len(prefix) > 0:
|
||||
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
|
||||
if contain and len(contain) > 0:
|
||||
res_tags -= {tag for tag in res_tags if contain not in tag}
|
||||
return res_tags or set()
|
||||
|
||||
def merge(self, taglines: set['TagLine']) -> 'TagLine':
|
||||
"""
|
||||
|
||||
@ -543,11 +543,19 @@ class TagsFromFileTestCase(CmmTestCase):
|
||||
self.file_txt = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
|
||||
self.file_path_txt = pathlib.Path(self.file_txt.name)
|
||||
with open(self.file_path_txt, "w") as fd:
|
||||
fd.write(f"""{TagLine.prefix} tag1 tag2
|
||||
fd.write(f"""{TagLine.prefix} tag1 tag2 ptag3
|
||||
{Question.txt_header}
|
||||
This is a question.
|
||||
{Answer.txt_header}
|
||||
This is an answer.
|
||||
""")
|
||||
self.file_txt_no_tags = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
|
||||
self.file_path_txt_no_tags = pathlib.Path(self.file_txt_no_tags.name)
|
||||
with open(self.file_path_txt_no_tags, "w") as fd:
|
||||
fd.write(f"""{Question.txt_header}
|
||||
This is a question.
|
||||
{Answer.txt_header}
|
||||
This is an answer.
|
||||
""")
|
||||
self.file_yaml = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
|
||||
self.file_path_yaml = pathlib.Path(self.file_yaml.name)
|
||||
@ -560,6 +568,16 @@ This is an answer.
|
||||
{Message.tags_yaml_key}:
|
||||
- tag1
|
||||
- tag2
|
||||
- ptag3
|
||||
""")
|
||||
self.file_yaml_no_tags = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
|
||||
self.file_path_yaml_no_tags = pathlib.Path(self.file_yaml_no_tags.name)
|
||||
with open(self.file_path_yaml_no_tags, "w") as fd:
|
||||
fd.write(f"""
|
||||
{Question.yaml_key}: |-
|
||||
This is a question.
|
||||
{Answer.yaml_key}: |-
|
||||
This is an answer.
|
||||
""")
|
||||
|
||||
def tearDown(self) -> None:
|
||||
@ -570,11 +588,90 @@ This is an answer.
|
||||
|
||||
def test_tags_from_file_txt(self) -> None:
|
||||
tags = Message.tags_from_file(self.file_path_txt)
|
||||
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2'), Tag('ptag3')})
|
||||
|
||||
def test_tags_from_file_txt_no_tags(self) -> None:
|
||||
tags = Message.tags_from_file(self.file_path_txt_no_tags)
|
||||
self.assertSetEqual(tags, set())
|
||||
|
||||
def test_tags_from_file_yaml(self) -> None:
|
||||
tags = Message.tags_from_file(self.file_path_yaml)
|
||||
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2'), Tag('ptag3')})
|
||||
|
||||
def test_tags_from_file_yaml_no_tags(self) -> None:
|
||||
tags = Message.tags_from_file(self.file_path_yaml_no_tags)
|
||||
self.assertSetEqual(tags, set())
|
||||
|
||||
def test_tags_from_file_txt_prefix(self) -> None:
|
||||
tags = Message.tags_from_file(self.file_path_txt, prefix='p')
|
||||
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||
tags = Message.tags_from_file(self.file_path_txt, prefix='R')
|
||||
self.assertSetEqual(tags, set())
|
||||
|
||||
def test_tags_from_file_yaml_prefix(self) -> None:
|
||||
tags = Message.tags_from_file(self.file_path_yaml, prefix='p')
|
||||
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||
tags = Message.tags_from_file(self.file_path_yaml, prefix='R')
|
||||
self.assertSetEqual(tags, set())
|
||||
|
||||
def test_tags_from_file_txt_contain(self) -> None:
|
||||
tags = Message.tags_from_file(self.file_path_txt, contain='3')
|
||||
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||
tags = Message.tags_from_file(self.file_path_txt, contain='R')
|
||||
self.assertSetEqual(tags, set())
|
||||
|
||||
def test_tags_from_file_yaml_contain(self) -> None:
|
||||
tags = Message.tags_from_file(self.file_path_yaml, contain='3')
|
||||
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||
tags = Message.tags_from_file(self.file_path_yaml, contain='R')
|
||||
self.assertSetEqual(tags, set())
|
||||
|
||||
|
||||
class TagsFromDirTestCase(CmmTestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.temp_dir_no_tags = tempfile.TemporaryDirectory()
|
||||
self.tag_sets = [
|
||||
{Tag('atag1'), Tag('atag2')},
|
||||
{Tag('btag3'), Tag('btag4')},
|
||||
{Tag('ctag5'), Tag('ctag6')}
|
||||
]
|
||||
self.files = [
|
||||
pathlib.Path(self.temp_dir.name, 'file1.txt'),
|
||||
pathlib.Path(self.temp_dir.name, 'file2.yaml'),
|
||||
pathlib.Path(self.temp_dir.name, 'file3.txt')
|
||||
]
|
||||
self.files_no_tags = [
|
||||
pathlib.Path(self.temp_dir_no_tags.name, 'file4.txt'),
|
||||
pathlib.Path(self.temp_dir_no_tags.name, 'file5.yaml'),
|
||||
pathlib.Path(self.temp_dir_no_tags.name, 'file6.txt')
|
||||
]
|
||||
for file, tags in zip(self.files, self.tag_sets):
|
||||
message = Message(Question('This is a question.'),
|
||||
Answer('This is an answer.'),
|
||||
tags)
|
||||
message.to_file(file)
|
||||
for file in self.files_no_tags:
|
||||
message = Message(Question('This is a question.'),
|
||||
Answer('This is an answer.'))
|
||||
message.to_file(file)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def test_tags_from_dir(self) -> None:
|
||||
all_tags = Message.tags_from_dir(pathlib.Path(self.temp_dir.name))
|
||||
expected_tags = self.tag_sets[0] | self.tag_sets[1] | self.tag_sets[2]
|
||||
self.assertEqual(all_tags, expected_tags)
|
||||
|
||||
def test_tags_from_dir_prefix(self) -> None:
|
||||
atags = Message.tags_from_dir(pathlib.Path(self.temp_dir.name), prefix='a')
|
||||
expected_tags = self.tag_sets[0]
|
||||
self.assertEqual(atags, expected_tags)
|
||||
|
||||
def test_tags_from_dir_no_tags(self) -> None:
|
||||
all_tags = Message.tags_from_dir(pathlib.Path(self.temp_dir_no_tags.name))
|
||||
self.assertSetEqual(all_tags, set())
|
||||
|
||||
|
||||
class MessageIDTestCase(CmmTestCase):
|
||||
|
||||
@ -40,15 +40,33 @@ class TestTagLine(CmmTestCase):
|
||||
self.assertEqual(tagline, 'TAGS: tag1 tag2')
|
||||
|
||||
def test_tags(self) -> None:
|
||||
tagline = TagLine('TAGS: tag1 tag2')
|
||||
tagline = TagLine('TAGS: atag1 btag2')
|
||||
tags = tagline.tags()
|
||||
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||
self.assertEqual(tags, {Tag('atag1'), Tag('btag2')})
|
||||
|
||||
def test_tags_with_newline(self) -> None:
|
||||
tagline = TagLine('TAGS: tag1\n tag2')
|
||||
tags = tagline.tags()
|
||||
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||
|
||||
def test_tags_prefix(self) -> None:
|
||||
tagline = TagLine('TAGS: atag1 stag2 stag3')
|
||||
tags = tagline.tags(prefix='a')
|
||||
self.assertSetEqual(tags, {Tag('atag1')})
|
||||
tags = tagline.tags(prefix='s')
|
||||
self.assertSetEqual(tags, {Tag('stag2'), Tag('stag3')})
|
||||
tags = tagline.tags(prefix='R')
|
||||
self.assertSetEqual(tags, set())
|
||||
|
||||
def test_tags_contain(self) -> None:
|
||||
tagline = TagLine('TAGS: atag1 stag2 stag3')
|
||||
tags = tagline.tags(contain='t')
|
||||
self.assertSetEqual(tags, {Tag('atag1'), Tag('stag2'), Tag('stag3')})
|
||||
tags = tagline.tags(contain='1')
|
||||
self.assertSetEqual(tags, {Tag('atag1')})
|
||||
tags = tagline.tags(contain='R')
|
||||
self.assertSetEqual(tags, set())
|
||||
|
||||
def test_merge(self) -> None:
|
||||
tagline1 = TagLine('TAGS: tag1 tag2')
|
||||
tagline2 = TagLine('TAGS: tag2 tag3')
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user