Skip to content

Instantly share code, notes, and snippets.

@Lucifer1590
Created March 12, 2024 15:14
Show Gist options
  • Save Lucifer1590/e542fb480a372fe593ec81bd16d63efb to your computer and use it in GitHub Desktop.
Save Lucifer1590/e542fb480a372fe593ec81bd16d63efb to your computer and use it in GitHub Desktop.
kuzco monitor
import requests
from datetime import datetime
import time
import sys
url = "https://relay.kuzco.xyz/api/trpc/worker.list,metrics.worker?batch=1&input=%7B%220%22%3A%7B%22json%22%3Anull%2C%22meta%22%3A%7B%22values%22%3A%5B%22undefined%22%5D%7D%7D%2C%221%22%3A%7B%22json%22%3A%7B%22workerId%22%3A%22r-s-RXhWHZ%22%7D%7D%7D"
headers = {
'Authorization': 'Bearer eyJhbGciNzcwMDg4MzNAZ21haWwuY29tIXdL35_GmH7NwLVDuGEdFUWs',
'Content-Type': 'application/json'
}
def print_update_timer(seconds):
sys.stdout.write(f"\rUpdating in {seconds} seconds")
sys.stdout.flush()
while True:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
for result in data:
if 'workers' in result['result']['data']['json']:
workers = result['result']['data']['json']['workers']
for worker in workers:
extracted_data = {
'_id': worker['_id'],
'name': worker['name'],
'status': worker['status'],
'ipAddress': worker['info']['ipAddress'],
'createdAt': datetime.fromtimestamp(worker['createdAt'] / 1000).strftime('%I:%M %p %d %B %Y'),
'updatedAt': datetime.fromtimestamp(worker['updatedAt'] / 1000).strftime('%I:%M %p %d %B %Y'),
'lastHeartbeatAt': datetime.fromtimestamp(worker['lastHeartbeatAt'] / 1000).strftime('%I:%M %p %d %B %Y'),
}
print("\nWorker Information:")
for key, value in extracted_data.items():
print(f"{key}: {value}")
if 'generationsLast24Hours' in result['result']['data']['json']:
generations_last_24_hours = "{:,}".format(result['result']['data']['json']['generationsLast24Hours'])
tokens_last_24_hours = "{:,}".format(result['result']['data']['json']['tokensLast24Hours'])
tokens_all_time = "{:,}".format(result['result']['data']['json']['tokensAllTime'])
print("\nMetrics Information:")
print(f"Generations Last 24 Hours: {generations_last_24_hours}")
print(f"Tokens Last 24 Hours: {tokens_last_24_hours}")
print(f"Tokens All Time: {tokens_all_time}")
else:
print("Failed to fetch data")
for i in range(5, 0, -1):
print_update_timer(i)
time.sleep(1)
print("\n" * 100) # Clear console output for the next update
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment