Compare commits

..

2 Commits

4 changed files with 22 additions and 69 deletions

View File

@ -221,53 +221,48 @@ class Message():
@classmethod
def tags_from_file(cls: Type[MessageInst],
file_path: pathlib.Path,
prefix: Optional[str] = None,
contain: Optional[str] = None) -> set[Tag]:
prefix: Optional[str] = None) -> set[Tag]:
"""
Return only the tags from the given Message file,
optionally filtered based on prefix or contained string.
optionally filtered based on prefix.
"""
tags: set[Tag] = set()
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
# for TXT, it's enough to read the TagLine
if file_path.suffix == '.txt':
with open(file_path, "r") as fd:
try:
tags = TagLine(fd.readline()).tags(prefix, contain)
tags = TagLine(fd.readline()).tags(prefix)
except TagError:
pass # message without tags
else: # '.yaml'
try:
message = cls.from_file(file_path)
if message:
msg_tags = message.filter_tags(prefix=prefix, contain=contain)
except MessageError as e:
print(f"Error processing message in '{file_path}': {str(e)}")
if msg_tags:
tags = msg_tags
with open(file_path, "r") as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
if cls.tags_yaml_key in data:
if prefix and len(prefix) > 0:
tags = set(sorted([t.strip() for t in data[cls.tags_yaml_key] if t.startswith(prefix)]))
else:
tags = set(sorted(data[cls.tags_yaml_key]))
return tags
@classmethod
def tags_from_dir(cls: Type[MessageInst],
path: pathlib.Path,
glob: Optional[str] = None,
prefix: Optional[str] = None,
contain: Optional[str] = None) -> set[Tag]:
prefix: Optional[str] = None) -> set[Tag]:
"""
Return only the tags from message files in the given directory.
The files can be filtered using 'glob', the tags by using 'prefix'
and 'contain'.
The files can be filtered using 'glob', the tags by using 'prefix'.
"""
tags: set[Tag] = set()
file_iter = path.glob(glob) if glob else path.iterdir()
for file_path in sorted(file_iter):
if file_path.is_file():
try:
tags |= cls.tags_from_file(file_path, prefix, contain)
tags |= cls.tags_from_file(file_path, prefix)
except MessageError as e:
print(f"Error processing message in '{file_path}': {str(e)}")
return tags
@ -431,19 +426,6 @@ class Message():
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
yaml.dump(data, fd, sort_keys=False)
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
"""
Filter tags based on their prefix (i. e. the tag starts with a given string)
or some contained string.
"""
res_tags = self.tags
if res_tags:
if prefix and len(prefix) > 0:
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
if contain and len(contain) > 0:
res_tags -= {tag for tag in res_tags if contain not in tag}
return res_tags or set()
def match(self, mfilter: MessageFilter) -> bool: # noqa: 13
"""
Matches the current Message to the given filter atttributes.

View File

@ -118,10 +118,10 @@ class TagLine(str):
"""
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
def tags(self, prefix: Optional[str] = None) -> set[Tag]:
"""
Returns all tags contained in this line as a set, optionally
filtered based on prefix or contained string.
filtered based on prefix.
"""
tagstr = self[len(self.prefix):].strip()
separator = Tag.default_separator
@ -131,12 +131,10 @@ class TagLine(str):
if s in tagstr:
separator = s
break
res_tags = set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
if prefix and len(prefix) > 0:
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
if contain and len(contain) > 0:
res_tags -= {tag for tag in res_tags if contain not in tag}
return res_tags or set()
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator) if t.startswith(prefix)]))
else:
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
def merge(self, taglines: set['TagLine']) -> 'TagLine':
"""

View File

@ -605,26 +605,10 @@ This is an answer.
def test_tags_from_file_txt_prefix(self) -> None:
tags = Message.tags_from_file(self.file_path_txt, prefix='p')
self.assertSetEqual(tags, {Tag('ptag3')})
tags = Message.tags_from_file(self.file_path_txt, prefix='R')
self.assertSetEqual(tags, set())
def test_tags_from_file_yaml_prefix(self) -> None:
tags = Message.tags_from_file(self.file_path_yaml, prefix='p')
self.assertSetEqual(tags, {Tag('ptag3')})
tags = Message.tags_from_file(self.file_path_yaml, prefix='R')
self.assertSetEqual(tags, set())
def test_tags_from_file_txt_contain(self) -> None:
tags = Message.tags_from_file(self.file_path_txt, contain='3')
self.assertSetEqual(tags, {Tag('ptag3')})
tags = Message.tags_from_file(self.file_path_txt, contain='R')
self.assertSetEqual(tags, set())
def test_tags_from_file_yaml_contain(self) -> None:
tags = Message.tags_from_file(self.file_path_yaml, contain='3')
self.assertSetEqual(tags, {Tag('ptag3')})
tags = Message.tags_from_file(self.file_path_yaml, contain='R')
self.assertSetEqual(tags, set())
class TagsFromDirTestCase(CmmTestCase):

View File

@ -40,9 +40,9 @@ class TestTagLine(CmmTestCase):
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_tags(self) -> None:
tagline = TagLine('TAGS: atag1 btag2')
tagline = TagLine('TAGS: tag1 tag2')
tags = tagline.tags()
self.assertEqual(tags, {Tag('atag1'), Tag('btag2')})
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_tags_with_newline(self) -> None:
tagline = TagLine('TAGS: tag1\n tag2')
@ -52,20 +52,9 @@ class TestTagLine(CmmTestCase):
def test_tags_prefix(self) -> None:
tagline = TagLine('TAGS: atag1 stag2 stag3')
tags = tagline.tags(prefix='a')
self.assertSetEqual(tags, {Tag('atag1')})
self.assertEqual(tags, {Tag('atag1')})
tags = tagline.tags(prefix='s')
self.assertSetEqual(tags, {Tag('stag2'), Tag('stag3')})
tags = tagline.tags(prefix='R')
self.assertSetEqual(tags, set())
def test_tags_contain(self) -> None:
tagline = TagLine('TAGS: atag1 stag2 stag3')
tags = tagline.tags(contain='t')
self.assertSetEqual(tags, {Tag('atag1'), Tag('stag2'), Tag('stag3')})
tags = tagline.tags(contain='1')
self.assertSetEqual(tags, {Tag('atag1')})
tags = tagline.tags(contain='R')
self.assertSetEqual(tags, set())
self.assertEqual(tags, {Tag('stag2'), Tag('stag3')})
def test_merge(self) -> None:
tagline1 = TagLine('TAGS: tag1 tag2')