Module static_topo_impl.agent_processor
Expand source code
import os
from typing import List
from stackstate_checks.base import AgentCheck, Health
from static_topo_impl.dsl.interpreter import TopologyInterpreter
from static_topo_impl.model.factory import TopologyFactory
from static_topo_impl.model.instance import InstanceInfo
from static_topo_impl.model.stackstate import (Component, HealthCheckState,
Relation)
class AgentProcessor:
def __init__(self, instance: InstanceInfo, agent_check: AgentCheck):
self.agent_check = agent_check
self.log = agent_check.log
self.instance = instance
self.factory = TopologyFactory()
def process(self):
interpreter = TopologyInterpreter(self.factory)
for topo_dsl_file in self.instance.topo_files:
if topo_dsl_file.endswith(".topo"):
topo_files = [topo_dsl_file] # Single file
else:
topo_files = [os.path.join(topo_dsl_file, f) for f in os.listdir(topo_dsl_file) if f.endswith(".topo")]
for topo_file in topo_files:
self.log.info(f"Processing '{topo_file}'")
model = interpreter.model_from_file(topo_file)
interpreter.interpret(model)
self._publish()
def _publish(self):
self.log.info(f"Publishing '{len(self.factory.components.values())}' components")
self.agent_check.start_snapshot()
components: List[Component] = self.factory.components.values()
for c in components:
c.properties.dedup_labels()
c_as_dict = c.properties.to_primitive()
self.agent_check.component(c.uid, c.get_type(), c_as_dict)
self.log.info(f"Publishing '{len(self.factory.relations)}' relations")
relations: List[Relation] = self.factory.relations.values()
for r in relations:
self.agent_check.relation(r.source_id, r.target_id, r.get_type(), r.properties)
self.agent_check.stop_snapshot()
self._publish_health()
self._publish_events()
def _publish_health(self):
self.log.info(f"Synchronizing '{len(self.factory.health)}' health states")
self.agent_check.health.start_snapshot()
deviating, clear, critical = 0, 0, 0
health_instances: List[HealthCheckState] = self.factory.health.values()
for health in health_instances:
health_value = health.health
if not isinstance(health_value, Health):
health_value = Health[health_value]
if health_value == Health.CLEAR:
clear += 1
elif health_value == Health.CRITICAL:
critical += 1
elif health_value == Health.DEVIATING:
deviating += 1
self.agent_check.health.check_state(
health.check_id,
health.check_name,
health_value,
health.topo_identifier,
health.message,
)
self.log.info(f"Critical -> {critical}, Deviating -> {deviating}, Clear -> {clear}")
self.agent_check.health.stop_snapshot()
def _publish_events(self):
self.log.info(f"Sending '{len(self.factory.events)}' events")
for event in self.factory.events:
event_dict = event.to_primitive(role="public")
self.agent_check.event(event_dict)
Classes
class AgentProcessor (instance: InstanceInfo, agent_check: stackstate_checks.base.checks.base.__AgentCheckPy3)
-
Expand source code
class AgentProcessor: def __init__(self, instance: InstanceInfo, agent_check: AgentCheck): self.agent_check = agent_check self.log = agent_check.log self.instance = instance self.factory = TopologyFactory() def process(self): interpreter = TopologyInterpreter(self.factory) for topo_dsl_file in self.instance.topo_files: if topo_dsl_file.endswith(".topo"): topo_files = [topo_dsl_file] # Single file else: topo_files = [os.path.join(topo_dsl_file, f) for f in os.listdir(topo_dsl_file) if f.endswith(".topo")] for topo_file in topo_files: self.log.info(f"Processing '{topo_file}'") model = interpreter.model_from_file(topo_file) interpreter.interpret(model) self._publish() def _publish(self): self.log.info(f"Publishing '{len(self.factory.components.values())}' components") self.agent_check.start_snapshot() components: List[Component] = self.factory.components.values() for c in components: c.properties.dedup_labels() c_as_dict = c.properties.to_primitive() self.agent_check.component(c.uid, c.get_type(), c_as_dict) self.log.info(f"Publishing '{len(self.factory.relations)}' relations") relations: List[Relation] = self.factory.relations.values() for r in relations: self.agent_check.relation(r.source_id, r.target_id, r.get_type(), r.properties) self.agent_check.stop_snapshot() self._publish_health() self._publish_events() def _publish_health(self): self.log.info(f"Synchronizing '{len(self.factory.health)}' health states") self.agent_check.health.start_snapshot() deviating, clear, critical = 0, 0, 0 health_instances: List[HealthCheckState] = self.factory.health.values() for health in health_instances: health_value = health.health if not isinstance(health_value, Health): health_value = Health[health_value] if health_value == Health.CLEAR: clear += 1 elif health_value == Health.CRITICAL: critical += 1 elif health_value == Health.DEVIATING: deviating += 1 self.agent_check.health.check_state( health.check_id, health.check_name, health_value, health.topo_identifier, health.message, ) self.log.info(f"Critical -> {critical}, Deviating -> {deviating}, Clear -> {clear}") self.agent_check.health.stop_snapshot() def _publish_events(self): self.log.info(f"Sending '{len(self.factory.events)}' events") for event in self.factory.events: event_dict = event.to_primitive(role="public") self.agent_check.event(event_dict)
Methods
def process(self)
-
Expand source code
def process(self): interpreter = TopologyInterpreter(self.factory) for topo_dsl_file in self.instance.topo_files: if topo_dsl_file.endswith(".topo"): topo_files = [topo_dsl_file] # Single file else: topo_files = [os.path.join(topo_dsl_file, f) for f in os.listdir(topo_dsl_file) if f.endswith(".topo")] for topo_file in topo_files: self.log.info(f"Processing '{topo_file}'") model = interpreter.model_from_file(topo_file) interpreter.interpret(model) self._publish()