Created
September 8, 2019 06:36
-
-
Save DIRKMJK/d02ef2f5845228a3df96c5cf543bf4a4 to your computer and use it in GitHub Desktop.
Holleweg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from pathlib import Path | |
import requests | |
import pandas as pd | |
import geopandas as gpd | |
from shapely.geometry import Point | |
import numpy as np | |
import geopy.distance | |
OSM = Path('../data/osm') | |
COUNTRIES = '../data/ne_10m_admin_0_countries/ne_10m_admin_0_countries.shp' | |
API_KEY = XXX | |
BASE_URL = 'https://maps.googleapis.com/maps/api/elevation/json?locations={}&key={}' | |
ELEVATION = '../data/elevation.txt' | |
PREC = 5 | |
MILE = 1.609344 | |
GET_ELEVATION = input('Get new elevation data? y/[n]') | |
def length(linestring): | |
"""Calculate length of linestring""" | |
length = 0 | |
coords = linestring.coords | |
coords = [(lat, lon) for (lon, lat) in coords] | |
for i in range(len(coords) - 1): | |
start = coords[i] | |
end = coords[i + 1] | |
length += geopy.distance.distance(start, end).m | |
return length | |
def convert_speed(speed): | |
"""Convert mph to kph""" | |
if pd.isnull(speed): | |
return speed | |
if 'mph' in speed: | |
speed = speed.replace('mph', '').strip() | |
speed = MILE * float(speed) | |
return float(speed) | |
def find_country(line): | |
"""Find country line is in""" | |
for i in countries.index: | |
geom = countries.geometry[i] | |
if line.within(geom): | |
return countries.ADMIN[i] | |
return None | |
def find_shapes(line, geo): | |
"""Find multipolygons point is in""" | |
coords = line.coords | |
start = Point(coords[0]) | |
end = Point(coords[-1]) | |
matches = [] | |
for i in geo.index: | |
multipolygon = geo.loc[i, 'geometry'] | |
for polygon in list(multipolygon): | |
if start.within(polygon) or end.within(polygon): | |
matches.append(i) | |
return matches | |
def add_geo(var, geo, gdf_to_geo): | |
"""Lookup geo characteristics""" | |
for i in gdf[gdf.country == 'Netherlands'].index: | |
geo_indices = gdf_to_geo[i] | |
values = geo.loc[geo_indices, var] | |
values = [v for v in values if pd.notnull(v)] | |
gdf.loc[i, var] = '|'.join(values) | |
def location_query(elevation, n): | |
"""Create n-length query for locations with no elevation data yet""" | |
subset = elevation[pd.isnull(elevation.elevation)].head(n) | |
coords = ['{:f},{:f}'.format(c[0], c[1]) for c in zip(subset.lat, subset.lon)] | |
query = '|'.join(coords) | |
return subset.index, query | |
def request_elevation(elevation, n): | |
"""Request elevation for next n locations""" | |
index, query = location_query(elevation, n) | |
url = BASE_URL.format(query, API_KEY) | |
r = requests.get(url) | |
if r.status_code != 200: | |
print(r.text) | |
print(query) | |
else: | |
results = r.json()['results'] | |
for i, result in enumerate(results): | |
ind = index[i] | |
elevation.loc[ind, 'lat_g'] = result['location']['lat'] | |
elevation.loc[ind, 'lon_g'] = result['location']['lng'] | |
elevation.loc[ind, 'elevation'] = result['elevation'] | |
elevation.loc[ind, 'resolution'] = result['resolution'] | |
return elevation, r | |
def lookup_elevation(geometry): | |
"""Look up elevation for start or end of edge""" | |
coords = geometry.coords | |
elevations = [] | |
resolutions = [] | |
key_start = f'{round(coords[0][0], PREC)},{round(coords[0][1], PREC)}' | |
if key_start in elev_dict: | |
elevations.append(elev_dict[key_start][0]) | |
resolutions.append(elev_dict[key_start][1]) | |
key_end = f'{round(coords[-1][0], PREC)},{round(coords[-1][1], PREC)}' | |
if key_end in elev_dict: | |
elevations.append(elev_dict[key_end][0]) | |
resolutions.append(elev_dict[key_end][1]) | |
if not elevations: | |
return None, None, None | |
return min(elevations), max(elevations), np.mean(resolutions) | |
def create_elevation(gdf): | |
"""Create df with elevation for start and and points of ways""" | |
gdf['start'] = gdf.geometry.map(lambda x: x.coords[0]) | |
gdf['end'] = gdf.geometry.map(lambda x: x.coords[-1]) | |
start_and_end_coords = set(list(gdf.start) + list(gdf.end)) | |
elevation = pd.DataFrame(start_and_end_coords) | |
elevation.columns = ['lon', 'lat'] | |
for c in ['lat_g', 'lon_g', 'elevation', 'resolution']: | |
if c not in elevation.columns: | |
elevation[c] = None | |
count = 0 | |
todo = sum(pd.isnull(elevation.elevation)) | |
n = 10 | |
while count < todo: | |
count += n | |
if count % 500 == 0: | |
print(count) | |
elevation, r = request_elevation(elevation, n) | |
if count % 100 == 0: | |
time.sleep(1) | |
if len(r.json()['results']) < n: | |
break | |
elevation.to_csv(ELEVATION, index=False) | |
return elevation | |
# Merge OSM data into gdf and create additional variables | |
gdfs = [gpd.read_file(p) for p in OSM.glob('*.json')] | |
lans = [p.stem for p in OSM.glob('*.json')] | |
for i, gdf in enumerate(gdfs): | |
gdfs[i]['lan'] = lans[i] | |
gdf = pd.concat(gdfs, sort=False) | |
gdf = gdf.reset_index() | |
gdf['length_m'] = gdf.geometry.apply(length) | |
gdf['max_speed'] = gdf.maxspeed.apply(convert_speed) | |
# Add countries (shapefile from Natural Earth) | |
countries = gpd.read_file(COUNTRIES) | |
gdf['country'] = gdf.geometry.apply(find_country) | |
# Add geology for NL (from WUR Geomorfologie map) | |
geo = gpd.read_file('../data/geomorfologie2017/Geomorfologie2017_v2.gml') | |
geo = geo.to_crs({'init': 'epsg:4326'}) | |
gdf_to_geo = {} | |
for i, row in gdf[gdf.country == 'Netherlands'].iterrows(): | |
gdf_to_geo[i] = find_shapes(row.geometry, geo) | |
columns_geo = [c for c in geo.columns | |
if 'omschrijving' in c or c.endswith('naam')] | |
for c in columns_geo: | |
add_geo(c, geo, gdf_to_geo) | |
# Add elevation data | |
if GET_ELEVATION.lower() in ['y', 'yes']: | |
elevation = create_elevation(gdf) | |
else: | |
elevation = pd.read_csv(ELEVATION) | |
elev_dict = {f'{round(row.lon, PREC)},{round(row.lat, PREC)}': | |
(row.elevation, row.resolution) | |
for _, row in elevation.iterrows()} | |
gdf['elev_min'] = gdf.geometry.map(lambda x: lookup_elevation(x)[0]) | |
gdf['elev_max'] = gdf.geometry.map(lambda x: lookup_elevation(x)[1]) | |
gdf['elev_mean'] = (gdf.elev_min + gdf.elev_max) / 2 | |
gdf['resolution'] = gdf.geometry.map(lambda x: lookup_elevation(x)[2]) | |
gdf['gradient'] = 100 * (gdf.elev_max - gdf.elev_min) / gdf.length_m | |
# Store data | |
keep = [ | |
'geometry', 'name', 'lan', 'oneway', 'length_m', 'max_speed', | |
'country', 'elev_min', 'elev_max', 'elev_mean', 'resolution', | |
'gradient', | |
] | |
geojson_data = gdf[keep].to_json() | |
with open('../map/data.js', 'w') as f: | |
f.write('var data = {};\n'.format(geojson_data)) | |
f.write('var languages = {};'.format(list(set(gdf.lan)))) | |
keep += columns_geo | |
rename_cols = { | |
'BRO_Relief_omschrijving': 'relief', | |
'BRO_Vormsubgroep_omschrijving': 'vormsubgroep', | |
'GENESE_omschrijving_kort': 'genese', | |
'Bovengrond_omschrijving': 'bovengrond', | |
'BovengrondReliëf_omschrijving': 'bovengr_rlf', | |
'Geologischtijdperk_vormingsproces_eindigt_naam': 'geo_eind', | |
'Geologischtijdperk_vormingsproces_begint_naam': 'geo_begin', | |
'BRO_Landvormgroep_omschrijving': 'landvormgroep', | |
} | |
gdf[keep].rename(columns=rename_cols).to_file('../data/enriched/enriched.shp') | |
geo['holleweg'] = geo.BRO_Vormsubgroep_omschrijving.map(lambda x: | |
'Holle weg' in str(x)) | |
holleweg_geojson = geo[geo.holleweg][['geometry']].to_json() | |
with open('../map/geo.js', 'w') as f: | |
f.write('var holleweg = {};'.format(holleweg_geojson)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
NAMES = { | |
'nl': 'Holleweg', | |
'en': 'Holloway', | |
'fr': 'Chemin Creux', | |
'de': 'Hohlweg', | |
'gl': 'Corredoira', | |
'da': 'Hulvejen', | |
'no': 'Hulvei', | |
'sv': 'Hålväg' | |
} | |
URL = "http://overpass-api.de/api/interpreter" | |
query = "[out:json]; way[name~'.*{}']; out geom;" | |
def query_op(name): | |
"""Query overpass api for street name""" | |
params = {'data': query.format(name)} | |
response = requests.get(URL, params=params) | |
return response | |
def to_geojson(data): | |
"""Convert overpass response to geojson""" | |
geojson = { | |
'type': 'FeatureCollection', | |
'features': [] | |
} | |
for element in data['elements']: | |
coordinates = [[v for k, v in coords.items()] | |
for coords in element['geometry']] | |
coordinates = [[v2, v1] for [v1, v2] in coordinates] | |
feature = { | |
'type': 'Feature', | |
'geometry': { | |
'type': 'LineString', | |
'coordinates': coordinates | |
}, | |
'properties': element['tags'] | |
} | |
geojson['features'].append(feature) | |
return geojson | |
def main(): | |
"""Query overpass for all street names and store results""" | |
for lan, name in NAMES.items(): | |
print(lan) | |
response = query_op(name) | |
if response.status_code != 200: | |
print(response.text) | |
break | |
geojson = to_geojson(response.json()) | |
with open('../data/osm/{}.json'.format(lan), 'w') as f: | |
f.write(json.dumps(geojson)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment