Skip to content

Instantly share code, notes, and snippets.

@cristianpb
Last active January 8, 2021 23:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cristianpb/84ec819842c44778cf4042d861f10e84 to your computer and use it in GitHub Desktop.
Save cristianpb/84ec819842c44778cf4042d861f10e84 to your computer and use it in GitHub Desktop.
bulk csv request
import requests
import time
url = "https://deces.matchid.io/deces/api/v1/search/csv"
files = {
'file': ('clients_test.csv', open('clients_test.csv', 'rb'), 'application/csv', {'Expires': '0'}),
'sep': (None, ';'),
'firstName': (None, 'Prenom'),
'lastName': (None, 'Nom'),
'birthDate': (None, 'Date'),
'dateFormat': (None, 'DD/MM/YYYY')
}
r = requests.post(url, files=files)
print(r.text)
res = r.json()
print(res['id'])
url = "https://deces.matchid.io/deces/api/v1/search/csv/"
url_job = url + res['id']
print("url: ", url_job)
r = requests.request("GET", url_job)
print(r.text)
res = r.json()
print(res)
while res['status'] == 'created' or res['status'] == 'waiting' or res['status'] == 'active':
r = requests.request("GET", url_job)
try:
res = r.json()
except:
break
print(res)
print(r.text)
from multiprocessing import Process, Queue
import itertools
import io
import os
import logging
import pandas as pd
import numpy as np
import requests
import sys, time, traceback
import json
server_address = 'http://localhost:8084'
# Process config
lines_per_request = 20
verbosechunksize = 100
threads = 1
timeout = 60
maxtries = 5
#limit = None
limit = 120
# Input fields configuration
columns = ['Prenom', 'Nom', 'Date',
'PERS_lastSeenAliveDate'
]
request_data = {
'firstName': 'Prenom',
'lastName': 'Nom',
'birthDate': 'Date',
'dateFormat': 'DD/MM/YYYY',
'sep': ';',
'lastSeenAliveDate': 'PERS_lastSeenAliveDate'
}
match_from_date = '2020-07-31'
# Ouput fields configuration
output_prefix = ''
error_prefix = 'error'
error_col = '{}{}'.format(output_prefix,error_prefix) if error_prefix else None
def process_chunk(i,df,process_queue,write_queue,schema_check=[]):
try:
df = request_submit(df,i,schema_check)
if ((((i+1)*lines_per_request) % verbosechunksize) == 0):
print("chunk {}-{} ok".format(i*lines_per_request+1,(i+1)*lines_per_request))
except:
exc_type, exc_obj, exc_tb = sys.exc_info()
logging.warning("chunk {}-{} failed - {}".format(i*lines_per_request+1,(i+1)*lines_per_request,traceback.print_exception(exc_type, exc_obj, exc_tb)))
write_queue.put(df)
process_queue.get(i)
def request_submit(df,i=0,schema_check=[]):
"""Does the actual request to the backend server"""
global maxtries
#string_io = StringIO.StringIO()
string_io = io.StringIO()
response = None
if not isinstance(df,pd.DataFrame):
return df
df['PERS_lastSeenAliveDate'] = match_from_date
df.reset_index(inplace=True)
df[columns].to_csv(string_io, encoding="utf-8", index=False)
kwargs = {
'data': request_data,
'files': {'data': string_io.getvalue()},
'timeout': timeout,
'url': "{}/deces/api/v1/search/csv".format(server_address)
}
tries=1
failed=True
while ((failed == True) & (tries <= maxtries)):
try:
response = requests.post(**kwargs)
status_code = response.status_code
except requests.exceptions.ReadTimeout:
status_code = "timeout"
if status_code == 200:
failed=False
else:
logging.warning("Retry: {}".format(tries))
tries += 1
if (tries <= maxtries):
time.sleep(3 ** (tries-1))
if (failed == False):
jobId = response.json()['id']
jobProgress = 'created'
while jobProgress in ['active', 'waiting', 'created']:
jobStatus = requests.get("{}/deces/api/v1/search/csv/{}".format(server_address, jobId))
try:
jobProgress = jobStatus.json()['status']
except Exception as e:
jobProgress = {}
break
time.sleep(0.5)
content = io.StringIO(jobStatus.content.decode('utf-8-sig'))
result = pd.read_csv(content, dtype=object, sep=";")
if error_col:
df[error_col] = None
if (tries>1):
logging.warning("chunk {}-{} needed {} tries".format(tries,i*lines_per_request+1,(i+1)*lines_per_request))
diff = [x for x in result.axes[1].difference(df.axes[1])]
df.drop([u'index'], axis=1, inplace=True)
for result_column in diff:
new_column=result_column.replace("result_", output_prefix)
df[new_column] = result[result_column]
if (failed == True):
tries -= 1
logging.warning("chunk {}-{} failed after {} tries".format(i*lines_per_request+1,(i+1)*lines_per_request,tries))
df[output_prefix+'score'] = -1
if error_col:
df[error_col] = "HTTP Status: {}".format(status_code)
if (len(schema_check)>len(df.axes[1])):
diff = [x for x in schema_check.difference(df.axes[1])]
for col in diff:
df[col]=None
return df
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return itertools.izip_longest(*args, fillvalue=fillvalue)
def compare_datasets(ids):
# First a small pass to produce the output schema
#dataset_iter = ids.iter_dataframes(chunksize=lines_per_request, infer_with_pandas=False, limit=limit)
dataset_iter = pd.read_csv(ids, iterator=True, chunksize=lines_per_request, nrows=limit, sep=";")
process_queue = Queue(threads)
write_queue = Queue()
ow = open("demofile2.txt", "a")
for i,chunk in enumerate(dataset_iter):
process_queue.put(i)
thread = Process(target=process_chunk, args=[i,chunk,process_queue,write_queue])
thread.start()
while (write_queue.qsize() > 0):
ow.write(write_queue.get().to_csv(encoding="utf-8", index=False))
logging.info("waiting {} chunk processes".format(process_queue.qsize()))
c = 0
while (process_queue.qsize() > 0) and c < 300:
c += 1
time.sleep(1)
logging.info("flushing {} chunks".format(write_queue.qsize()))
c = 0
while (write_queue.qsize() > 0) and c < 300:
c += 1
ow.write(write_queue.get().to_csv(encoding="utf-8", index=False))
ow.close()
def compare_datasets_dss(ids, ods):
'''
Compare each row in an input dataset, and produces a row in the output dataset with additional fields.
ids: the input dataset
ods: the output dataset
'''
# First a small pass to produce the output schema
small = ids.get_dataframe(sampling='head', limit=3, infer_with_pandas=False)
initial_index = small.axes[1]
match_result = request_submit(small)
output_index = match_result.axes[1]
schema = ids.read_schema()
floats = [output_prefix + column for column in ['birth.latitude', 'birth.longitude','death.longitude',
'death.latitude', 'death.age', 'score']]
for column in output_index.difference(initial_index):
schema.append({'name': column, 'type': 'float' if column in floats else 'string'})
ods.write_schema(schema)
ow = ods.get_writer()
# Then the full pass
dataset_iter = ids.iter_dataframes(chunksize=lines_per_request, infer_with_pandas=False, limit=limit)
process_queue = Queue(threads)
write_queue = Queue()
for i,chunk in enumerate(dataset_iter):
process_queue.put(i)
thread = Process(target=process_chunk, args=[i,chunk,process_queue,write_queue,output_index])
thread.start()
while (write_queue.qsize() > 0):
ow.write_dataframe(write_queue.get())
logging.info("waiting {} chunk processes".format(process_queue.qsize()))
c = 0
while (process_queue.qsize() > 0) and c < 300:
c += 1
time.sleep(1)
logging.info("flushing {} chunks".format(write_queue.qsize()))
c = 0
while (write_queue.qsize() > 0) and c < 300:
c += 1
ow.write_dataframe(write_queue.get())
ids = 'clients_test.csv'
compare_datasets(ids)
#ids = dataiku.Dataset("SIVESIV.esiv_pers_phys")
#ods = dataiku.Dataset("dcd_siv_test")
#compare_datasets_dss(ids, ods)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment