@@ -34,7 +34,50 @@ def get_engine(engine):
3434 return FastParquetImpl ()
3535
3636
37- class PyArrowImpl (object ):
37+ class BaseImpl (object ):
38+
39+ api = None # module
40+
41+ @staticmethod
42+ def _validate_index (df ):
43+ if not isinstance (df .index , Int64Index ):
44+ msg = (
45+ "parquet does not support serializing {} for the index;"
46+ "you can .reset_index() to make the index into column(s)"
47+ )
48+ raise ValueError (msg .format (type (df .index )))
49+ if not df .index .equals (RangeIndex (len (df ))):
50+ raise ValueError (
51+ "parquet does not support serializing a non-default index "
52+ "for the index; you can .reset_index() to make the index "
53+ "into column(s)"
54+ )
55+ if df .index .name is not None :
56+ raise ValueError (
57+ "parquet does not serialize index meta-data "
58+ "on a default index"
59+ )
60+
61+ @staticmethod
62+ def _validate_columns (df ):
63+ # must have value column names (strings only)
64+ if df .columns .inferred_type not in {'string' , 'unicode' }:
65+ raise ValueError ("parquet must have string column names" )
66+
67+ def validate_dataframe (self , df ):
68+ if not isinstance (df , DataFrame ):
69+ raise ValueError ("to_parquet only support IO with DataFrames" )
70+ self ._validate_columns (df )
71+ self ._validate_index (df )
72+
73+ def write (self , df , path , compression , ** kwargs ):
74+ raise NotImplementedError ()
75+
76+ def read (self , path , columns = None , ** kwargs ):
77+ raise NotImplementedError ()
78+
79+
80+ class PyArrowImpl (BaseImpl ):
3881
3982 def __init__ (self ):
4083 # since pandas is a dependency of pyarrow
@@ -63,8 +106,14 @@ def __init__(self):
63106 self ._pyarrow_lt_070 = LooseVersion (pyarrow .__version__ ) < '0.7.0'
64107 self .api = pyarrow
65108
109+ def _validate_index (self , df ):
110+ # pyarrow >= 0.7.0 supports multi-indexes so no need to validate
111+ if self ._pyarrow_lt_070 :
112+ super (PyArrowImpl , self )._validate_index (df )
113+
66114 def write (self , df , path , compression = 'snappy' ,
67115 coerce_timestamps = 'ms' , ** kwargs ):
116+ self .validate_dataframe (df )
68117 path , _ , _ = get_filepath_or_buffer (path )
69118 if self ._pyarrow_lt_060 :
70119 table = self .api .Table .from_pandas (df , timestamps_to_ms = True )
@@ -83,12 +132,11 @@ def read(self, path, columns=None, **kwargs):
83132 ** kwargs ).to_pandas ()
84133
85134
86- class FastParquetImpl (object ):
135+ class FastParquetImpl (BaseImpl ):
87136
88137 def __init__ (self ):
89138 # since pandas is a dependency of fastparquet
90139 # we need to import on first use
91-
92140 try :
93141 import fastparquet
94142 except ImportError :
@@ -109,6 +157,7 @@ def __init__(self):
109157 self .api = fastparquet
110158
111159 def write (self , df , path , compression = 'snappy' , ** kwargs ):
160+ self .validate_dataframe (df )
112161 # thriftpy/protocol/compact.py:339:
113162 # DeprecationWarning: tostring() is deprecated.
114163 # Use tobytes() instead.
@@ -140,46 +189,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
140189 kwargs
141190 Additional keyword arguments passed to the engine
142191 """
143-
144192 impl = get_engine (engine )
145-
146- if not isinstance (df , DataFrame ):
147- raise ValueError ("to_parquet only support IO with DataFrames" )
148-
149- valid_types = {'string' , 'unicode' }
150-
151- # validate that we have only a default index
152- # raise on anything else as we don't serialize the index
153- # *unless* we're using pyarrow >= 0.7.1 which does support multi-indexes
154- if impl .api .__name__ == 'pyarrow' and not impl ._pyarrow_lt_070 :
155- validate_index = False
156- else :
157- validate_index = True
158-
159- if validate_index :
160- if not isinstance (df .index , Int64Index ):
161- raise ValueError ("parquet does not support serializing {} "
162- "for the index; you can .reset_index()"
163- "to make the index into column(s)" .format (
164- type (df .index )))
165-
166- if not df .index .equals (RangeIndex .from_range (range (len (df )))):
167- raise ValueError ("parquet does not support serializing a "
168- "non-default index for the index; you "
169- "can .reset_index() to make the index "
170- "into column(s)" )
171-
172- if df .index .name is not None :
173- raise ValueError ("parquet does not serialize index meta-data on a "
174- "default index" )
175-
176- # validate columns
177- # ----------------
178-
179- # must have value column names (strings only)
180- if df .columns .inferred_type not in valid_types :
181- raise ValueError ("parquet must have string column names" )
182-
183193 return impl .write (df , path , compression = compression , ** kwargs )
184194
185195
0 commit comments