Skip to content

Instantly share code, notes, and snippets.

@yellowcap
Last active February 2, 2023 16:30
Show Gist options
  • Save yellowcap/4a0c5f23b5c6997202a5a1df7ea4b3e3 to your computer and use it in GitHub Desktop.
Save yellowcap/4a0c5f23b5c6997202a5a1df7ea4b3e3 to your computer and use it in GitHub Desktop.
Sentinel-2 RGB image overlapping with GeoJSON feature served through fastapi
import requests
response = requests.post(
"http://127.0.0.1:8000/rgb/?start=2022-01-01&end=2022-01-31&mode=ndvi",
json={
"bbox": [128.774676, -22.256217, 128.789557, -22.241022],
"geometry": {
"coordinates": [
[
[128.77626, -22.24742],
[128.783557, -22.241022],
[128.789557, -22.244932],
[128.786677, -22.253995],
[128.780149, -22.256217],
[128.774676, -22.251907],
[128.77626, -22.24742],
]
],
"type": "Polygon",
},
"properties": None,
"type": "Feature",
},
)
if response.status_code != 200:
print(response.json())
else:
with open("fastapi-test.png", "wb") as f:
f.write(response.content)
import io
from datetime import date
from enum import Enum
from math import ceil
from typing import Union
from rio_tiler.colormap import cmap
import numpy as np
import pyproj
from fastapi import FastAPI, Query, Response
from geojson_pydantic import Feature
from geojson_pydantic.types import BBox
from PIL import Image, ImageOps
from rasterio import Affine
from rasterio.features import bounds, rasterize
from rio_tiler.io import COGReader
from satsearch import Search
from shapely import Geometry
from shapely.geometry import shape
from shapely.ops import transform
RGB_BANDS = ["red", "green", "blue"]
NDVI_BANDS = ["red", "nir"]
SCALE = 10
app = FastAPI()
def transform_geometry(geom: Feature, epsg: int) -> Geometry:
"""
Reproject a GeoJson Pydantic Feature and return it as Shapely Geometry.
"""
project = pyproj.Transformer.from_proj(
pyproj.Proj(init="epsg:4326"),
pyproj.Proj(init=f"epsg:{epsg}"),
)
if geom.geometry is None:
raise ValueError("Can not transform an empty geometry")
return transform(
project.transform,
shape(geom.geometry.dict()),
)
def compute_transform(geom: Geometry, scale: int) -> tuple[Affine, int, int]:
"""
Compute the geotransform and image size for the rasterized geometry. The
input scale is expected to be in the coordinate system units of the epsg
provided.
"""
extent = bounds(geom)
width = ceil((extent[2] - extent[0]) / scale)
height = ceil((extent[3] - extent[1]) / scale)
geotransform = Affine(scale, 0, extent[0], 0, -scale, extent[3])
print(f"Computed width {width} and height {height}")
return geotransform, width, height
def get_pixels(
href: str, bbox: Union[BBox, None], width: int, height: int
) -> np.ndarray:
"""
Retrieve pixel values for a raster over a bounding box at the
given resolution.
"""
print(f"Getting data for {href}")
with COGReader(href) as raster:
return raster.part(bbox, width=width, height=height)
class BandCombo(str, Enum):
rgb = "rgb"
ndvi = "ndvi"
def bands(self):
if self == "rgb":
return RGB_BANDS
else:
return NDVI_BANDS
def get_array(aoi: Feature, start: date, end: date, mode: BandCombo) -> np.ndarray:
"""
Get
"""
search = Search(
url="https://earth-search.aws.element84.com/v0",
bbox=aoi.bbox,
datetime=f"{start}T00:00:00Z/{end}T23:59:59Z",
query={"eo:cloud_cover": {"lt": 90}},
collections=["sentinel-s2-l2a-cogs"],
)
items = [item for item in search.items()]
print(f"Search found {len(items)} scenes")
items.sort(key=lambda x: x.properties["eo:cloud_cover"])
item = items[0]
print(item)
epsg = item.properties["proj:epsg"]
aoi_transformed = transform_geometry(aoi, epsg)
geotransform, width, height = compute_transform(aoi_transformed, SCALE)
aoi_rasterized = rasterize(
[aoi_transformed],
out_shape=(height, width),
transform=geotransform,
all_touched=True,
fill=0,
default_value=1,
)
hrefs = [item.asset(band)["href"] for band in mode.bands()]
X = []
for href in hrefs:
pixels = get_pixels(href, aoi.bbox, width, height)
pixels_data = pixels.data[0]
pixels_data_masked = pixels_data * aoi_rasterized
X.append(pixels_data_masked)
return np.array(X, dtype="float32")
@app.post("/rgb/", response_class=Response)
async def rgb(aoi: Feature, start: date, end: date, mode: BandCombo):
"""
Return an RGB image over the input geometry based on imagery from
within the given time interval.
"""
X = get_array(aoi, start, end, mode)
if mode == BandCombo.rgb:
X = np.multiply(np.clip(X / 3000, 0, 1), 255)
X = X.swapaxes(0, 1).swapaxes(1, 2)
img = Image.fromarray(X.astype("uint8"), mode="RGB")
else:
ndvi = (X[1] - X[0]) / (X[1] + X[0])
ndvi_scaled = 255* (np.nan_to_num(ndvi, nan=-1) + 1) / 2
img = Image.fromarray(ndvi_scaled.astype("uint8"))
with io.BytesIO() as output:
img.save(output, format="PNG")
data = output.getvalue()
return Response(content=data, media_type="image/png")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment