Compare commits

..

2 Commits

4 changed files with 69 additions and 22 deletions

View File

@ -221,48 +221,53 @@ class Message():
@classmethod @classmethod
def tags_from_file(cls: Type[MessageInst], def tags_from_file(cls: Type[MessageInst],
file_path: pathlib.Path, file_path: pathlib.Path,
prefix: Optional[str] = None) -> set[Tag]: prefix: Optional[str] = None,
contain: Optional[str] = None) -> set[Tag]:
""" """
Return only the tags from the given Message file, Return only the tags from the given Message file,
optionally filtered based on prefix. optionally filtered based on prefix or contained string.
""" """
tags: set[Tag] = set() tags: set[Tag] = set()
if not file_path.exists(): if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist") raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes: if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported") raise MessageError(f"File type '{file_path.suffix}' is not supported")
# for TXT, it's enough to read the TagLine
if file_path.suffix == '.txt': if file_path.suffix == '.txt':
with open(file_path, "r") as fd: with open(file_path, "r") as fd:
try: try:
tags = TagLine(fd.readline()).tags(prefix) tags = TagLine(fd.readline()).tags(prefix, contain)
except TagError: except TagError:
pass # message without tags pass # message without tags
else: # '.yaml' else: # '.yaml'
with open(file_path, "r") as fd: try:
data = yaml.load(fd, Loader=yaml.FullLoader) message = cls.from_file(file_path)
if cls.tags_yaml_key in data: if message:
if prefix and len(prefix) > 0: msg_tags = message.filter_tags(prefix=prefix, contain=contain)
tags = set(sorted([t.strip() for t in data[cls.tags_yaml_key] if t.startswith(prefix)])) except MessageError as e:
else: print(f"Error processing message in '{file_path}': {str(e)}")
tags = set(sorted(data[cls.tags_yaml_key])) if msg_tags:
tags = msg_tags
return tags return tags
@classmethod @classmethod
def tags_from_dir(cls: Type[MessageInst], def tags_from_dir(cls: Type[MessageInst],
path: pathlib.Path, path: pathlib.Path,
glob: Optional[str] = None, glob: Optional[str] = None,
prefix: Optional[str] = None) -> set[Tag]: prefix: Optional[str] = None,
contain: Optional[str] = None) -> set[Tag]:
""" """
Return only the tags from message files in the given directory. Return only the tags from message files in the given directory.
The files can be filtered using 'glob', the tags by using 'prefix'. The files can be filtered using 'glob', the tags by using 'prefix'
and 'contain'.
""" """
tags: set[Tag] = set() tags: set[Tag] = set()
file_iter = path.glob(glob) if glob else path.iterdir() file_iter = path.glob(glob) if glob else path.iterdir()
for file_path in sorted(file_iter): for file_path in sorted(file_iter):
if file_path.is_file(): if file_path.is_file():
try: try:
tags |= cls.tags_from_file(file_path, prefix) tags |= cls.tags_from_file(file_path, prefix, contain)
except MessageError as e: except MessageError as e:
print(f"Error processing message in '{file_path}': {str(e)}") print(f"Error processing message in '{file_path}': {str(e)}")
return tags return tags
@ -426,6 +431,19 @@ class Message():
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags]) data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
yaml.dump(data, fd, sort_keys=False) yaml.dump(data, fd, sort_keys=False)
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
"""
Filter tags based on their prefix (i. e. the tag starts with a given string)
or some contained string.
"""
res_tags = self.tags
if res_tags:
if prefix and len(prefix) > 0:
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
if contain and len(contain) > 0:
res_tags -= {tag for tag in res_tags if contain not in tag}
return res_tags or set()
def match(self, mfilter: MessageFilter) -> bool: # noqa: 13 def match(self, mfilter: MessageFilter) -> bool: # noqa: 13
""" """
Matches the current Message to the given filter atttributes. Matches the current Message to the given filter atttributes.

View File

