Skip to content

Instantly share code, notes, and snippets.

@rochoa
Created March 20, 2020 10:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rochoa/e8d4aff09dbe83279bc41ccc8359ce36 to your computer and use it in GitHub Desktop.
Save rochoa/e8d4aff09dbe83279bc41ccc8359ce36 to your computer and use it in GitHub Desktop.
import click
import csv
import json
import requests
from io import StringIO
from datetime import datetime
from kafka import KafkaConsumer
import atexit
def flush(endpoint, datasource_name, token, rows):
try:
csv_chunk = StringIO()
writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerows(rows)
except:
print(f"Failed to process rows as block, row-by-row processing fallback")
csv_chunk = StringIO()
writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for r in rows:
try:
writer.writerow(r)
except:
print(f"Failed to process row =", r)
append_url = f'{endpoint}/v0/datasources?mode=append&name={datasource_name}'
response = requests.post(append_url,
data=csv_chunk.getvalue(),
headers={'Authorization': f'Bearer {token}'}
)
print(f"[{datetime.now()}] status={response.status_code} datasource={datasource_name}, rows={len(rows)}")
@click.command()
@click.argument('topic')
@click.argument('datasource_name')
@click.option('--token', envvar='TOKEN')
@click.option('--endpoint', default='https://api.tinybird.co')
def consume(topic, datasource_name, token, endpoint):
consumer = KafkaConsumer(topic, value_deserializer=json.loads, group_id=f"tb_{datasource_name}")
rows = []
atexit.register(lambda: flush(endpoint, datasource_name, token, rows))
for msg in consumer:
rows.append(msg.value)
if len(rows) >= 20000:
flush(endpoint, datasource_name, token, rows)
rows = []
if __name__ == '__main__':
consume()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment