Risk assessment for flooding - building damage and population exposure#

Click Flood Damage and Population Exposure to go to this workflow’s GitHub repository.

Introduction#

This workflow assesses economic damage to buildings, exposure of critical infrastructure, and population exposure & displacement, by combining flood map data (hazard) and population and building data (exposure).

Building damage & critical infrastructure#

Flood risk in the form of economic damage at a building level is computed from flood depth maps and building data. The damages are computed for each flood event (flood map for a given return period) and then integrated over all of the event return periods to determine expected annual damage (EAD). Damage to each building varies based on the local flood depth, reconstruction costs, value of its contents, and its footprint area.

Datasets:

Furthermore, critical infrastructure is mapped onto the flood maps to visually assess its exposure to the hazard.

The code can be customized to use any flood map, building data, and depth-damage relationships.

Population exposure & population displaced#

Population exposure and displaced population are computed from flood maps and population maps. The population data by default is based on a global dataset and one can choose from a number of options for the time period (past years and projections for near future). Population is considered displaced when the population is exposed to flood depths over a given threshold. For both exposure and displacement, the results are integrated over all of the event return periods to determine expected annual population exposed (EAPE) and expected annual population displaced (EAPD).

Datasets:

The code can be customized to use any population data, flood map, and minimum water depth threshold to classify the exposed and displaced populations.

Limitations#

The flood maps that are used in this workflow by default do not take into consideration possible flood protection infrastructure that may in reality limit the impact of the hazard. Moreover, the resolution of 3 arc-seconds for both the flood maps and population maps may be unsitable for particularly complex regions. If possible, it is suggested to use local data which may lead to a better representation of the ground truth.

Furthermore, buildings that are close to water bodies (e.g: rivers) may overlap the water body, thus resulting in higher than expected flood depths (and damage) values being used in the damage calculations. This effect can be reduced by using higher resolution flood maps. Similarly, due to the resolution of both the population and the flood maps, it might be that part of the population appears to be over a water body and is counted towards the overall exposed and displaced statistics irregardless of flooding.

Preparation work#

Import modules#

import os
import sys
import urllib.request
from zipfile import ZipFile
import socket
import warnings
import numpy as np
import pandas as pd
import geopandas as gpd
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
from rasterio.warp import reproject, calculate_default_transform
from rasterio.mask import mask
from rasterio.transform import array_bounds
import rasterstats
from osgeo import gdal, osr
from shapely.geometry import Polygon, Point
import osmnx as ox
from pyproj import Transformer
from matplotlib.colors import LinearSegmentedColormap
import matplotlib.lines as mlines
import matplotlib.pyplot as plt
import matplotlib.patheffects as pe
from mpl_toolkits.axes_grid1 import make_axes_locatable
import contextily as ctx

Define inputs#

In this section different input parameters are set. Please read the descriptions when provided to ensure the appropriate changes are made. Some of the things that can be set here are:

  • Geographical bounds of area of interest (a .shp or .gpkg file can be used for polygons delineating the area of interest, alternatively coordinates can be manually inserted for a rectangular outline).

  • Return periods for which calculations are made, projection of the population.

  • Code flags (e.g. choosing which maps get produced, what elements are shown in outputs, automatically saving images, etc.)

  • Directory locations

  • Colorbars for maps

## Name for saves
saveName = 'myLocation'

## Location, either from file or from coordinates ------------------
outlineFileFlag=False #If true, the outline found in the file location below will be used. If false, the rectangle created by the input coordinates will be used

if outlineFileFlag is True: 
    outlineFileLoc='.\\data\\myLocationOutline_GeoPackage.gpkg' #Input location of shapefile/geopackage with outline of interest
    try:
        outlineFile=gpd.read_file(outlineFileLoc)
    except Exception as exc:
        raise ValueError("Could not open outline file location (shapefile or geopackage). If you want to manually define a box with coordinates, set the outlineFileFlag to False, and set Latitude1, Latitude2, Longitude1, Longitude2 with your chosen coordinates.") from exc
    outlineGeometry=outlineFile.geometry.to_crs(4326)
    if outlineGeometry.ndim>1:
        raise ValueError(f"Make sure the given file contains only the outline needed, so that the dimension of the variable is 1. Current dimensions: {outlineGeometry.ndim}")
    outlineBounds=outlineGeometry.bounds
    Latitude1, Latitude2 = outlineBounds['miny'].min(),outlineBounds['maxy'].max()
    Longitude1, Longitude2 = outlineBounds['minx'].min(),outlineBounds['maxx'].max()

elif outlineFileFlag is False: 
    # Placeholder example, Italy
    Latitude1, Longitude1 = 43.77, 11.265 # Input coordinates of location, in EPSG:4326
    Latitude2, Longitude2 = 43.78, 11.29 # Input coordinates of location, in EPSG:4326
    
    Latitude1, Latitude2 = min(Latitude1, Latitude2), max(Latitude1, Latitude2)
    Longitude1, Longitude2 = min(Longitude1, Longitude2), max(Longitude1, Longitude2)
    outlineGeometry=gpd.GeoSeries(
    [Polygon([(Longitude1, Latitude1), (Longitude2, Latitude1),
              (Longitude2, Latitude2), (Longitude1, Latitude2)])],crs='EPSG:4326')
else:
    raise ValueError(f"outlineFileFlag must be set as either True or False. Currently: {outlineFileFlag}")

## Return Periods of Interest ------------------------------------
# Return period levels, if using original source pick from [10, 20, 30, 40, 50, 75, 100, 200, 500]
ReturnPeriods = [10, 50, 100, 500]  

## Population estimate/projection year ---------------------------
#Can choose any year with a 5 year interval between 1975 and 2030 (estimates until 2020, projections after 2020)
PopYear = 2025 

if not (isinstance(PopYear, int) and 1975 <= PopYear <= 2030 and PopYear % 5 == 0):
    raise ValueError("The variable must be an integer, between 1975 and 2030, and a multiple of 5.")

## Water Depths to Compute damages, based on Mean, Maximum, and/or Minimum depths at a building's location 
# Mean will assume the water depth at a building's location to be the mean of it's max and min value. Max will assume the whole building's location has water depth value as at it's largest value, and viceversa for Min
Depths = ['Mean'] #Options are ['Mean', 'Max', 'Min']


## Water Depths for Exposed and Displaced Population---------------
# If depth is greater than this, the population is exposed
minDepthExposed = 0.0 
# If depth is greater than this, the population is displaced
minDepthDisplaced = 1.0 


## Figure Options ------------------------------------------
# Save images in folder (True saves)
imageSaveFlag = False

# Return period for the optional figures. If using original source pick from [10, 20, 30, 40, 50, 75, 100, 200, 500]
ImageReturnPeriod = [10, 500] #At least one input, value/s MUST also be present in RetrunPeriods 

for value in ImageReturnPeriod:
    if value not in ReturnPeriods:
        raise ValueError(f"Value {value} in ImageReturnPeriod is not present in ReturnPeriods, i.e. workflow is trying to create graphics from a return period not being analysed.\nEither remove it from ImageReturnPeriod, or add it into ReturnPeriod.")

# Buffer size. How much additional space is given around the location of interest in the maps, for nicer looking map outputs. (in EPSG:4326)
ybuffer=0.0020
xbuffer=0.0040

# Outline of location settings
showOutlineFlag=True #Set to True to show outline, False to hide it
outlineColour='limegreen' #Colour of outline
outlineStyle='-' #Style of the outline. Choose from -, :, --, -.
outlineThickness=3 #Line thickness of outline
outlineFace='none' #Face colour of outline
outlineAlpha=1 #Transparency of edge and face colour for the outline
outlineLabelFlag=True #True prints the outline label, False hides it
outlineLabel=f'{saveName} Outline' #Text used for the label of the outline
outlineLabelSize=8 #Fontsize of otuline label
outlineLabelPosition='upper left' #Position of the label within the map, suggested either 'upper left' or 'lower right'
legend_outline = None

# Maximum Water level in legend. Useful if big discrepancy between river and flood depths exists.
customMaxDepthLegend=-1 #Set as -1 to automatically set highest value found in data as the legend's maximum

# Damage curves (True prints)
flagDamageCurve = True

# Building Images (True prints)
    #Building maps with classification 
flagBuilding = True
    #Building maps with flood level
flagBuildingH2o = True
    #Building maps with damage received
flagBuildingDmg = True
    #Annual damage by return period graph
flagBuildingDmgGraph = True

# Population Images (True prints)
    #Population exposed map
flagPopulationExp = True
    #Annual population exposed graph
flagPopulationExpGraph = True
    #Population displaced map
flagPopulationDis = True
    #Annual population displaced graph
flagPopulationDisGraph = True

## Figure Colour Scheme -------------------------------
# Water Depth Colorbar for Maps
cmap_h2o = LinearSegmentedColormap.from_list('gist_stern_inv',['blue', 'red'], N=256)  #cmap_h2o='Blues' can also be a good option.
# Building Class Colorbar for Maps
cmap_cls = LinearSegmentedColormap.from_list('gist_stern_inv',['orange','purple', 'blue', 'red'], N=256)
# Population Colorbar for Maps
cmap_pop = LinearSegmentedColormap.from_list('gist_stern_inv',['orange', 'red','fuchsia'], N=256)
# Damage Colorbar for Maps
cmap_dmg = LinearSegmentedColormap.from_list('gist_stern_inv',['blue', 'red','fuchsia'], N=256)


## Data Management ----------------------------------
RP, tile_id_max = True,0 #Temporary initiation, will automatically change in workflow

# Directory of main folder (working directory)
dirWork = '.'
os.chdir(dirWork)

# Input Hazard Path
dirDepths = os.path.join(dirWork, 'data')
if not os.path.exists(dirDepths):
    os.makedirs(dirDepths)

# OSM Output Path
dirOSM = os.path.join(dirWork, 'OSM')
if not os.path.exists(dirOSM):
    os.makedirs(dirOSM)

# Results directory
dirResults = os.path.join(dirWork, 'DamageBuildings')
if not os.path.exists(dirResults):
    os.makedirs(dirResults)
dirResultsPop = os.path.join(dirWork, 'ExposePopulation')
if not os.path.exists(dirResultsPop):
    os.makedirs(dirResultsPop)

# Saved images directory
dirImages = os.path.join(dirWork, 'Images')
if not os.path.exists(dirImages):
    os.makedirs(dirImages)

Download data#

In this section, the data required to run the analysis is downloaded.

This section can be modified to use local data.

#This workflow is using European data. Personal and local data is recommended.
#Note: There is a known issue with Mollweide projections. Standard projection used is 'EPSG:4326'

timeout = 90    # Download time out in seconds
max_retries = 5  # Maximum number of download attempts
socket.setdefaulttimeout(timeout)

##Flood data download
urlData='https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/CEMS-EFAS/flood_hazard/'

for RP in ReturnPeriods:
    print(f'Return Period={str(RP)}')
    rastDepths = os.path.join(dirDepths, f'Europe_RP{RP}_filled_depth.tif')
    if os.path.exists(rastDepths):
        print(f'Flood depth raster already exists (skipping download): {rastDepths}')
    else:
        rastTif = f'Europe_RP{RP}_filled_depth.tif'
        pathRastTif = os.path.join(dirDepths, rastTif)
        urlRastTif = os.path.join(urlData, rastTif)
        print(urlRastTif)
        for attempt in range(1, max_retries + 1):
            print(f'    Attempt: {attempt}')
            try:
                urllib.request.urlretrieve(urlRastTif, pathRastTif)
                break  # Break loop if download is successful
            except Exception as exc:
                print('      Timeout.  Retry data download')
                if attempt == max_retries:
                    print('    Maximum number of timeouts reached.  Data download failed')
                    print(f'      Consider increasing timeout value {timeout} seconds')
                    print(f'      Consider increasing maximum number of download attempts {max_retries}')
                    raise Exception(f'Timeout time {timeout} seconds exceeded {max_retries}') from exc
        print('  Unzipping downloaded file')


##Population data download ------------------------
def download_pop_rast(tile_id):#Downloading the raster files for Population
    print(f'Population tile id: {tile_id}')
    urlDataPop=f'https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/GHSL/GHS_POP_GLOBE_R2023A/GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss/V1-0/tiles/'
    rastPopulation= os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id}.tif')
    if os.path.exists(rastPopulation):
        print('Population raster already exists (skipping download)')
    else:
        rastZip = f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id}.zip'
        pathRastZip = os.path.join(dirDepths, rastZip)
        urlRastZip = os.path.join(urlDataPop, rastZip)
        print(urlRastZip)
        for attempt in range(1, max_retries + 1):
            print(f'    Attempt: {attempt}')
            try:
                urllib.request.urlretrieve(urlRastZip, pathRastZip)
                break  # Break loop if download is successful
            except Exception as exc:
                print('      Timeout.  Retry data download')
                if attempt == max_retries:
                    print('    Maximum number of timeouts reached.  Data download failed')
                    print(f'      Consider increasing timeout value {timeout} seconds')
                    print(f'      Consider increasing maximum number of download attempts {max_retries}')
                    raise Exception(f'Timeout time {timeout} seconds exceeded {max_retries}') from exc
        print('  Unzipping downloaded file')
        with ZipFile(pathRastZip, 'r') as zip_ref:
            zip_ref.extract(f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id}.tif',dirDepths)

def stitch_on_right(file1, file2, output_file):
    if os.path.exists(output_file):
        print('Stitched population raster already exists (skipping download)')
    else:
        with rasterio.open(file1) as src1, rasterio.open(file2) as src2:
            # Get metadata and transform of the first raster
            meta = src1.meta.copy()

            # Calculate new dimensions for the stitched raster
            new_width = src1.width + src2.width
            new_height = max(src1.height, src2.height)

            # Update metadata with new dimensions
            meta.update(width=new_width, height=new_height)

            # Create output raster
            with rasterio.open(output_file, 'w', **meta) as dst:
                # Write the first raster to the output raster
                dst.write(src1.read(), window=rasterio.windows.Window(col_off=0, row_off=0, width=src1.width, height=src1.height))

                # Write the second raster to the output raster
                for i in range(1, src2.count + 1):
                    dst.write(src2.read(i), i, window=rasterio.windows.Window(col_off=src1.width, row_off=0, width=src2.width, height=src2.height))

def stitch_on_top(file1, file2, output_file):#Stitching two rasters vertically
    if os.path.exists(output_file):
        print('Stitched population raster already exists (skipping download)')
    else:
        with rasterio.open(file1) as src1, rasterio.open(file2) as src2:
            # Get metadata and transform of the first raster
            meta = src2.meta.copy()

            # Calculate new dimensions for the stitched raster
            new_width = max(src1.width, src2.width)
            new_height = src1.height + src2.height

            # Update metadata with new dimensions
            meta.update(width=new_width, height=new_height)

            # Create output raster
            with rasterio.open(output_file, 'w', **meta) as dst:
                # Write the second raster to the output raster
                dst.write(src2.read(), window=rasterio.windows.Window(col_off=0, row_off=0, width=src2.width, height=src2.height))

                # Write the first raster to the output raster
                for i in range(1, src1.count + 1):
                    dst.write(src1.read(i), i, window=rasterio.windows.Window(col_off=0, row_off=src1.height, width=src1.width, height=src1.height))

def stitch_all(file1, file2, file3, file4, output_file):#Stitching four rasters
    if os.path.exists(output_file):
        print('Stitched population raster already exists (skipping download)')
    else:
        with rasterio.open(file1) as src1, rasterio.open(file2) as src2, rasterio.open(file3) as src3, rasterio.open(file4) as src4:
            # Get metadata and transform of the first raster
            meta = src3.meta.copy()

            # Calculate new dimensions for the stitched raster
            new_width = src3.width + src4.width
            new_height = src1.height + src3.height

            # Update metadata with new dimensions
            meta.update(width=new_width, height=new_height)

            # Create output raster
            with rasterio.open(output_file, 'w', **meta) as dst:
                # Write the first raster to the output raster
                dst.write(src3.read(), window=rasterio.windows.Window(col_off=0, row_off=0, width=src3.width, height=src3.height))

                # Write the second raster to the output raster
                for i in range(1, src4.count + 1):
                    dst.write(src4.read(i), i, window=rasterio.windows.Window(col_off=src3.width, row_off=0, width=src4.width, height=src4.height))

                # Write the third raster to the output raster
                for i in range(1, src1.count + 1):
                    dst.write(src1.read(i), i, window=rasterio.windows.Window(col_off=0, row_off=src3.height, width=src1.width, height=src1.height))
                
                # Write the fourth raster to the output raster
                for i in range(1, src2.count + 1):
                    dst.write(src2.read(i), i, window=rasterio.windows.Window(col_off=src1.width, row_off=src3.height, width=src2.width, height=src2.height))

urlDataPopScheme='https://ghsl.jrc.ec.europa.eu/download/'
shpPopScheme = os.path.join(dirDepths, 'WGS84_tile_schema.shp')
if os.path.exists(shpPopScheme):
    print('Population scheme shapefile already exists (skipping download)')
else:
    rastZip = 'GHSL_data_4326_shapefile.zip'
    pathRastZip = os.path.join(dirDepths, rastZip)
    urlRastZip = os.path.join(urlDataPopScheme, rastZip)
    print(urlRastZip)
    for attempt in range(1, max_retries + 1):
        print(f'    Attempt: {attempt}')
        try:
            urllib.request.urlretrieve(urlRastZip, pathRastZip)
            break  # Break loop if download is successful
        except Exception as exc:
            print('      Timeout.  Retry data download')
            if attempt == max_retries:
                print('    Maximum number of timeouts reached.  Data download failed')
                print(f'      Consider increasing timeout value {timeout} seconds')
                print(f'      Consider increasing maximum number of download attempts {max_retries}')
                raise Exception(f'Timeout time {timeout} seconds exceeded {max_retries}') from exc
    print('  Unzipping downloaded file')
    with ZipFile(pathRastZip, 'r') as zip_ref:
        zip_ref.extractall(dirDepths)

# Load the shapefile
gdf = gpd.read_file(shpPopScheme)

# Iterate through each polygon and check if the point is inside
tile_id_min, tile_id_max = None, None 
for index, row in gdf.iterrows():
    left, top, right, bottom = row['left'], row['top'], row['right'], row['bottom']
    if left <= Longitude1 <= right and bottom <= Latitude1 <= top:
        tile_id_min = row['tile_id']
    if left <= Longitude2 <= right and bottom <= Latitude2 <= top:
        tile_id_max = row['tile_id']
    if tile_id_min is not None and tile_id_max is not None:
        break

tileMax=tile_id_max.split('_')
Rmax, Cmax = tileMax[0][1:].split('R')[0], tileMax[1][1:].split('C')[0]
tileMin=tile_id_min.split('_')
Rmin, Cmin = tileMin[0][1:].split('R')[0], tileMin[1][1:].split('C')[0]

if tile_id_min == tile_id_max:
    download_pop_rast(tile_id_max)
    rastPopulation= os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_max}.tif')

elif Rmin == Rmax and Cmin < Cmax:  # Stitch on the right
    download_pop_rast(tile_id_max)
    download_pop_rast(tile_id_min)  
    file1 = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_min}.tif')
    file2 = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_max}.tif')
    rastPopulation = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_min}_to_{tile_id_max}.tif')
    stitch_on_right(file1, file2, rastPopulation)
    print(f'Stitched horizontally tiles {tile_id_min} and {tile_id_max}.')
elif Rmin > Rmax and Cmin == Cmax: # Stitch on the top
    download_pop_rast(tile_id_max)
    download_pop_rast(tile_id_min) 
    file1 = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_min}.tif')
    file2 = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_max}.tif')
    rastPopulation = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_min}_to_{tile_id_max}.tif')
    stitch_on_top(file1, file2, rastPopulation)
    print(f'Stitched vertically tiles {tile_id_min} and {tile_id_max}.')
elif Rmin > Rmax and Cmin < Cmax:
    download_pop_rast(tile_id_max)
    download_pop_rast(tile_id_min)
    tile_id_tmp1=f'R{Rmin}_C{Cmax}'
    tile_id_tmp2=f'R{Rmax}_C{Cmin}'
    download_pop_rast(tile_id_tmp1)
    download_pop_rast(tile_id_tmp2)
    file1 = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_min}.tif')
    file2 = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_tmp1}.tif')
    file3 = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_tmp2}.tif')
    file4 = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_max}.tif')
    rastPopulation = os.path.join(dirDepths, f'GHS_POP_E{PopYear}_GLOBE_R2023A_4326_3ss_V1_0_{tile_id_min}_{tile_id_tmp1}_to_{tile_id_max}_{tile_id_tmp2}.tif')
    stitch_all(file1,file2,file3,file4,rastPopulation)
    print(f'Stitched together tiles {tile_id_min}, {tile_id_tmp1}, {tile_id_max}, {tile_id_tmp2}.')
    print(f'Filen location: {rastPopulation}')
else:
    raise ValueError(f"Location crosses population rasters {tile_id_min} and {tile_id_max}. Modify location or use personal data.)")
Return Period=10
Flood depth raster already exists (skipping download): .\data\Europe_RP10_filled_depth.tif
Return Period=50
Flood depth raster already exists (skipping download): .\data\Europe_RP50_filled_depth.tif
Return Period=100
Flood depth raster already exists (skipping download): .\data\Europe_RP100_filled_depth.tif
Return Period=500
Flood depth raster already exists (skipping download): .\data\Europe_RP500_filled_depth.tif
Population scheme shapefile already exists (skipping download)
Population tile id: R5_C20
Population raster already exists (skipping download)

Define depth-damage functions#

Damage caused to buildings can be determined in relation to flood depth that the buildings are subjected to. In this section the relationship between water depth and damage are determined.

Maximum damage values are based on:

  • Huizinga, J., Moel, H. de, Szewczyk, W. (2017). Global flood depth-damage functions. Methodology and the database with guidelines. EUR 28552 EN. doi: 10.2760/16510 With following assumed values:

  • CPI2010 = 2010 World Bank Consumer Price Index for country of interest.

  • CPI2022 = 2022 Consumer Price Index for country of interest (latest value).

  • In calculating maximum damage, first array value is 2010 building reconstruction costs per square meter.

  • Second array value is 2010 building content replacement value per square meter.

Damage classes:

  • Options are Residential, Commercial, Industrial, Agriculture, Cultural, and Transportation, as well as an Universal class.

  • In the default code, Agriculture, Cultural, and Transportation as well as unclassified buildings are set to Universal.

Damage function:

  • Based on polynomial functions fit to the JRC depth-damage curves, with the order depending on fit (coefs).

  • In the default code, a combined damage function is applied based on Residential, Commercial, and Industrial JRC depth-damage values

# Define arrays for damage values based on 2010 estimates
CPI2010 = 100                                  # 2010 EU Value
CPI2022 = 121.8                                # 2022 EU Value
CPI_Frac = CPI2022 / CPI2010
MaxDmgRES = np.array([480, 240]) * CPI_Frac    # EU Value
MaxDmgCOM = np.array([502, 502]) * CPI_Frac    # EU Value
MaxDmgIND = np.array([328, 492]) * CPI_Frac    # EU Value
MaxDmgAGR = np.array([0.23, 0.46]) * CPI_Frac  # Italy 2021 (AGR), 0.23 EUR/m2
MaxDmgCUL = MaxDmgCOM                          # EU Value
MaxDmgTRS = MaxDmgIND                          # Italy 2021 (TRS)
MaxDmgUNI = (MaxDmgRES+MaxDmgCOM+MaxDmgIND)/3
# Combine damage arrays into a single array
MaxDmg = np.column_stack((MaxDmgRES, MaxDmgCOM, MaxDmgIND, MaxDmgUNI))

# Damage classes
DamageClasses = ['Residential', 'Commercial', 'Industrial','Universal']

def DamageFunction(wd1, coefs, wd_range=(0, 6)):
    wd = np.clip(wd1, *wd_range)
    y = coefs[0] * wd**5 + coefs[1] * wd**4 + coefs[2] * wd**3 \
        + coefs[3] * wd**2 + coefs[4] * wd + coefs[5]
    y = np.clip(y, 0, 1)
    return y

# Polynomial coefficients for each function
#   - Up to 5th order
#   - 1st value is highest order (5th) and last is intercept
coefs_UNI = [0.0,  0.0, 0.0, -0.02787, 0.3334, 0.0]
coefs_RES = [0.0005869, -0.01077, 0.07497, -0.2602, 0.5959, 0.0]
coefs_COM = [0.0, 0.0, -0.0009149, -0.02021, 0.3216, 0.0]
coefs_IND = [0.0, 0.0, -0.001202, -0.01225, 0.2852, 0.0]
coefs_TRS = [0.0, -0.00938, 0.07734, -0.2906, 0.7625, 0.0]
coefs_AGR = [0.0, -0.004601, 0.06114, -0.3061, 0.7773, 0.0]

# Plot Depth Damage Functions
# Water depth values from 0 to 6m
wd_values = np.linspace(0, 6, 100)
dmgRES = DamageFunction(wd_values, coefs_RES)
dmgCOM = DamageFunction(wd_values, coefs_COM)
dmgIND = DamageFunction(wd_values, coefs_IND)
dmgUNI = DamageFunction(wd_values, coefs_UNI)
if flagDamageCurve is True:
    plt.plot(wd_values, dmgUNI, color='black', linewidth=2, label='Universal')
    plt.plot(wd_values, dmgRES, color='darkgreen', linestyle='--', linewidth=1,
            label='Residential')
    plt.plot(wd_values, dmgCOM, color='blue', linestyle='--', linewidth=1,
            label='Commercial')
    plt.plot(wd_values, dmgIND, color='darkred', linestyle='--', linewidth=1,
            label='Industrial')
    plt.grid(color='grey', linestyle='-', linewidth=0.5)
    plt.xlabel('Water Depth (m)')
    plt.ylabel('Damage Fraction')
    plt.title('JRC Residential, Commercial & Industrial Depth-Damage Functions')
    plt.legend()
    plt.show()
../../../../_images/54abf10c28e3cacc38565c90a12d9a1887bab3e47693c2dd54a04228763b7c28.png

Retrieving data for the area of interest#

In this section the bounding box is used to crop the data to the area of interest. The code converts latitude and longitude values to the equivalent projection used by the flood map raster & writes bounding box to shapefile. We also define the bounding box for OpenStreetMap data.

If the region of interest is not as desired, the latitude and longitude values can be changed in the “Define inputs” section above.

#Set shape box location
shpBBox = os.path.join(dirOSM, f'outline_{saveName}.shp')

# Determine Raster EPSG Code (only works with osgeo)
ds = gdal.Open(rastDepths)
srs = osr.SpatialReference()
srs.ImportFromWkt(ds.GetProjection())
epsgRast = f'EPSG:{srs.GetAuthorityCode(None)}'
ds = None

print(f'Water Depth Raster Projection: {epsgRast}')

# Create a transformer for coordinate conversion
transformer = Transformer.from_crs('EPSG:4326', epsgRast, always_xy=True)

# Convert bounding box coordinates to raster CRS
xMin, yMin = transformer.transform(Longitude1-xbuffer, Latitude1-ybuffer)
xMax, yMax = transformer.transform(Longitude2+xbuffer, Latitude2+ybuffer)

print('Converted Coordinate Bounds with Buffer:')
print(f'  Longitudes: {Longitude1-xbuffer}E --> {xMin} meters & {Longitude2+xbuffer}E --> {xMax} meters')
print(f'  Latitudes: {Latitude1-ybuffer}N --> {yMin} meters & {Latitude2+ybuffer}N --> {yMax} meters')

# Define the bounding box coordinates based on the zoomed region
bounding_box = gpd.GeoDataFrame(geometry=outlineGeometry)
# Write the GeoDataFrame to a shapefile
bounding_box.to_file(shpBBox)
print('Bounding Box Shapefile written to: '+shpBBox)

# Read GeoDataFrame from the bounding box shapefile
gdfBBox = gpd.read_file(shpBBox)
crsRast = gdfBBox.crs

for RP in ImageReturnPeriod:
    rastDepths = os.path.join(dirDepths, f'Europe_RP{RP}_filled_depth.tif')
    # Read the raster using rasterio
    with rasterio.open(rastDepths) as src:
        window = from_bounds(xMin, yMin, xMax, yMax, src.transform)
        rDepths = src.read(1, window=window)
        rDepths = np.ma.masked_where((rDepths < -999) | (rDepths > 1000), rDepths)
        max_depth  = rDepths.max()
        if customMaxDepthLegend == -1:
            maxDepthLegend = ((max_depth // 1) + 1)
        else:
            maxDepthLegend=customMaxDepthLegend
        missing_data_value = src.nodata
    fig, ax = plt.subplots()
    im = ax.imshow(rDepths, vmin=0, vmax=maxDepthLegend, cmap=cmap_h2o, extent=(xMin, xMax, yMin, yMax),
                zorder=2, alpha=0.7)
    plt.title(f'River flood map with {RP}-year return period')
    plt.xlabel('Longitude', fontsize='small')
    plt.ylabel('Latitude', fontsize='small')
    divider = make_axes_locatable(ax)
    cax = divider.append_axes("right", size="2%", pad=0.1)
    plt.colorbar(im, cax=cax,label='Depth (m)',extend='max')
    plt.text(.02,.02,f'Projection in {epsgRast}', fontsize=8,
             path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
    ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
    ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
    ctx.add_basemap(ax=ax, crs=epsgRast, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=0.5)
    txt = ax.texts[-1]
    txt.set_position([0.99,0.98])
    txt.set_ha('right')
    txt.set_va('top')
    if showOutlineFlag is True:
        outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
    if outlineLabelFlag is True:
        outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
        legend_outline=ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
    if imageSaveFlag is True:
        plt.savefig(os.path.join(dirImages, f'{saveName}_floodmap_{RP}RP.png'), bbox_inches='tight')
    plt.show()

##--------------------------------------------------------
## Population map
# Open the raster file
ds = gdal.Open(rastPopulation)
if ds is None:
    raise RuntimeError(f"Failed to open the raster file: {rastPopulation}")

# Get the projection from the raster file
proj_wkt = ds.GetProjection()
srs = osr.SpatialReference()
srs.ImportFromWkt(proj_wkt)

# Try to get the EPSG code
epsg_code = srs.GetAuthorityCode(None)

# Check if the EPSG code was successfully retrieved
if epsg_code is None:
    # If not, you might need to handle specific cases manually
    # Check if the projection matches known projections
    proj4_str = srs.ExportToProj4()
    if '+proj=moll' in proj4_str and '+datum=WGS84' in proj4_str:
        epsg_code = 'EPSG:54009'  # ESRI:54009 World Mollweide projection
    else:
        raise RuntimeError("Unable to determine EPSG code for the given projection.")
else:
    epsg_code = f"EPSG:{epsg_code}"

epsgRastPop = epsg_code
# Close the dataset
ds = None
print(f'Population Raster Projection: {epsgRastPop}')

# Create a transformer for coordinate conversion
if epsg_code == 'EPSG:54009':
    PopProj = '+proj=moll +datum=WGS84 +units=m' #MOLLWEIDE HAS TO BE MANUALLY INSERTED
else:
    PopProj = epsg_code

transformer = Transformer.from_crs('EPSG:4326', PopProj, always_xy=True)

# Convert bounding box coordinates to raster CRS
xMinPop, yMinPop = transformer.transform(Longitude1-xbuffer, Latitude1-ybuffer)
xMaxPop, yMaxPop = transformer.transform(Longitude2+xbuffer, Latitude2+ybuffer)

# Ensure xMin < xMax and yMin < yMax
xMinPop, xMaxPop = min(xMinPop, xMaxPop), max(xMinPop, xMaxPop)
yMinPop, yMaxPop = min(yMinPop, yMaxPop), max(yMinPop, yMaxPop)

print('Converted Coordinate Bounds with Buffer:')
print(f'  Longitudes: {Longitude1-xbuffer}E --> {xMinPop} meters & {Longitude2+xbuffer}E --> {xMaxPop} meters')
print(f'  Latitudes: {Latitude1-ybuffer}N --> {yMinPop} meters & {Latitude2+ybuffer}N --> {yMaxPop} meters')

# Read the raster using rasterio
with rasterio.open(rastPopulation) as src:
    window = from_bounds(xMinPop, yMinPop, xMaxPop, yMaxPop, src.transform)
    rPopulation = src.read(1, window=window)
    rPopulation = np.ma.masked_where((rPopulation < 0.1), rPopulation)
    max_population = rPopulation.max()
    maxPopLegend = ((max_population // 100) + 1) * 100
    missing_data_value = src.nodata


fig, ax = plt.subplots()
im = ax.imshow(rPopulation, vmin=0, vmax=maxPopLegend, cmap=cmap_pop, extent=(xMinPop, xMaxPop, yMinPop, yMaxPop),
               zorder=2, alpha=0.7)
plt.title(f'Population Raster for the year {PopYear}')
plt.xlabel('Longitude', fontsize='small')
plt.ylabel('Latitude', fontsize='small')
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="2%", pad=0.1)
plt.colorbar(im, cax=cax, label='People per 3 arcseconds\u00B2', extend='max')
plt.text(.02,.02,f'Projection in {epsgRastPop}', fontsize=8,
             path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
ctx.add_basemap(ax=ax, crs=epsgRastPop, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=1)
txt = ax.texts[-1]
txt.set_position([0.99,0.98])
txt.set_ha('right')
txt.set_va('top')
if showOutlineFlag is True:
    outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
if outlineLabelFlag is True:
    outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
    legend_outline=ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
if imageSaveFlag is True:
    plt.savefig(os.path.join(dirImages, f'{saveName}_populationmap_{PopYear}.png'), bbox_inches='tight')
plt.show()
Water Depth Raster Projection: EPSG:4326
Converted Coordinate Bounds with Buffer:
  Longitudes: 11.262923196531974E --> 11.262923196531974 meters & 11.292889716317687E --> 11.292889716317687 meters
  Latitudes: 43.76740233777335N --> 43.76740233777335 meters & 43.78725243139119N --> 43.78725243139119 meters
Bounding Box Shapefile written to: .\OSM\outline_myLocation.shp
../../../../_images/923a69c2e593195867c67c33ff92c6225bb2ec0cc29df1700fbb9a11b73ffbc5.png ../../../../_images/4184ed754a5ee2ee63d7408d5ac95942ac69cb3da748a39f2a3eade1b389ae08.png
Population Raster Projection: EPSG:4326
Converted Coordinate Bounds with Buffer:
  Longitudes: 11.262923196531974E --> 11.262923196531974 meters & 11.292889716317687E --> 11.292889716317687 meters
  Latitudes: 43.76740233777335N --> 43.76740233777335 meters & 43.78725243139119N --> 43.78725243139119 meters
../../../../_images/27d9a84be981ddb17f3eb1a90e17fdb63395ac2cb0dc6282680178fd8ad11fe7.png

OpenStreetMap buildings data#

In this section the OSM data are loaded based on the bounding box defined above. The extracted data represents the building use (unclassified) and is written to a shapefile.

# Output shapefile for unclassified buildings
shpOSM = os.path.join(dirOSM, f'{saveName}_OSM_Building_Unclassified.shp')

# Define tags for OSM data
tags = {'building': True,'amenity': True}
# Retrieve OSM geometries within the bounding box
gdfOSM = ox.features_from_polygon(outlineGeometry.iloc[0], tags)
# Filter out non-Polygon geometries
gdfOSM = gdfOSM[gdfOSM.geom_type == 'Polygon']
# Confirm that all of the GDF elements are compatible with shp format
gdfOSM = gdfOSM.map(lambda x: str(x) if isinstance(x, list) else x)
# Clip the OSM data to the bounding box
gdfOSM = gpd.clip(gdfOSM, outlineGeometry)
# Save the polygon-only gdp to shapefile
warnings.simplefilter("ignore",category=UserWarning) #Removing UserWarning regarding truncated columns when saving to ESRI Shapefile
gdfOSM.to_file(shpOSM, driver='ESRI Shapefile', encoding='utf-8')
warnings.resetwarnings()
print('OSM data read in and saved to shapefile')
print('  '+shpOSM)

if flagBuilding is True:
    # Plot data
    fig, ax = plt.subplots()  # Adjust figsize as needed
    plt.title('OSM Buildings: Prior to residential, commericial and industrial classification')
    plt.xlabel('Longitude', fontsize='small')
    plt.ylabel('Latitude', fontsize='small')
    plt.text(.02,.02,f'Projection in {gdfOSM.crs}', fontsize=8,
              path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
    if showOutlineFlag is True:
        outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
    if outlineLabelFlag is True:
        outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
        legend_outline=ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
    gdfOSM.plot(column='building',ax=ax, legend=True, cmap='coolwarm',
                      legend_kwds={'ncol': 3, 'bbox_to_anchor': (.5, -0.15), 'loc': 'upper center', 'title': 'Building Legend:'})
    ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
    ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
    ctx.add_basemap(ax=ax, crs=gdfOSM.crs, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=0.5)
    txt = ax.texts[-1]
    txt.set_position([0.99,0.98])
    txt.set_ha('right')
    txt.set_va('top')
    if outlineLabelFlag is True:
        ax.add_artist(legend_outline)
    if imageSaveFlag is True:
        plt.savefig(os.path.join(dirImages, f'{saveName}_OSMbuilding_preclassification.png'), bbox_inches='tight')
    plt.show()
OSM data read in and saved to shapefile
  .\OSM\myLocation_OSM_Building_Unclassified.shp
../../../../_images/2e66d02d12f573c5d273071ada4db4df0ed42e20a48cc07ce8efb67ba29b3357.png

Reproject OSM data to raster projection#

In this section, in order to perform the damage analysis, the OSM and raster data need to be on the same Coordinate Reference System (CRS). The OSM are on an EPSG:4326 CRS and the raster data could be another. In the cell, the OSM data are converted to match the raster CRS.

# Determine CRSs from shapefile and Raster
crsOSM = gdfOSM.crs

if crsOSM not in (epsgRast, epsgRast.lower()):
    print(f'Reprojecting from OSM {crsOSM} to raster EPSG:{epsgRast}')
    # Read the GeoDataFrame if reprojection is needed
    gdfOSM = gpd.read_file(shpOSM)
    # Reproject the GeoDataFrame to match the raster CRS
    gdfOSM = gdfOSM.to_crs(epsg=epsgRast)
    # Save the reprojected data back to the shapefile
    gdfOSM.to_file(shpOSM)
    print('  Overwriting reprojected data:', shpOSM)

else:
    print('No reprojection performed. Both projections are the same:', crsOSM)

print('Done')
No reprojection performed. Both projections are the same: epsg:4326
Done

Building classifications#

In this section, the building types are classified to Residential, Commercial, Industrial, etc.

  • This procedure needs to be performed manually by looking at the list of building types.

  • If None is listed for a building class (bldgClass column), the building type (building column) should be assigned to one of the lists (e.g., classResidential, classCommercial, classIndustrial).

  • Buildings with type “yes” will be classified as Universal by default in a later step.

As calculated in an earlier section, the Universal class has a damage curve equal to the average of the Residential, Commercial, and Industrial ones.

Below we have added a classification of OSM building types already.

# CSV file with classifications (classResidential, classCommercial, etc)
csvOSMclasses = os.path.join(dirOSM, f'{saveName}_OSM_Building_Reclassified.csv')
csvOSMamenity = os.path.join(dirOSM, f'{saveName}_OSM_Amenity_Classified.csv')
 # Keep only the building and geometry columns
gdfBuildings = gdfOSM[['building','amenity','geometry']].copy()

# Building classifications
#   - Change and add as needed, 
classResidential = ['hut', 'apartments', 'detached', 'residential', 'house', 'barn', 'garage',
                    'carport', 'semidetached_house', 'shed', 'bungalow', 'roof', 'terrace',
                    'allotment_house']
classCommercial = ['commercial', 'office', 'retail', 'kiosk', 'supermarket', 'warehouse',
                   'garages', 'hotel', 'stadium', 'grandstand', 'sports_centre',  'pavilion',
                   'government', 'school', 'kindergarten', 'university', 'dormitory', 'public',
                   'service', 'hospital', 'civic', 'terminal', 'fire_station',
                   'train_station', 'boathouse', 'toilets', 'tech_cab', 'tower', 'portal',
                   'columbarium', 'greenhouse', 'guardhouse', 'construction',
                   'funeral_hall']
classIndustrial = ['industrial', 'manufacture']
classCultural = ['church', 'cathedral', 'baptistery', 'obelisk', 'basilica', 'monastery', 'ruins',
                 'column', 'chapel', 'synagogue', 'shrine', 'religious', 'convent', 'fort']
classAgricultural = ['farm_auxiliary']
classTransportation = ['bridge', 'parking']
classUniversal = ['universal']

# Critical infrastructure, add infrastructure of interest ------- 
criticalInfrastructureList = ['hospital','fuel','bank','clinic','pharmacy', 
                             'police','prison','refugee_site', 
                             'train_station',
                             'fire_station','transformer_tower','water_tower','bridge',
                             'transportation']
# This affects the look of the maps in the 'Critical Infrastructure' section
critMarkersColours = {
    'hospital': {'marker': 'P', 'color': 'red'}, 
    'police': {'marker': 's', 'color': 'blue'}, 
    'train_station': {'marker': 'o', 'color': 'green'}, 
    'transformer_tower': {'marker': '*', 'color': 'purple'}, 
    'water_tower': {'marker': 'v', 'color': 'orange'}, 
    'bridge': {'marker': 'd', 'color': 'brown'}, 
    'fire_station': {'marker': 'X', 'color': 'pink'}, 
    'transportation': {'marker': '>', 'color': 'cyan'}, 
    'refugee_site': {'marker': 'o', 'color': 'lime'},
    'fuel': {'marker': '2', 'color': 'yellow'},
}  
    #Here are some more example of marker and colours in case they are needed. Feel free to experiment:
    #OtherMarkerList=['^','p','D','H','X','h']; OtherColourList=['lime','darkgreen','navy','slategray','pink','magenta']

# For now, set transportation and cultural to universal (can add/change if desired)
classUniversal = classUniversal + classCultural + classAgricultural + classTransportation
classCultural = []
classAgricultural = []
classTransportation = []

# Convert building classes to dataframe
bldgClasses = pd.DataFrame({'building': gdfBuildings['building'].unique()})

# Classify each structure to Residential, Commercial, Industrial, etc.
#   - Structures not listed in above class lists are classified as none
bldgClasses['bldgClass'] = None
bldgClasses.loc[bldgClasses['building'].isin(classResidential), 'bldgClass'] = 'Residential'
bldgClasses.loc[bldgClasses['building'].isin(classCommercial), 'bldgClass'] = 'Commercial'
bldgClasses.loc[bldgClasses['building'].isin(classIndustrial), 'bldgClass'] = 'Industrial'
bldgClasses.loc[bldgClasses['building'].isin(classCultural), 'bldgClass'] = 'Cultural'
bldgClasses.loc[bldgClasses['building'].isin(classAgricultural), 'bldgClass'] = 'Agricultural'
bldgClasses.loc[bldgClasses['building'].isin(classTransportation), 'bldgClass'] = 'Transportation'
bldgClasses.loc[bldgClasses['building'].isin(classUniversal), 'bldgClass'] = 'Universal'

#   - Adding critical infrastructure
bldgClasses['critInfrastructure'] = None
bldgClasses.loc[bldgClasses['building'].isin(criticalInfrastructureList), 'critInfrastructure'] = True

amenityClasses = pd.DataFrame({'amenity': gdfBuildings['amenity'].unique()})
amenityClasses['critInfrastructure'] = None
amenityClasses.loc[amenityClasses['amenity'].isin(criticalInfrastructureList), 'critInfrastructure'] = True


# Write structure and classification to CSV
bldgClasses.to_csv(csvOSMclasses, index=False)
amenityClasses.to_csv(csvOSMamenity, index=False)

print('Building Classifications')
print('  - If None is listed for a building class, add the building one of the above lists.')
print("  - Building 'yes' will be classified as Universal in a later step.")
print(bldgClasses)
print(amenityClasses)
Building Classifications
  - If None is listed for a building class, add the building one of the above lists.
  - Building 'yes' will be classified as Universal in a later step.
              building    bldgClass critInfrastructure
0                  yes         None               None
1               church    Universal               None
2           apartments  Residential               None
3                  NaN         None               None
4                house  Residential               None
5             detached  Residential               None
6               school   Commercial               None
7               retail   Commercial               None
8   semidetached_house  Residential               None
9               garage  Residential               None
10       train_station   Commercial               True
11                roof  Residential               None
12              office   Commercial               None
13          government   Commercial               None
14             stadium   Commercial               None
15          grandstand   Commercial               None
16               hotel   Commercial               None
17            hospital   Commercial               True
18          industrial   Industrial               None
             amenity critInfrastructure
0                NaN               None
1   place_of_worship               None
2             school               None
3            parking               None
4         university               None
5             police               True
6                bar               None
7             clinic               True
8         grave_yard               None
9       fire_station               True
10          hospital               True

In the code below, the building types are classified and written to a ShapeFile with a column.

  • The first map shows the buildings that have been classified based on the above assignments.

  • The second map shows the same, but the with building type “yes” classified as Universal. The difference between the maps provides an idea of the how many building have a type assigned to them.

# Shapefile name with reclassied buildings (output)
shpOSMreclass = os.path.join(dirOSM,f'{saveName}_OSM_Building_Reclassified.shp')

# Building classification created earlier
bldgClasses = pd.read_csv(csvOSMclasses)

# Merge the spatial data with the new information
gdfOSMreclass = pd.merge(gdfOSM, bldgClasses, on='building', how='left')

if flagBuilding is True:
    # Plot without unclassified buildings
    fig, ax = plt.subplots()  # Adjust figsize as needed
    plt.title('Building Data without Unclassified Buildings')
    plt.xlabel('Longitude', fontsize='small')
    plt.ylabel('Latitude', fontsize='small')
    plt.text(.02,.02,f'Projection in {gdfOSM.crs}', fontsize=8,
                path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
    if showOutlineFlag is True:
        outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
    if outlineLabelFlag is True:
        outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
        legend_outline=ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
    gdfOSMreclass.plot(column='bldgClass', ax=ax, legend=True, cmap=cmap_cls,
                       legend_kwds={'ncol': 4, 'bbox_to_anchor': (.5, -0.15), 'loc': 'upper center', 'title': 'Building Classes:'})
    ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
    ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
    ctx.add_basemap(ax=ax, crs=gdfOSM.crs, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=0.5)
    txt = ax.texts[-1]
    txt.set_position([0.99,0.98])
    txt.set_ha('right')
    txt.set_va('top')
    if outlineLabelFlag is True:
        ax.add_artist(legend_outline)
    if imageSaveFlag is True:
        plt.savefig(os.path.join(dirImages, f'{saveName}_OSMbuilding_unclassified_simple.png'), bbox_inches='tight')
    plt.show()

# Substitute undefined (null) building classes
gdfOSMreclass['bldgClass'] = gdfOSMreclass['bldgClass'].fillna('Universal')

# Write classified structures to file
warnings.simplefilter("ignore",category=UserWarning) #Removing UserWarning regarding truncated columns when saving to ESRI Shapefile
gdfOSMreclass.to_file(shpOSMreclass, encoding='utf-8')
warnings.resetwarnings()

if flagBuilding is True:
    # Plot with unclassified buildings as classified
    fig, ax = plt.subplots()  # Adjust figsize as needed
    plt.title('Building Data with Unclassified Buildings as Universal Class')
    plt.xlabel('Longitude', fontsize='small')
    plt.ylabel('Latitude', fontsize='small')
    plt.text(.02,.02,f'Projection in {gdfOSM.crs}', fontsize=8,
                path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
    if showOutlineFlag is True:
        outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
    if outlineLabelFlag is True:
        outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
        legend_outline=ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
    gdfOSMreclass.plot(column='bldgClass', ax=ax, legend=True, cmap=cmap_cls,
                       legend_kwds={'ncol': 4, 'bbox_to_anchor': (.5, -0.15), 'loc': 'upper center', 'title': 'Building Classes:'})
    ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
    ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
    ctx.add_basemap(ax=ax, crs=gdfOSM.crs, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=0.5)
    txt = ax.texts[-1]
    txt.set_position([0.99,0.98])
    txt.set_ha('right')
    txt.set_va('top')
    if outlineLabelFlag is True:
        ax.add_artist(legend_outline)
    if imageSaveFlag is True:
        plt.savefig(os.path.join(dirImages, f'{saveName}_OSMbuilding_classified_simple.png'), bbox_inches='tight')
    plt.show()
    
../../../../_images/3640eb30b2dba3f175764e96e97d4b05edb036d9dcf985c0a2ee137e35a22500.png ../../../../_images/34f4168ec1ab4aeb8c2c30ad998bed6a03f2fd18d85e2659c9915179f0a8304a.png

Flood depths at building locations#

In this section, the flood map rasters for each return period (extreme event) are loaded.

  • The rasters are then translated to flood depths for each building based on the desired statistic (mean, maximum or minimum depth or all three).

  • For each return period a plot of a flood map can be generated as well as a plot with the flood depths corresponding to each building.

for RP in ReturnPeriods:

    print("Computing Building Water Depths:  RP=", str(RP))
    print('  Loading water depth for selected bounds:  RP=', str(RP))
    rastDepths = os.path.join(dirDepths, f'Europe_RP{RP}_filled_depth.tif')
    rastDepths_zoom = rastDepths.replace('Europe', f'{saveName}_Europe')
    print(f'  {rastDepths}')

    # Keep only the building, bldgClass and geometry columns

    gdfDamage = gdfOSMreclass[['building', 'bldgClass', 'geometry']].copy()

    # Compute building areas in m2
    gdfDamage_ESPG3035=gdfDamage.to_crs(3035)
    gdfDamage['Area_m2'] = gdfDamage_ESPG3035.geometry.area

    # Read the raster using rasterio
    with rasterio.open(rastDepths) as src:
        rDepths, out_transform = mask(src, outlineGeometry, crop=True)
        rDepths = rDepths[0]
        rDepths = np.ma.masked_where((rDepths < -999) | (rDepths > 1000), rDepths)
        missing_data_value = src.nodata
        max_depth  = rDepths.max()
        with rasterio.open(
            rastDepths_zoom,
            'w',
            driver='GTiff',
            height=rDepths.shape[0],
            width=rDepths.shape[1],
            count=1,
            dtype=rDepths.dtype,
            crs=src.crs,
            transform=out_transform,
            nodata=missing_data_value
        ) as dst:
            dst.write(rDepths, 1)
    height, width = rDepths.shape
    xMinDepth, yMinDepth, xMaxDepth, yMaxDepth = array_bounds(height, width, out_transform)    
    # Perform zonal statistics directly on the raster array        
    result = rasterstats.zonal_stats(
        gdfDamage,
        rDepths,
        nodata=src.nodata,
        affine=out_transform,
        stats=['mean', 'min', 'max'],
        all_touched=True
    )

    # Update geodataframe with zonal statistics
    gdfDamage["MeanDepth"] = [entry["mean"] for entry in result]
    gdfDamage["MinDepth"] = [entry["min"] for entry in result]
    gdfDamage["MaxDepth"] = [entry["max"] for entry in result]

    if customMaxDepthLegend == -1:
        maxDepthLegend = ((max_depth // 1) + 1)
    else:
        maxDepthLegend=customMaxDepthLegend

    if RP in ImageReturnPeriod and flagBuildingH2o is True:
        fig, ax = plt.subplots()
        plt.title(f'Buildings map and river flood map with {RP}-year return period')
        plt.xlabel('Longitude', fontsize='small')
        plt.ylabel('Latitude', fontsize='small')
        im = ax.imshow(rDepths, vmin=0, vmax=maxDepthLegend, cmap=cmap_h2o, extent=(xMinDepth, xMaxDepth, yMinDepth, yMaxDepth), zorder=1, alpha=0.8)
        divider = make_axes_locatable(ax)
        cax = divider.append_axes("right", size="2%", pad=0.1)
        plt.colorbar(im, cax=cax,label='Depth (m)',extend='max')
        plt.text(.02,.02,f'Projection in {epsgRast}', fontsize=8,
             path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
        gdfDamage.plot(ax=ax, edgecolor='grey', linewidth=0.25, facecolor='none')
        ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
        ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
        ctx.add_basemap(ax=ax, crs=epsgRast, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=0.4)
        txt = ax.texts[-1]
        txt.set_position([0.99,0.98])
        txt.set_ha('right')
        txt.set_va('top')
        if showOutlineFlag is True:
            outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
            if outlineLabelFlag is True:
                outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
                ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
        if imageSaveFlag is True:
            plt.savefig(os.path.join(dirImages, f'{saveName}_buildingoutline_floodmap_{RP}RP.png'), bbox_inches='tight')
        plt.show()
        
        # Map depths > 0 at building level
        gdf_filtered = gdfDamage[(gdfDamage['MeanDepth'] > 0) | (gdfDamage['MinDepth'] > 0) | (gdfDamage['MaxDepth'] > 0)]
        
        fig, ax = plt.subplots()
        plt.title(f'Mean flood depth at building locations \n derived from the flood map with {RP}-year return period')
        plt.xlabel('Longitude', fontsize='small')
        plt.ylabel('Latitude', fontsize='small')
        plot = gdf_filtered.plot(column='MeanDepth', vmin=0, vmax=maxDepthLegend, cmap=cmap_h2o, ax=ax)
        divider = make_axes_locatable(ax)
        cax = divider.append_axes("right", size="2%", pad=0.1)
        plt.colorbar(plot.collections[0], cax=cax,label='Depth (m)',extend='max')
        plt.text(.02,.02,f'Projection in {epsgRast}', fontsize=8,
             path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
        ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
        ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
        ctx.add_basemap(ax=ax, crs=gdf_filtered.crs, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=0.9)
        txt = ax.texts[-1]
        txt.set_position([0.99,0.98])
        txt.set_ha('right')
        txt.set_va('top')  
        if showOutlineFlag is True:
            outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
            if outlineLabelFlag is True:
                outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
                ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
        if imageSaveFlag is True:
            plt.savefig(os.path.join(dirImages, f'{saveName}_building_flooddepth_{RP}RP.png'), bbox_inches='tight')
        plt.show()

    # Save the updated geodataframe to a shapefile
    shpDepths = os.path.join(dirResults, f'{saveName}_Depths_Building_RP{RP}.shp')
    gdfDamage.to_file(shpDepths, driver='ESRI Shapefile')

print('Done')
Computing Building Water Depths:  RP= 10
  Loading water depth for selected bounds:  RP= 10
  .\data\Europe_RP10_filled_depth.tif
../../../../_images/6fadc8485f6bcf13373d328682b9855b03881a894ab263b290bb753cb3605314.png ../../../../_images/18ad65de5e42ca2774aa53e0b94a2a54deb34251c7831fb87fcb544b737a03a5.png
Computing Building Water Depths:  RP= 50
  Loading water depth for selected bounds:  RP= 50
  .\data\Europe_RP50_filled_depth.tif
Computing Building Water Depths:  RP= 100
  Loading water depth for selected bounds:  RP= 100
  .\data\Europe_RP100_filled_depth.tif
Computing Building Water Depths:  RP= 500
  Loading water depth for selected bounds:  RP= 500
  .\data\Europe_RP500_filled_depth.tif
../../../../_images/60823b04110812101f59ba0e341d0ffb032ae1f320857117a1bbd5f58a8726fb.png ../../../../_images/ea1b62b860f7317f44d3534cacd324a53cd7e012947bed23f25165420fa84fec.png
Done

Calculating economic damage to buildings#

Based on the flood water depths, the damage to the buildings (reconstruction costs) and for its contents are determined.

  • First the fractional building damage is calculated applying the JRC damage functions for each classifiction (residential, commerical, etc).

  • Then the fractional damage is multiplied with the maximum damage value per square meter and the building footprint area in meters and written to a shapefile.

  • The damages in millions of Euros summed over all of the classes and plotted for each return period level.

for RP in ReturnPeriods:
    # Read Building Water Depth Shapefile
    shpDepths = os.path.join(dirResults, f'{saveName}_Depths_Building_RP{RP}.shp')
    gdfDamage = gpd.read_file(shpDepths)

    for Depth in Depths:

        if Depth == 'Mean':
            depthStat = 'mean'
        elif Depth == 'Max':
            depthStat = 'max'
        elif Depth == 'Min':
            depthStat = 'min'
        else:
            print('Depth statistic does not exist.')
            print('  Current options are Mean, Max, and Min')
            sys.exit()

        print(f'Computing damage for {Depth.lower()} building depth. RP={str(RP)}.')
        
        statName = depthStat.title() + 'Depth'

        # Compute the damage factor for each building class

        gdfDamage['TotDamage'] = 0  # Initialize TotalDamage column

        for dmgClass in DamageClasses:

            bldgDamage='f'+dmgClass[:3].upper()+depthStat
            dmgName = 'Dmg'+bldgDamage[1:]
            # Damage factors and maximum damage value including contents
            if dmgClass == 'Residential':
                gdfDamage[bldgDamage] = DamageFunction(gdfDamage[statName], coefs_RES)
                MaxDmg = MaxDmgRES.sum()
            elif dmgClass == 'Commercial':
                gdfDamage[bldgDamage] = DamageFunction(gdfDamage[statName], coefs_COM)
                MaxDmg = MaxDmgCOM.sum()
            elif dmgClass == 'Industrial':
                gdfDamage[bldgDamage] = DamageFunction(gdfDamage[statName], coefs_IND)
                MaxDmg = MaxDmgIND.sum()
            elif dmgClass == 'Transportation':
                gdfDamage[bldgDamage] = DamageFunction(gdfDamage[statName], coefs_TRS)
                MaxDmg = MaxDmgTRS.sum()
            elif dmgClass == 'Agriculture':
                gdfDamage[bldgDamage] = DamageFunction(gdfDamage[statName], coefs_AGR)
                MaxDmg = MaxDmgAGR.sum()
            elif dmgClass == 'Universal':
                gdfDamage[bldgDamage] = DamageFunction(gdfDamage[statName], coefs_UNI)
                MaxDmg = MaxDmgUNI.sum()
            else:
                gdfDamage[bldgDamage] = DamageFunction(gdfDamage[statName], coefs_UNI)
                MaxDmg = MaxDmgUNI.sum()

            # Damage computation
            gdfDamage.loc[gdfDamage['bldgClass'] != dmgClass, bldgDamage] = 0
            gdfDamage[dmgName] = gdfDamage[bldgDamage] * gdfDamage['Area_m2'] * MaxDmg

            # Add TotalDamage in millions of €
            gdfDamage['TotDamage'] += gdfDamage[dmgName] / 10**6

        gdfDamage.to_file(shpDepths, driver='ESRI Shapefile')

        # Plotting the GeoDataFrame with filtered values
        if RP in ImageReturnPeriod and flagBuildingDmg is True:
            fig, ax = plt.subplots()
            plt.title(f'Damage to buildings by {Depth.lower()} flood depth\nbased on the flood map with {str(RP)}-year return period')
            plt.xlabel('Longitude', fontsize='small')
            plt.ylabel('Latitude', fontsize='small')
            plot = gdfDamage.plot(column='TotDamage', cmap=cmap_dmg, ax=ax,zorder=2)
            
            divider = make_axes_locatable(ax)
            cax = divider.append_axes("right", size="2%", pad=0.1)
            plt.colorbar(plot.collections[0], cax=cax,label='Damage (Mil €)',extend='max')
            
            plt.text(.02,.02,f'Projection in {gdfDamage.crs}', fontsize=8,path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
            ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
            ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
            ctx.add_basemap(ax=ax, crs=gdfDamage.crs, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=0.9)
            txt = ax.texts[-1]
            txt.set_position([0.99,0.98])
            txt.set_ha('right')
            txt.set_va('top')
            if showOutlineFlag is True:
                outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
                if outlineLabelFlag is True:
                    outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
                    ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
            if imageSaveFlag is True:
                plt.savefig(os.path.join(dirImages, f'{saveName}_building_damage_{Depth.lower()}depth_{RP}RP.png'), bbox_inches='tight')
            plt.show()

print('Done computing damage for each building')
Computing damage for mean building depth. RP=10.
../../../../_images/4397a9fd89f0a428a6fda21b796bb70a89bdb661e8ba8daefcd33baf33d037b1.png
Computing damage for mean building depth. RP=50.
Computing damage for mean building depth. RP=100.
Computing damage for mean building depth. RP=500.
../../../../_images/d0072f01f4a4f4f137c6708a775f82226972b2ed960e4eb6b95851e562402f53.png
Done computing damage for each building

Total damage to buildings#

In this section the total damage for the region of interest is summed and written to a CSV file.

nDmg = np.append(DamageClasses, 'Total')

dfDMGs = pd.DataFrame(columns=[])
dfDMGs.index = nDmg
dfDMGs.index.name = 'Building Class'

for Depth in Depths:

    if Depth not in ['Mean', 'Max', 'Min']:
        print('Depth statistic does not exist.')
        print('  Current options are Mean, Minimum, and Maximum')
        sys.exit()

    print(f'Damage for {Depth} depth')

    for RP in ReturnPeriods:

        shpOSM = os.path.join(dirResults, f'{saveName}_Depths_Building_RP{RP}.shp')
        gdfOSM = gpd.read_file(shpOSM)

        vDmg = pd.DataFrame(columns=[])
        for dmgClass in DamageClasses:
            dmgName = f'DmgTRS{Depth.lower()}' if dmgClass == 'Transportation' else \
            f'Dmg{dmgClass[:3].upper()}{Depth.lower()}'
            vDmg = np.append(vDmg, gdfOSM[dmgName].sum())

        totDmg = sum(vDmg)
        vDmg = np.append(vDmg, totDmg)
        totDmg_byclass = pd.DataFrame(vDmg)
        # Assign names to the dataframe headers
        totDmg_byclass.columns = [f'{RP}-yr']
        totDmg_byclass.index = [nDmg]

        # Compute the total damage across the entire area of interest
        print(f'  RP={RP}: Total damage (€) = {round(totDmg, 3)}')

        dfDMGs[f'{RP}-yr'] = np.array(totDmg_byclass)

    damage_csv_filename = os.path.join(dirResults, f'{saveName}_DamageTotal_{Depth}.csv')
    dfDMGs.to_csv(damage_csv_filename)

print('Done')
Damage for Mean depth
  RP=10: Total damage (€) = 291661603.94
  RP=50: Total damage (€) = 381318957.091
  RP=100: Total damage (€) = 409315826.375
  RP=500: Total damage (€) = 456376769.802
Done

Expected annual damage#

In this section the plot of building damages vs return periods of the flood maps is generated. Moreover, by integrating the curve, an estimate of the expected annual damage (EAD) in millions of Euros is provided. EAD is the damage that the region would expect on average in any given year.

max_yaxis=0
vert_spacer=0
for Depth in Depths:
    if Depth == 'Mean':
        depthStat = 'mean'
    elif Depth == 'Max':
        depthStat = 'max'
    elif Depth == 'Min':
        depthStat = 'min'
    else:
        print('Depth statistic does not exist.')
        print('  Current options are Mean, Minimum, and Maximum')
        sys.exit()

    # Load damage data for the current depth statistic
    damage_csv_filename = os.path.join(dirResults, f'{saveName}_DamageTotal_{Depth}.csv')
    dfDMGs = pd.read_csv(damage_csv_filename, index_col=0)

    # Compute the total Estimated Annual Damage (EAD) over all return periods
    probRPs = 1 / np.array(ReturnPeriods)
    iTot = dfDMGs.index.get_loc('Total')
    EAD = 0
    for iRP in range(len(ReturnPeriods)-1):
        diffRP = probRPs[iRP] - probRPs[iRP+1]
        avgDMG = (dfDMGs.iloc[iTot, iRP+1] + dfDMGs.iloc[iTot, iRP]) / 2
        EAD = EAD + avgDMG * diffRP
        graphText = f'{Depth} Expected Annual Damage: {round(EAD/10**6, 2)} Mil €'

    # Plot estimated direct damage vs exceedance probability
    if flagBuildingDmgGraph is True:
        totDmg = dfDMGs.loc['Total'] / 10**6
        max_yaxis_current=totDmg.max()
        max_yaxis=max(max_yaxis,max_yaxis_current)
        plt.plot(np.array(ReturnPeriods), totDmg, marker='o', linestyle='-')
        plt.ylim(0)
        plt.grid(which='both', linestyle=':', linewidth=0.5, color='gray',dashes=(1,5))
        plt.xlabel('Event return period (Years)')
        plt.ylabel('Estimated Direct Damage (Mil €)')
        plt.text(0.96, 0.05+vert_spacer, graphText, transform=plt.gca().transAxes, fontsize=10,
             verticalalignment='bottom', horizontalalignment='right', bbox=dict(facecolor='white', alpha=0.5, edgecolor='black'))
    print(graphText)
    vert_spacer=vert_spacer+0.08
yaxis_buffer=50 #This sets the y axis max to be the smallest multiple larger than the max value (eg: if yaxis_buffer=100, and the max value is 280, the yaxis max will be 300)
plt.ylim(0, ((max_yaxis // yaxis_buffer) + 1) * yaxis_buffer)
if len(Depths) > 1:
    plt.legend(Depths,title="Flood depth used at\nbuilding location:",fontsize='small',title_fontsize='small',
           loc='upper left', bbox_to_anchor=(1, 1),
          fancybox=True)
    plt.title('Estimated damage to buildings based on\nflood depth at building locations')
else:
    plt.title('Estimated damage to buildings based on\n{Depth.lower()} flood depth at building locations')
if imageSaveFlag is True:
    plt.savefig(os.path.join(dirImages, f'{saveName}_damage_graph.png'), bbox_inches='tight')
plt.show()
Mean Expected Annual Damage: 34.34 Mil €
../../../../_images/e0cc5794b19ec324ccc27889462f7f1648e0d904c798fcce2d071de052cae7aa.png

Critical Infrastructure#

In order to visualise the exposure of critical infrastructure for the area of interest, the OSM dataset is used:

  • Markers and colours are attributed for each type of critical infrastructure.

  • Flood water depths are read.

  • Critical infrastructure and floods are mapped together.

Note that preference will be given to amenity classes over building classes, to avoid duplicates. Therefore there might be cases where certain OSM entries will not show up in the map.

# Ensure the critical column is boolean
gdfOSMreclass['critInfrastructure'] = gdfOSMreclass['critInfrastructure'].infer_objects(copy=False)

for RP in ImageReturnPeriod:
    rastDepths = os.path.join(dirDepths, f'{saveName}_Europe_RP{RP}_filled_depth.tif')
    # Read the TIFF image
    with rasterio.open(rastDepths) as src:
        rDepths = src.read(1)  # Reading the first band
        rDepths = np.ma.masked_where((rDepths < -999) | (rDepths > 1000), rDepths)
        # Compute the maximum value from the masked data
        max_depth  = rDepths.max()
        if customMaxDepthLegend == -1:
            maxDepthLegend = ((max_depth // 1) + 1)
        else:
            maxDepthLegend=customMaxDepthLegend
        missing_data_value = src.nodata

    fig=plt.figure()
    ax = plt.axes()
    im = ax.imshow(rDepths, vmin=0, vmax=maxDepthLegend, cmap=cmap_h2o, extent=(xMinDepth, xMaxDepth, yMinDepth, yMaxDepth),
                zorder=1, alpha=0.6)
    plt.title(f'Critical infrastructure exposure to river floods with {RP}-year return period')
    divider = make_axes_locatable(ax)
    plt.xlabel('Longitude', fontsize='small')
    plt.ylabel('Latitude', fontsize='small')
    cax = divider.append_axes("right", size="2%", pad=0.1)
    plt.colorbar(im, cax=cax,label='Depth (m)',extend='max')
    plt.text(.02,.02,f'Projection in {gdfDamage.crs}', fontsize=8,
                path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)

    # Collect critical buildings for all building types
    for building_type, props in critMarkersColours.items():
        markerItem = props['marker']
        colourItem = props['color']
        # Check for building type in 'amenity' or 'building' columns
        if building_type in gdfOSMreclass['amenity'].unique():
            crit_buildings = gdfOSMreclass[gdfOSMreclass['amenity'] == building_type]
        elif building_type in gdfOSMreclass['building'].unique():
            crit_buildings = gdfOSMreclass[gdfOSMreclass['building'] == building_type]
        else:
            continue
        crit_buildings_geometry = crit_buildings.geometry
        crit_buildings_geometry = crit_buildings_geometry.to_crs('EPSG:3857')
        crit_buildings_centroids = crit_buildings_geometry.centroid
        transformer = Transformer.from_crs('EPSG:3857', gdfOSMreclass.crs, always_xy=True)
        reprojected_centroids = []
        for centroid in crit_buildings_centroids:
            x, y = transformer.transform(centroid.x, centroid.y)  # Reproject the coordinates
            reprojected_centroids.append(Point(x, y))
        crit_buildings_centroids_reprojected = gpd.GeoSeries(reprojected_centroids, crs=gdfOSMreclass.crs)
        ax.scatter(crit_buildings_centroids_reprojected.x, crit_buildings_centroids_reprojected.y, color=colourItem, marker=markerItem, s=100,
                linewidths=.8, edgecolors='k', label=f'{building_type.capitalize()}',zorder=6) #change s value for the size of markers
    # Optionally, add basemap if needed
    ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
    ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
    ctx.add_basemap(ax=ax, crs=gdfDamage.crs, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=0.5)
    txt = ax.texts[-1]
    txt.set_position([0.99,0.98])
    txt.set_ha('right')
    txt.set_va('top')
    #Legend
    if showOutlineFlag is True:
        outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
    if outlineLabelFlag is True:
        outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
        legend_outline=ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)

    legend = ax.legend(loc='upper center', ncol=4, bbox_to_anchor=(0.5, -0.11),
                        title='Critical infrastructure type:')
    ax.tick_params(direction='out', length=8, width=.8,
                    labelsize=7)
    
    if outlineLabelFlag is True:
        ax.add_artist(legend_outline)
        
    if imageSaveFlag is True:
        plt.savefig(os.path.join(dirImages, f'{saveName}_criticalinfrastructure_{RP}RP.png'), bbox_inches='tight')
    plt.show()
../../../../_images/2822753908690ed2281790fcda545368f6365adbfd0f372bc3c235f5623c0c49.png ../../../../_images/a79cd30e6c0d18e91c7b32c3db0f6cf67baba2eece441975140fb150cdc55937.png

Exposed population#

Based on the flood depth maps, the exposed population is determined.

  • The population and flood rasters are compared.

  • The exposed population is written to a CSV file.

  • A map of the exposed popoulation is produced.

  • The exposed population is plotted against the flood map return period.

Expected annual exposed population is also calculated, representing the expected number of people exposed on average in any given year.

Please note that due to the resolution of both the population and the flood maps, it might be that part of the population appears to be over a water body (eg: a river) and is counted towards the overall exposed statistics.

print('  Loading population for selected bounds:')
rastPopulation_zoom = rastPopulation.replace('.tif', f'_{saveName}.tif')
print(f'  {rastPopulation_zoom}')

# Load the population data within the specified bounds
with rasterio.open(rastPopulation) as src:
    window = from_bounds(xMinPop, yMinPop, xMaxPop, yMaxPop, src.transform)
    rPopulation = src.read(1, window=window)
    rPopulation = np.ma.masked_where(rPopulation < 0.0, rPopulation)
    missing_data_value = src.nodata

    # Save the zoomed population raster
    with rasterio.open(
        rastPopulation_zoom,
        'w',
        driver='GTiff',
        height=rPopulation.shape[0],
        width=rPopulation.shape[1],
        count=1,
        dtype=rPopulation.dtype,
        crs=src.crs,
        transform=src.window_transform(window),
        nodata=missing_data_value
    ) as dst:
        dst.write(rPopulation, 1)


#Initialise population exposed array
exposedPop = []

# Process each return period
for RP in ReturnPeriods:
    rastDepths_zoom = os.path.join(dirDepths, f'{saveName}_Europe_RP{RP}_filled_depth.tif')
    with rasterio.open(rastDepths_zoom) as flood_src:
        rastDepths_data = flood_src.read(1)

        # Reproject the population raster to match the projection of the depths raster
        with rasterio.open(rastPopulation_zoom) as pop_src:
            pop_data = pop_src.read(1)
            pop_transform, pop_width, pop_height = calculate_default_transform(
                pop_src.crs, flood_src.crs, pop_src.width, pop_src.height, *pop_src.bounds
            )
            pop_profile = pop_src.profile.copy()
            pop_profile.update({
                'crs': flood_src.crs,
                'transform': pop_transform,
                'width': pop_width,
                'height': pop_height
            })

            # Create an empty array to store the reprojected population data
            reprojected_pop_data = np.zeros((flood_src.height, flood_src.width), dtype=pop_data.dtype)
            reproject(
                pop_data,
                reprojected_pop_data,
                src_transform=pop_src.transform,
                src_crs=pop_src.crs,
                dst_transform=pop_transform,
                dst_crs=flood_src.crs,
                resampling=Resampling.nearest
            )

            # Find the spatial intersection between the depths raster and the reprojected population raster
            exposed_population = np.where(rastDepths_data > minDepthExposed, 1, 0) * reprojected_pop_data

            # Sum the exposed population
            total_exposed = np.sum(exposed_population)
            exposedPop.append(total_exposed)

            # Save the result raster for the exposed population
            result_raster = os.path.join(dirResultsPop, os.path.basename(rastDepths_zoom).replace('.tif', '_exposed_population.tif'))
            with rasterio.open(
                result_raster,
                'w',
                driver='GTiff',
                height=exposed_population.shape[0],
                width=exposed_population.shape[1],
                count=1,
                dtype=exposed_population.dtype,
                crs=flood_src.crs,
                transform=flood_src.transform,
            ) as dst:
                dst.write(exposed_population, 1)
    if len(exposed_population.shape) == 3:
        height, width = exposed_population.shape[1], exposed_population.shape[2]
    elif len(exposed_population.shape) == 2:
        height, width = exposed_population.shape
    xMinExpPop, yMinExpPop, xMaxExpPop, yMaxExpPop = array_bounds(height, width, out_transform)   

    if RP in ImageReturnPeriod and flagPopulationExp is True:
        # Plot rasters
        fig, ax = plt.subplots()
        plt.title(f'Exposed population for the river flood event with {RP}-year return period\n(Population statistics based on estimate for the year {PopYear})')
        im = ax.imshow(np.ma.masked_where((exposed_population <= 0), exposed_population), cmap=cmap_pop, extent=(xMinExpPop, xMaxExpPop, yMinExpPop, yMaxExpPop),
                       zorder=2,alpha=0.8)
        plt.xlabel('Longitude', fontsize='small')
        plt.ylabel('Latitude', fontsize='small')
       
        divider = make_axes_locatable(ax)
        cax = divider.append_axes("right", size="2%", pad=0.1)
        plt.colorbar(im, cax=cax, label='People per 3 arcseconds\u00B2', extend='max')
        plt.text(0, -.27, f'Exposed if Water Depth >{minDepthExposed}m', transform=ax.transAxes, fontsize=8,
             verticalalignment='top', horizontalalignment='left', bbox=dict(facecolor='white', alpha=0.5, edgecolor='black'))
        plt.text(.02,.02,f'Projection in {epsgRastPop}', fontsize=8,path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
        ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
        ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
        ctx.add_basemap(ax=ax, crs=epsgRastPop, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=1)
        txt = ax.texts[-1]
        txt.set_position([0.99,0.98])
        txt.set_ha('right')
        txt.set_va('top')
        if showOutlineFlag is True:
            outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
        if outlineLabelFlag is True:
            outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
            legend_outline=ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
        if imageSaveFlag is True:
            plt.savefig(os.path.join(dirImages, f'{saveName}_popexposed_map_{RP}RP.png'), bbox_inches='tight')
        plt.show()              


# Compute the total Estimated Annual Exposed Population (EAEP) over all return periods
EAEP = 0
for iRP in range(len(ReturnPeriods)-1):
    diffRP = probRPs[iRP] - probRPs[iRP+1]
    avgPOP = (exposedPop[iRP+1] + exposedPop[iRP]) / 2
    EAEP = EAEP + avgPOP * diffRP

if flagPopulationExpGraph is True:
    plt.plot(np.array(ReturnPeriods), exposedPop, marker='o', linestyle='-')
    plt.ylim(0)
    plt.grid(which='both', linestyle=':', linewidth=0.5, color='gray',dashes=(1,5))
    plt.xlabel('Event Return Period (Years)')
    plt.ylabel('People')
    plt.title(f'Estimated exposed population per flood event return period\n(Population statistics based on estimate for the year {PopYear})')
    graphText = f'Expected annual population exposed: {round(EAEP)} people.'
    plt.text(0.96, 0.05, graphText, transform=plt.gca().transAxes, fontsize=10,
             verticalalignment='bottom', horizontalalignment='right', bbox=dict(facecolor='white', alpha=0.5, edgecolor='black'))
    
    if imageSaveFlag is True:
        plt.savefig(os.path.join(dirImages, f'{saveName}_popexposed_graph.png'), bbox_inches='tight')
    plt.show()

print(graphText)

#CSV FILE
dfPOP = pd.DataFrame(columns=[])
dfPOP.index = ReturnPeriods
dfPOP.index.name = 'Flood event return period (years)'
dfPOP['People Exposed'] = exposedPop
population_csv_filename = os.path.join(dirResultsPop, f'{saveName}_ExposedPopulationTotal.csv')
dfPOP.to_csv(population_csv_filename)
  Loading population for selected bounds:
  .\data\GHS_POP_E2025_GLOBE_R2023A_4326_3ss_V1_0_R5_C20_myLocation.tif
../../../../_images/0a325b1930f0c1d05a25606b729b03e34e60465a7e77782f4509b20fb706e347.png ../../../../_images/ffea0b3430df7547ba91cda70a8eceb8adb13692cce45e44f092aa754dfabd52.png ../../../../_images/69121df01b9c7fb8b6e1d9d18672cff70f74ffd245cfe17c97ca5f5de6e0013b.png
Expected annual population exposed: 1419 people.

Displaced population#

Based on the flood depth maps, the displaced population is a subset of the exposed population that experience flood depths above a given threshold.

  • The population and flood rasters are compared.

  • The displaced population is written to a CSV file.

  • A map of the displaced population is produced.

  • The plot of displaced population vs flood map return period is produced.

Expected annual displaced population is also calculated, representing the expected number of people displaced on average per year.

Please note that due to the resolution of both the population and the flood maps, it might be that part of the population appears to be over a water body (eg: a river) and is counted towards the overall displaced statistics.

# Initialise displaced population array
displacedPop = []
# Process each return period
for RP in ReturnPeriods:
    rastDepths_zoom = os.path.join(dirDepths, f'{saveName}_Europe_RP{RP}_filled_depth.tif')
    with rasterio.open(rastDepths_zoom) as flood_src:
        rastDepths_data = flood_src.read(1)

        # Reproject the population raster to match the projection of the depths raster
        with rasterio.open(rastPopulation_zoom) as pop_src:
            pop_data = pop_src.read(1)
            pop_transform, pop_width, pop_height = calculate_default_transform(
                pop_src.crs, flood_src.crs, pop_src.width, pop_src.height, *pop_src.bounds
            )
            pop_profile = pop_src.profile.copy()
            pop_profile.update({
                'crs': flood_src.crs,
                'transform': pop_transform,
                'width': pop_width,
                'height': pop_height
            })

            # Create an empty array to store the reprojected population data
            reprojected_pop_data = np.zeros((flood_src.height, flood_src.width), dtype=pop_data.dtype)
            reproject(
                pop_data,
                reprojected_pop_data,
                src_transform=pop_src.transform,
                src_crs=pop_src.crs,
                dst_transform=pop_transform,
                dst_crs=flood_src.crs,
                resampling=Resampling.nearest
            )

            # Find the spatial intersection between the depths raster and the reprojected population raster
            displaced_population = np.where(rastDepths_data > minDepthDisplaced, 1, 0) * reprojected_pop_data

            # Sum the exposed population
            total_displaced = np.sum(displaced_population)
            displacedPop.append(total_displaced)

            # Save the result raster for the exposed population
            result_raster = os.path.join(dirResultsPop, os.path.basename(rastDepths_zoom).replace('.tif', '_displaced_population.tif'))
            with rasterio.open(
                result_raster,
                'w',
                driver='GTiff',
                height=displaced_population.shape[0],
                width=displaced_population.shape[1],
                count=1,
                dtype=displaced_population.dtype,
                crs=flood_src.crs,
                transform=flood_src.transform,
            ) as dst:
                dst.write(displaced_population, 1)
    if len(displaced_population.shape) == 3:
        height, width = displaced_population.shape[1], displaced_population.shape[2]
    elif len(displaced_population.shape) == 2:
        height, width = displaced_population.shape
    xMinDisPop, yMinDisPop, xMaxDisPop, yMaxDisPop = array_bounds(height, width, out_transform)  

    if RP in ImageReturnPeriod and flagPopulationDis is True:
    # Plot rasters
        fig, ax = plt.subplots()
        plt.title(f'Displaced population for the river flood event with {RP}-year return period\n(Population statistics based on estimate for the year {PopYear})')
        im = ax.imshow(np.ma.masked_where((displaced_population <= 0), displaced_population), cmap=cmap_pop, extent=(xMinDisPop, xMaxDisPop, yMinDisPop, yMaxDisPop),
                       zorder=2,alpha=0.8)
        plt.xlabel('Longitude', fontsize='small')
        plt.ylabel('Latitude', fontsize='small')
        divider = make_axes_locatable(ax)
        cax = divider.append_axes("right", size="2%", pad=0.1)
        plt.colorbar(im, cax=cax, label='People per 3 arcseconds\u00B2', extend='max')
        plt.text(0.00, -0.27, f'Displaced if flood depth >{minDepthDisplaced}m', transform=ax.transAxes, fontsize=8,
             verticalalignment='top', horizontalalignment='left', bbox=dict(facecolor='white', alpha=0.5, edgecolor='black'))
        plt.text(.02,.02,f'Projection in {epsgRastPop}', fontsize=8,path_effects=[pe.withStroke(linewidth=3, foreground="white")], transform=ax.transAxes,zorder=7)
        ax.set_xlim(Longitude1-xbuffer,Longitude2+xbuffer)
        ax.set_ylim(Latitude1-ybuffer,Latitude2+ybuffer)
        ctx.add_basemap(ax=ax, crs=epsgRastPop, source=ctx.providers.OpenStreetMap.Mapnik, attribution_size='6', alpha=1)
        txt = ax.texts[-1]
        txt.set_position([0.99,0.98])
        txt.set_ha('right')
        txt.set_va('top')
        if showOutlineFlag is True:
            outlineGeometry.plot(ax=ax, edgecolor=outlineColour, linestyle=outlineStyle, linewidth=outlineThickness, facecolor=outlineFace, alpha=outlineAlpha, zorder=5)
        if outlineLabelFlag is True:
            outline_legend = mlines.Line2D([], [], color=outlineColour, linewidth=outlineThickness, label=outlineLabel)
            legend_outline=ax.legend(handles=[outline_legend], loc=outlineLabelPosition, fontsize=outlineLabelSize, frameon=True)
        if imageSaveFlag is True:
            plt.savefig(os.path.join(dirImages, f'{saveName}_popdisplaced_map.png'), bbox_inches='tight')
        plt.show()              


# Compute the total Estimated Annual Displaced Population (EADP) over all return periods
EADP = 0
for iRP in range(len(ReturnPeriods)-1):
    diffRP = probRPs[iRP] - probRPs[iRP+1]
    avgPOP = (displacedPop[iRP+1] + displacedPop[iRP]) / 2
    EADP = EADP + avgPOP * diffRP

if flagPopulationDisGraph is True:
    plt.plot(np.array(ReturnPeriods), displacedPop, marker='o', linestyle='-')
    plt.ylim(0)
    plt.grid(which='both', linestyle=':', linewidth=0.5, color='gray',dashes=(1,5))
    plt.xlabel('Event Return Period (Years)')
    plt.ylabel('People')
    plt.title(f'Estimated displaced population per flood event return period\n(Population statistics based on estimate for the year {PopYear})')
    graphText = f'Expected Annual Population Displaced: {round(EADP)} people.'
    plt.text(0.96, 0.13, f'Displaced if flood depth >{minDepthDisplaced}m', transform=plt.gca().transAxes, fontsize=8,
             verticalalignment='bottom', horizontalalignment='right', bbox=dict(facecolor='white', alpha=0.5, edgecolor='black'))
    plt.text(0.96, 0.05, graphText, transform=plt.gca().transAxes, fontsize=10,
             verticalalignment='bottom', horizontalalignment='right', bbox=dict(facecolor='white', alpha=0.5, edgecolor='black'))
    if imageSaveFlag is True:
        plt.savefig(os.path.join(dirImages, f'{saveName}_popdisplaced_graph.png'), bbox_inches='tight')
    plt.show()

print(graphText)

#CSV FILE
dfPOP = pd.DataFrame(columns=[])
dfPOP.index = ReturnPeriods
dfPOP.index.name = 'Event Return Period (years)'
dfPOP['People Exposed'] = exposedPop
population_csv_filename = os.path.join(dirResultsPop, f'{saveName}_DisplacedPopulationTotal.csv')
dfPOP.to_csv(population_csv_filename)
../../../../_images/4c81a77a11661713996fed252a6b4af819c19bec070841a77d03592292a89130.png ../../../../_images/35d79fff56cbdb97c4cd810d9f418f43889d266ec8a94af6c618560ad380f6d0.png ../../../../_images/83737c398df39927cad5617139c8edab66e63bd2fff1296ce75838fdcca6ca95.png
Expected Annual Population Displaced: 1251 people.

Conclusion#

In the risk assessment:

  • Flood and population maps where generated.

  • By combining hazards with exposure and vulnerabilities the risk was computed

    • Building damage maps and estimated yearly damage.

    • Population displacement and estimated yearly displacement.

  • Moreover, overall population and critical infrastructure exposed were also showcased.

Authors#

CMCC

Main contributors:
Jeremy Pal, Davide Serrao