Module static_topo_impl.dsl.interpreter

Expand source code
import re
from datetime import datetime
from typing import Any, Dict, List

import attr
from asteval import Interpreter
from six import string_types
from static_topo_impl.model.factory import TopologyFactory
from static_topo_impl.model.stackstate import (Component, Event,
                                               HealthCheckState, Relation,
                                               SourceLink)
from textx import metamodel_from_str, textx_isinstance
from textx.metamodel import TextXMetaModel
from textx.model import TextXSyntaxError


@attr.s(kw_only=True)
class TopologyContext:
    factory: TopologyFactory = attr.ib()
    component: Component = attr.ib(default=None)
    event: Event = attr.ib(default=None)


class PropertyInterpreter:
    def __init__(
        self,
        properties: Dict[str, Any],
        defaults: Dict[str, Any],
        source_name: str,
        ctx: TopologyContext,
        topology_meta: TextXMetaModel,
    ):
        self.source_name = source_name
        self.defaults = defaults
        self.properties = properties
        self.ctx = ctx
        self.topology_meta = topology_meta
        self.PropertyObjectClass = self.topology_meta["PropertyObject"]
        self.PropertyListClass = self.topology_meta["PropertyList"]
        self.PropertyCodeClass = self.topology_meta["PropertyCode"]
        self.PropertyClass = self.topology_meta["Property"]
        self.default_source = "default"

    def get_string_property(self, name: str, default=None) -> str:
        value = self.get_property_value(name, self.properties, self.source_name, default=None)
        if value is None:
            value = self.get_property_value(name, self.defaults, self.default_source, default=default)

        return self._assert_string(value, name, self.source_name)

    def get_map_property(self, name: str, default=None) -> Dict[str, Any]:
        value = self.get_property_value(name, self.properties, self.source_name, default=default)
        return self._assert_dict(value, name, self.source_name)

    def get_list_property(self, name: str, default=None) -> List[Any]:
        value = self.get_property_value(name, self.properties, self.source_name, default=default)
        return self._assert_list(value, name, self.source_name)

    def get_property(self, name: str, default=None) -> Any:
        value = self.get_property_value(name, self.properties, self.source_name, default=None)
        if value is None:
            value = self.get_property_value(name, self.defaults, self.default_source, default=default)
        return value

    def merge_map_property(self, name: str) -> Dict[str, Any]:
        default_value = self.get_property_value(name, self.defaults, self.default_source, default={})
        self._assert_dict(default_value, name, self.default_source)
        map_value = self.get_property_value(name, self.properties, self.source_name, default=None)
        self._assert_dict(map_value, name, self.source_name)
        if map_value is None:
            return default_value
        else:
            default_value.update(map_value)
            return default_value

    def merge_list_property(self, name: str):
        default_value = self.get_property_value(name, self.defaults, self.default_source, default=[])
        self._assert_list(default_value, name, self.default_source)
        list_value = self.get_property_value(name, self.properties, self.source_name, default=None)
        self._assert_list(list_value, name, self.source_name)
        if list_value is None:
            return default_value
        else:
            list_value.extend(default_value)
            return list_value

    def run_processors(self, defaults_name="processor"):
        name = "processor"
        if self._is_code(name, self.properties):
            self._run_code(self.properties[name].code, name, self.source_name)
        if self._is_code(name, self.defaults):
            self._run_code(self.defaults[defaults_name].code, defaults_name, self.default_source)

    def get_property_value(self, name: str, properties: Dict[str, Any], source_name: str, default: Any = None) -> Any:
        value_ast = properties.get(name, default)
        return self._convert_value(value_ast, name, source_name)

    def _is_code(self, name: str, properties: Dict[str, Any]) -> bool:
        value_ast = properties.get(name, None)
        return textx_isinstance(value_ast, self.PropertyCodeClass)

    def _convert_value(self, value_ast: Any, property_name: str, source_name: str) -> Any:
        if value_ast is None:
            return None
        elif textx_isinstance(value_ast, self.PropertyObjectClass):
            value = {}
            for member in value_ast.members:
                value[member.key] = self._convert_value(member.value, property_name, source_name)
            return value
        elif textx_isinstance(value_ast, self.PropertyListClass):
            value_list = []
            for v in value_ast.values:
                value_list.append(self._convert_value(v, property_name, source_name))
            return value_list
        elif textx_isinstance(value_ast, self.PropertyCodeClass):
            return self._run_code(value_ast.code, property_name, source_name)
        else:
            return value_ast

    @staticmethod
    def _assert_string(value: Any, name: str, source_name: str) -> str:
        if value is not None:
            if not isinstance(value, string_types):
                raise Exception(f"Expected string type for '{name}', but was {type(value)} on `{source_name}`")
        return value

    @staticmethod
    def _assert_dict(value: Any, name: str, source_name: str) -> Dict[str, Any]:
        if value is not None:
            if not isinstance(value, dict):
                raise Exception(f"Expected dict type for '{name}', but was {type(value)} on `{source_name}`")
        return value

    @staticmethod
    def _assert_list(value: Any, name: str, source_name: str) -> List[Any]:
        if value is not None:
            if not isinstance(value, list):
                raise Exception(f"Expected list type for '{name}', but was {type(value)} on `{source_name}`")
        return value

    def _run_code(self, code: str, property_name, source_name: str) -> Any:
        aeval = self._get_asteval_interpreter(self.ctx)
        code = code.strip()
        if code.endswith("```"):
            code = code[:-3]
        code_lines = code.split("\n")
        # Fix first line indentation
        if len(code_lines) > 1:
            padding_count = 0
            second_line = code_lines[1]
            for i in range(0, len(second_line)):
                if second_line[i] != " ":
                    break
                padding_count += 1
            for i in range(1, len(code_lines)):
                code_lines[i] = code_lines[i][padding_count:]
        code = "\n".join(code_lines)
        value = self._eval_expression(code, aeval, property_name, source_name)
        return value

    @staticmethod
    def _get_asteval_interpreter(ctx: TopologyContext) -> Interpreter:
        aeval = Interpreter()
        aeval.symtable["factory"] = ctx.factory
        aeval.symtable["component"] = ctx.component
        aeval.symtable["event"] = ctx.event
        return aeval

    @staticmethod
    def _eval_expression(
        expression: str, aeval: Interpreter, eval_property: str, source_name: str, fail_on_error: bool = True
    ):
        existing_errs = len(aeval.error)
        result = aeval.eval(expression)
        if len(aeval.error) > existing_errs and fail_on_error:
            error_messages = []
            for err in aeval.error:
                error_messages.append(err.get_error())
            raise Exception(
                f"Failed to evaluate property '{eval_property}' on `{source_name}`. "
                f"Expression |\n {expression} \n |.\n Errors:\n {error_messages}"
            )
        return result


