This commit is contained in:
2025-01-26 19:24:23 -08:00
parent 32cd60e92b
commit d1dde0dbc6
4155 changed files with 29170 additions and 216373 deletions

View File

@@ -1,19 +1,31 @@
from packaging.version import Version
import json
import warnings
from packaging.version import Version
import numpy as np
from pandas import DataFrame, Series
import geopandas._compat as compat
from geopandas._compat import import_optional_dependency
from geopandas.array import from_wkb
from geopandas import GeoDataFrame
import shapely
import geopandas
from geopandas import GeoDataFrame
from geopandas._compat import import_optional_dependency
from geopandas.array import from_shapely, from_wkb
from .file import _expand_user
METADATA_VERSION = "1.0.0"
SUPPORTED_VERSIONS = ["0.1.0", "0.4.0", "1.0.0-beta.1", "1.0.0"]
SUPPORTED_VERSIONS = ["0.1.0", "0.4.0", "1.0.0-beta.1", "1.0.0", "1.1.0"]
GEOARROW_ENCODINGS = [
"point",
"linestring",
"polygon",
"multipoint",
"multilinestring",
"multipolygon",
]
SUPPORTED_ENCODINGS = ["WKB"] + GEOARROW_ENCODINGS
# reference: https://github.com/opengeospatial/geoparquet
# Metadata structure:
@@ -68,7 +80,40 @@ def _remove_id_from_member_of_ensembles(json_dict):
member.pop("id", None)
def _create_metadata(df, schema_version=None):
# type ids 0 to 7
_geometry_type_names = [
"Point",
"LineString",
"LineString",
"Polygon",
"MultiPoint",
"MultiLineString",
"MultiPolygon",
"GeometryCollection",
]
_geometry_type_names += [geom_type + " Z" for geom_type in _geometry_type_names]
def _get_geometry_types(series):
"""
Get unique geometry types from a GeoSeries.
"""
arr_geometry_types = shapely.get_type_id(series.array._data)
# ensure to include "... Z" for 3D geometries
has_z = shapely.has_z(series.array._data)
arr_geometry_types[has_z] += 8
geometry_types = Series(arr_geometry_types).unique().tolist()
# drop missing values (shapely.get_type_id returns -1 for those)
if -1 in geometry_types:
geometry_types.remove(-1)
return sorted([_geometry_type_names[idx] for idx in geometry_types])
def _create_metadata(
df, schema_version=None, geometry_encoding=None, write_covering_bbox=False
):
"""Create and encode geo metadata dict.
Parameters
@@ -77,13 +122,22 @@ def _create_metadata(df, schema_version=None):
schema_version : {'0.1.0', '0.4.0', '1.0.0-beta.1', '1.0.0', None}
GeoParquet specification version; if not provided will default to
latest supported version.
write_covering_bbox : bool, default False
Writes the bounding box column for each row entry with column
name 'bbox'. Writing a bbox column can be computationally
expensive, hence is default setting is False.
Returns
-------
dict
"""
schema_version = schema_version or METADATA_VERSION
if schema_version is None:
if geometry_encoding and any(
encoding != "WKB" for encoding in geometry_encoding.values()
):
schema_version = "1.1.0"
else:
schema_version = METADATA_VERSION
if schema_version not in SUPPORTED_VERSIONS:
raise ValueError(
@@ -94,7 +148,8 @@ def _create_metadata(df, schema_version=None):
column_metadata = {}
for col in df.columns[df.dtypes == "geometry"]:
series = df[col]
geometry_types = sorted(Series(series.geom_type.unique()).dropna())
geometry_types = _get_geometry_types(series)
if schema_version[0] == "0":
geometry_types_name = "geometry_type"
if len(geometry_types) == 1:
@@ -111,7 +166,7 @@ def _create_metadata(df, schema_version=None):
_remove_id_from_member_of_ensembles(crs)
column_metadata[col] = {
"encoding": "WKB",
"encoding": geometry_encoding[col],
"crs": crs,
geometry_types_name: geometry_types,
}
@@ -121,10 +176,20 @@ def _create_metadata(df, schema_version=None):
# don't add bbox with NaNs for empty / all-NA geometry column
column_metadata[col]["bbox"] = bbox
if write_covering_bbox:
column_metadata[col]["covering"] = {
"bbox": {
"xmin": ["bbox", "xmin"],
"ymin": ["bbox", "ymin"],
"xmax": ["bbox", "xmax"],
"ymax": ["bbox", "ymax"],
},
}
return {
"primary_column": df._geometry_column_name,
"columns": column_metadata,
"version": schema_version or METADATA_VERSION,
"version": schema_version,
"creator": {"library": "geopandas", "version": geopandas.__version__},
}
@@ -188,7 +253,7 @@ def _validate_dataframe(df):
raise ValueError("Index level names must be strings")
def _validate_metadata(metadata):
def _validate_geo_metadata(metadata):
"""Validate geo metadata.
Must not be empty, and must contain the structure specified above.
@@ -232,8 +297,12 @@ def _validate_metadata(metadata):
"'{key}' for column '{col}'".format(key=key, col=col)
)
if column_metadata["encoding"] != "WKB":
raise ValueError("Only WKB geometry encoding is supported")
if column_metadata["encoding"] not in SUPPORTED_ENCODINGS:
raise ValueError(
"Only WKB geometry encoding or one of the native encodings "
f"({GEOARROW_ENCODINGS!r}) are supported, "
f"got: {column_metadata['encoding']}"
)
if column_metadata.get("edges", "planar") == "spherical":
warnings.warn(
@@ -245,37 +314,59 @@ def _validate_metadata(metadata):
stacklevel=4,
)
if "covering" in column_metadata:
covering = column_metadata["covering"]
if "bbox" in covering:
bbox = covering["bbox"]
for var in ["xmin", "ymin", "xmax", "ymax"]:
if var not in bbox.keys():
raise ValueError("Metadata for bbox column is malformed.")
def _geopandas_to_arrow(df, index=None, schema_version=None):
def _geopandas_to_arrow(
df,
index=None,
geometry_encoding="WKB",
schema_version=None,
write_covering_bbox=None,
):
"""
Helper function with main, shared logic for to_parquet/to_feather.
"""
from pyarrow import Table
from pyarrow import StructArray
from geopandas.io._geoarrow import geopandas_to_arrow
_validate_dataframe(df)
# create geo metadata before altering incoming data frame
geo_metadata = _create_metadata(df, schema_version=schema_version)
if schema_version is not None:
if geometry_encoding != "WKB" and schema_version != "1.1.0":
raise ValueError(
"'geoarrow' encoding is only supported with schema version >= 1.1.0"
)
kwargs = {}
if compat.USE_SHAPELY_20:
kwargs = {"flavor": "iso"}
else:
for col in df.columns[df.dtypes == "geometry"]:
series = df[col]
if series.has_z.any():
warnings.warn(
"The GeoDataFrame contains 3D geometries, and when using "
"shapely < 2.0, such geometries will be written not exactly "
"following to the GeoParquet spec (not using ISO WKB). For "
"most use cases this should not be a problem (GeoPandas can "
"read such files fine).",
stacklevel=2,
)
break
df = df.to_wkb(**kwargs)
table, geometry_encoding_dict = geopandas_to_arrow(
df, geometry_encoding=geometry_encoding, index=index, interleaved=False
)
geo_metadata = _create_metadata(
df,
schema_version=schema_version,
geometry_encoding=geometry_encoding_dict,
write_covering_bbox=write_covering_bbox,
)
table = Table.from_pandas(df, preserve_index=index)
if write_covering_bbox:
if "bbox" in df.columns:
raise ValueError(
"An existing column 'bbox' already exists in the dataframe. "
"Please rename to write covering bbox."
)
bounds = df.bounds
bbox_array = StructArray.from_arrays(
[bounds["minx"], bounds["miny"], bounds["maxx"], bounds["maxy"]],
names=["xmin", "ymin", "xmax", "ymax"],
)
table = table.append_column("bbox", bbox_array)
# Store geopandas specific file-level metadata
# This must be done AFTER creating the table or it is not persisted
@@ -286,7 +377,14 @@ def _geopandas_to_arrow(df, index=None, schema_version=None):
def _to_parquet(
df, path, index=None, compression="snappy", schema_version=None, **kwargs
df,
path,
index=None,
compression="snappy",
geometry_encoding="WKB",
schema_version=None,
write_covering_bbox=False,
**kwargs,
):
"""
Write a GeoDataFrame to the Parquet format.
@@ -312,9 +410,17 @@ def _to_parquet(
output except `RangeIndex` which is stored as metadata only.
compression : {'snappy', 'gzip', 'brotli', None}, default 'snappy'
Name of the compression to use. Use ``None`` for no compression.
geometry_encoding : {'WKB', 'geoarrow'}, default 'WKB'
The encoding to use for the geometry columns. Defaults to "WKB"
for maximum interoperability. Specify "geoarrow" to use one of the
native GeoArrow-based single-geometry type encodings.
schema_version : {'0.1.0', '0.4.0', '1.0.0', None}
GeoParquet specification version; if not provided will default to
latest supported version.
write_covering_bbox : bool, default False
Writes the bounding box column for each row entry with column
name 'bbox'. Writing a bbox column can be computationally
expensive, hence is default setting is False.
**kwargs
Additional keyword arguments passed to pyarrow.parquet.write_table().
"""
@@ -322,19 +428,14 @@ def _to_parquet(
"pyarrow.parquet", extra="pyarrow is required for Parquet support."
)
if kwargs and "version" in kwargs and kwargs["version"] is not None:
if schema_version is None and kwargs["version"] in SUPPORTED_VERSIONS:
warnings.warn(
"the `version` parameter has been replaced with `schema_version`. "
"`version` will instead be passed directly to the underlying "
"parquet writer unless `version` is 0.1.0 or 0.4.0.",
FutureWarning,
stacklevel=2,
)
schema_version = kwargs.pop("version")
path = _expand_user(path)
table = _geopandas_to_arrow(df, index=index, schema_version=schema_version)
table = _geopandas_to_arrow(
df,
index=index,
geometry_encoding=geometry_encoding,
schema_version=schema_version,
write_covering_bbox=write_covering_bbox,
)
parquet.write_table(table, path, compression=compression, **kwargs)
@@ -379,47 +480,26 @@ def _to_feather(df, path, index=None, compression=None, schema_version=None, **k
if Version(pyarrow.__version__) < Version("0.17.0"):
raise ImportError("pyarrow >= 0.17 required for Feather support")
if kwargs and "version" in kwargs and kwargs["version"] is not None:
if schema_version is None and kwargs["version"] in SUPPORTED_VERSIONS:
warnings.warn(
"the `version` parameter has been replaced with `schema_version`. "
"`version` will instead be passed directly to the underlying "
"feather writer unless `version` is 0.1.0 or 0.4.0.",
FutureWarning,
stacklevel=2,
)
schema_version = kwargs.pop("version")
path = _expand_user(path)
table = _geopandas_to_arrow(df, index=index, schema_version=schema_version)
feather.write_feather(table, path, compression=compression, **kwargs)
def _arrow_to_geopandas(table, metadata=None):
def _arrow_to_geopandas(table, geo_metadata=None):
"""
Helper function with main, shared logic for read_parquet/read_feather.
"""
df = table.to_pandas()
metadata = metadata or table.schema.metadata
if metadata is None or b"geo" not in metadata:
raise ValueError(
"""Missing geo metadata in Parquet/Feather file.
Use pandas.read_parquet/read_feather() instead."""
)
try:
metadata = _decode_metadata(metadata.get(b"geo", b""))
except (TypeError, json.decoder.JSONDecodeError):
raise ValueError("Missing or malformed geo metadata in Parquet/Feather file")
_validate_metadata(metadata)
if geo_metadata is None:
# Note: this path of not passing metadata is also used by dask-geopandas
geo_metadata = _validate_and_decode_metadata(table.schema.metadata)
# Find all geometry columns that were read from the file. May
# be a subset if 'columns' parameter is used.
geometry_columns = df.columns.intersection(metadata["columns"])
geometry_columns = [
col for col in geo_metadata["columns"] if col in table.column_names
]
result_column_names = list(table.slice(0, 0).to_pandas().columns)
geometry_columns.sort(key=result_column_names.index)
if not len(geometry_columns):
raise ValueError(
@@ -428,7 +508,7 @@ def _arrow_to_geopandas(table, metadata=None):
use pandas.read_parquet/read_feather() instead."""
)
geometry = metadata["primary_column"]
geometry = geo_metadata["primary_column"]
# Missing geometry likely indicates a subset of columns was read;
# promote the first available geometry to the primary geometry.
@@ -443,9 +523,12 @@ def _arrow_to_geopandas(table, metadata=None):
stacklevel=3,
)
table_attr = table.drop(geometry_columns)
df = table_attr.to_pandas()
# Convert the WKB columns that are present back to geometry.
for col in geometry_columns:
col_metadata = metadata["columns"][col]
col_metadata = geo_metadata["columns"][col]
if "crs" in col_metadata:
crs = col_metadata["crs"]
if isinstance(crs, dict):
@@ -455,7 +538,19 @@ def _arrow_to_geopandas(table, metadata=None):
# OGC:CRS84
crs = "OGC:CRS84"
df[col] = from_wkb(df[col].values, crs=crs)
if col_metadata["encoding"] == "WKB":
geom_arr = from_wkb(np.array(table[col]), crs=crs)
else:
from geopandas.io._geoarrow import construct_shapely_array
geom_arr = from_shapely(
construct_shapely_array(
table[col].combine_chunks(), "geoarrow." + col_metadata["encoding"]
),
crs=crs,
)
df.insert(result_column_names.index(col), col, geom_arr)
return GeoDataFrame(df, geometry=geometry)
@@ -521,7 +616,59 @@ def _ensure_arrow_fs(filesystem):
return filesystem
def _read_parquet(path, columns=None, storage_options=None, **kwargs):
def _validate_and_decode_metadata(metadata):
if metadata is None or b"geo" not in metadata:
raise ValueError(
"""Missing geo metadata in Parquet/Feather file.
Use pandas.read_parquet/read_feather() instead."""
)
# check for malformed metadata
try:
decoded_geo_metadata = _decode_metadata(metadata.get(b"geo", b""))
except (TypeError, json.decoder.JSONDecodeError):
raise ValueError("Missing or malformed geo metadata in Parquet/Feather file")
_validate_geo_metadata(decoded_geo_metadata)
return decoded_geo_metadata
def _read_parquet_schema_and_metadata(path, filesystem):
"""
Opening the Parquet file/dataset a first time to get the schema and metadata.
TODO: we should look into how we can reuse opened dataset for reading the
actual data, to avoid discovering the dataset twice (problem right now is
that the ParquetDataset interface doesn't allow passing the filters on read)
"""
import pyarrow
from pyarrow import parquet
kwargs = {}
if Version(pyarrow.__version__) < Version("15.0.0"):
kwargs = dict(use_legacy_dataset=False)
try:
schema = parquet.ParquetDataset(path, filesystem=filesystem, **kwargs).schema
except Exception:
schema = parquet.read_schema(path, filesystem=filesystem)
metadata = schema.metadata
# read metadata separately to get the raw Parquet FileMetaData metadata
# (pyarrow doesn't properly exposes those in schema.metadata for files
# created by GDAL - https://issues.apache.org/jira/browse/ARROW-16688)
if metadata is None or b"geo" not in metadata:
try:
metadata = parquet.read_metadata(path, filesystem=filesystem).metadata
except Exception:
pass
return schema, metadata
def _read_parquet(path, columns=None, storage_options=None, bbox=None, **kwargs):
"""
Load a Parquet object from the file path, returning a GeoDataFrame.
@@ -565,8 +712,13 @@ def _read_parquet(path, columns=None, storage_options=None, **kwargs):
both ``pyarrow.fs`` and ``fsspec`` (e.g. "s3://") then the ``pyarrow.fs``
filesystem is preferred. Provide the instantiated fsspec filesystem using
the ``filesystem`` keyword if you wish to use its implementation.
bbox : tuple, optional
Bounding box to be used to filter selection from geoparquet data. This
is only usable if the data was saved with the bbox covering metadata.
Input is of the tuple format (xmin, ymin, xmax, ymax).
**kwargs
Any additional kwargs passed to pyarrow.parquet.read_table().
Any additional kwargs passed to :func:`pyarrow.parquet.read_table`.
Returns
-------
@@ -595,29 +747,36 @@ def _read_parquet(path, columns=None, storage_options=None, **kwargs):
filesystem, path = _get_filesystem_path(
path, filesystem=filesystem, storage_options=storage_options
)
path = _expand_user(path)
schema, metadata = _read_parquet_schema_and_metadata(path, filesystem)
geo_metadata = _validate_and_decode_metadata(metadata)
bbox_filter = (
_get_parquet_bbox_filter(geo_metadata, bbox) if bbox is not None else None
)
if_bbox_column_exists = _check_if_covering_in_geo_metadata(geo_metadata)
# by default, bbox column is not read in, so must specify which
# columns are read in if it exists.
if not columns and if_bbox_column_exists:
columns = _get_non_bbox_columns(schema, geo_metadata)
# if both bbox and filters kwargs are used, must splice together.
if "filters" in kwargs:
filters_kwarg = kwargs.pop("filters")
filters = _splice_bbox_and_filters(filters_kwarg, bbox_filter)
else:
filters = bbox_filter
kwargs["use_pandas_metadata"] = True
table = parquet.read_table(path, columns=columns, filesystem=filesystem, **kwargs)
# read metadata separately to get the raw Parquet FileMetaData metadata
# (pyarrow doesn't properly exposes those in schema.metadata for files
# created by GDAL - https://issues.apache.org/jira/browse/ARROW-16688)
metadata = None
if table.schema.metadata is None or b"geo" not in table.schema.metadata:
try:
# read_metadata does not accept a filesystem keyword, so need to
# handle this manually (https://issues.apache.org/jira/browse/ARROW-16719)
if filesystem is not None:
pa_filesystem = _ensure_arrow_fs(filesystem)
with pa_filesystem.open_input_file(path) as source:
metadata = parquet.read_metadata(source).metadata
else:
metadata = parquet.read_metadata(path).metadata
except Exception:
pass
table = parquet.read_table(
path, columns=columns, filesystem=filesystem, filters=filters, **kwargs
)
return _arrow_to_geopandas(table, metadata)
return _arrow_to_geopandas(table, geo_metadata)
def _read_feather(path, columns=None, **kwargs):
@@ -677,11 +836,78 @@ def _read_feather(path, columns=None, **kwargs):
)
# TODO move this into `import_optional_dependency`
import pyarrow
import geopandas.io._pyarrow_hotfix # noqa: F401
if Version(pyarrow.__version__) < Version("0.17.0"):
raise ImportError("pyarrow >= 0.17 required for Feather support")
path = _expand_user(path)
table = feather.read_table(path, columns=columns, **kwargs)
return _arrow_to_geopandas(table)
def _get_parquet_bbox_filter(geo_metadata, bbox):
primary_column = geo_metadata["primary_column"]
if _check_if_covering_in_geo_metadata(geo_metadata):
bbox_column_name = _get_bbox_encoding_column_name(geo_metadata)
return _convert_bbox_to_parquet_filter(bbox, bbox_column_name)
elif geo_metadata["columns"][primary_column]["encoding"] == "point":
import pyarrow.compute as pc
return (
(pc.field((primary_column, "x")) >= bbox[0])
& (pc.field((primary_column, "x")) <= bbox[2])
& (pc.field((primary_column, "y")) >= bbox[1])
& (pc.field((primary_column, "y")) <= bbox[3])
)
else:
raise ValueError(
"Specifying 'bbox' not supported for this Parquet file (it should either "
"have a bbox covering column or use 'point' encoding)."
)
def _convert_bbox_to_parquet_filter(bbox, bbox_column_name):
import pyarrow.compute as pc
return ~(
(pc.field((bbox_column_name, "xmin")) > bbox[2])
| (pc.field((bbox_column_name, "ymin")) > bbox[3])
| (pc.field((bbox_column_name, "xmax")) < bbox[0])
| (pc.field((bbox_column_name, "ymax")) < bbox[1])
)
def _check_if_covering_in_geo_metadata(geo_metadata):
primary_column = geo_metadata["primary_column"]
return "covering" in geo_metadata["columns"][primary_column].keys()
def _get_bbox_encoding_column_name(geo_metadata):
primary_column = geo_metadata["primary_column"]
return geo_metadata["columns"][primary_column]["covering"]["bbox"]["xmin"][0]
def _get_non_bbox_columns(schema, geo_metadata):
bbox_column_name = _get_bbox_encoding_column_name(geo_metadata)
columns = schema.names
if bbox_column_name in columns:
columns.remove(bbox_column_name)
return columns
def _splice_bbox_and_filters(kwarg_filters, bbox_filter):
parquet = import_optional_dependency(
"pyarrow.parquet", extra="pyarrow is required for Parquet support."
)
if bbox_filter is None:
return kwarg_filters
filters_expression = parquet.filters_to_expression(kwarg_filters)
return bbox_filter & filters_expression