@@ -39,9 +39,33 @@ def bucket(session, cloudformation_outputs):
39
39
session .s3 .delete_objects (path = f"s3://{ bucket } /" )
40
40
41
41
42
- def test_read_csv (session , bucket ):
43
- boto3 .client ("s3" ).upload_file ("data_samples/small.csv" , bucket ,
44
- "data_samples/small.csv" )
45
- path = f"s3://{ bucket } /data_samples/small.csv"
46
- dataframe = session .spark .read_csv (path = path )
47
- assert dataframe .count () == 100
42
+ @pytest .mark .parametrize (
43
+ "sample_name" ,
44
+ ["nano" , "micro" , "small" ],
45
+ )
46
+ def test_read_csv (session , bucket , sample_name ):
47
+ path = f"data_samples/{ sample_name } .csv"
48
+ if sample_name == "micro" :
49
+ schema = "id SMALLINT, name STRING, value FLOAT, date TIMESTAMP"
50
+ timestamp_format = "yyyy-MM-dd"
51
+ elif sample_name == "small" :
52
+ schema = "id BIGINT, name STRING, date DATE"
53
+ timestamp_format = "dd-MM-yy"
54
+ elif sample_name == "nano" :
55
+ schema = "id INTEGER, name STRING, value DOUBLE, date TIMESTAMP, time TIMESTAMP"
56
+ timestamp_format = "yyyy-MM-dd"
57
+ dataframe = session .spark .read_csv (path = path ,
58
+ schema = schema ,
59
+ timestampFormat = timestamp_format ,
60
+ dateFormat = timestamp_format ,
61
+ header = True )
62
+
63
+ boto3 .client ("s3" ).upload_file (path , bucket , path )
64
+ path2 = f"s3://{ bucket } /{ path } "
65
+ dataframe2 = session .spark .read_csv (path = path2 ,
66
+ schema = schema ,
67
+ timestampFormat = timestamp_format ,
68
+ dateFormat = timestamp_format ,
69
+ header = True )
70
+ assert dataframe .count () == dataframe2 .count ()
71
+ assert len (list (dataframe .columns )) == len (list (dataframe2 .columns ))
0 commit comments