class TopologyInterpreter:
    def __init__(self, factory: TopologyFactory):
        self.factory = factory
        self.topology_meta = metamodel_from_str(TOPOLOGY_TX)
        self.ElementPropertiesChangedClass = self.topology_meta["ElementPropertiesChanged"]
        self.link_pattern = re.compile("\\[([\\s\\w-]*)\\]\\((.*)\\)")

    def model_from_file(self, model_file_name: str):
        try:
            return self.topology_meta.model_from_file(model_file_name)
        except TextXSyntaxError as e:
            raise Exception(e.message)

    def interpret(self, model) -> TopologyFactory:
        defaults: Dict[str, Any] = {}
        if hasattr(model, "defaults") and model.defaults is not None:
            defaults = self._index_properties(model.defaults.properties)
        if hasattr(model, "components") and model.components is not None:
            components_ast = model.components
            for component_ast in components_ast.components:
                self._interpret_component(component_ast, defaults)
            self._resolve_relations()
        if hasattr(model, "events") and model.events is not None:
            events_ast = model.events
            for event_ast in events_ast.events:
                self._interpret_event(event_ast, defaults)

        return self.factory

    def _interpret_event(self, event_ast, defaults):
        event = Event()
        properties = self._index_properties(event_ast.properties)
        ctx = TopologyContext(factory=self.factory, event=event)
        property_interpreter = PropertyInterpreter(properties, defaults, "event", ctx, self.topology_meta)

        event.msg_title = property_interpreter.get_string_property("title", "Unknown")
        property_interpreter.source_name = f"Event with title '{event.msg_title}"
        event.msg_text = property_interpreter.get_string_property("message", "")
        event.tags.extend(property_interpreter.merge_list_property("tags"))
        event.timestamp = datetime.now()
        identifiers = property_interpreter.get_list_property("identifiers", [])
        if len(identifiers) == 0:
            raise Exception(f"Event must have at least 1 identifier '{event.msg_title}'.")
        event.context.element_identifiers = self._resolve_identifiers(identifiers)

        links = property_interpreter.get_list_property("links", [])
        for link in links:
            match = self.link_pattern.match(link)
            if not match:
                raise Exception(f"Link '{link}' must have the format '[description](url)'")
            source_link = SourceLink()
            source_link.title = match.group(1)
            source_link.url = match.group(2)
            event.context.source_links.append(source_link)

        if textx_isinstance(event_ast, self.ElementPropertiesChangedClass):
            event.context.category = "Changes"
            event.event_type = "Element Properties Changed"
            previous = property_interpreter.get_map_property("previous", {})
            current = property_interpreter.get_map_property("current", {})
            event.context.data = {"old": previous, "new": current}

        property_interpreter.run_processors(defaults_name="eventProcessor")
        self.factory.add_event(event)

    def _resolve_identifiers(self, identifiers):
        resolved_identifiers = []
        for identifier in identifiers:
            if self.factory.component_exists(identifier):
                resolved_identifiers.append(identifier)
            else:
                target_component = self.factory.get_component_by_name(identifier, raise_not_found=False)
                if target_component:
                    resolved_identifiers.append(target_component.uid)
                else:
                    # Reference to another component on StackState Server
                    resolved_identifiers.append(identifier)
        return resolved_identifiers

    def _interpret_component(self, component_ast, defaults):
        component = Component()
        component.set_type(component_ast.component_type)
        properties = self._index_properties(component_ast.properties)
        ctx = TopologyContext(factory=self.factory, component=component)
        property_interpreter = PropertyInterpreter(properties, defaults, component.get_type(), ctx, self.topology_meta)

        component.set_name(property_interpreter.get_property("name"))
        if component.get_name() is None:
            raise Exception(f"Component name is required for '{component.get_type()}'.")

        property_interpreter.source_name = component.get_name()
        component.properties.update_properties(property_interpreter.merge_map_property("data"))
        component.properties.layer = property_interpreter.get_string_property("layer", "Unknown")
        component.properties.domain = property_interpreter.get_string_property("domain", "Unknown")
        component.properties.environment = property_interpreter.get_string_property("environment", "Unknown")
        component.properties.labels.extend(property_interpreter.merge_list_property("labels"))
        component.uid = property_interpreter.get_string_property("id", None)
        component.properties.identifiers.extend(property_interpreter.merge_list_property("identifiers"))
        property_interpreter.run_processors()

        if component.uid is None:
            component.uid = f"urn:{component.get_type().lower()}:{component.get_name().lower()}"

        if len(component.properties.identifiers) == 0:
            component.properties.identifiers.append(component.uid)

        self._interpret_health(component, property_interpreter)
        self._interpret_relations(component, property_interpreter)
        self.factory.add_component(component)

    def _resolve_relations(self):
        components: List[Component] = self.factory.components.values()
        for source in components:
            for relation in source.relations:
                if self.factory.component_exists(relation.target_id):
                    self.factory.add_relation(relation.source_id, relation.target_id, relation.get_type())
                else:
                    target_component = self.factory.get_component_by_name(relation.target_id, raise_not_found=False)
                    if target_component:
                        self.factory.add_relation(relation.source_id, target_component.uid, relation.get_type())
                    else:
                        raise Exception(
                            f"Failed to find related component '{relation.target_id}'. "
                            f"Reference from component {source.uid}."
                        )
            source.relations = []

    @staticmethod
    def _interpret_relations(component: Component, property_interpreter: PropertyInterpreter):
        relations: List[str] = property_interpreter.merge_list_property("relations")
        for relation in relations:
            rel_parts = relation.split("|")
            rel_type = "uses"
            if len(rel_parts) == 2:
                rel_type = rel_parts[1]
            rel_id = f"{component.uid} --> {rel_parts[0]}"
            relation = Relation({"source_id": component.uid, "target_id": rel_parts[0], "external_id": rel_id})
            relation.set_type(rel_type)
            component.relations.append(relation)

    def _interpret_health(self, component: Component, property_interpreter: PropertyInterpreter):
        cid = component.uid
        if len(component.properties.identifiers) > 0 and cid not in component.properties.identifiers:
            cid = component.properties.identifiers[0]
        health_info = property_interpreter.get_string_property("health", "HealthCheck|CLEAR")
        health_parts = health_info.split("|")
        health_name = "HealthCheck"
        if len(health_parts) == 2:
            health_name = health_parts[0]
            health_state = health_parts[1]
        else:
            health_state = health_parts[0]
        health_msg = property_interpreter.get_string_property("healthMessage", "")
        health = HealthCheckState()
        health.check_name = health_name
        health.check_id = f"{component.get_name()}_static_states"
        health.topo_identifier = cid
        health.health = health_state.upper()
        health.message = health_msg
        health.validate()
        self.factory.health[health.check_id] = health

    @staticmethod
    def _index_properties(properties) -> Dict[str, Any]:
        index: Dict[str, Any] = {}
        for p in properties:
            index[p.name] = p.value
        return index


TOPOLOGY_TX = """
TopologyModel:
    defaults=Defaults?
    components=Components
    events=Events?
;

Defaults:
    'defaults' '{'
        properties*=DefaultProperty
    '}'
;

Components:
    'components' '{'
        components*=Component
    '}'
;

Events:
    'events' '{'
        events*=Event
    '}'
;

Event:
    ElementPropertiesChanged
;

ElementPropertiesChanged:
    "ElementPropertiesChanged" "(" properties*=ElementPropertiesChangedProperty ")"
;

ElementPropertiesChangedProperty:
  name=ElementPropertiesChangedPropertyKeyword value=PropertyValue
;

ElementPropertiesChangedPropertyKeyword:
     EventPropertyKeyword | 'previous' | 'current'
;

Component:
    ComponentMultiline | ComponentInline
;

ComponentMultiline:
   component_type=ComponentType "(" properties*=Property ")"
;

ComponentInline:
   component_type=ComponentType "(" properties*=Property[','] ")"
;

ComponentType:
    /[^\\d\\W][\\w.\\s]*\\b/

;

Property:
  name=PropertyKeyword value=PropertyValue
;

PropertyKeyword:
     'identifiers' | 'id' | 'name' | 'layer' | 'domain' | 'environment' | 'labels' | 'processor' | 'data' |
     'relations' | 'healthMessage' |'health'
;

DefaultProperty:
  name=DefaultPropertyKeyword value=PropertyValue
;

DefaultPropertyKeyword:
     PropertyKeyword | 'eventProcessor' | 'tags'
;

EventPropertyKeyword:
     'title' | 'message' | 'identifiers' | 'tags' | 'links' | 'processor'
;

PropertyString:
    ID | STRING
;

PropertyValue:
    PropertyString | FLOAT | INT | BOOL | PropertyObject | PropertyList | PropertyCode
;

PropertyList:
    '[' values*=PropertyValue[','] ']'
;

PropertyObject:
    "{" members*=PropertyMember[','] "}"
;

PropertyCode:
   "```" code=/(?ms)(.*?)[`]{3}/
;

PropertyMember:
    key=ID  value=PropertyValue
;

Comment:
  /#.*$/
;
"""

Classes

class PropertyInterpreter (properties: Dict[str, Any], defaults: Dict[str, Any], source_name: str, ctx: TopologyContext, topology_meta: textx.metamodel.TextXMetaModel)
Expand source code
class PropertyInterpreter:
    def __init__(
        self,
        properties: Dict[str, Any],
        defaults: Dict[str, Any],
        source_name: str,
        ctx: TopologyContext,
        topology_meta: TextXMetaModel,
    ):
        self.source_name = source_name
        self.defaults = defaults
        self.properties = properties
        self.ctx = ctx
        self.topology_meta = topology_meta
        self.PropertyObjectClass = self.topology_meta["PropertyObject"]
        self.PropertyListClass = self.topology_meta["PropertyList"]
        self.PropertyCodeClass = self.topology_meta["PropertyCode"]
        self.PropertyClass = self.topology_meta["Property"]
        self.default_source = "default"

    def get_string_property(self, name: str, default=None) -> str:
        value = self.get_property_value(name, self.properties, self.source_name, default=None)
        if value is None:
            value = self.get_property_value(name, self.defaults, self.default_source, default=default)

        return self._assert_string(value, name, self.source_name)

    def get_map_property(self, name: str, default=None) -> Dict[str, Any]:
        value = self.get_property_value(name, self.properties, self.source_name, default=default)
        return self._assert_dict(value, name, self.source_name)

    def get_list_property(self, name: str, default=None) -> List[Any]:
        value = self.get_property_value(name, self.properties, self.source_name, default=default)
        return self._assert_list(value, name, self.source_name)

    def get_property(self, name: str, default=None) -> Any:
        value = self.get_property_value(name, self.properties, self.source_name, default=None)
        if value is None:
            value = self.get_property_value(name, self.defaults, self.default_source, default=default)
        return value

    def merge_map_property(self, name: str) -> Dict[str, Any]:
        default_value = self.get_property_value(name, self.defaults, self.default_source, default={})
        self._assert_dict(default_value, name, self.default_source)
        map_value = self.get_property_value(name, self.properties, self.source_name, default=None)
        self._assert_dict(map_value, name, self.source_name)
        if map_value is None:
            return default_value
        else:
            default_value.update(map_value)
            return default_value

    def merge_list_property(self, name: str):
        default_value = self.get_property_value(name, self.defaults, self.default_source, default=[])
        self._assert_list(default_value, name, self.default_source)
        list_value = self.get_property_value(name, self.properties, self.source_name, default=None)
        self._assert_list(list_value, name, self.source_name)
        if list_value is None:
            return default_value
        else:
            list_value.extend(default_value)
            return list_value

    def run_processors(self, defaults_name="processor"):
        name = "processor"
        if self._is_code(name, self.properties):
            self._run_code(self.properties[name].code, name, self.source_name)
        if self._is_code(name, self.defaults):
            self._run_code(self.defaults[defaults_name].code, defaults_name, self.default_source)

    def get_property_value(self, name: str, properties: Dict[str, Any], source_name: str, default: Any = None) -> Any:
        value_ast = properties.get(name, default)
        return self._convert_value(value_ast, name, source_name)

    def _is_code(self, name: str, properties: Dict[str, Any]) -> bool:
        value_ast = properties.get(name, None)
        return textx_isinstance(value_ast, self.PropertyCodeClass)

    def _convert_value(self, value_ast: Any, property_name: str, source_name: str) -> Any:
        if value_ast is None:
            return None
        elif textx_isinstance(value_ast, self.PropertyObjectClass):
            value = {}
            for member in value_ast.members:
                value[member.key] = self._convert_value(member.value, property_name, source_name)
            return value
        elif textx_isinstance(value_ast, self.PropertyListClass):
            value_list = []
            for v in value_ast.values:
                value_list.append(self._convert_value(v, property_name, source_name))
            return value_list
        elif textx_isinstance(value_ast, self.PropertyCodeClass):
            return self._run_code(value_ast.code, property_name, source_name)
        else:
            return value_ast

    @staticmethod
    def _assert_string(value: Any, name: str, source_name: str) -> str:
        if value is not None:
            if not isinstance(value, string_types):
                raise Exception(f"Expected string type for '{name}', but was {type(value)} on `{source_name}`")
        return value

    @staticmethod
    def _assert_dict(value: Any, name: str, source_name: str) -> Dict[str, Any]:
        if value is not None:
            if not isinstance(value, dict):
                raise Exception(f"Expected dict type for '{name}', but was {type(value)} on `{source_name}`")
        return value

    @staticmethod
    def _assert_list(value: Any, name: str, source_name: str) -> List[Any]:
        if value is not None:
            if not isinstance(value, list):
                raise Exception(f"Expected list type for '{name}', but was {type(value)} on `{source_name}`")
        return value

    def _run_code(self, code: str, property_name, source_name: str) -> Any:
        aeval = self._get_asteval_interpreter(self.ctx)
        code = code.strip()
        if code.endswith("```"):
            code = code[:-3]
        code_lines = code.split("\n")
        # Fix first line indentation
        if len(code_lines) > 1:
            padding_count = 0
            second_line = code_lines[1]
            for i in range(0, len(second_line)):
                if second_line[i] != " ":
                    break
                padding_count += 1
            for i in range(1, len(code_lines)):
                code_lines[i] = code_lines[i][padding_count:]
        code = "\n".join(code_lines)
        value = self._eval_expression(code, aeval, property_name, source_name)
        return value

    @staticmethod
    def _get_asteval_interpreter(ctx: TopologyContext) -> Interpreter:
        aeval = Interpreter()
        aeval.symtable["factory"] = ctx.factory
        aeval.symtable["component"] = ctx.component
        aeval.symtable["event"] = ctx.event
        return aeval

    @staticmethod
    def _eval_expression(
        expression: str, aeval: Interpreter, eval_property: str, source_name: str, fail_on_error: bool = True
    ):
        existing_errs = len(aeval.error)
        result = aeval.eval(expression)
        if len(aeval.error) > existing_errs and fail_on_error:
            error_messages = []
            for err in aeval.error:
                error_messages.append(err.get_error())
            raise Exception(
                f"Failed to evaluate property '{eval_property}' on `{source_name}`. "
                f"Expression |\n {expression} \n |.\n Errors:\n {error_messages}"
            )
        return result

Methods

def get_list_property(self, name: str, default=None) ‑> List[Any]
Expand source code
def get_list_property(self, name: str, default=None) -> List[Any]:
    value = self.get_property_value(name, self.properties, self.source_name, default=default)
    return self._assert_list(value, name, self.source_name)
def get_map_property(self, name: str, default=None) ‑> Dict[str, Any]
Expand source code
def get_map_property(self, name: str, default=None) -> Dict[str, Any]:
    value = self.get_property_value(name, self.properties, self.source_name, default=default)
    return self._assert_dict(value, name, self.source_name)
def get_property(self, name: str, default=None) ‑> Any
Expand source code
def get_property(self, name: str, default=None) -> Any:
    value = self.get_property_value(name, self.properties, self.source_name, default=None)
    if value is None:
        value = self.get_property_value(name, self.defaults, self.default_source, default=default)
    return value
def get_property_value(self, name: str, properties: Dict[str, Any], source_name: str, default: Any = None) ‑> Any
Expand source code
def get_property_value(self, name: str, properties: Dict[str, Any], source_name: str, default: Any = None) -> Any:
    value_ast = properties.get(name, default)
    return self._convert_value(value_ast, name, source_name)
def get_string_property(self, name: str, default=None) ‑> str
Expand source code
def get_string_property(self, name: str, default=None) -> str:
    value = self.get_property_value(name, self.properties, self.source_name, default=None)
    if value is None:
        value = self.get_property_value(name, self.defaults, self.default_source, default=default)

    return self._assert_string(value, name, self.source_name)
def merge_list_property(self, name: str)
Expand source code
def merge_list_property(self, name: str):
    default_value = self.get_property_value(name, self.defaults, self.default_source, default=[])
    self._assert_list(default_value, name, self.default_source)
    list_value = self.get_property_value(name, self.properties, self.source_name, default=None)
    self._assert_list(list_value, name, self.source_name)
    if list_value is None:
        return default_value
    else:
        list_value.extend(default_value)
        return list_value
def merge_map_property(self, name: str) ‑> Dict[str, Any]
Expand source code
def merge_map_property(self, name: str) -> Dict[str, Any]:
    default_value = self.get_property_value(name, self.defaults, self.default_source, default={})
    self._assert_dict(default_value, name, self.default_source)
    map_value = self.get_property_value(name, self.properties, self.source_name, default=None)
    self._assert_dict(map_value, name, self.source_name)
    if map_value is None:
        return default_value
    else:
        default_value.update(map_value)
        return default_value
def run_processors(self, defaults_name='processor')
Expand source code
def run_processors(self, defaults_name="processor"):
    name = "processor"
    if self._is_code(name, self.properties):
        self._run_code(self.properties[name].code, name, self.source_name)
    if self._is_code(name, self.defaults):
        self._run_code(self.defaults[defaults_name].code, defaults_name, self.default_source)
class TopologyContext (*, factory: TopologyFactory, component: Component = None, event: Event = None)

Method generated by attrs for class TopologyContext.

Expand source code
class TopologyContext:
    factory: TopologyFactory = attr.ib()
    component: Component = attr.ib(default=None)
    event: Event = attr.ib(default=None)

Class variables

var componentComponent
var eventEvent
var factoryTopologyFactory
class TopologyInterpreter (factory: TopologyFactory)
Expand source code
class TopologyInterpreter:
    def __init__(self, factory: TopologyFactory):
        self.factory = factory
        self.topology_meta = metamodel_from_str(TOPOLOGY_TX)
        self.ElementPropertiesChangedClass = self.topology_meta["ElementPropertiesChanged"]
        self.link_pattern = re.compile("\\[([\\s\\w-]*)\\]\\((.*)\\)")

    def model_from_file(self, model_file_name: str):
        try:
            return self.topology_meta.model_from_file(model_file_name)
        except TextXSyntaxError as e:
            raise Exception(e.message)

    def interpret(self, model) -> TopologyFactory:
        defaults: Dict[str, Any] = {}
        if hasattr(model, "defaults") and model.defaults is not None:
            defaults = self._index_properties(model.defaults.properties)
        if hasattr(model, "components") and model.components is not None:
            components_ast = model.components
            for component_ast in components_ast.components:
                self._interpret_component(component_ast, defaults)
            self._resolve_relations()
        if hasattr(model, "events") and model.events is not None:
            events_ast = model.events
            for event_ast in events_ast.events:
                self._interpret_event(event_ast, defaults)

        return self.factory

    def _interpret_event(self, event_ast, defaults):
        event = Event()
        properties = self._index_properties(event_ast.properties)
        ctx = TopologyContext(factory=self.factory, event=event)
        property_interpreter = PropertyInterpreter(properties, defaults, "event", ctx, self.topology_meta)

        event.msg_title = property_interpreter.get_string_property("title", "Unknown")
        property_interpreter.source_name = f"Event with title '{event.msg_title}"
        event.msg_text = property_interpreter.get_string_property("message", "")
        event.tags.extend(property_interpreter.merge_list_property("tags"))
        event.timestamp = datetime.now()
        identifiers = property_interpreter.get_list_property("identifiers", [])
        if len(identifiers) == 0:
            raise Exception(f"Event must have at least 1 identifier '{event.msg_title}'.")
        event.context.element_identifiers = self._resolve_identifiers(identifiers)

        links = property_interpreter.get_list_property("links", [])
        for link in links:
            match = self.link_pattern.match(link)
            if not match:
                raise Exception(f"Link '{link}' must have the format '[description](url)'")
            source_link = SourceLink()
            source_link.title = match.group(1)
            source_link.url = match.group(2)
            event.context.source_links.append(source_link)

        if textx_isinstance(event_ast, self.ElementPropertiesChangedClass):
            event.context.category = "Changes"
            event.event_type = "Element Properties Changed"
            previous = property_interpreter.get_map_property("previous", {})
            current = property_interpreter.get_map_property("current", {})
            event.context.data = {"old": previous, "new": current}

        property_interpreter.run_processors(defaults_name="eventProcessor")
        self.factory.add_event(event)

    def _resolve_identifiers(self, identifiers):
        resolved_identifiers = []
        for identifier in identifiers:
            if self.factory.component_exists(identifier):
                resolved_identifiers.append(identifier)
            else:
                target_component = self.factory.get_component_by_name(identifier, raise_not_found=False)
                if target_component:
                    resolved_identifiers.append(target_component.uid)
                else:
                    # Reference to another component on StackState Server
                    resolved_identifiers.append(identifier)
        return resolved_identifiers

    def _interpret_component(self, component_ast, defaults):
        component = Component()
        component.set_type(component_ast.component_type)
        properties = self._index_properties(component_ast.properties)
        ctx = TopologyContext(factory=self.factory, component=component)
        property_interpreter = PropertyInterpreter(properties, defaults, component.get_type(), ctx, self.topology_meta)

        component.set_name(property_interpreter.get_property("name"))
        if component.get_name() is None:
            raise Exception(f"Component name is required for '{component.get_type()}'.")

        property_interpreter.source_name = component.get_name()
        component.properties.update_properties(property_interpreter.merge_map_property("data"))
        component.properties.layer = property_interpreter.get_string_property("layer", "Unknown")
        component.properties.domain = property_interpreter.get_string_property("domain", "Unknown")
        component.properties.environment = property_interpreter.get_string_property("environment", "Unknown")
        component.properties.labels.extend(property_interpreter.merge_list_property("labels"))
        component.uid = property_interpreter.get_string_property("id", None)
        component.properties.identifiers.extend(property_interpreter.merge_list_property("identifiers"))
        property_interpreter.run_processors()

        if component.uid is None:
            component.uid = f"urn:{component.get_type().lower()}:{component.get_name().lower()}"

        if len(component.properties.identifiers) == 0:
            component.properties.identifiers.append(component.uid)

        self._interpret_health(component, property_interpreter)
        self._interpret_relations(component, property_interpreter)
        self.factory.add_component(component)

    def _resolve_relations(self):
        components: List[Component] = self.factory.components.values()
        for source in components:
            for relation in source.relations:
                if self.factory.component_exists(relation.target_id):
                    self.factory.add_relation(relation.source_id, relation.target_id, relation.get_type())
                else:
                    target_component = self.factory.get_component_by_name(relation.target_id, raise_not_found=False)
                    if target_component:
                        self.factory.add_relation(relation.source_id, target_component.uid, relation.get_type())
                    else:
                        raise Exception(
                            f"Failed to find related component '{relation.target_id}'. "
                            f"Reference from component {source.uid}."
                        )
            source.relations = []

    @staticmethod
    def _interpret_relations(component: Component, property_interpreter: PropertyInterpreter):
        relations: List[str] = property_interpreter.merge_list_property("relations")
        for relation in relations:
            rel_parts = relation.split("|")
            rel_type = "uses"
            if len(rel_parts) == 2:
                rel_type = rel_parts[1]
            rel_id = f"{component.uid} --> {rel_parts[0]}"
            relation = Relation({"source_id": component.uid, "target_id": rel_parts[0], "external_id": rel_id})
            relation.set_type(rel_type)
            component.relations.append(relation)

    def _interpret_health(self, component: Component, property_interpreter: PropertyInterpreter):
        cid = component.uid
        if len(component.properties.identifiers) > 0 and cid not in component.properties.identifiers:
            cid = component.properties.identifiers[0]
        health_info = property_interpreter.get_string_property("health", "HealthCheck|CLEAR")
        health_parts = health_info.split("|")
        health_name = "HealthCheck"
        if len(health_parts) == 2:
            health_name = health_parts[0]
            health_state = health_parts[1]
        else:
            health_state = health_parts[0]
        health_msg = property_interpreter.get_string_property("healthMessage", "")
        health = HealthCheckState()
        health.check_name = health_name
        health.check_id = f"{component.get_name()}_static_states"
        health.topo_identifier = cid
        health.health = health_state.upper()
        health.message = health_msg
        health.validate()
        self.factory.health[health.check_id] = health

    @staticmethod
    def _index_properties(properties) -> Dict[str, Any]:
        index: Dict[str, Any] = {}
        for p in properties:
            index[p.name] = p.value
        return index

Methods

def interpret(self, model) ‑> TopologyFactory
Expand source code
def interpret(self, model) -> TopologyFactory:
    defaults: Dict[str, Any] = {}
    if hasattr(model, "defaults") and model.defaults is not None:
        defaults = self._index_properties(model.defaults.properties)
    if hasattr(model, "components") and model.components is not None:
        components_ast = model.components
        for component_ast in components_ast.components:
            self._interpret_component(component_ast, defaults)
        self._resolve_relations()
    if hasattr(model, "events") and model.events is not None:
        events_ast = model.events
        for event_ast in events_ast.events:
            self._interpret_event(event_ast, defaults)

    return self.factory
def model_from_file(self, model_file_name: str)
Expand source code
def model_from_file(self, model_file_name: str):
    try:
        return self.topology_meta.model_from_file(model_file_name)
    except TextXSyntaxError as e:
        raise Exception(e.message)