Module static_topo_impl.stackstate.client
Expand source code
import datetime
import json
import logging
import zlib
from hashlib import md5
from typing import Dict, List
from urllib.parse import quote
import requests
from static_topo_impl.model.instance import StackStateSpec
from static_topo_impl.model.stackstate import (Component, Event,
HealthCheckState, Relation)
from static_topo_impl.model.stackstate_receiver import (
HealthStream, HealthSync, HealthSyncStartSnapshot, Instance, ReceiverApi,
SyncStats, TopologySync)
class StackStateClient:
def __init__(self, config: StackStateSpec):
self.config = config
self.intake_url = f"{self.config.receiver_url}/stsAgent/intake?api_key={self.config.api_key}"
def publish_health_checks(
self, health_checks: List[HealthCheckState], dry_run=False, stats=SyncStats()
) -> SyncStats:
stats.checks = len(health_checks)
payload = self._prepare_health_sync_payload(health_checks)
return self._post_data(payload, dry_run, stats)
def publish_events(self, events: List[Event], dry_run=False, stats=SyncStats()) -> SyncStats:
stats.events = len(events)
payload = self._prepare_event_sync_payload(events)
return self._post_data(payload, dry_run, stats)
def publish(
self, components: List[Component], relations: List[Relation], dry_run=False, stats=SyncStats()
) -> SyncStats:
stats.components = len(components)
stats.relations = len(relations)
payload = self._prepare_topo_payload(components, relations)
return self._post_data(payload, dry_run, stats)
def _post_data(self, payload: ReceiverApi, dry_run: bool, stats: SyncStats) -> SyncStats:
if dry_run:
stats.payloads.append(json.dumps(payload.to_primitive(role="public"), indent=4))
return stats
serialized_payload = json.dumps(payload.to_primitive(role="public"))
zipped = zlib.compress(serialized_payload.encode("utf-8"))
logging.debug(
"payload_size=%d, compressed_size=%d, compression_ratio=%.3f"
% (len(serialized_payload), len(zipped), float(len(serialized_payload)) / float(len(zipped)))
)
headers: Dict[str, str] = {
"Content-Type": "application/json",
"Content-Encoding": "deflate",
"Content-MD5": md5(zipped).hexdigest(),
}
self._handle_failed_call(requests.post(self.intake_url, data=zipped, headers=headers))
return stats
def _prepare_health_sync_payload(self, checks: List[HealthCheckState]) -> ReceiverApi:
health_stream = HealthStream()
spec = self.config.health_sync
encoded_source = quote(spec.source_name, safe="")
encoded_stream = quote(spec.stream_id, safe="")
health_stream.urn = f"urn:health:{encoded_source}:{encoded_stream}"
start_snapshot = HealthSyncStartSnapshot()
start_snapshot.expiry_interval_s = spec.expiry_interval_seconds
start_snapshot.repeat_interval_s = spec.repeat_interval_seconds
sync = HealthSync()
sync.start_snapshot = start_snapshot
sync.stream = health_stream
sync.check_states = checks
payload = self._prepare_receiver_payload()
payload.health = [sync]
return payload
def _prepare_event_sync_payload(self, events: List[Event]) -> ReceiverApi:
payload = self._prepare_receiver_payload()
for event in events:
event_list = payload.events.setdefault(event.event_type, [])
event_list.append(event)
return payload
def _prepare_topo_payload(self, components: List[Component], relations: List[Relation]) -> ReceiverApi:
instance = Instance()
instance.instance_type = self.config.instance_type
instance.url = self.config.instance_url
topology_sync = TopologySync()
topology_sync.instance = instance
topology_sync.components = components
topology_sync.relations = relations
payload = self._prepare_receiver_payload()
payload.topologies = [topology_sync]
return payload
def _prepare_receiver_payload(self) -> ReceiverApi:
payload = ReceiverApi()
payload.apiKey = self.config.api_key
payload.collection_timestamp = datetime.datetime.now()
payload.internal_hostname = self.config.internal_hostname
return payload
@staticmethod
def _handle_failed_call(response: requests.Response) -> requests.Response:
if not response.ok:
msg = "Failed to call [%s] . Status code %s" % (
response.url,
response.status_code,
)
logging.error(msg)
logging.error("Response: %s" % response.text)
raise Exception(msg)
return response
Classes
class StackStateClient (config: StackStateSpec)
-
Expand source code
class StackStateClient: def __init__(self, config: StackStateSpec): self.config = config self.intake_url = f"{self.config.receiver_url}/stsAgent/intake?api_key={self.config.api_key}" def publish_health_checks( self, health_checks: List[HealthCheckState], dry_run=False, stats=SyncStats() ) -> SyncStats: stats.checks = len(health_checks) payload = self._prepare_health_sync_payload(health_checks) return self._post_data(payload, dry_run, stats) def publish_events(self, events: List[Event], dry_run=False, stats=SyncStats()) -> SyncStats: stats.events = len(events) payload = self._prepare_event_sync_payload(events) return self._post_data(payload, dry_run, stats) def publish( self, components: List[Component], relations: List[Relation], dry_run=False, stats=SyncStats() ) -> SyncStats: stats.components = len(components) stats.relations = len(relations) payload = self._prepare_topo_payload(components, relations) return self._post_data(payload, dry_run, stats) def _post_data(self, payload: ReceiverApi, dry_run: bool, stats: SyncStats) -> SyncStats: if dry_run: stats.payloads.append(json.dumps(payload.to_primitive(role="public"), indent=4)) return stats serialized_payload = json.dumps(payload.to_primitive(role="public")) zipped = zlib.compress(serialized_payload.encode("utf-8")) logging.debug( "payload_size=%d, compressed_size=%d, compression_ratio=%.3f" % (len(serialized_payload), len(zipped), float(len(serialized_payload)) / float(len(zipped))) ) headers: Dict[str, str] = { "Content-Type": "application/json", "Content-Encoding": "deflate", "Content-MD5": md5(zipped).hexdigest(), } self._handle_failed_call(requests.post(self.intake_url, data=zipped, headers=headers)) return stats def _prepare_health_sync_payload(self, checks: List[HealthCheckState]) -> ReceiverApi: health_stream = HealthStream() spec = self.config.health_sync encoded_source = quote(spec.source_name, safe="") encoded_stream = quote(spec.stream_id, safe="") health_stream.urn = f"urn:health:{encoded_source}:{encoded_stream}" start_snapshot = HealthSyncStartSnapshot() start_snapshot.expiry_interval_s = spec.expiry_interval_seconds start_snapshot.repeat_interval_s = spec.repeat_interval_seconds sync = HealthSync() sync.start_snapshot = start_snapshot sync.stream = health_stream sync.check_states = checks payload = self._prepare_receiver_payload() payload.health = [sync] return payload def _prepare_event_sync_payload(self, events: List[Event]) -> ReceiverApi: payload = self._prepare_receiver_payload() for event in events: event_list = payload.events.setdefault(event.event_type, []) event_list.append(event) return payload def _prepare_topo_payload(self, components: List[Component], relations: List[Relation]) -> ReceiverApi: instance = Instance() instance.instance_type = self.config.instance_type instance.url = self.config.instance_url topology_sync = TopologySync() topology_sync.instance = instance topology_sync.components = components topology_sync.relations = relations payload = self._prepare_receiver_payload() payload.topologies = [topology_sync] return payload def _prepare_receiver_payload(self) -> ReceiverApi: payload = ReceiverApi() payload.apiKey = self.config.api_key payload.collection_timestamp = datetime.datetime.now() payload.internal_hostname = self.config.internal_hostname return payload @staticmethod def _handle_failed_call(response: requests.Response) -> requests.Response: if not response.ok: msg = "Failed to call [%s] . Status code %s" % ( response.url, response.status_code, ) logging.error(msg) logging.error("Response: %s" % response.text) raise Exception(msg) return response
Methods
def publish(self, components: List[Component], relations: List[Relation], dry_run=False, stats=<SyncStats instance>) ‑> SyncStats
-
Expand source code
def publish( self, components: List[Component], relations: List[Relation], dry_run=False, stats=SyncStats() ) -> SyncStats: stats.components = len(components) stats.relations = len(relations) payload = self._prepare_topo_payload(components, relations) return self._post_data(payload, dry_run, stats)
def publish_events(self, events: List[Event], dry_run=False, stats=<SyncStats instance>) ‑> SyncStats
-
Expand source code
def publish_events(self, events: List[Event], dry_run=False, stats=SyncStats()) -> SyncStats: stats.events = len(events) payload = self._prepare_event_sync_payload(events) return self._post_data(payload, dry_run, stats)
def publish_health_checks(self, health_checks: List[HealthCheckState], dry_run=False, stats=<SyncStats instance>) ‑> SyncStats
-
Expand source code
def publish_health_checks( self, health_checks: List[HealthCheckState], dry_run=False, stats=SyncStats() ) -> SyncStats: stats.checks = len(health_checks) payload = self._prepare_health_sync_payload(health_checks) return self._post_data(payload, dry_run, stats)