Compare commits
2 Commits
a41c13de37
...
da01a94a1e
| Author | SHA1 | Date | |
|---|---|---|---|
| da01a94a1e | |||
| c7b99fe9b4 |
@ -221,48 +221,53 @@ class Message():
|
|||||||
@classmethod
|
@classmethod
|
||||||
def tags_from_file(cls: Type[MessageInst],
|
def tags_from_file(cls: Type[MessageInst],
|
||||||
file_path: pathlib.Path,
|
file_path: pathlib.Path,
|
||||||
prefix: Optional[str] = None) -> set[Tag]:
|
prefix: Optional[str] = None,
|
||||||
|
contain: Optional[str] = None) -> set[Tag]:
|
||||||
"""
|
"""
|
||||||
Return only the tags from the given Message file,
|
Return only the tags from the given Message file,
|
||||||
optionally filtered based on prefix.
|
optionally filtered based on prefix or contained string.
|
||||||
"""
|
"""
|
||||||
tags: set[Tag] = set()
|
tags: set[Tag] = set()
|
||||||
if not file_path.exists():
|
if not file_path.exists():
|
||||||
raise MessageError(f"Message file '{file_path}' does not exist")
|
raise MessageError(f"Message file '{file_path}' does not exist")
|
||||||
if file_path.suffix not in cls.file_suffixes:
|
if file_path.suffix not in cls.file_suffixes:
|
||||||
raise MessageError(f"File type '{file_path.suffix}' is not supported")
|
raise MessageError(f"File type '{file_path.suffix}' is not supported")
|
||||||
|
# for TXT, it's enough to read the TagLine
|
||||||
if file_path.suffix == '.txt':
|
if file_path.suffix == '.txt':
|
||||||
with open(file_path, "r") as fd:
|
with open(file_path, "r") as fd:
|
||||||
try:
|
try:
|
||||||
tags = TagLine(fd.readline()).tags(prefix)
|
tags = TagLine(fd.readline()).tags(prefix, contain)
|
||||||
except TagError:
|
except TagError:
|
||||||
pass # message without tags
|
pass # message without tags
|
||||||
else: # '.yaml'
|
else: # '.yaml'
|
||||||
with open(file_path, "r") as fd:
|
try:
|
||||||
data = yaml.load(fd, Loader=yaml.FullLoader)
|
message = cls.from_file(file_path)
|
||||||
if cls.tags_yaml_key in data:
|
if message:
|
||||||
if prefix and len(prefix) > 0:
|
msg_tags = message.filter_tags(prefix=prefix, contain=contain)
|
||||||
tags = set(sorted([t.strip() for t in data[cls.tags_yaml_key] if t.startswith(prefix)]))
|
except MessageError as e:
|
||||||
else:
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||||
tags = set(sorted(data[cls.tags_yaml_key]))
|
if msg_tags:
|
||||||
|
tags = msg_tags
|
||||||
return tags
|
return tags
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tags_from_dir(cls: Type[MessageInst],
|
def tags_from_dir(cls: Type[MessageInst],
|
||||||
path: pathlib.Path,
|
path: pathlib.Path,
|
||||||
glob: Optional[str] = None,
|
glob: Optional[str] = None,
|
||||||
prefix: Optional[str] = None) -> set[Tag]:
|
prefix: Optional[str] = None,
|
||||||
|
contain: Optional[str] = None) -> set[Tag]:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Return only the tags from message files in the given directory.
|
Return only the tags from message files in the given directory.
|
||||||
The files can be filtered using 'glob', the tags by using 'prefix'.
|
The files can be filtered using 'glob', the tags by using 'prefix'
|
||||||
|
and 'contain'.
|
||||||
"""
|
"""
|
||||||
tags: set[Tag] = set()
|
tags: set[Tag] = set()
|
||||||
file_iter = path.glob(glob) if glob else path.iterdir()
|
file_iter = path.glob(glob) if glob else path.iterdir()
|
||||||
for file_path in sorted(file_iter):
|
for file_path in sorted(file_iter):
|
||||||
if file_path.is_file():
|
if file_path.is_file():
|
||||||
try:
|
try:
|
||||||
tags |= cls.tags_from_file(file_path, prefix)
|
tags |= cls.tags_from_file(file_path, prefix, contain)
|
||||||
except MessageError as e:
|
except MessageError as e:
|
||||||
print(f"Error processing message in '{file_path}': {str(e)}")
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||||
return tags
|
return tags
|
||||||
@ -426,6 +431,19 @@ class Message():
|
|||||||
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
|
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
|
||||||
yaml.dump(data, fd, sort_keys=False)
|
yaml.dump(data, fd, sort_keys=False)
|
||||||
|
|
||||||
|
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
||||||
|
"""
|
||||||
|
Filter tags based on their prefix (i. e. the tag starts with a given string)
|
||||||
|
or some contained string.
|
||||||
|
"""
|
||||||
|
res_tags = self.tags
|
||||||
|
if res_tags:
|
||||||
|
if prefix and len(prefix) > 0:
|
||||||
|
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
|
||||||
|
if contain and len(contain) > 0:
|
||||||
|
res_tags -= {tag for tag in res_tags if contain not in tag}
|
||||||
|
return res_tags or set()
|
||||||
|
|
||||||
def match(self, mfilter: MessageFilter) -> bool: # noqa: 13
|
def match(self, mfilter: MessageFilter) -> bool: # noqa: 13
|
||||||
"""
|
"""
|
||||||
Matches the current Message to the given filter atttributes.
|
Matches the current Message to the given filter atttributes.
|
||||||
|
|||||||
@ -118,10 +118,10 @@ class TagLine(str):
|
|||||||
"""
|
"""
|
||||||
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
|
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
|
||||||
|
|
||||||
def tags(self, prefix: Optional[str] = None) -> set[Tag]:
|
def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
||||||
"""
|
"""
|
||||||
Returns all tags contained in this line as a set, optionally
|
Returns all tags contained in this line as a set, optionally
|
||||||
filtered based on prefix.
|
filtered based on prefix or contained string.
|
||||||
"""
|
"""
|
||||||
tagstr = self[len(self.prefix):].strip()
|
tagstr = self[len(self.prefix):].strip()
|
||||||
separator = Tag.default_separator
|
separator = Tag.default_separator
|
||||||
@ -131,10 +131,12 @@ class TagLine(str):
|
|||||||
if s in tagstr:
|
if s in tagstr:
|
||||||
separator = s
|
separator = s
|
||||||
break
|
break
|
||||||
|
res_tags = set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
|
||||||
if prefix and len(prefix) > 0:
|
if prefix and len(prefix) > 0:
|
||||||
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator) if t.startswith(prefix)]))
|
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
|
||||||
else:
|
if contain and len(contain) > 0:
|
||||||
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
|
res_tags -= {tag for tag in res_tags if contain not in tag}
|
||||||
|
return res_tags or set()
|
||||||
|
|
||||||
def merge(self, taglines: set['TagLine']) -> 'TagLine':
|
def merge(self, taglines: set['TagLine']) -> 'TagLine':
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -605,10 +605,26 @@ This is an answer.
|
|||||||
def test_tags_from_file_txt_prefix(self) -> None:
|
def test_tags_from_file_txt_prefix(self) -> None:
|
||||||
tags = Message.tags_from_file(self.file_path_txt, prefix='p')
|
tags = Message.tags_from_file(self.file_path_txt, prefix='p')
|
||||||
self.assertSetEqual(tags, {Tag('ptag3')})
|
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||||
|
tags = Message.tags_from_file(self.file_path_txt, prefix='R')
|
||||||
|
self.assertSetEqual(tags, set())
|
||||||
|
|
||||||
def test_tags_from_file_yaml_prefix(self) -> None:
|
def test_tags_from_file_yaml_prefix(self) -> None:
|
||||||
tags = Message.tags_from_file(self.file_path_yaml, prefix='p')
|
tags = Message.tags_from_file(self.file_path_yaml, prefix='p')
|
||||||
self.assertSetEqual(tags, {Tag('ptag3')})
|
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||||
|
tags = Message.tags_from_file(self.file_path_yaml, prefix='R')
|
||||||
|
self.assertSetEqual(tags, set())
|
||||||
|
|
||||||
|
def test_tags_from_file_txt_contain(self) -> None:
|
||||||
|
tags = Message.tags_from_file(self.file_path_txt, contain='3')
|
||||||
|
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||||
|
tags = Message.tags_from_file(self.file_path_txt, contain='R')
|
||||||
|
self.assertSetEqual(tags, set())
|
||||||
|
|
||||||
|
def test_tags_from_file_yaml_contain(self) -> None:
|
||||||
|
tags = Message.tags_from_file(self.file_path_yaml, contain='3')
|
||||||
|
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||||
|
tags = Message.tags_from_file(self.file_path_yaml, contain='R')
|
||||||
|
self.assertSetEqual(tags, set())
|
||||||
|
|
||||||
|
|
||||||
class TagsFromDirTestCase(CmmTestCase):
|
class TagsFromDirTestCase(CmmTestCase):
|
||||||
|
|||||||
@ -40,9 +40,9 @@ class TestTagLine(CmmTestCase):
|
|||||||
self.assertEqual(tagline, 'TAGS: tag1 tag2')
|
self.assertEqual(tagline, 'TAGS: tag1 tag2')
|
||||||
|
|
||||||
def test_tags(self) -> None:
|
def test_tags(self) -> None:
|
||||||
tagline = TagLine('TAGS: tag1 tag2')
|
tagline = TagLine('TAGS: atag1 btag2')
|
||||||
tags = tagline.tags()
|
tags = tagline.tags()
|
||||||
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
|
self.assertEqual(tags, {Tag('atag1'), Tag('btag2')})
|
||||||
|
|
||||||
def test_tags_with_newline(self) -> None:
|
def test_tags_with_newline(self) -> None:
|
||||||
tagline = TagLine('TAGS: tag1\n tag2')
|
tagline = TagLine('TAGS: tag1\n tag2')
|
||||||
@ -52,9 +52,20 @@ class TestTagLine(CmmTestCase):
|
|||||||
def test_tags_prefix(self) -> None:
|
def test_tags_prefix(self) -> None:
|
||||||
tagline = TagLine('TAGS: atag1 stag2 stag3')
|
tagline = TagLine('TAGS: atag1 stag2 stag3')
|
||||||
tags = tagline.tags(prefix='a')
|
tags = tagline.tags(prefix='a')
|
||||||
self.assertEqual(tags, {Tag('atag1')})
|
self.assertSetEqual(tags, {Tag('atag1')})
|
||||||
tags = tagline.tags(prefix='s')
|
tags = tagline.tags(prefix='s')
|
||||||
self.assertEqual(tags, {Tag('stag2'), Tag('stag3')})
|
self.assertSetEqual(tags, {Tag('stag2'), Tag('stag3')})
|
||||||
|
tags = tagline.tags(prefix='R')
|
||||||
|
self.assertSetEqual(tags, set())
|
||||||
|
|
||||||
|
def test_tags_contain(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: atag1 stag2 stag3')
|
||||||
|
tags = tagline.tags(contain='t')
|
||||||
|
self.assertSetEqual(tags, {Tag('atag1'), Tag('stag2'), Tag('stag3')})
|
||||||
|
tags = tagline.tags(contain='1')
|
||||||
|
self.assertSetEqual(tags, {Tag('atag1')})
|
||||||
|
tags = tagline.tags(contain='R')
|
||||||
|
self.assertSetEqual(tags, set())
|
||||||
|
|
||||||
def test_merge(self) -> None:
|
def test_merge(self) -> None:
|
||||||
tagline1 = TagLine('TAGS: tag1 tag2')
|
tagline1 = TagLine('TAGS: tag1 tag2')
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user