Files
2025-01-26 19:24:23 -08:00

399 lines
11 KiB
Python

from io import BytesIO
from pathlib import Path
from zipfile import ZIP_DEFLATED, ZipFile
import numpy as np
from pyogrio import (
__gdal_version_string__,
__version__,
list_drivers,
)
from pyogrio._compat import (
HAS_ARROW_API,
HAS_ARROW_WRITE_API,
HAS_GDAL_GEOS,
HAS_PYARROW,
HAS_PYPROJ,
HAS_SHAPELY,
)
from pyogrio.core import vsi_rmtree
from pyogrio.raw import read, write
import pytest
_data_dir = Path(__file__).parent.resolve() / "fixtures"
# mapping of driver extension to driver name for well-supported drivers
DRIVERS = {
".fgb": "FlatGeobuf",
".geojson": "GeoJSON",
".geojsonl": "GeoJSONSeq",
".geojsons": "GeoJSONSeq",
".gpkg": "GPKG",
".shp": "ESRI Shapefile",
}
# mapping of driver name to extension
DRIVER_EXT = {driver: ext for ext, driver in DRIVERS.items()}
ALL_EXTS = [".fgb", ".geojson", ".geojsonl", ".gpkg", ".shp"]
START_FID = {
".fgb": 0,
".geojson": 0,
".geojsonl": 0,
".geojsons": 0,
".gpkg": 1,
".shp": 0,
}
def pytest_report_header(config):
drivers = ", ".join(
f"{driver}({capability})"
for driver, capability in sorted(list_drivers().items())
)
return (
f"pyogrio {__version__}\n"
f"GDAL {__gdal_version_string__}\n"
f"Supported drivers: {drivers}"
)
# marks to skip tests if optional dependecies are not present
requires_arrow_api = pytest.mark.skipif(not HAS_ARROW_API, reason="GDAL>=3.6 required")
requires_pyarrow_api = pytest.mark.skipif(
not HAS_ARROW_API or not HAS_PYARROW, reason="GDAL>=3.6 and pyarrow required"
)
requires_pyproj = pytest.mark.skipif(not HAS_PYPROJ, reason="pyproj required")
requires_arrow_write_api = pytest.mark.skipif(
not HAS_ARROW_WRITE_API or not HAS_PYARROW,
reason="GDAL>=3.8 required for Arrow write API",
)
requires_gdal_geos = pytest.mark.skipif(
not HAS_GDAL_GEOS, reason="GDAL compiled with GEOS required"
)
requires_shapely = pytest.mark.skipif(not HAS_SHAPELY, reason="Shapely >= 2.0 required")
def prepare_testfile(testfile_path, dst_dir, ext):
if ext == testfile_path.suffix:
return testfile_path
dst_path = dst_dir / f"{testfile_path.stem}{ext}"
if dst_path.exists():
return dst_path
meta, _, geometry, field_data = read(testfile_path)
if ext == ".fgb":
# For .fgb, spatial_index=False to avoid the rows being reordered
meta["spatial_index"] = False
# allow mixed Polygons/MultiPolygons type
meta["geometry_type"] = "Unknown"
elif ext == ".gpkg":
# For .gpkg, spatial_index=False to avoid the rows being reordered
meta["spatial_index"] = False
meta["geometry_type"] = "MultiPolygon"
write(dst_path, geometry, field_data, **meta)
return dst_path
@pytest.fixture(scope="session")
def data_dir():
return _data_dir
@pytest.fixture(scope="function")
def naturalearth_lowres(tmp_path, request):
ext = getattr(request, "param", ".shp")
testfile_path = _data_dir / Path("naturalearth_lowres/naturalearth_lowres.shp")
return prepare_testfile(testfile_path, tmp_path, ext)
@pytest.fixture(scope="function", params=ALL_EXTS)
def naturalearth_lowres_all_ext(tmp_path, naturalearth_lowres, request):
return prepare_testfile(naturalearth_lowres, tmp_path, request.param)
@pytest.fixture(scope="function")
def naturalearth_lowres_vsi(tmp_path, naturalearth_lowres):
"""Wrap naturalearth_lowres as a zip file for VSI tests"""
path = tmp_path / f"{naturalearth_lowres.name}.zip"
with ZipFile(path, mode="w", compression=ZIP_DEFLATED, compresslevel=5) as out:
for ext in ["dbf", "prj", "shp", "shx", "cpg"]:
filename = f"{naturalearth_lowres.stem}.{ext}"
out.write(naturalearth_lowres.parent / filename, filename)
return path, f"/vsizip/{path}/{naturalearth_lowres.name}"
@pytest.fixture(scope="function")
def naturalearth_lowres_vsimem(naturalearth_lowres):
"""Write naturalearth_lowres to a vsimem file for VSI tests"""
meta, _, geometry, field_data = read(naturalearth_lowres)
name = f"pyogrio_fixture_{naturalearth_lowres.stem}"
dst_path = Path(f"/vsimem/{name}/{name}.gpkg")
meta["spatial_index"] = False
meta["geometry_type"] = "MultiPolygon"
write(dst_path, geometry, field_data, layer="naturalearth_lowres", **meta)
yield dst_path
vsi_rmtree(dst_path.parent)
@pytest.fixture(scope="session")
def line_zm_file():
return _data_dir / "line_zm.gpkg"
@pytest.fixture(scope="session")
def curve_file():
return _data_dir / "curve.gpkg"
@pytest.fixture(scope="session")
def curve_polygon_file():
return _data_dir / "curvepolygon.gpkg"
@pytest.fixture(scope="session")
def multisurface_file():
return _data_dir / "multisurface.gpkg"
@pytest.fixture(scope="session")
def test_gpkg_nulls():
return _data_dir / "test_gpkg_nulls.gpkg"
@pytest.fixture(scope="function")
def no_geometry_file(tmp_path):
# create a GPKG layer that does not include geometry
filename = tmp_path / "test_no_geometry.gpkg"
write(
filename,
layer="no_geometry",
geometry=None,
field_data=[np.array(["a", "b", "c"])],
fields=["col"],
)
return filename
@pytest.fixture(scope="function")
def list_field_values_file(tmp_path):
# Create a GeoJSON file with list values in a property
list_geojson = """{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": { "int64": 1, "list_int64": [0, 1] },
"geometry": { "type": "Point", "coordinates": [0, 2] }
},
{
"type": "Feature",
"properties": { "int64": 2, "list_int64": [2, 3] },
"geometry": { "type": "Point", "coordinates": [1, 2] }
},
{
"type": "Feature",
"properties": { "int64": 3, "list_int64": [4, 5] },
"geometry": { "type": "Point", "coordinates": [2, 2] }
},
{
"type": "Feature",
"properties": { "int64": 4, "list_int64": [6, 7] },
"geometry": { "type": "Point", "coordinates": [3, 2] }
},
{
"type": "Feature",
"properties": { "int64": 5, "list_int64": [8, 9] },
"geometry": { "type": "Point", "coordinates": [4, 2] }
}
]
}"""
filename = tmp_path / "test_ogr_types_list.geojson"
with open(filename, "w") as f:
_ = f.write(list_geojson)
return filename
@pytest.fixture(scope="function")
def nested_geojson_file(tmp_path):
# create GeoJSON file with nested properties
nested_geojson = """{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [0, 0]
},
"properties": {
"top_level": "A",
"intermediate_level": {
"bottom_level": "B"
}
}
}
]
}"""
filename = tmp_path / "test_nested.geojson"
with open(filename, "w") as f:
_ = f.write(nested_geojson)
return filename
@pytest.fixture(scope="function")
def datetime_file(tmp_path):
# create GeoJSON file with millisecond precision
datetime_geojson = """{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": { "col": "2020-01-01T09:00:00.123" },
"geometry": { "type": "Point", "coordinates": [1, 1] }
},
{
"type": "Feature",
"properties": { "col": "2020-01-01T10:00:00" },
"geometry": { "type": "Point", "coordinates": [2, 2] }
}
]
}"""
filename = tmp_path / "test_datetime.geojson"
with open(filename, "w") as f:
_ = f.write(datetime_geojson)
return filename
@pytest.fixture(scope="function")
def datetime_tz_file(tmp_path):
# create GeoJSON file with datetimes with timezone
datetime_tz_geojson = """{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": { "datetime_col": "2020-01-01T09:00:00.123-05:00" },
"geometry": { "type": "Point", "coordinates": [1, 1] }
},
{
"type": "Feature",
"properties": { "datetime_col": "2020-01-01T10:00:00-05:00" },
"geometry": { "type": "Point", "coordinates": [2, 2] }
}
]
}"""
filename = tmp_path / "test_datetime_tz.geojson"
with open(filename, "w") as f:
f.write(datetime_tz_geojson)
return filename
@pytest.fixture(scope="function")
def geojson_bytes(tmp_path):
"""Extracts first 3 records from naturalearth_lowres and writes to GeoJSON,
returning bytes"""
meta, _, geometry, field_data = read(
_data_dir / Path("naturalearth_lowres/naturalearth_lowres.shp"), max_features=3
)
filename = tmp_path / "test.geojson"
write(filename, geometry, field_data, **meta)
with open(filename, "rb") as f:
bytes_buffer = f.read()
return bytes_buffer
@pytest.fixture(scope="function")
def geojson_filelike(tmp_path):
"""Extracts first 3 records from naturalearth_lowres and writes to GeoJSON,
returning open file handle"""
meta, _, geometry, field_data = read(
_data_dir / Path("naturalearth_lowres/naturalearth_lowres.shp"), max_features=3
)
filename = tmp_path / "test.geojson"
write(filename, geometry, field_data, layer="test", **meta)
with open(filename, "rb") as f:
yield f
@pytest.fixture(scope="function")
def nonseekable_bytes(tmp_path):
# mock a non-seekable byte stream, such as a zstandard handle
class NonSeekableBytesIO(BytesIO):
def seekable(self):
return False
def seek(self, *args, **kwargs):
raise OSError("cannot seek")
# wrap GeoJSON into a non-seekable BytesIO
geojson = """{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": { },
"geometry": { "type": "Point", "coordinates": [1, 1] }
}
]
}"""
return NonSeekableBytesIO(geojson.encode("UTF-8"))
@pytest.fixture(
scope="session",
params=[
# Japanese
("CP932", ""),
# Chinese
("CP936", "中文"),
# Central European
("CP1250", "Đ"),
# Latin 1 / Western European
("CP1252", "ÿ"),
# Greek
("CP1253", "Φ"),
# Arabic
("CP1256", "ش"),
],
)
def encoded_text(request):
"""Return tuple with encoding name and very short sample text in that encoding
NOTE: it was determined through testing that code pages for MS-DOS do not
consistently work across all Python installations (in particular, fail with conda),
but ANSI code pages appear to work properly.
"""
return request.param