import gdal
import json
import math
import numpy as np
import os
import osr
import scipy.signal
import uuid
import warnings
import xml.etree.ElementTree as ET
from .pipesegment import PipeSegment, LoadSegment, MergeSegment
from .image import Image
from . import image
[docs]class BandMath(PipeSegment):
Modify the array holding an image's pixel values,
using a user-supplied function.
def __init__(self, function, master=0):
self.function = function
self.master = master
def transform(self, pin):
if isinstance(pin, tuple):
pin = (pin * image.MergeToStack(self.master))()
data = self.function(
if data.ndim == 2:
data = np.expand_dims(data, axis=0)
return Image(data,, pin.metadata)
[docs]class Amplitude(PipeSegment):
Convert complex image to amplitude, by taking the magnitude of each pixel
def transform(self, pin):
return Image(np.absolute(,, pin.metadata)
[docs]class Intensity(PipeSegment):
Convert amplitude (or complex values) to intensity, by squaring each pixel
def transform(self, pin):
pout = Image(None,, pin.metadata)
if not np.iscomplexobj( = np.square(
else: = np.square(np.absolute(
return pout
[docs]class InPhase(PipeSegment):
Get in-phase (real) component of complex-valued data
def transform(self, pin):
return Image(np.real(,, pin.metadata)
[docs]class Quadrature(PipeSegment):
Get quadrature (imaginary) component of complex-valued data
def transform(self, pin):
return Image(np.imag(,, pin.metadata)
[docs]class Phase(PipeSegment):
Return the phase of the input image
def transform(self, pin):
return Image(np.angle(,, pin.metadata)
[docs]class Conjugate(PipeSegment):
Return complex conjugate of the input image
def transform(self, pin):
return Image(np.conj(,, pin.metadata)
[docs]class MultiplyConjugate(PipeSegment):
Given an iterable of two images, multiply the first
by the complex conjugate of the second.
def __init__(self, master=0):
self.master = master
def transform(self, pin):
return Image(
pin[0].data * np.conj(pin[1].data),
[docs]class Decibels(PipeSegment):
Express quantity in decibels
The 'flag' argument indicates how to handle nonpositive inputs:
'min' outputs the log of the image's smallest positive value,
'nan' outputs NaN, and any other value is used as the flag value itself.
def __init__(self, flag='min'):
self.flag = flag
def transform(self, pin):
pout = Image(None,, pin.metadata)
if isinstance(self.flag, str) and self.flag.lower() == 'min':
flagval = 10. * np.log10(([>0].min())
elif isinstance(self.flag, str) and self.flag.lower() == 'nan':
flagval = math.nan
flagval = self.flag / 10. = 10. * np.log10(,
out=np.full(np.shape(, flagval).astype(,>0
return pout
[docs]class Multilook(PipeSegment):
Multilook filter to reduce speckle in SAR magnitude imagery
Note: Set kernel_size to a tuple to vary it by direction.
def __init__(self, kernel_size=5, method='avg'):
self.kernel_size = kernel_size
self.method = method
def transform(self, pin):
if self.method == 'avg':
filter = scipy.ndimage.filters.uniform_filter
elif self.method == 'med':
filter = scipy.ndimage.filters.median_filter
elif self.method == 'max':
filter = scipy.ndimage.filters.maximum_filter
raise Exception('! Invalid method in Multilook.')
pout = Image(np.zeros(,,, pin.metadata)
for i in range([0]):[i, :, :] = filter([i, :, :],
return pout
[docs]class MultilookComplex(Multilook):
Like 'Multilook', but supports complex input
def transform(self, pin):
mkwargs = {'kernel_size':self.kernel_size, 'method':self.method}
pout = (pin
* (InPhase() * Multilook(**mkwargs) * image.Scale(1.+0.j)
+ Quadrature() * Multilook(**mkwargs) * image.Scale(1.j))
* image.MergeToSum()
return pout
[docs]class Orthorectify(PipeSegment):
Orthorectify an image using its ground control points (GCPs) with GDAL
def __init__(self, projection=3857, algorithm='lanczos',
row_res=1., col_res=1.):
self.projection = projection
self.algorithm = algorithm
self.row_res = row_res
self.col_res = col_res
def transform(self, pin):
drivername = 'GTiff'
srcpath = '/vsimem/orthorectify_input_' + str(uuid.uuid4()) + '.tif'
dstpath = '/vsimem/orthorectify_output_' + str(uuid.uuid4()) + '.tif'
(pin * image.SaveImage(srcpath, driver=drivername))()
gdal.Warp(dstpath, srcpath,
dstSRS='epsg:' + str(self.projection),
xRes=self.row_res, yRes=self.col_res,
pout = image.LoadImage(dstpath)() =
if in (bool, np.dtype('bool')): ='bool')
driver = gdal.GetDriverByName(drivername)
return pout
[docs]class DecompositionPauli(PipeSegment):
Compute the Pauli decomposition of quad-pol SAR data.
Note: Convention is alpha-->blue, beta-->red, gamma-->green
def __init__(self, hh_band=0, vv_band=1, xx_band=2):
self.hh_band = hh_band
self.vv_band = vv_band
self.xx_band = xx_band
def transform(self, pin):
hh =[self.hh_band]
vv =[self.vv_band]
xx =[self.xx_band]
alpha2 = 0.5 * np.square(np.absolute(hh + vv))
beta2 = 0.5 * np.square(np.absolute(hh - vv))
gamma2 = 2 * np.square(np.absolute(xx))
alpha2 = np.expand_dims(alpha2, axis=0)
beta2 = np.expand_dims(beta2, axis=0)
gamma2 = np.expand_dims(gamma2, axis=0)
pout = Image(np.concatenate((alpha2, beta2, gamma2), axis=0),,
return pout
[docs]class DecompositionFreemanDurden(PipeSegment):
Compute the three-component polarimetric decomposition of quad-pol SAR data
proposed by Freeman and Durden.
Note: Convention is Ps-->blue, Pd-->red, Pv-->green
def __init__(self, hh_band=0, vv_band=1, xx_band=2, kernel_size=5):
self.hh_band = hh_band
self.vv_band = vv_band
self.xx_band = xx_band
self.kernel_size = kernel_size
def transform(self, pin):
# Scattering matrix terms
hh = pin * image.SelectBands(self.hh_band)
vv = pin * image.SelectBands(self.vv_band)
xx = pin * image.SelectBands(self.xx_band)
# Covariance matrix terms
C11 = hh * Intensity()
C22 = vv * Intensity()
C33 = xx * Intensity()
C12 = (hh + vv) * MultiplyConjugate()
mkwargs = {'kernel_size':self.kernel_size, 'method':'avg'}
C11 = C11 * Multilook(**mkwargs)
C22 = C22 * Multilook(**mkwargs)
C33 = C33 * Multilook(**mkwargs)
C12 = C12 * MultilookComplex(**mkwargs)
# Volume amplitude, and volume-subtracted matrix terms
fv = C33 * image.Scale(1.5)
c11 = (C11 + fv) * BandMath(lambda x: x[0] - x[1])
c22 = (C22 + fv) * BandMath(lambda x: x[0] - x[1])
c12 = (C12 + fv) * BandMath(lambda x: x[0] - x[1] / 3.)
c12 = (c11 + c22 + c12) * BandMath(lambda x: np.where(x[0]*x[1]
< np.square(np.abs(x[2])), np.sqrt(x[0]*x[1]) \
* x[2]/np.abs(x[2]), x[2]))#
# Surface and dihedral amplitudes
surfacedominates = c12 * BandMath(lambda x: np.real(x) >= 0)
term1 = (c11 + c22 + c12 * InPhase() + c12 * Quadrature()
+ surfacedominates) * BandMath(lambda x:
(x[0]*x[1] - (x[2])**2 - (x[3])**2) /
(x[0] + x[1] + 2*x[2]*np.where(x[4], 1, -1)))
term1 = term1 * Amplitude()#
term2 = (c22 + term1) * BandMath(lambda x: x[0] - x[1])
term2 = term2 * Amplitude()#
term3 = (term1 + term2 + c12 * InPhase() + c12 * Quadrature()
+ surfacedominates) * BandMath(lambda x:
(x[2] + np.where(x[4], 1, -1) * x[0] + x[3] * 1.j) / x[1])
fs = ((term2 + surfacedominates) * image.SetMask(flag=0) + (term1
+ surfacedominates * image.InvertMask()) * image.SetMask(flag=0)) \
* image.MergeToSum()
fd = ((term1 + surfacedominates) * image.SetMask(flag=0) + (term2
+ surfacedominates * image.InvertMask()) * image.SetMask(flag=0)) \
* image.MergeToSum()
alpha = (surfacedominates * image.Scale(-1.) + (term3
+ surfacedominates * image.InvertMask()) * image.SetMask(flag=0)) \
* BandMath(lambda x: x[0] + x[1])
beta = ((term3 + surfacedominates) * image.SetMask(flag=0) \
+ surfacedominates * image.InvertMask() * image.Scale(1.)) \
* BandMath(lambda x: x[0] + x[1])
# Power
Ps = (fs + beta * Intensity()) * BandMath(lambda x: x[0] * (1. + x[1]))
Pd = (fd + alpha *Intensity()) * BandMath(lambda x: x[0] * (1. + x[1]))
Pv = fv
Pmask = (c11 + c22) * BandMath(lambda x: np.logical_and(
x[0]==0, x[1]==0)) * image.InvertMask()
Ps = (Ps + Pmask) * image.SetMask(flag=0)#
Pd = (Pd + Pmask) * image.SetMask(flag=0)#
Pstack = (Ps + Pd + Pv) * image.MergeToStack()
return Pstack()
[docs]class DecompositionHAlpha(PipeSegment):
Compute H-Alpha (Entropy-alpha) dual-polarization decomposition
def __init__(self, band0=0, band1=1, kernel_size=5):
self.band0 = band0
self.band1 = band1
self.kernel_size = kernel_size
def transform(self, pin):
mkwargs = {'kernel_size':self.kernel_size, 'method':'avg'}
image0 = pin * image.SelectBands(self.band0)
image1 = pin * image.SelectBands(self.band1)
# Coherence matrix terms
c00 = image0 * Intensity() * Multilook(**mkwargs)
c11 = image1 * Intensity() * Multilook(**mkwargs)
c01 = (image0 + image1) * MultiplyConjugate() \
* MultilookComplex(**mkwargs)
c01sq = c01 * Intensity()
# Calculate eigenvalues and some eigenvector terms (assumes c01 != 0)
# tr=trace; det=determinant; l1,l2=eigenvalues; v..=eigenvector terms
tr = (c00 + c11) * BandMath(lambda x: x[0] + x[1])
det = (c00 + c11 + c01sq) * BandMath(lambda x: x[0]*x[1] - x[2])
l1 = (tr + det) * BandMath(lambda x:
0.5*x[0] + np.sqrt(0.25*x[0]**2-x[1]))
l2 = (tr + det) * BandMath(lambda x:
0.5*x[0] - np.sqrt(0.25*x[0]**2-x[1]))
absv11 = (c00 + c01 + l1) * BandMath(lambda x: np.abs(x[1]) / np.sqrt(np.abs(x[1])**2 + np.abs(x[2] - x[0])**2))
absv12 = (c00 + c01 + l2) * BandMath(lambda x: np.abs(x[1]) / np.sqrt(np.abs(x[1])**2 + np.abs(x[2] - x[0])**2))
# Calculate entropy (H) and alpha
P1 = (l1 + l2) * BandMath(lambda x: x[0] / (x[0] + x[1]))
P2 = (l1 + l2) * BandMath(lambda x: x[1] / (x[0] + x[1]))
H = (P1 + P2) * BandMath(lambda x: -x[0] * np.log(x[0])
- x[1] * np.log(x[1]))
alpha = (P1 + P2 + absv11 + absv12) * BandMath(lambda x: x[0] * np.arccos(x[2]) + x[1] * np.arccos(x[3]))
outputs = (H + alpha) * image.MergeToStack()
return outputs()
[docs]class CapellaScaleFactor(PipeSegment):
Calibrate Capella single-look complex data (or amplitude thereof)
using the scale factor in the metadata
def transform(self, pin):
tiffjson = json.loads(pin.metadata['meta']['TIFFTAG_IMAGEDESCRIPTION'])
scale_factor = tiffjson['collect']['image']['scale_factor']
return Image(scale_factor *,, pin.metadata)
[docs]class CapellaGridToGCPs(PipeSegment):
Generate ground control points (GCPs) from a Capella grid file
and save them in a corresponding image's metadata. Input is a tuple
with the image in the 0 position and the grid in the 1 position.
Output is the image with modified metadata. Spacing between points
is in pixels.
def __init__(self, reverse_order=False, row_range=None, col_range=None,
spacing=150, row_spacing=None, col_spacing=None):
self.reverse_order = reverse_order
self.row_range = row_range
self.col_range = col_range
self.spacing = spacing
self.row_spacing = row_spacing
self.col_spacing = col_spacing
def transform(self, pin):
if not self.reverse_order:
img = pin[0]
grid = pin[1]
img = pin[1]
grid = pin[0]
pout = Image(,, img.metadata.copy())
if self.row_range is None:
rlo = 0
rhi =[1] - 1
rlo = self.row_range[0]
rhi = self.row_range[1]
if self.col_range is None:
clo = 0
chi =[2] - 1
clo = self.col_range[0]
chi = self.col_range[1]
rspace = self.spacing
cspace = self.spacing
if self.row_spacing is not None:
rspace = self.row_spacing
if self.col_spacing is not None:
cspace = self.col_spacing
gcps = []
for ri in range(rlo, rhi + 1, rspace):
for ci in range(clo, chi + 1, cspace):
gcps.append(gdal.GCP([1, ri, ci], #longitude[0, ri, ci], #latitude[2, ri, ci], #altitude
ci, ri #pixel=column=x, line=row=y
if len(gcps) > 10922:
warnings.warn('! Many GCPs generated in CapellaGridToGCPs.')
pout.metadata['gcps'] = gcps
return pout
[docs]class CapellaGridToPolygon(PipeSegment):
Given a Capella grid file, return a GeoJSON string indicating its boundary.
'step' is number of pixels between each recorded point.
def __init__(self, step=100, flags=False):
self.step = step
self.flags = flags
def transform(self, pin):
# Get indices of selected points along the edges of the grid file
nrows =[1]
ncols =[2]
step = self.step
allri = []
allci = []
cornerri = []
cornerci = []
for edge in range(4):
if edge == 0:
ri = list(range(0, nrows - 1, step))
ci = [0] * len(ri)
elif edge == 1:
ci = list(range(0, ncols - 1, step))
ri = [nrows - 1] * len(ci)
elif edge == 2:
ri = list(range(nrows - 1, 0, -step))
ci = [ncols - 1] * len(ri)
elif edge == 3:
ci = list(range(ncols - 1, 0, -step))
ri = [0] * len(ci)
# Get latitude/longitude values, and ensure they're counterclockwise
lats = [[0, ri, ci] for ri, ci in zip(allri, allci)]
lons = [[1, ri, ci] for ri, ci in zip(allri, allci)]
cornerlats = [[0, ri, ci] for ri,ci in zip(cornerri, cornerci)]
cornerlons = [[1, ri, ci] for ri,ci in zip(cornerri, cornerci)]
vi = (cornerlons[1] - cornerlons[0], cornerlats[1] - cornerlats[0])
vf = (cornerlons[0] - cornerlons[3], cornerlats[0] - cornerlats[3])
counterclockwise = vf[0] * vi[1] - vf[1] * vi[0] > 0
if not counterclockwise:
northlooking = cornerlats[3] > cornerlats[0]
eastlooking = cornerlons[3] > cornerlons[0]
flags = (counterclockwise, northlooking, eastlooking)
# Write latitudes & longitudes of the selected points to a JSON string
jsonstring = '{\n' \
'"type": "FeatureCollection",\n' \
'"name": "region_' + + '",\n' \
'"crs": { "type": "name", "properties": { "name": "urn:ogc:def:crs:EPSG::4326" } },\n' \
'"features": [\n' \
'{ "type": "Feature", "properties": { }, "geometry": { "type": "Polygon", "coordinates": [ [ '
for i, (lat, lon) in enumerate(zip(lats, lons)):
if i>0:
jsonstring += ', '
jsonstring += '[ ' + str(lon) + ', ' + str(lat) + ', 0.0 ]'
jsonstring += '] ] } }\n]\n}'
if self.flags:
return (jsonstring,) + flags
return jsonstring
[docs]class CapellaGridCommonWindow(PipeSegment):
Given an iterable of Capella grid files with equal orientations and pixel
sizes but translational offsets, find the overlapping region and return
its array indices for each grid file. Optionally, also return the subpixel
offset of each grid file needed for exact alignment.
def __init__(self, master=0, subpixel=True):
self.master = master
self.subpixel = subpixel
def transform(self, pin):
# Find the pixel in each grid that's closest to center of master grid.
# 'x' and 'y' are the latitude and longitude bands of the grid files,
# and (refx, refy) is the (lat, lon) of that center.
m = self.master
l = len(pin)
order = [m] + list(range(m)) + list(range(m+1, l))
localrefs = [[]] * len(pin)
fineoffsets = [[]] * len(pin)
extents = [[]] * len(pin)
windows = [[]] * len(pin)
for step, index in enumerate(order):
x = pin[index].data[0]
y = pin[index].data[1]
if step==0:
localrefs[index] = (int(0.5 * x.shape[0]),
int(0.5 * x.shape[1]))
fineoffsets[index] = (0., 0.)
# Get latitude and longitude of the reference point
refx = x[localrefs[index]]
refy = y[localrefs[index]]
# Find pixel closest to reference point
localrefs[index] = self.courseoffset(x, y, refx, refy)
# Find subpixel offset of reference point
fineoffsets[index] = self.fineoffset(x, y, refx, refy,
# Find how far from the reference pixel each grid extends
# Convention is [left, bottom, right, top]
extents[index] = [
x.shape[0] - localrefs[index][0] - 1,
x.shape[1] - localrefs[index][1] - 1,
if step==0:
minextents = extents[index].copy()
for i in range(4):
if extents[index][i] < minextents[i]:
minextents[i] = extents[index][i]
# Calculate col_min, row_max, col_max, row_min of overlapping window
for step, index in enumerate(order):
windows[index] = [
localrefs[index][1] - minextents[0],
localrefs[index][0] + minextents[1],
localrefs[index][1] + minextents[2],
localrefs[index][0] - minextents[3]
# Optionally return subpixel offsets
if self.subpixel:
finearray = np.array(fineoffsets)
return windows
[docs] def haversine(self, lat1, lon1, lat2, lon2, rad=False, radius=6.371E6):
Haversine formula for distance between two points given their
latitude and longitude, assuming a spherical earth.
if not rad:
lat1 = np.radians(lat1)
lon1 = np.radians(lon1)
lat2 = np.radians(lat2)
lon2 = np.radians(lon2)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
return 2 * radius * np.arcsin(np.sqrt(a))
[docs] def courseoffset(self, latgrid, longrid, lattarget, lontarget):
Given a latitude/longitude pair, find the closest point in
a grid of almost-regularly-spaced latitude/longitude pairs.
bound0 = np.shape(latgrid)[0] - 1
bound1 = np.shape(latgrid)[1] - 1
pos0 = int(bound0 / 2)
pos1 = int(bound1 / 2)
def score(pos0, pos1):
return self.haversine(latgrid[pos0, pos1], longrid[pos0, pos1], lattarget, lontarget)
while True:
scorenow = score(pos0, pos1)
if pos0>0 and score(pos0-1, pos1)<scorenow:
pos0 -= 1
elif pos0<bound0 and score(pos0+1, pos1)<scorenow:
pos0 += 1
elif pos1>0 and score(pos0, pos1-1)<scorenow:
pos1 -= 1
elif pos1<bound1 and score(pos0, pos1+1)<scorenow:
pos1 += 1
return (pos0, pos1)
[docs] def fineoffset(self, latgrid, longrid, lattarget, lontarget, uidx, vidx):
Given grids of almost-equally-spaced latitude and longitude, and an
exact latitude-longitude pair to aim for, and indices of the (ideally)
nearest point to that lat-long target, returns a first-order estimate
of the target offset, in pixels, relative to the specified point.
mlat = lattarget - latgrid[uidx, vidx]
mlon = lontarget - longrid[uidx, vidx]
ulat = latgrid[uidx+1, vidx] - latgrid[uidx, vidx]
ulon = longrid[uidx+1, vidx] - longrid[uidx, vidx]
vlat = latgrid[uidx, vidx+1] - latgrid[uidx, vidx]
vlon = longrid[uidx, vidx+1] - longrid[uidx, vidx]
uoffset = (mlat*ulat + mlon*ulon) / (ulat**2 + ulon**2)
voffset = (mlat*vlat + mlon*vlon) / (vlat**2 + vlon**2)
return uoffset, voffset
[docs]class TerraSARXScaleFactor(PipeSegment):
Calibrate TerraSAR-X complex data using the scale factor in the
accompanying xml file.
def __init__(self, reverse_order=False):
self.reverse_order = reverse_order
def transform(self, pin):
if not self.reverse_order:
img = pin[0]
info = pin[1]
img = pin[1]
info = pin[0]
root = ET.fromstring(info)
scale_factor = float(list(root.iter('calFactor'))[0].text)
return Image(math.sqrt(scale_factor) *,, img.metadata)
[docs]class TerraSARXGeorefToGCPs(PipeSegment):
Generate ground control points (GCPs) from a TerraSAR-X GEOREF.xml file
and save them in a corresponding image's metadata. Input is a tuple
with the image in the 0 position and the georef file in the 1 position.
Output is the image with modified metadata.
def __init__(self, reverse_order=False):
self.reverse_order = reverse_order
def transform(self, pin):
# Define output image
if not self.reverse_order:
img = pin[0]
georef = pin[1]
img = pin[1]
georef = pin[0]
pout = Image(,, img.metadata.copy())
# Set GCP values
gcps = []
root = ET.fromstring(georef)
gcpentries = root.findall('./geolocationGrid/gridPoint')
for gcpentry in gcpentries:
float(gcpentry.find('lon').text), #longitude
float(gcpentry.find('lat').text), #latitude
float(gcpentry.find('height').text), #altitude
float(gcpentry.find('col').text), #pixel=column=x
float(gcpentry.find('row').text) #line=row=y
pout.metadata['gcps'] = gcps
# Set GCP projection
crs = osr.SpatialReference()
pout.metadata['gcp_projection'] = crs.ExportToWkt()
return pout