Module static_topo_impl.cli_processor

Expand source code
import logging
import os

from static_topo_impl.dsl.interpreter import TopologyInterpreter
from static_topo_impl.model.factory import TopologyFactory
from static_topo_impl.model.instance import Configuration
from static_topo_impl.model.stackstate_receiver import SyncStats
from static_topo_impl.stackstate import StackStateClient


class CliProcessor:
    def __init__(self, config: Configuration):
        self.config = config
        self.stackstate: StackStateClient = StackStateClient(config.stackstate)
        self.factory: TopologyFactory = TopologyFactory()

    def run(self, dry_run=False) -> SyncStats:
        interpreter = TopologyInterpreter(self.factory)
        for topo_dsl_file in self.config.topo_files:
            if topo_dsl_file.endswith(".topo"):
                topo_files = [topo_dsl_file]  # Single file
            else:
                topo_files = [os.path.join(topo_dsl_file, f) for f in os.listdir(topo_dsl_file) if f.endswith(".topo")]
            for topo_file in topo_files:
                logging.info(f"Processing '{topo_file}'")
                model = interpreter.model_from_file(topo_file)
                interpreter.interpret(model)

        stats = self.stackstate.publish(
            list(self.factory.components.values()), list(self.factory.relations.values()), dry_run
        )
        self.stackstate.publish_health_checks(list(self.factory.health.values()), dry_run=dry_run, stats=stats)
        return self.stackstate.publish_events(self.factory.events, dry_run=dry_run, stats=stats)

Classes

class CliProcessor (config: Configuration)
Expand source code
class CliProcessor:
    def __init__(self, config: Configuration):
        self.config = config
        self.stackstate: StackStateClient = StackStateClient(config.stackstate)
        self.factory: TopologyFactory = TopologyFactory()

    def run(self, dry_run=False) -> SyncStats:
        interpreter = TopologyInterpreter(self.factory)
        for topo_dsl_file in self.config.topo_files:
            if topo_dsl_file.endswith(".topo"):
                topo_files = [topo_dsl_file]  # Single file
            else:
                topo_files = [os.path.join(topo_dsl_file, f) for f in os.listdir(topo_dsl_file) if f.endswith(".topo")]
            for topo_file in topo_files:
                logging.info(f"Processing '{topo_file}'")
                model = interpreter.model_from_file(topo_file)
                interpreter.interpret(model)

        stats = self.stackstate.publish(
            list(self.factory.components.values()), list(self.factory.relations.values()), dry_run
        )
        self.stackstate.publish_health_checks(list(self.factory.health.values()), dry_run=dry_run, stats=stats)
        return self.stackstate.publish_events(self.factory.events, dry_run=dry_run, stats=stats)

Methods

def run(self, dry_run=False) ‑> SyncStats
Expand source code
def run(self, dry_run=False) -> SyncStats:
    interpreter = TopologyInterpreter(self.factory)
    for topo_dsl_file in self.config.topo_files:
        if topo_dsl_file.endswith(".topo"):
            topo_files = [topo_dsl_file]  # Single file
        else:
            topo_files = [os.path.join(topo_dsl_file, f) for f in os.listdir(topo_dsl_file) if f.endswith(".topo")]
        for topo_file in topo_files:
            logging.info(f"Processing '{topo_file}'")
            model = interpreter.model_from_file(topo_file)
            interpreter.interpret(model)

    stats = self.stackstate.publish(
        list(self.factory.components.values()), list(self.factory.relations.values()), dry_run
    )
    self.stackstate.publish_health_checks(list(self.factory.health.values()), dry_run=dry_run, stats=stats)
    return self.stackstate.publish_events(self.factory.events, dry_run=dry_run, stats=stats)