@ -118,10 +118,10 @@ class TagLine(str):
""" """
return cls(' '.join([cls.prefix] + sorted([t for t in tags]))) return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
def tags(self, prefix: Optional[str] = None) -> set[Tag]: def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
""" """
Returns all tags contained in this line as a set, optionally Returns all tags contained in this line as a set, optionally
filtered based on prefix. filtered based on prefix or contained string.
""" """
tagstr = self[len(self.prefix):].strip() tagstr = self[len(self.prefix):].strip()
separator = Tag.default_separator separator = Tag.default_separator
@ -131,10 +131,12 @@ class TagLine(str):
if s in tagstr: if s in tagstr:
separator = s separator = s
break break
res_tags = set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
if prefix and len(prefix) > 0: if prefix and len(prefix) > 0:
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator) if t.startswith(prefix)])) res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
else: if contain and len(contain) > 0:
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator)])) res_tags -= {tag for tag in res_tags if contain not in tag}
return res_tags or set()
def merge(self, taglines: set['TagLine']) -> 'TagLine': def merge(self, taglines: set['TagLine']) -> 'TagLine':
""" """

View File

@ -605,10 +605,26 @@ This is an answer.
def test_tags_from_file_txt_prefix(self) -> None: def test_tags_from_file_txt_prefix(self) -> None:
tags = Message.tags_from_file(self.file_path_txt, prefix='p') tags = Message.tags_from_file(self.file_path_txt, prefix='p')
self.assertSetEqual(tags, {Tag('ptag3')}) self.assertSetEqual(tags, {Tag('ptag3')})
tags = Message.tags_from_file(self.file_path_txt, prefix='R')
self.assertSetEqual(tags, set())
def test_tags_from_file_yaml_prefix(self) -> None: def test_tags_from_file_yaml_prefix(self) -> None:
tags = Message.tags_from_file(self.file_path_yaml, prefix='p') tags = Message.tags_from_file(self.file_path_yaml, prefix='p')
self.assertSetEqual(tags, {Tag('ptag3')}) self.assertSetEqual(tags, {Tag('ptag3')})
tags = Message.tags_from_file(self.file_path_yaml, prefix='R')
self.assertSetEqual(tags, set())
def test_tags_from_file_txt_contain(self) -> None:
tags = Message.tags_from_file(self.file_path_txt, contain='3')
self.assertSetEqual(tags, {Tag('ptag3')})
tags = Message.tags_from_file(self.file_path_txt, contain='R')
self.assertSetEqual(tags, set())
def test_tags_from_file_yaml_contain(self) -> None:
tags = Message.tags_from_file(self.file_path_yaml, contain='3')
self.assertSetEqual(tags, {Tag('ptag3')})
tags = Message.tags_from_file(self.file_path_yaml, contain='R')
self.assertSetEqual(tags, set())
class TagsFromDirTestCase(CmmTestCase): class TagsFromDirTestCase(CmmTestCase):

View File

@ -40,9 +40,9 @@ class TestTagLine(CmmTestCase):
self.assertEqual(tagline, 'TAGS: tag1 tag2') self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_tags(self) -> None: def test_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2') tagline = TagLine('TAGS: atag1 btag2')
tags = tagline.tags() tags = tagline.tags()
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')}) self.assertEqual(tags, {Tag('atag1'), Tag('btag2')})
def test_tags_with_newline(self) -> None: def test_tags_with_newline(self) -> None:
tagline = TagLine('TAGS: tag1\n tag2') tagline = TagLine('TAGS: tag1\n tag2')
@ -52,9 +52,20 @@ class TestTagLine(CmmTestCase):
def test_tags_prefix(self) -> None: def test_tags_prefix(self) -> None:
tagline = TagLine('TAGS: atag1 stag2 stag3') tagline = TagLine('TAGS: atag1 stag2 stag3')
tags = tagline.tags(prefix='a') tags = tagline.tags(prefix='a')
self.assertEqual(tags, {Tag('atag1')}) self.assertSetEqual(tags, {Tag('atag1')})
tags = tagline.tags(prefix='s') tags = tagline.tags(prefix='s')
self.assertEqual(tags, {Tag('stag2'), Tag('stag3')}) self.assertSetEqual(tags, {Tag('stag2'), Tag('stag3')})
tags = tagline.tags(prefix='R')
self.assertSetEqual(tags, set())
def test_tags_contain(self) -> None:
tagline = TagLine('TAGS: atag1 stag2 stag3')
tags = tagline.tags(contain='t')
self.assertSetEqual(tags, {Tag('atag1'), Tag('stag2'), Tag('stag3')})
tags = tagline.tags(contain='1')
self.assertSetEqual(tags, {Tag('atag1')})
tags = tagline.tags(contain='R')
self.assertSetEqual(tags, set())
def test_merge(self) -> None: def test_merge(self) -> None:
tagline1 = TagLine('TAGS: tag1 tag2') tagline1 = TagLine('TAGS: tag1 tag2')