-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #15 from UV-CDAT/cdms_tests
Cdms tests
- Loading branch information
Showing
3 changed files
with
336 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import requests | ||
import cdms2 | ||
import cdutil | ||
import os | ||
import sys | ||
import cdat_info | ||
import numpy | ||
import unittest | ||
|
||
|
||
class TestMIPS(unittest.TestCase): | ||
def setUp(self): | ||
super(TestMIPS, self).setUp() | ||
self.filename="obs_timeseries.nc" | ||
myurl = "http://uvcdat.llnl.gov/cdat/sample_data/" + self.filename | ||
r = requests.get(myurl, stream=True) | ||
with open(self.filename, 'wb') as fd: | ||
for chunk in r.iter_content(chunk_size=1024): | ||
fd.write(chunk) | ||
|
||
def tearDown(self): | ||
super(TestMIPS, self).tearDown() | ||
os.remove(self.filename) | ||
|
||
def testAnnualSeasonalAverage(self): | ||
f=cdms2.open(self.filename, "r") | ||
|
||
# Read in the raw data EXCLUDING a leap year | ||
obs_timeseries1 = f('obs', time=slice(0,48)) # 1900.1. to 1903.12. | ||
# Read in the raw data INCLUDING a leap year | ||
obs_timeseries2 = f('obs', time=slice(0,60)) # 1900.1. to 1904.12., 1904 is year lear | ||
|
||
### Truncate first Jan, Feb and last Dec before get Annual cycle anomaly ... (to have fair DJF seasonal mean later) | ||
obs_timeseries1 = obs_timeseries1[2:-1] | ||
obs_timeseries2 = obs_timeseries2[2:-1] | ||
|
||
### Set monthly time bounds ... | ||
cdutil.setTimeBoundsMonthly(obs_timeseries1) | ||
cdutil.setTimeBoundsMonthly(obs_timeseries2) | ||
|
||
#### Removing Annual cycle ... | ||
obs_timeseries_ano1 = cdutil.ANNUALCYCLE.departures(obs_timeseries1) | ||
obs_timeseries_ano2 = cdutil.ANNUALCYCLE.departures(obs_timeseries2) | ||
|
||
#### Calculate time average ... | ||
obs_timeseries_ano_timeave1 = cdutil.averager(obs_timeseries_ano1, axis='t') ## This should be zero and it does | ||
obs_timeseries_ano_timeave2 = cdutil.averager(obs_timeseries_ano2, axis='t') ## This should be zero BUT it does NOT | ||
|
||
#### SEASONAL MEAN TEST #### | ||
obs_timeseries_ano1_DJF = cdutil.DJF(obs_timeseries_ano1, criteriaarg=[0.95,None]) | ||
obs_timeseries_ano2_DJF = cdutil.DJF(obs_timeseries_ano2, criteriaarg=[0.95,None]) | ||
obs_timeseries_ano1_JJA = cdutil.JJA(obs_timeseries_ano1, criteriaarg=[0.95,None]) | ||
obs_timeseries_ano2_JJA = cdutil.JJA(obs_timeseries_ano2, criteriaarg=[0.95,None]) | ||
|
||
#### Calculate time average ... | ||
obs_timeseries_ano1_DJF_timeave = cdutil.averager(obs_timeseries_ano1_DJF, axis='t') ## This should be zero and it does | ||
obs_timeseries_ano2_DJF_timeave = cdutil.averager(obs_timeseries_ano2_DJF, axis='t') ## This should be zero BUT it does NOT | ||
|
||
obs_timeseries_ano1_JJA_timeave = cdutil.averager(obs_timeseries_ano1_JJA, axis='t') ## This should be zero and it does | ||
obs_timeseries_ano2_JJA_timeave = cdutil.averager(obs_timeseries_ano2_JJA, axis='t') ## This should be zero and it does | ||
|
||
numpy.testing.assert_almost_equal(obs_timeseries_ano_timeave2, obs_timeseries_ano_timeave1,10) | ||
numpy.testing.assert_almost_equal(obs_timeseries_ano1_JJA_timeave, obs_timeseries_ano2_JJA_timeave, 10) | ||
numpy.testing.assert_almost_equal(obs_timeseries_ano1_DJF_timeave, obs_timeseries_ano2_DJF_timeave, 10) | ||
|
||
if __name__ == "__main__": | ||
basetest.run() | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,266 @@ | ||
import unittest | ||
import cdat_info | ||
import os | ||
import cdms2 | ||
import numpy | ||
import MV2 | ||
import regrid2 | ||
import cdutil | ||
import cdtime | ||
import datetime | ||
import tempfile | ||
|
||
class TestRegressions(unittest.TestCase): | ||
def getDataFile(self, name): | ||
pth = cdat_info.get_sampledata_path() | ||
return self.getFile(os.path.join(pth, name)) | ||
|
||
def setUp(self): | ||
self.orig_cwd = os.getcwd() | ||
self.files = [] | ||
self.tempdir = tempfile.mkdtemp() | ||
|
||
def getFile(self, path, mode="r"): | ||
f = cdms2.open(path, mode) | ||
self.files.append(f) | ||
return f | ||
|
||
def getTempFile(self, path, mode="r"): | ||
return self.getFile(os.path.join(self.tempdir, path), mode) | ||
|
||
def testCreateCopyLoseDType(self): | ||
incat = self.getFile(os.path.join(cdat_info.get_sampledata_path(), "tas_ccsr-95a.xml")) | ||
invar = incat['tas'] | ||
intype = invar.dtype | ||
|
||
outfile = self.getTempFile('newfile.nc', 'w') | ||
outfile.createVariableCopy(invar) | ||
|
||
outvar = outfile['tas'] | ||
outtype = outvar.dtype | ||
|
||
self.assertEqual(outtype, intype) | ||
|
||
def testDeleteAttributes(self): | ||
test_nm = 'CDMS_Test_del_attributes.nc' | ||
f = self.getTempFile(test_nm, "w") | ||
s = MV2.ones((20, 20)) | ||
s.id = "test" | ||
s.test_attribute = "some variable attribute" | ||
f.test_attribute = "some file attribute" | ||
f.write(s) | ||
f.close() | ||
f = self.getTempFile(test_nm, "r+") | ||
delattr(f, 'test_attribute') | ||
s = f["test"] | ||
del(s.test_attribute) | ||
f.close() | ||
f = self.getTempFile(test_nm) | ||
|
||
self.assertFalse(hasattr(f, 'test_attribute')) | ||
s = f["test"] | ||
self.assertFalse(hasattr(s, 'test_attribute')) | ||
|
||
def testContiguousRegridNANIssue(self): | ||
a=MV2.reshape(MV2.sin(MV2.arange(20000)),(2,1,100,100)) | ||
lon=cdms2.createAxis(MV2.arange(100)*3.6) | ||
lon.designateLongitude() | ||
lon.units="degrees_east" | ||
lon.id="longitude" | ||
|
||
lat = cdms2.createAxis(MV2.arange(100)*1.8-90.) | ||
lat.id="latitude" | ||
lat.designateLatitude() | ||
lat.units="degrees_north" | ||
|
||
lev = cdms2.createAxis([1000.]) | ||
lev.id="plev" | ||
lev.designateLevel() | ||
lev.units="hPa" | ||
|
||
t=cdms2.createAxis([0,31.]) | ||
t.id="time" | ||
t.designateTime() | ||
t.units="days since 2014" | ||
|
||
cdutil.setTimeBoundsMonthly(t) | ||
a.setAxisList((t,lev,lat,lon)) | ||
a=MV2.masked_less(a,.5) | ||
grd=cdms2.createGaussianGrid(64) | ||
|
||
a=a.ascontiguous() | ||
a=a.regrid(grd, regridTool="regrid2") | ||
a=cdutil.averager(a, axis='txy') | ||
self.assertEqual(a[0], 0.7921019540305255) | ||
|
||
def testBadCalendar(self): | ||
t = cdms2.createAxis([1, 2, 3, 4]) | ||
t.designateTime() | ||
t.setCalendar(cdtime.ClimCalendar) | ||
with self.assertRaises(cdms2.CDMSError): | ||
t.setCalendar(3421) | ||
|
||
def testAxisDatetime(self): | ||
ax = cdms2.createAxis([10.813224335543,],id="time") | ||
ax.units="seconds since 2014-10-06 10:12:13" | ||
ax.designateTime() | ||
|
||
dt = ax.asdatetime() | ||
|
||
self.assertEqual(dt[0], datetime.datetime(2014, 10, 6, 10, 12, 23, 813)) | ||
|
||
def testFileURI(self): | ||
pth = os.path.join(cdat_info.get_sampledata_path(),"clt.nc") | ||
f = cdms2.open("file://"+pth) | ||
self.assertEqual("file://" + pth, f.uri) | ||
|
||
def testDefaultFillValueNotNAN(self): | ||
self.assertFalse(numpy.isnan(MV2.array(0.).fill_value)) | ||
|
||
def testDimUnlimited(self): | ||
f = self.getDataFile("tas_mo_clim.nc") | ||
v = f.variables['climseas'] | ||
t = v.getTime() | ||
self.assertTrue(t.isUnlimited()) | ||
|
||
def testJSON(self): | ||
f = self.getFile(cdat_info.get_sampledata_path()+"/clt.nc") | ||
s = f("clt") | ||
jsn = s.dumps() | ||
s2 = cdms2.createVariable(jsn, fromJSON=True) | ||
assert(numpy.allclose(s2, s)) | ||
|
||
def testFullAveraging(self): | ||
cdms2.setAutoBounds("on") | ||
a = MV2.masked_greater(MV2.array([1,4,5,6,7,8,9.]),.5) | ||
self.assertTrue(numpy.ma.is_masked(cdutil.averager(a))) | ||
|
||
def testAxisDetection(self): | ||
val = [1,2,3] | ||
a = cdms2.createAxis(val) | ||
|
||
#First let's make sure it does not detect anything | ||
self.assertFalse(a.isLatitude()) | ||
self.assertFalse(a.isLongitude()) | ||
self.assertFalse(a.isLevel()) | ||
self.assertFalse(a.isTime()) | ||
|
||
#Now quick tests for making it latitude | ||
for u in ["DEGREESN"," deGREEn ","degrees_north","degree_north","degree_n","degrees_n","degreen","degreesn"]: | ||
a.units = u | ||
self.assertTrue(a.isLatitude()) | ||
a.units="" | ||
self.assertFalse(a.isLatitude()) | ||
for i in ["lat","LAT","latitude","latituDE"]: | ||
a.id = i | ||
self.assertTrue(a.isLatitude()) | ||
a.id="axis" | ||
self.assertFalse(a.isLatitude()) | ||
a.axis="Y" | ||
self.assertTrue(a.isLatitude()) | ||
del(a.axis) | ||
self.assertFalse(a.isLatitude()) | ||
#Now quick tests for making it longitude | ||
for u in ["DEGREESe"," deGREEe ","degrees_east","degree_east","degree_e","degrees_e","degreee","degreese"]: | ||
a.units = u | ||
self.assertTrue(a.isLongitude()) | ||
a.units="" | ||
self.assertFalse(a.isLongitude()) | ||
for i in ["lon","LON","longitude","lOngituDE"]: | ||
a.id = i | ||
self.assertTrue(a.isLongitude()) | ||
a.id="axis" | ||
self.assertFalse(a.isLongitude()) | ||
a.axis="X" | ||
self.assertTrue(a.isLongitude()) | ||
del(a.axis) | ||
self.assertFalse(a.isLongitude()) | ||
#Now quick tests for making it level | ||
for u in ["Pa","hPa","psi","N/m2","N*m-2","kg*m-1*s-2","atm","bar","torr"]: | ||
a.units = u | ||
self.assertTrue(a.isLevel()) | ||
a.units="" | ||
self.assertFalse(a.isLevel()) | ||
for i in ["lev","LEV","level","lEvEL","depth"," depth"]: | ||
a.id = i | ||
self.assertTrue(a.isLevel()) | ||
a.id="axis" | ||
self.assertFalse(a.isLevel()) | ||
a.axis="Z" | ||
self.assertTrue(a.isLevel()) | ||
del(a.axis) | ||
self.assertFalse(a.isLevel()) | ||
a.positive="up" | ||
self.assertTrue(a.isLevel()) | ||
a.positive="positive" | ||
self.assertFalse(a.isLevel()) | ||
|
||
def testSimpleWrite(self): | ||
f = self.getTempFile("test_simple_write.nc", "w") | ||
data = numpy.random.random((20, 64, 128)) | ||
data = MV2.array(data) | ||
data.getAxis(0).designateTime() | ||
f.write(data, dtype=numpy.float32, id="test_simple") | ||
|
||
def testRegridZonal(self): | ||
f = self.getFile(os.path.join(cdat_info.get_sampledata_path(), "clt.nc")) | ||
s = f("clt", slice(0, 1)) | ||
g = cdms2.createGaussianGrid(64) | ||
gl = cdms2.createZonalGrid(g) | ||
regridded = s.regrid(gl) | ||
|
||
def testAurore(self): | ||
""" | ||
No idea what this is testing. | ||
""" | ||
from cdms2.coord import TransientAxis2D, TransientVirtualAxis | ||
from cdms2.hgrid import TransientCurveGrid | ||
from cdms2.gengrid import TransientGenericGrid | ||
|
||
def CurveGrid(v, lat, lon): | ||
ni, nj = lat.shape | ||
idi = "i" | ||
idj = "j" | ||
lat_units = 'degrees_north' | ||
lon_units = 'degrees_east' | ||
iaxis = TransientVirtualAxis(idi, ni) | ||
jaxis = TransientVirtualAxis(idj, nj) | ||
lataxis = TransientAxis2D(lat, axes=(iaxis, jaxis), attributes={'units': lat_units}, id="latitude") | ||
lonaxis = TransientAxis2D(lon, axes=(iaxis, jaxis), attributes={'units': lon_units}, id="longitude") | ||
curvegrid = TransientGenericGrid(lataxis, lonaxis, tempmask=None) | ||
attributs = None | ||
vid = None | ||
if hasattr(v, 'attributes'): attributs = v.attributes | ||
if hasattr(v, 'id'): vid = v.id | ||
axis0 = v.getAxis(0) | ||
return cdms2.createVariable(v, axes=[axis0,iaxis,jaxis], grid=curvegrid, attributes=attributs, id=v.id) | ||
|
||
lat = MV2.array([[-20, -10, 0, -15, -5]], 'f') | ||
lon = MV2.array([[0, 10, 20, 50, 60]], 'f') | ||
|
||
data1 = MV2.array([[[2, 3, 1, 6, 2]]], 'f') | ||
data2 = MV2.array([[[2, 3, 1, 6, 2]]], 'f') | ||
|
||
data1 = CurveGrid(data1, lat, lon) | ||
data2 = CurveGrid(data2, lat, lon) | ||
|
||
result = MV2.concatenate([data1, data2], axis=0) | ||
|
||
def testSliceMetadata(self): | ||
f = self.getFile(cdat_info.get_sampledata_path() + "/clt.nc") | ||
s = f("clt") | ||
s0 = s[0] | ||
self.assertTrue(s0.getAxis(0).isLatitude()) | ||
self.assertTrue(s0.getAxis(1).isLongitude()) | ||
|
||
def testReshapeMaskedAverage(self): | ||
a = MV2.arange(100) | ||
a = MV2.reshape(a, (10, 10)) | ||
self.assertEqual(a.shape, (10, 10)) | ||
self.assertEqual(len(a.getAxisList()), 2) | ||
a = MV2.masked_greater(a, 23) | ||
b = MV2.average(a, axis=0) | ||
c = a - b | ||
|
||
if __name__ == "__main__": | ||
basetest.run() |