Last active
January 8, 2021 23:00
-
-
Save cristianpb/84ec819842c44778cf4042d861f10e84 to your computer and use it in GitHub Desktop.
bulk csv request
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
url = "https://deces.matchid.io/deces/api/v1/search/csv" | |
files = { | |
'file': ('clients_test.csv', open('clients_test.csv', 'rb'), 'application/csv', {'Expires': '0'}), | |
'sep': (None, ';'), | |
'firstName': (None, 'Prenom'), | |
'lastName': (None, 'Nom'), | |
'birthDate': (None, 'Date'), | |
'dateFormat': (None, 'DD/MM/YYYY') | |
} | |
r = requests.post(url, files=files) | |
print(r.text) | |
res = r.json() | |
print(res['id']) | |
url = "https://deces.matchid.io/deces/api/v1/search/csv/" | |
url_job = url + res['id'] | |
print("url: ", url_job) | |
r = requests.request("GET", url_job) | |
print(r.text) | |
res = r.json() | |
print(res) | |
while res['status'] == 'created' or res['status'] == 'waiting' or res['status'] == 'active': | |
r = requests.request("GET", url_job) | |
try: | |
res = r.json() | |
except: | |
break | |
print(res) | |
print(r.text) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, Queue | |
import itertools | |
import io | |
import os | |
import logging | |
import pandas as pd | |
import numpy as np | |
import requests | |
import sys, time, traceback | |
import json | |
server_address = 'http://localhost:8084' | |
# Process config | |
lines_per_request = 20 | |
verbosechunksize = 100 | |
threads = 1 | |
timeout = 60 | |
maxtries = 5 | |
#limit = None | |
limit = 120 | |
# Input fields configuration | |
columns = ['Prenom', 'Nom', 'Date', | |
'PERS_lastSeenAliveDate' | |
] | |
request_data = { | |
'firstName': 'Prenom', | |
'lastName': 'Nom', | |
'birthDate': 'Date', | |
'dateFormat': 'DD/MM/YYYY', | |
'sep': ';', | |
'lastSeenAliveDate': 'PERS_lastSeenAliveDate' | |
} | |
match_from_date = '2020-07-31' | |
# Ouput fields configuration | |
output_prefix = '' | |
error_prefix = 'error' | |
error_col = '{}{}'.format(output_prefix,error_prefix) if error_prefix else None | |
def process_chunk(i,df,process_queue,write_queue,schema_check=[]): | |
try: | |
df = request_submit(df,i,schema_check) | |
if ((((i+1)*lines_per_request) % verbosechunksize) == 0): | |
print("chunk {}-{} ok".format(i*lines_per_request+1,(i+1)*lines_per_request)) | |
except: | |
exc_type, exc_obj, exc_tb = sys.exc_info() | |
logging.warning("chunk {}-{} failed - {}".format(i*lines_per_request+1,(i+1)*lines_per_request,traceback.print_exception(exc_type, exc_obj, exc_tb))) | |
write_queue.put(df) | |
process_queue.get(i) | |
def request_submit(df,i=0,schema_check=[]): | |
"""Does the actual request to the backend server""" | |
global maxtries | |
#string_io = StringIO.StringIO() | |
string_io = io.StringIO() | |
response = None | |
if not isinstance(df,pd.DataFrame): | |
return df | |
df['PERS_lastSeenAliveDate'] = match_from_date | |
df.reset_index(inplace=True) | |
df[columns].to_csv(string_io, encoding="utf-8", index=False) | |
kwargs = { | |
'data': request_data, | |
'files': {'data': string_io.getvalue()}, | |
'timeout': timeout, | |
'url': "{}/deces/api/v1/search/csv".format(server_address) | |
} | |
tries=1 | |
failed=True | |
while ((failed == True) & (tries <= maxtries)): | |
try: | |
response = requests.post(**kwargs) | |
status_code = response.status_code | |
except requests.exceptions.ReadTimeout: | |
status_code = "timeout" | |
if status_code == 200: | |
failed=False | |
else: | |
logging.warning("Retry: {}".format(tries)) | |
tries += 1 | |
if (tries <= maxtries): | |
time.sleep(3 ** (tries-1)) | |
if (failed == False): | |
jobId = response.json()['id'] | |
jobProgress = 'created' | |
while jobProgress in ['active', 'waiting', 'created']: | |
jobStatus = requests.get("{}/deces/api/v1/search/csv/{}".format(server_address, jobId)) | |
try: | |
jobProgress = jobStatus.json()['status'] | |
except Exception as e: | |
jobProgress = {} | |
break | |
time.sleep(0.5) | |
content = io.StringIO(jobStatus.content.decode('utf-8-sig')) | |
result = pd.read_csv(content, dtype=object, sep=";") | |
if error_col: | |
df[error_col] = None | |
if (tries>1): | |
logging.warning("chunk {}-{} needed {} tries".format(tries,i*lines_per_request+1,(i+1)*lines_per_request)) | |
diff = [x for x in result.axes[1].difference(df.axes[1])] | |
df.drop([u'index'], axis=1, inplace=True) | |
for result_column in diff: | |
new_column=result_column.replace("result_", output_prefix) | |
df[new_column] = result[result_column] | |
if (failed == True): | |
tries -= 1 | |
logging.warning("chunk {}-{} failed after {} tries".format(i*lines_per_request+1,(i+1)*lines_per_request,tries)) | |
df[output_prefix+'score'] = -1 | |
if error_col: | |
df[error_col] = "HTTP Status: {}".format(status_code) | |
if (len(schema_check)>len(df.axes[1])): | |
diff = [x for x in schema_check.difference(df.axes[1])] | |
for col in diff: | |
df[col]=None | |
return df | |
def grouper(iterable, n, fillvalue=None): | |
"Collect data into fixed-length chunks or blocks" | |
args = [iter(iterable)] * n | |
return itertools.izip_longest(*args, fillvalue=fillvalue) | |
def compare_datasets(ids): | |
# First a small pass to produce the output schema | |
#dataset_iter = ids.iter_dataframes(chunksize=lines_per_request, infer_with_pandas=False, limit=limit) | |
dataset_iter = pd.read_csv(ids, iterator=True, chunksize=lines_per_request, nrows=limit, sep=";") | |
process_queue = Queue(threads) | |
write_queue = Queue() | |
ow = open("demofile2.txt", "a") | |
for i,chunk in enumerate(dataset_iter): | |
process_queue.put(i) | |
thread = Process(target=process_chunk, args=[i,chunk,process_queue,write_queue]) | |
thread.start() | |
while (write_queue.qsize() > 0): | |
ow.write(write_queue.get().to_csv(encoding="utf-8", index=False)) | |
logging.info("waiting {} chunk processes".format(process_queue.qsize())) | |
c = 0 | |
while (process_queue.qsize() > 0) and c < 300: | |
c += 1 | |
time.sleep(1) | |
logging.info("flushing {} chunks".format(write_queue.qsize())) | |
c = 0 | |
while (write_queue.qsize() > 0) and c < 300: | |
c += 1 | |
ow.write(write_queue.get().to_csv(encoding="utf-8", index=False)) | |
ow.close() | |
def compare_datasets_dss(ids, ods): | |
''' | |
Compare each row in an input dataset, and produces a row in the output dataset with additional fields. | |
ids: the input dataset | |
ods: the output dataset | |
''' | |
# First a small pass to produce the output schema | |
small = ids.get_dataframe(sampling='head', limit=3, infer_with_pandas=False) | |
initial_index = small.axes[1] | |
match_result = request_submit(small) | |
output_index = match_result.axes[1] | |
schema = ids.read_schema() | |
floats = [output_prefix + column for column in ['birth.latitude', 'birth.longitude','death.longitude', | |
'death.latitude', 'death.age', 'score']] | |
for column in output_index.difference(initial_index): | |
schema.append({'name': column, 'type': 'float' if column in floats else 'string'}) | |
ods.write_schema(schema) | |
ow = ods.get_writer() | |
# Then the full pass | |
dataset_iter = ids.iter_dataframes(chunksize=lines_per_request, infer_with_pandas=False, limit=limit) | |
process_queue = Queue(threads) | |
write_queue = Queue() | |
for i,chunk in enumerate(dataset_iter): | |
process_queue.put(i) | |
thread = Process(target=process_chunk, args=[i,chunk,process_queue,write_queue,output_index]) | |
thread.start() | |
while (write_queue.qsize() > 0): | |
ow.write_dataframe(write_queue.get()) | |
logging.info("waiting {} chunk processes".format(process_queue.qsize())) | |
c = 0 | |
while (process_queue.qsize() > 0) and c < 300: | |
c += 1 | |
time.sleep(1) | |
logging.info("flushing {} chunks".format(write_queue.qsize())) | |
c = 0 | |
while (write_queue.qsize() > 0) and c < 300: | |
c += 1 | |
ow.write_dataframe(write_queue.get()) | |
ids = 'clients_test.csv' | |
compare_datasets(ids) | |
#ids = dataiku.Dataset("SIVESIV.esiv_pers_phys") | |
#ods = dataiku.Dataset("dcd_siv_test") | |
#compare_datasets_dss(ids, ods) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment