Skip to content

Instantly share code, notes, and snippets.

@martinjc
Created May 29, 2019 21:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save martinjc/0d54358e1ddae6f48f689e899506e570 to your computer and use it in GitHub Desktop.
Save martinjc/0d54358e1ddae6f48f689e899506e570 to your computer and use it in GitHub Desktop.
Code to pull activity data out of Runkeeper API
import os
import json
import time
import requests
from urllib.parse import urlencode, quote
from _credentials import client_id, client_secret, access_token
DATA_DIR = os.path.join(os.getcwd(), "data")
def get_auth_url():
# create an authorisation url to copy paste into a browser window
# so we can get an access token to store in _credentials.py
base_url = "https://runkeeper.com/"
endpoint = "apps/authorize"
params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": "http://www.martinjc.com",
}
auth_url = base_url + endpoint + "?" + urlencode(params, quote_via=quote)
return auth_url
def do_auth():
base_url = "https://runkeeper.com/"
endpoint = "apps/token"
params = {
"grant_type": "authorization_code",
"code": "code",
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": "http://www.martinjc.com",
}
r = requests.post(base_url + endpoint, params)
print(r.json())
def get_data(endpoint, params=None):
base_url = "https://api.runkeeper.com/"
endpoint = endpoint
headers = {"Authorization": "Bearer " + access_token}
r = requests.get(base_url + endpoint, params=params, headers=headers)
print(r.status_code)
print(r.json())
if r.status_code != 200:
print(r.json())
return None
return r.json()
def get_fitness_activities():
# store activity URIs separately
with open("activities.json", "w") as activities_output_file, open(
"activity_uris.json", "w"
) as uri_output_file:
activities = []
activity_uris = []
activity_data = get_data("fitnessActivities")
num_activities = activity_data["size"]
num_calls = int(num_activities / 25)
call_count = 0
if activity_data.get("items"):
activities.extend(activity_data["items"])
print(call_count, len(activity_data["items"]), len(activities))
for activity in activity_data["items"]:
activity_uris.append(activity["uri"])
while activity_data.get("next") and call_count < num_calls:
activity_data = get_data(activity_data["next"])
call_count += 1
if activity_data.get("items"):
activities.extend(activity_data["items"])
print(call_count, len(activity_data["items"]), len(activities))
for activity in activity_data["items"]:
activity_uris.append(activity["uri"])
json.dump(activities, activities_output_file)
json.dump(activity_uris, uri_output_file)
def download_activities(activity_list):
for activity_uri in activity_list:
id_str = activity_uri.replace("/fitnessActivities/", "")
filename = os.path.join(DATA_DIR, "%s.json" % id_str)
if not os.path.exists(filename):
activity_data = get_data(activity_uri)
time.sleep(9)
if activity_data is not None:
with open(filename, "w") as output_file:
json.dump(activity_data, output_file)
if __name__ == "__main__":
print(get_auth_url())
do_auth()
get_fitness_activities()
with open("activity_uris.json", "r") as input_file:
activity_uris = json.load(input_file)
download_activities(activity_uris)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment