Skip to content

Commit

Permalink
add voronoi grid test (xfail for now)
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbonelli committed Nov 27, 2023
1 parent 636aa5d commit 6e0e6c5
Showing 1 changed file with 264 additions and 0 deletions.
264 changes: 264 additions & 0 deletions autotest/prt/test_prt_fmi08.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
"""
Tests a PRT model on the Voronoi grid demonstrated
in Flopy's Voronoi example:
https://flopy.readthedocs.io/en/latest/Notebooks/dis_voronoi_example.html
Two variants are included, first with straight
pathlines and then with more interesting paths.
Particles are released from the central cell
to which constant concentration is assigned
in the transport model.
TODO: support parallel adjacent cell faces,
filter duplicated vertices as can appear in
flopy.utils.voronoi output via scipy/Qhull,
expected to fail for now
"""


import os
from pathlib import Path
from pprint import pformat

import flopy
import matplotlib.pyplot as plt
import numpy as np
import pytest
from flopy.discretization import VertexGrid
from flopy.utils import GridIntersect
from flopy.utils.triangle import Triangle
from flopy.utils.voronoi import VoronoiGrid
from shapely.geometry import LineString, Point

from prt_test_utils import get_model_name

simname = "prtfmi08"
ex = [simname]
xmin = 0.0
xmax = 2000.0
ymin = 0.0
ymax = 1000.0
top = 1.0
botm = [0.0]
angle_min = 30
area_max = 1000.0
delr = area_max**0.5
nlay = 1
ncol = xmax / delr
nrow = ymax / delr
nodes = ncol * nrow
porosity = 0.1


def get_grid(workspace, targets):
workspace.mkdir(exist_ok=True)
tri = Triangle(
maximum_area=area_max,
angle=angle_min,
model_ws=workspace,
exe_name=targets.triangle,
)
poly = np.array(((xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax)))
tri.add_polygon(poly)
tri.build(verbose=False)
return VoronoiGrid(tri)

# fig = plt.figure(figsize=(10, 10))
# ax = plt.subplot(1, 1, 1, aspect="equal")
# pc = tri.plot(ax=ax)


def build_gwf_sim(name, ws, targets):
ws = Path(ws)
sim_ws = ws / "tracking"
gwfname = get_model_name(name, "gwf")

# create grid
grid = get_grid(ws / "grid", targets)
vgrid = VertexGrid(**grid.get_gridprops_vertexgrid(), nlay=1)
ibd = np.zeros(vgrid.ncpl, dtype=int)
gi = GridIntersect(vgrid)

# identify cells on left edge
line = LineString([(xmin, ymin), (xmin, ymax)])
cells0 = gi.intersect(line)["cellids"]
cells0 = np.array(list(cells0))
ibd[cells0] = 1

# identify cells on right edge
line = LineString([(xmax, ymin), (xmax, ymax)])
cells1 = gi.intersect(line)["cellids"]
cells1 = np.array(list(cells1))
ibd[cells1] = 2

# identify release cell
point = Point((500, 500))
cells2 = gi.intersect(point)["cellids"]
cells2 = np.array(list(cells2))
ibd[cells2] = 3

# create simulation
sim_ws = ws / "flow"
sim = flopy.mf6.MFSimulation(
sim_name=name, version="mf6", exe_name=targets.mf6, sim_ws=sim_ws
)
tdis = flopy.mf6.ModflowTdis(
sim, time_units="DAYS", perioddata=[[1.0, 1, 1.0]]
)
gwf = flopy.mf6.ModflowGwf(sim, modelname=gwfname, save_flows=True)
ims = flopy.mf6.ModflowIms(
sim,
print_option="SUMMARY",
complexity="complex",
outer_dvclose=1.0e-8,
inner_dvclose=1.0e-8,
)
disv = flopy.mf6.ModflowGwfdisv(
gwf, nlay=nlay, **grid.get_disv_gridprops(), top=top, botm=botm
)
npf = flopy.mf6.ModflowGwfnpf(
gwf,
xt3doptions=[(True)],
k=10.0,
save_saturation=True,
save_specific_discharge=True,
)
ic = flopy.mf6.ModflowGwfic(gwf)

chdlist = []
for icpl in cells0:
chdlist.append([(0, icpl), 1.0])
for icpl in cells1:
chdlist.append([(0, icpl), 0.0])
chd = flopy.mf6.ModflowGwfchd(gwf, stress_period_data=chdlist)
oc = flopy.mf6.ModflowGwfoc(
gwf,
budget_filerecord=f"{gwfname}.bud",
head_filerecord=f"{gwfname}.hds",
saverecord=[("HEAD", "ALL"), ("BUDGET", "ALL")],
printrecord=[("HEAD", "LAST"), ("BUDGET", "LAST")],
)
return sim


def build_prt_sim(name, ws, targets):
ws = Path(ws)
sim_ws = ws / "tracking"
gwfname = get_model_name(name, "gwf")
prtname = get_model_name(name, "prt")

# create grid
grid = get_grid(ws / "grid", targets)
gridprops = grid.get_gridprops_vertexgrid()
vgrid = VertexGrid(**gridprops, nlay=1)
ibd = np.zeros(vgrid.ncpl, dtype=int)
gi = GridIntersect(vgrid)

# identify cells on left edge
line = LineString([(xmin, ymin), (xmin, ymax)])
cells0 = gi.intersect(line)["cellids"]
cells0 = np.array(list(cells0))
ibd[cells0] = 1

# identify cells on right edge
line = LineString([(xmax, ymin), (xmax, ymax)])
cells1 = gi.intersect(line)["cellids"]
cells1 = np.array(list(cells1))
ibd[cells1] = 2

# identify release cell
point = Point((500, 500))
cells2 = vgrid.intersect(point.x, point.y, 0.5)
cells2 = np.array([cells2])
ibd[cells2] = 3

# create simulation
sim = flopy.mf6.MFSimulation(
sim_name=name, version="mf6", exe_name=targets.mf6, sim_ws=sim_ws
)
tdis = flopy.mf6.ModflowTdis(
sim, time_units="DAYS", perioddata=[[1.0, 1, 1.0]]
)
prt = flopy.mf6.ModflowPrt(sim, modelname=prtname)
disv = flopy.mf6.ModflowGwfdisv(
prt, nlay=nlay, **grid.get_disv_gridprops(), top=top, botm=botm
)
flopy.mf6.ModflowPrtmip(prt, pname="mip", porosity=porosity)
releasepts = [
# index, (layer, cell index), x, y, z
(0, (0, cells2[0]), point.x, point.y, 0.5)
]
prp_track_file = f"{prtname}.prp.trk"
prp_track_csv_file = f"{prtname}.prp.trk.csv"
flopy.mf6.ModflowPrtprp(
prt,
pname="prp1",
filename=f"{prtname}_1.prp",
nreleasepts=len(releasepts),
packagedata=releasepts,
perioddata={0: ["FIRST"]},
track_filerecord=[prp_track_file],
trackcsv_filerecord=[prp_track_csv_file],
boundnames=True,
)
prt_track_file = f"{prtname}.trk"
prt_track_csv_file = f"{prtname}.trk.csv"
flopy.mf6.ModflowPrtoc(
prt,
pname="oc",
track_filerecord=[prt_track_file],
trackcsv_filerecord=[prt_track_csv_file],
)
gwf_budget_file = f"{gwfname}.bud"
gwf_head_file = f"{gwfname}.hds"
flopy.mf6.ModflowPrtfmi(
prt,
packagedata=[
("GWFHEAD", sim_ws.parent / "flow" / gwf_head_file),
("GWFBUDGET", sim_ws.parent / "flow" / gwf_budget_file),
],
)
ems = flopy.mf6.ModflowEms(
sim,
pname="ems",
filename=f"{prtname}.ems",
)
sim.register_solution_package(ems, [prt.name])
return sim


@pytest.mark.parametrize("name", ex)
@pytest.mark.xfail
def test_mf6model(name, function_tmpdir, targets):
# workspace
ws = function_tmpdir

# build mf6 models
gwfsim = build_gwf_sim(name, ws, targets)
prtsim = build_prt_sim(name, ws, targets)

# run gwf
gwfsim.write_simulation()
success, buff = gwfsim.run_simulation(report=True)
assert success, pformat(buff)

# get gwf results
gwf = gwfsim.get_model()
head = gwf.output.head().get_data()
bdobj = gwf.output.budget()
spdis = bdobj.get_data(text="DATA-SPDIS")[0]

# plot gwf results
fig = plt.figure(figsize=(15, 15))
ax = plt.subplot(1, 1, 1, aspect="equal")
pmv = flopy.plot.PlotMapView(gwf)
pmv.plot_array(head, cmap="jet", alpha=0.5)
pmv.plot_vector(spdis["qx"], spdis["qy"], alpha=0.25)
# plt.show()

# run prt
prtsim.write_simulation()
success, buff = prtsim.run_simulation(report=True)
assert success, pformat(buff)

0 comments on commit 6e0e6c5

Please sign in to comment.