Skip to content

Instantly share code, notes, and snippets.

@tuulos
Created September 27, 2023 04:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tuulos/56a5cae34c10028303792ad88fbd35dd to your computer and use it in GitHub Desktop.
Save tuulos/56a5cae34c10028303792ad88fbd35dd to your computer and use it in GitHub Desktop.
send events to OBP
import os
import json
from subprocess import check_call
import click
def assume_role(role_arn):
import boto3
sts_client = boto3.client('sts')
assumed_role_object=sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName="send_event"
)
credentials=assumed_role_object['Credentials']
return {'AWS_ACCESS_KEY_ID': credentials['AccessKeyId'],
'AWS_SECRET_ACCESS_KEY': credentials['SecretAccessKey'],
'AWS_SESSION_TOKEN': credentials['SessionToken']}
def submit(event_name, payload):
from metaflow.integrations import ArgoEvent
ArgoEvent(name=event_name).publish(payload=payload)
print(f"Event {event_name} published")
@click.command()
@click.option('--role-arn', help='Role ARN to assume', required=True)
@click.option('--event-name', help='Event name', required=True)
@click.option('--event-payload', help='Event payload', default='{}')
@click.option('--obp-token', help='OBP machine token', required=True)
def send(role_arn=None, event_name=None, event_payload=None, obp_token=None):
payload = json.loads(event_payload)
env = os.environ.copy()
env.update(assume_role(role_arn))
check_call(['outerbounds', 'configure', '-f', obp_token], env=env)
submit(event_name, payload)
if __name__ == '__main__':
send()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment