Source code for axon_sdk.visualization.server
import http.server
import json
import socketserver
import webbrowser
import socket
import os
import time
import threading
[docs]
class Handler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
script_dir = os.path.dirname(os.path.abspath(__file__))
super().__init__(*args, directory=script_dir, **kwargs)
[docs]
def do_GET(self):
if self.path == "/graph_data":
graph_data = self.server.graph_data
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(graph_data).encode())
else:
super().do_GET()
[docs]
def open_browser(port):
webbrowser.open(f"http://localhost:{port}")
[docs]
def is_port_available(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.bind(("localhost", port))
return True
except OSError:
return False
[docs]
def find_available_port(initial_port):
port = initial_port
while not is_port_available(port):
print(f"Port {port} not available. Trying next one...")
port += 1
return port
[docs]
def start_server_on_port(graph_data, port):
with socketserver.TCPServer(("", port), Handler) as httpd:
print(f"Serving HTTP on localhost port {port}...")
httpd.graph_data = graph_data
httpd.serve_forever()
[docs]
def start_server(graph_data):
"""
The server will only stay open for a fixed amount of time, enough to load the visualization.
Afterwards, the simulator execution will continue.
To achieve so, the server is run on a different thread with a timeout.
"""
print("Launching topology visualization...")
print("=========================================")
port = find_available_port(initial_port=8000)
open_browser(port)
server_thread = threading.Thread(
target=start_server_on_port, args=(graph_data, port)
)
server_thread.daemon = (
True # This allows the program to exit even if the thread is running
)
server_thread.start()
time.sleep(1)
server_thread.join(timeout=1)
print("Stopping server and continuing program execution...")
print(
"[Refreshing the visualization tab will not work since the server is stopped]"
)
print("=========================================")