Created
September 27, 2023 04:52
-
-
Save tuulos/56a5cae34c10028303792ad88fbd35dd to your computer and use it in GitHub Desktop.
send events to OBP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
from subprocess import check_call | |
import click | |
def assume_role(role_arn): | |
import boto3 | |
sts_client = boto3.client('sts') | |
assumed_role_object=sts_client.assume_role( | |
RoleArn=role_arn, | |
RoleSessionName="send_event" | |
) | |
credentials=assumed_role_object['Credentials'] | |
return {'AWS_ACCESS_KEY_ID': credentials['AccessKeyId'], | |
'AWS_SECRET_ACCESS_KEY': credentials['SecretAccessKey'], | |
'AWS_SESSION_TOKEN': credentials['SessionToken']} | |
def submit(event_name, payload): | |
from metaflow.integrations import ArgoEvent | |
ArgoEvent(name=event_name).publish(payload=payload) | |
print(f"Event {event_name} published") | |
@click.command() | |
@click.option('--role-arn', help='Role ARN to assume', required=True) | |
@click.option('--event-name', help='Event name', required=True) | |
@click.option('--event-payload', help='Event payload', default='{}') | |
@click.option('--obp-token', help='OBP machine token', required=True) | |
def send(role_arn=None, event_name=None, event_payload=None, obp_token=None): | |
payload = json.loads(event_payload) | |
env = os.environ.copy() | |
env.update(assume_role(role_arn)) | |
check_call(['outerbounds', 'configure', '-f', obp_token], env=env) | |
submit(event_name, payload) | |
if __name__ == '__main__': | |
send() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment