Skip to content

Instantly share code, notes, and snippets.

@wybiral
Created February 17, 2023 18:23
Show Gist options
  • Save wybiral/6444686a3dd33d70f1df996bde57f0ef to your computer and use it in GitHub Desktop.
Save wybiral/6444686a3dd33d70f1df996bde57f0ef to your computer and use it in GitHub Desktop.
Watch Mastodon stream for notifications and blink Thingy52 LED
'''
Watch Mastodon stream for notifications and blink Thingy52
'''
import asyncio
import os
import ssl
try:
import certifi
except ImportError:
print('certifi module required: pip install certifi')
exit(1)
try:
from aiomast import MastodonAPI
except ImportError:
print('aiomast module required: pip install aiomast')
exit(1)
try:
from aiothingy import Thingy52
except ImportError:
print('aiothingy module required: pip install aiothingy')
exit(1)
MASTODON_INSTANCE = 'mastodon.social'
ACCESS_TOKEN = os.environ['MASTODON_TOKEN']
active = False
async def main():
global active
print('connecting to thingy')
thingy = Thingy52()
await thingy.connect()
print('turning led off')
await thingy.led.off()
async def reset(event):
global active
if event['type'] == 'button' and event['data']:
if active:
print('notification dismissed')
await thingy.led.off()
active = False
print('subscribing to button')
await thingy.button.subscribe(reset)
api = MastodonAPI(MASTODON_INSTANCE)
api.set_access_token(ACCESS_TOKEN)
api.ssl = ssl.create_default_context(cafile=certifi.where())
print('requesting mastodon instance')
instance = await api.instance.info()
config = instance['configuration']
streaming_url = config['urls']['streaming']
async with api.session:
while True:
print('connecting to mastodon stream')
stream = await api.websockets.connect(streaming_url)
await stream.subscribe('user:notification')
print('streaming')
print('')
async for event in stream:
if event['event'] == 'notification':
if not active:
await thingy.led.breathe(color='purple')
active = True
data = event['data']
if data['type'] == 'mention':
print('mentiond by ' + data['account']['acct'])
else:
print(data['type'])
print('disconnected')
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment