33from warnings import catch_warnings
44from distutils .version import LooseVersion
55from pandas import DataFrame , RangeIndex , Int64Index , get_option
6- from pandas .compat import range
6+ from pandas .compat import string_types
7+ from pandas .core .common import AbstractMethodError
78from pandas .io .common import get_filepath_or_buffer
89
910
@@ -39,37 +40,75 @@ def get_engine(engine):
3940 return FastParquetImpl ()
4041
4142
42- class PyArrowImpl (object ):
43+ class BaseImpl (object ):
44+
45+ api = None # module
46+
47+ @staticmethod
48+ def validate_dataframe (df ):
49+
50+ if not isinstance (df , DataFrame ):
51+ raise ValueError ("to_parquet only supports IO with DataFrames" )
52+
53+ # must have value column names (strings only)
54+ if df .columns .inferred_type not in {'string' , 'unicode' }:
55+ raise ValueError ("parquet must have string column names" )
56+
57+ # index level names must be strings
58+ valid_names = all (
59+ isinstance (name , string_types )
60+ for name in df .index .names
61+ if name is not None
62+ )
63+ if not valid_names :
64+ raise ValueError ("Index level names must be strings" )
65+
66+ def write (self , df , path , compression , ** kwargs ):
67+ raise AbstractMethodError (self )
68+
69+ def read (self , path , columns = None , ** kwargs ):
70+ raise AbstractMethodError (self )
71+
72+
73+ class PyArrowImpl (BaseImpl ):
4374
4475 def __init__ (self ):
4576 # since pandas is a dependency of pyarrow
4677 # we need to import on first use
47-
4878 try :
4979 import pyarrow
5080 import pyarrow .parquet
5181 except ImportError :
52- raise ImportError ("pyarrow is required for parquet support\n \n "
53- "you can install via conda\n "
54- "conda install pyarrow -c conda-forge\n "
55- "\n or via pip\n "
56- "pip install -U pyarrow\n " )
57-
82+ raise ImportError (
83+ "pyarrow is required for parquet support\n \n "
84+ "you can install via conda\n "
85+ "conda install pyarrow -c conda-forge\n "
86+ "\n or via pip\n "
87+ "pip install -U pyarrow\n "
88+ )
5889 if LooseVersion (pyarrow .__version__ ) < '0.4.1' :
59- raise ImportError ("pyarrow >= 0.4.1 is required for parquet"
60- "support\n \n "
61- "you can install via conda\n "
62- "conda install pyarrow -c conda-forge\n "
63- "\n or via pip\n "
64- "pip install -U pyarrow\n " )
65-
66- self ._pyarrow_lt_050 = LooseVersion (pyarrow .__version__ ) < '0.5.0'
67- self ._pyarrow_lt_060 = LooseVersion (pyarrow .__version__ ) < '0.6.0'
90+ raise ImportError (
91+ "pyarrow >= 0.4.1 is required for parquet support\n \n "
92+ "you can install via conda\n "
93+ "conda install pyarrow -c conda-forge\n "
94+ "\n or via pip\n "
95+ "pip install -U pyarrow\n "
96+ )
97+
98+ self ._pyarrow_lt_060 = (
99+ LooseVersion (pyarrow .__version__ ) < LooseVersion ('0.6.0' ))
100+ self ._pyarrow_lt_070 = (
101+ LooseVersion (pyarrow .__version__ ) < LooseVersion ('0.7.0' ))
102+
68103 self .api = pyarrow
69104
70105 def write (self , df , path , compression = 'snappy' ,
71106 coerce_timestamps = 'ms' , ** kwargs ):
107+ self .validate_dataframe (df )
108+ if self ._pyarrow_lt_070 :
109+ self ._validate_write_lt_070 (df )
72110 path , _ , _ = get_filepath_or_buffer (path )
111+
73112 if self ._pyarrow_lt_060 :
74113 table = self .api .Table .from_pandas (df , timestamps_to_ms = True )
75114 self .api .parquet .write_table (
@@ -83,36 +122,75 @@ def write(self, df, path, compression='snappy',
83122
84123 def read (self , path , columns = None , ** kwargs ):
85124 path , _ , _ = get_filepath_or_buffer (path )
125+ if self ._pyarrow_lt_070 :
126+ return self .api .parquet .read_pandas (path , columns = columns ,
127+ ** kwargs ).to_pandas ()
128+ kwargs ['use_pandas_metadata' ] = True
86129 return self .api .parquet .read_table (path , columns = columns ,
87130 ** kwargs ).to_pandas ()
88131
89-
90- class FastParquetImpl (object ):
132+ def _validate_write_lt_070 (self , df ):
133+ # Compatibility shim for pyarrow < 0.7.0
134+ # TODO: Remove in pandas 0.22.0
135+ from pandas .core .indexes .multi import MultiIndex
136+ if isinstance (df .index , MultiIndex ):
137+ msg = (
138+ "Multi-index DataFrames are only supported "
139+ "with pyarrow >= 0.7.0"
140+ )
141+ raise ValueError (msg )
142+ # Validate index
143+ if not isinstance (df .index , Int64Index ):
144+ msg = (
145+ "pyarrow < 0.7.0 does not support serializing {} for the "
146+ "index; you can .reset_index() to make the index into "
147+ "column(s), or install the latest version of pyarrow or "
148+ "fastparquet."
149+ )
150+ raise ValueError (msg .format (type (df .index )))
151+ if not df .index .equals (RangeIndex (len (df ))):
152+ raise ValueError (
153+ "pyarrow < 0.7.0 does not support serializing a non-default "
154+ "index; you can .reset_index() to make the index into "
155+ "column(s), or install the latest version of pyarrow or "
156+ "fastparquet."
157+ )
158+ if df .index .name is not None :
159+ raise ValueError (
160+ "pyarrow < 0.7.0 does not serialize indexes with a name; you "
161+ "can set the index.name to None or install the latest version "
162+ "of pyarrow or fastparquet."
163+ )
164+
165+
166+ class FastParquetImpl (BaseImpl ):
91167
92168 def __init__ (self ):
93169 # since pandas is a dependency of fastparquet
94170 # we need to import on first use
95-
96171 try :
97172 import fastparquet
98173 except ImportError :
99- raise ImportError ("fastparquet is required for parquet support\n \n "
100- "you can install via conda\n "
101- "conda install fastparquet -c conda-forge\n "
102- "\n or via pip\n "
103- "pip install -U fastparquet" )
104-
174+ raise ImportError (
175+ "fastparquet is required for parquet support\n \n "
176+ "you can install via conda\n "
177+ "conda install fastparquet -c conda-forge\n "
178+ "\n or via pip\n "
179+ "pip install -U fastparquet"
180+ )
105181 if LooseVersion (fastparquet .__version__ ) < '0.1.0' :
106- raise ImportError ("fastparquet >= 0.1.0 is required for parquet "
107- "support\n \n "
108- "you can install via conda\n "
109- "conda install fastparquet -c conda-forge\n "
110- "\n or via pip\n "
111- "pip install -U fastparquet" )
112-
182+ raise ImportError (
183+ "fastparquet >= 0.1.0 is required for parquet "
184+ "support\n \n "
185+ "you can install via conda\n "
186+ "conda install fastparquet -c conda-forge\n "
187+ "\n or via pip\n "
188+ "pip install -U fastparquet"
189+ )
113190 self .api = fastparquet
114191
115192 def write (self , df , path , compression = 'snappy' , ** kwargs ):
193+ self .validate_dataframe (df )
116194 # thriftpy/protocol/compact.py:339:
117195 # DeprecationWarning: tostring() is deprecated.
118196 # Use tobytes() instead.
@@ -123,7 +201,8 @@ def write(self, df, path, compression='snappy', **kwargs):
123201
124202 def read (self , path , columns = None , ** kwargs ):
125203 path , _ , _ = get_filepath_or_buffer (path )
126- return self .api .ParquetFile (path ).to_pandas (columns = columns , ** kwargs )
204+ parquet_file = self .api .ParquetFile (path )
205+ return parquet_file .to_pandas (columns = columns , ** kwargs )
127206
128207
129208def to_parquet (df , path , engine = 'auto' , compression = 'snappy' , ** kwargs ):
@@ -144,43 +223,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
144223 kwargs
145224 Additional keyword arguments passed to the engine
146225 """
147-
148226 impl = get_engine (engine )
149-
150- if not isinstance (df , DataFrame ):
151- raise ValueError ("to_parquet only support IO with DataFrames" )
152-
153- valid_types = {'string' , 'unicode' }
154-
155- # validate index
156- # --------------
157-
158- # validate that we have only a default index
159- # raise on anything else as we don't serialize the index
160-
161- if not isinstance (df .index , Int64Index ):
162- raise ValueError ("parquet does not support serializing {} "
163- "for the index; you can .reset_index()"
164- "to make the index into column(s)" .format (
165- type (df .index )))
166-
167- if not df .index .equals (RangeIndex .from_range (range (len (df )))):
168- raise ValueError ("parquet does not support serializing a "
169- "non-default index for the index; you "
170- "can .reset_index() to make the index "
171- "into column(s)" )
172-
173- if df .index .name is not None :
174- raise ValueError ("parquet does not serialize index meta-data on a "
175- "default index" )
176-
177- # validate columns
178- # ----------------
179-
180- # must have value column names (strings only)
181- if df .columns .inferred_type not in valid_types :
182- raise ValueError ("parquet must have string column names" )
183-
184227 return impl .write (df , path , compression = compression , ** kwargs )
185228
186229
0 commit comments