diff --git a/include/cassandra.h b/include/cassandra.h
index 573fd8826..3e1c55cca 100644
--- a/include/cassandra.h
+++ b/include/cassandra.h
@@ -557,7 +557,8 @@ typedef enum CassColumnType_ {
   CASS_COLUMN_TYPE_REGULAR,
   CASS_COLUMN_TYPE_PARTITION_KEY,
   CASS_COLUMN_TYPE_CLUSTERING_KEY,
-  CASS_COLUMN_TYPE_STATIC
+  CASS_COLUMN_TYPE_STATIC,
+  CASS_COLUMN_TYPE_COMPACT_VALUE
 } CassColumnType;
 
 typedef enum  CassErrorSource_ {
@@ -2322,6 +2323,9 @@ cass_aggregate_meta_final_func(const CassAggregateMeta* aggregate_meta);
 /**
  * Gets the initial condition value for the aggregate.
  *
+ * Note: The value of the initial condition will always be
+ * a "varchar" type for Cassandra 3.0+.
+ *
  * @public @memberof CassAggregateMeta
  *
  * @param[in] aggregate_meta
@@ -4278,6 +4282,18 @@ cass_data_type_set_class_name_n(CassDataType* data_type,
                                 const char* class_name,
                                 size_t class_name_length);
 
+/**
+ * Gets the sub-data type count of a UDT (user defined type), tuple
+ * or collection.
+ *
+ * Note: Only valid for UDT, tuple and collection data types.
+ *
+ * @param[in] data_type
+ * @return Returns the number of sub-data types
+ */
+CASS_EXPORT size_t
+cass_data_sub_type_count(const CassDataType* data_type);
+
 /**
  * Gets the sub-data type of a UDT (user defined type), tuple or collection at
  * the specified index.
diff --git a/src/abstract_data.hpp b/src/abstract_data.hpp
index acac4e927..3fd5fad19 100644
--- a/src/abstract_data.hpp
+++ b/src/abstract_data.hpp
@@ -143,7 +143,7 @@ class AbstractData {
 protected:
   virtual size_t get_indices(StringRef name,
                              IndexVec* indices) = 0;
-  virtual const SharedRefPtr& get_type(size_t index) const = 0;
+  virtual const DataType::ConstPtr& get_type(size_t index) const = 0;
 
 private:
   template 
@@ -152,7 +152,7 @@ class AbstractData {
       return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
     }
     IsValidDataType is_valid_type;
-    SharedRefPtr data_type(get_type(index));
+    DataType::ConstPtr data_type(get_type(index));
     if (data_type && !is_valid_type(value, data_type)) {
       return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
     }
diff --git a/src/collection.hpp b/src/collection.hpp
index 1c2dbf337..7863bc132 100644
--- a/src/collection.hpp
+++ b/src/collection.hpp
@@ -41,7 +41,7 @@ class Collection : public RefCounted {
     items_.reserve(item_count);
   }
 
-  Collection(const SharedRefPtr& data_type,
+  Collection(const CollectionType::ConstPtr& data_type,
              size_t item_count)
     : data_type_(data_type) {
     items_.reserve(item_count);
@@ -51,7 +51,7 @@ class Collection : public RefCounted {
     return static_cast(data_type_->value_type());
   }
 
-  const SharedRefPtr& data_type() const { return data_type_; }
+  const CollectionType::ConstPtr& data_type() const { return data_type_; }
   const BufferVec& items() const { return items_; }
 
 #define APPEND_TYPE(Type)                  \
@@ -129,7 +129,7 @@ class Collection : public RefCounted {
   void encode_items_uint16(char* buf) const;
 
 private:
-  SharedRefPtr data_type_;
+  CollectionType::ConstPtr data_type_;
   BufferVec items_;
 
 private:
diff --git a/src/collection_iterator.cpp b/src/collection_iterator.cpp
index 76c92e734..c652c5f3e 100644
--- a/src/collection_iterator.cpp
+++ b/src/collection_iterator.cpp
@@ -33,7 +33,7 @@ char* CollectionIterator::decode_value(char* position) {
   int32_t size;
   char* buffer = decode_size(protocol_version, position, size);
 
-  SharedRefPtr data_type;
+  DataType::ConstPtr data_type;
   if (collection_->value_type() == CASS_VALUE_TYPE_MAP) {
     data_type = (index_ % 2 == 0) ? collection_->primary_data_type()
                                   : collection_->secondary_data_type();
diff --git a/src/collection_iterator.hpp b/src/collection_iterator.hpp
index fbbe2dd30..fee7bb7c4 100644
--- a/src/collection_iterator.hpp
+++ b/src/collection_iterator.hpp
@@ -67,7 +67,7 @@ class TupleIterator : public ValueIterator {
       : ValueIterator(CASS_ITERATOR_TYPE_TUPLE)
       , tuple_(tuple)
       , position_(tuple->data()) {
-    SharedRefPtr collection_type(tuple->data_type());
+    CollectionType::ConstPtr collection_type(tuple->data_type());
     next_ = collection_type->types().begin();
     end_ = collection_type->types().end();
   }
diff --git a/src/control_connection.cpp b/src/control_connection.cpp
index 60b1637e8..61df6d3f3 100644
--- a/src/control_connection.cpp
+++ b/src/control_connection.cpp
@@ -38,12 +38,19 @@
 #define SELECT_PEERS "SELECT peer, data_center, rack, release_version, rpc_address FROM system.peers"
 #define SELECT_PEERS_TOKENS "SELECT peer, data_center, rack, release_version, rpc_address, tokens FROM system.peers"
 
-#define SELECT_KEYSPACES "SELECT * FROM system.schema_keyspaces"
-#define SELECT_COLUMN_FAMILIES "SELECT * FROM system.schema_columnfamilies"
-#define SELECT_COLUMNS "SELECT * FROM system.schema_columns"
-#define SELECT_USERTYPES "SELECT * FROM system.schema_usertypes"
-#define SELECT_FUNCTIONS "SELECT * FROM system.schema_functions"
-#define SELECT_AGGREGATES "SELECT * FROM system.schema_aggregates"
+#define SELECT_KEYSPACES_20 "SELECT * FROM system.schema_keyspaces"
+#define SELECT_COLUMN_FAMILIES_20 "SELECT * FROM system.schema_columnfamilies"
+#define SELECT_COLUMNS_20 "SELECT * FROM system.schema_columns"
+#define SELECT_USERTYPES_21 "SELECT * FROM system.schema_usertypes"
+#define SELECT_FUNCTIONS_22 "SELECT * FROM system.schema_functions"
+#define SELECT_AGGREGATES_22 "SELECT * FROM system.schema_aggregates"
+
+#define SELECT_KEYSPACES_30 "SELECT * FROM system_schema.keyspaces"
+#define SELECT_TABLES_30 "SELECT * FROM system_schema.tables"
+#define SELECT_COLUMNS_30 "SELECT * FROM system_schema.columns"
+#define SELECT_USERTYPES_30 "SELECT * FROM system_schema.types"
+#define SELECT_FUNCTIONS_30 "SELECT * FROM system_schema.functions"
+#define SELECT_AGGREGATES_30 "SELECT * FROM system_schema.aggregates"
 
 namespace cass {
 
@@ -194,7 +201,7 @@ void ControlConnection::on_ready(Connection* connection) {
 
   // The control connection has to refresh meta when there's a reconnect because
   // events could have been missed while not connected.
-  query_meta_all();
+  query_meta_hosts();
 }
 
 void ControlConnection::on_close(Connection* connection) {
@@ -359,31 +366,16 @@ void ControlConnection::on_event(EventResponse* response) {
   }
 }
 
-//TODO: query and callbacks should be in Metadata
-// punting for now because of tight coupling of Session and CC state
-void ControlConnection::query_meta_all() {
-  ScopedRefPtr > handler(
-        new ControlMultipleRequestHandler(this, ControlConnection::on_query_meta_all, QueryMetadataAllData()));
+void ControlConnection::query_meta_hosts() {
+  ScopedRefPtr > handler(
+        new ControlMultipleRequestHandler(this, ControlConnection::on_query_hosts, UnusedData()));
   handler->execute_query(SELECT_LOCAL_TOKENS);
   handler->execute_query(SELECT_PEERS_TOKENS);
-
-  if (session_->config().use_schema()) {
-    handler->execute_query(SELECT_KEYSPACES);
-    handler->execute_query(SELECT_COLUMN_FAMILIES);
-    handler->execute_query(SELECT_COLUMNS);
-    if (protocol_version_ >= 3) {
-      handler->execute_query(SELECT_USERTYPES);
-    }
-    if (protocol_version_ >= 4) {
-      handler->execute_query(SELECT_FUNCTIONS);
-      handler->execute_query(SELECT_AGGREGATES);
-    }
-  }
 }
 
-void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
-                                          const QueryMetadataAllData& unused,
-                                          const MultipleRequestHandler::ResponseVec& responses) {
+void ControlConnection::on_query_hosts(ControlConnection* control_connection,
+                                       const UnusedData& data,
+                                       const MultipleRequestHandler::ResponseVec& responses) {
   Connection* connection = control_connection->connection_;
   if (connection == NULL) {
     return;
@@ -391,8 +383,6 @@ void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
 
   Session* session = control_connection->session_;
 
-  session->metadata().clear_and_update_back();
-
   bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
 
   // If the 'system.local' table is empty the connection isn't used as a control
@@ -460,20 +450,73 @@ void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
   session->purge_hosts(is_initial_connection);
 
   if (session->config().use_schema()) {
-    session->metadata().update_keyspaces(static_cast(responses[2].get()));
-    session->metadata().update_tables(static_cast(responses[3].get()),
-                                      static_cast(responses[4].get()));
-    if (control_connection->protocol_version_ >= 3) {
-      session->metadata().update_user_types(static_cast(responses[5].get()));
+    control_connection->query_meta_schema();
+  } else {
+    control_connection->state_ = CONTROL_STATE_READY;
+    session->on_control_connection_ready();
+    // Create a new query plan that considers all the new hosts from the
+    // "system" tables.
+    control_connection->query_plan_.reset(session->new_query_plan());
+  }
+}
+
+//TODO: query and callbacks should be in Metadata
+// punting for now because of tight coupling of Session and CC state
+void ControlConnection::query_meta_schema() {
+  ScopedRefPtr > handler(
+        new ControlMultipleRequestHandler(this, ControlConnection::on_query_meta_schema, UnusedData()));
+
+  if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+    handler->execute_query(SELECT_KEYSPACES_30);
+    handler->execute_query(SELECT_TABLES_30);
+    handler->execute_query(SELECT_COLUMNS_30);
+    handler->execute_query(SELECT_USERTYPES_30);
+    handler->execute_query(SELECT_FUNCTIONS_30);
+    handler->execute_query(SELECT_AGGREGATES_30);
+  } else {
+    handler->execute_query(SELECT_KEYSPACES_20);
+    handler->execute_query(SELECT_COLUMN_FAMILIES_20);
+    handler->execute_query(SELECT_COLUMNS_20);
+    if (session_->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
+      handler->execute_query(SELECT_USERTYPES_21);
     }
-    if (control_connection->protocol_version_ >= 4) {
-      session->metadata().update_functions(static_cast(responses[6].get()));
-      session->metadata().update_aggregates(static_cast(responses[7].get()));
+    if (session_->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
+      handler->execute_query(SELECT_FUNCTIONS_22);
+      handler->execute_query(SELECT_AGGREGATES_22);
     }
-    session->metadata().swap_to_back_and_update_front();
-    if (control_connection->should_query_tokens_) session->metadata().build();
+  }
+}
+
+void ControlConnection::on_query_meta_schema(ControlConnection* control_connection,
+                                             const UnusedData& unused,
+                                             const MultipleRequestHandler::ResponseVec& responses) {
+  Connection* connection = control_connection->connection_;
+  if (connection == NULL) {
+    return;
   }
 
+  Session* session = control_connection->session_;
+
+  session->metadata().clear_and_update_back();
+
+  bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
+
+  session->metadata().update_keyspaces(static_cast(responses[0].get()));
+  session->metadata().update_tables(static_cast(responses[1].get()),
+      static_cast(responses[2].get()));
+
+  if (session->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
+    session->metadata().update_user_types(static_cast(responses[3].get()));
+  }
+
+  if (session->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
+    session->metadata().update_functions(static_cast(responses[4].get()));
+    session->metadata().update_aggregates(static_cast(responses[5].get()));
+  }
+
+  session->metadata().swap_to_back_and_update_front();
+  if (control_connection->should_query_tokens_) session->metadata().build();
+
   if (is_initial_connection) {
     control_connection->state_ = CONTROL_STATE_READY;
     session->on_control_connection_ready();
@@ -636,8 +679,9 @@ void ControlConnection::update_node_info(SharedRefPtr host, const Row* row
   }
 
   if (should_query_tokens_) {
+    bool is_connected_host = connection_ != NULL && host->address().compare(connection_->address()) == 0;
     std::string partitioner;
-    if (row->get_string_by_name("partitioner", &partitioner)) {
+    if (is_connected_host && row->get_string_by_name("partitioner", &partitioner)) {
       session_->metadata().set_partitioner(partitioner);
     }
     v = row->get_by_name("tokens");
@@ -655,7 +699,13 @@ void ControlConnection::update_node_info(SharedRefPtr host, const Row* row
 }
 
 void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) {
-  std::string query(SELECT_KEYSPACES);
+  std::string query;
+
+  if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+    query.assign(SELECT_KEYSPACES_30);
+  }  else {
+    query.assign(SELECT_KEYSPACES_20);
+  }
   query.append(" WHERE keyspace_name='")
        .append(keyspace_name.data(), keyspace_name.size())
        .append("'");
@@ -683,13 +733,26 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio
 
 void ControlConnection::refresh_table(const StringRef& keyspace_name,
                                       const StringRef& table_name) {
-  std::string cf_query(SELECT_COLUMN_FAMILIES);
-  cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
-          .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
+  std::string cf_query;
+  std::string col_query;
+
+  if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+    cf_query.assign(SELECT_TABLES_30);
+    cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
+        .append("' AND table_name='").append(table_name.data(), table_name.size()).append("'");
 
-  std::string col_query(SELECT_COLUMNS);
-  col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
-           .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
+    col_query.assign(SELECT_COLUMNS_30);
+    col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
+        .append("' AND table_name='").append(table_name.data(), table_name.size()).append("'");
+  } else {
+    cf_query.assign(SELECT_COLUMN_FAMILIES_20);
+    cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
+        .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
+
+    col_query.assign(SELECT_COLUMNS_20);
+    col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
+        .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
+  }
 
   LOG_DEBUG("Refreshing table %s; %s", cf_query.c_str(), col_query.c_str());
 
@@ -720,7 +783,13 @@ void ControlConnection::on_refresh_table(ControlConnection* control_connection,
 void ControlConnection::refresh_type(const StringRef& keyspace_name,
                                      const StringRef& type_name) {
 
-  std::string query(SELECT_USERTYPES);
+  std::string query;
+  if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+    query.assign(SELECT_USERTYPES_30);
+  } else {
+    query.assign(SELECT_USERTYPES_21);
+  }
+
   query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
                 .append("' AND type_name='").append(type_name.data(), type_name.size()).append("'");
 
@@ -752,12 +821,22 @@ void ControlConnection::refresh_function(const StringRef& keyspace_name,
                                          bool is_aggregate) {
 
   std::string query;
-  if (is_aggregate) {
-    query.assign(SELECT_AGGREGATES);
-    query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
+  if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+    if (is_aggregate) {
+      query.assign(SELECT_AGGREGATES_30);
+      query.append(" WHERE keyspace_name=? AND aggregate_name=? AND argument_types=?");
+    } else {
+      query.assign(SELECT_FUNCTIONS_30);
+      query.append(" WHERE keyspace_name=? AND function_name=? AND argument_types=?");
+    }
   } else {
-    query.assign(SELECT_FUNCTIONS);
-    query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
+    if (is_aggregate) {
+      query.assign(SELECT_AGGREGATES_22);
+      query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
+    } else {
+      query.assign(SELECT_FUNCTIONS_22);
+      query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
+    }
   }
 
   LOG_DEBUG("Refreshing %s %s in keyspace %s",
diff --git a/src/control_connection.hpp b/src/control_connection.hpp
index 4a72555e1..472167de3 100644
--- a/src/control_connection.hpp
+++ b/src/control_connection.hpp
@@ -106,7 +106,7 @@ class ControlConnection : public Connection::Listener {
     std::string table_name;
   };
 
-  struct QueryMetadataAllData {};
+  struct UnusedData {};
 
   template
   class ControlHandler : public Handler {
@@ -180,28 +180,32 @@ class ControlConnection : public Connection::Listener {
   virtual void on_availability_change(Connection* connection) {}
   virtual void on_event(EventResponse* response);
 
-  //TODO: possibly reorder callback functions to pair with initiator
-  static void on_query_meta_all(ControlConnection* control_connection,
-                                const QueryMetadataAllData& data,
-                                const MultipleRequestHandler::ResponseVec& responses);
-  static void on_refresh_node_info(ControlConnection* control_connection,
-                                   const RefreshNodeData& data,
-                                   Response* response);
-  static void on_refresh_node_info_all(ControlConnection* control_connection,
-                                       const RefreshNodeData& data,
-                                       Response* response);
-  void on_local_query(ResponseMessage* response);
-  void on_peer_query(ResponseMessage* response);
   static void on_reconnect(Timer* timer);
 
   bool handle_query_invalid_response(Response* response);
   void handle_query_failure(CassError code, const std::string& message);
   void handle_query_timeout();
 
-  void query_meta_all();
+  void query_meta_hosts();
+  static void on_query_hosts(ControlConnection* control_connection,
+                             const UnusedData& data,
+                             const MultipleRequestHandler::ResponseVec& responses);
+
+  void query_meta_schema();
+  static void on_query_meta_schema(ControlConnection* control_connection,
+                                const UnusedData& data,
+                                const MultipleRequestHandler::ResponseVec& responses);
+
   void refresh_node_info(SharedRefPtr host,
                          bool is_new_node,
                          bool query_tokens = false);
+  static void on_refresh_node_info(ControlConnection* control_connection,
+                                   const RefreshNodeData& data,
+                                   Response* response);
+  static void on_refresh_node_info_all(ControlConnection* control_connection,
+                                       const RefreshNodeData& data,
+                                       Response* response);
+
   void update_node_info(SharedRefPtr host, const Row* row);
 
   void refresh_keyspace(const StringRef& keyspace_name);
diff --git a/src/data_type.cpp b/src/data_type.cpp
index 101850cf4..286f42198 100644
--- a/src/data_type.cpp
+++ b/src/data_type.cpp
@@ -238,6 +238,19 @@ CassError cass_data_type_set_class_name_n(CassDataType* data_type,
   return CASS_OK;
 }
 
+size_t cass_data_sub_type_count(const CassDataType* data_type) {
+  if (data_type->is_collection() || data_type->is_tuple()) {
+    const cass::SubTypesDataType* sub_types
+        = static_cast(data_type->from());
+    return sub_types->types().size();
+  } else if (data_type->is_user_type()) {
+    const cass::UserType* user_type
+        = static_cast(data_type->from());
+    return user_type->fields().size();
+  }
+  return 0;
+}
+
 CassError cass_data_type_sub_type_name(const CassDataType* data_type,
                                        size_t index,
                                        const char** name,
@@ -360,20 +373,81 @@ void cass_data_type_free(CassDataType* data_type) {
 
 namespace cass {
 
-const SharedRefPtr DataType::NIL;
+const DataType::ConstPtr DataType::NIL;
+
+void NativeDataTypes::init_class_names() {
+  if (!by_class_names_.empty()) return;
+  by_class_names_["org.apache.cassandra.db.marshal.AsciiType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_ASCII));
+  by_class_names_["org.apache.cassandra.db.marshal.BooleanType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BOOLEAN));
+  by_class_names_["org.apache.cassandra.db.marshal.ByteType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TINY_INT));
+  by_class_names_["org.apache.cassandra.db.marshal.BytesType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BLOB));
+  by_class_names_["org.apache.cassandra.db.marshal.CounterColumnType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_COUNTER));
+  by_class_names_["org.apache.cassandra.db.marshal.DateType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMESTAMP));
+  by_class_names_["org.apache.cassandra.db.marshal.DecimalType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DECIMAL));
+  by_class_names_["org.apache.cassandra.db.marshal.DoubleType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DOUBLE));
+  by_class_names_["org.apache.cassandra.db.marshal.FloatType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_FLOAT));
+  by_class_names_["org.apache.cassandra.db.marshal.InetAddressType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INET));
+  by_class_names_["org.apache.cassandra.db.marshal.Int32Type"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INT));
+  by_class_names_["org.apache.cassandra.db.marshal.IntegerType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INT));
+  by_class_names_["org.apache.cassandra.db.marshal.LongType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BIGINT));
+  by_class_names_["org.apache.cassandra.db.marshal.ShortType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_SMALL_INT));
+  by_class_names_["org.apache.cassandra.db.marshal.SimpleDateType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DATE));
+  by_class_names_["org.apache.cassandra.db.marshal.TimeType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIME));
+  by_class_names_["org.apache.cassandra.db.marshal.TimestampType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMESTAMP));
+  by_class_names_["org.apache.cassandra.db.marshal.TimeUUIDType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMEUUID));
+  by_class_names_["org.apache.cassandra.db.marshal.UTF8Type"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TEXT));
+  by_class_names_["org.apache.cassandra.db.marshal.UUIDType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_UUID));
+}
+
+const DataType::ConstPtr& NativeDataTypes::by_class_name(const std::string& name) const {
+  DataTypeMap::const_iterator i = by_class_names_.find(name);
+  if (i == by_class_names_.end()) return DataType::NIL;
+  return i->second;
+}
+
+void NativeDataTypes::init_cql_names() {
+  if (!by_cql_names_.empty()) return;
+  by_cql_names_["ascii"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_ASCII));
+  by_cql_names_["bigint"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BIGINT));
+  by_cql_names_["blob"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BLOB));
+  by_cql_names_["boolean"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BOOLEAN));
+  by_cql_names_["counter"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_COUNTER));
+  by_cql_names_["date"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DATE));
+  by_cql_names_["decimal"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DECIMAL));
+  by_cql_names_["double"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DOUBLE));
+  by_cql_names_["float"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_FLOAT));
+  by_cql_names_["inet"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INET));
+  by_cql_names_["int"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INT));
+  by_cql_names_["smallint"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_SMALL_INT));
+  by_cql_names_["time"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIME));
+  by_cql_names_["timestamp"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMESTAMP));
+  by_cql_names_["timeuuid"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMEUUID));
+  by_cql_names_["tinyint"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TINY_INT));
+  by_cql_names_["text"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TEXT));
+  by_cql_names_["uuid"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_UUID));
+  by_cql_names_["varchar"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_VARCHAR));
+  by_cql_names_["varint"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_VARINT));
+}
+
+const DataType::ConstPtr& NativeDataTypes::by_cql_name(const std::string& name) const {
+  DataTypeMap::const_iterator i = by_cql_names_.find(name);
+  if (i == by_cql_names_.end()) return DataType::NIL;
+  return i->second;
+}
+
 
 bool cass::IsValidDataType::operator()(const Collection* value,
-                                                          const SharedRefPtr& data_type) const {
+                                                          const DataType::ConstPtr& data_type) const {
   return value->data_type()->equals(data_type);
 }
 
 bool cass::IsValidDataType::operator()(const Tuple* value,
-                                                     const SharedRefPtr& data_type) const {
+                                                     const DataType::ConstPtr& data_type) const {
   return value->data_type()->equals(data_type);
 }
 
 bool cass::IsValidDataType::operator()(const UserTypeValue* value,
-                                                             const SharedRefPtr& data_type) const {
+                                                             const DataType::ConstPtr& data_type) const {
   return value->data_type()->equals(data_type);
 }
 
diff --git a/src/data_type.hpp b/src/data_type.hpp
index 638728ca7..411cee0cb 100644
--- a/src/data_type.hpp
+++ b/src/data_type.hpp
@@ -65,10 +65,10 @@ inline bool equals_both_not_empty(const std::string& s1,
 
 class DataType : public RefCounted {
 public:
-  typedef SharedRefPtr Ptr;
-  typedef std::vector Vec;
+  typedef SharedRefPtr ConstPtr;
+  typedef std::vector Vec;
 
-  static const SharedRefPtr NIL;
+  static const DataType::ConstPtr NIL;
 
   DataType(CassValueType value_type)
     : value_type_(value_type) { }
@@ -99,7 +99,7 @@ class DataType : public RefCounted {
     return value_type_ == CASS_VALUE_TYPE_CUSTOM;
   }
 
-  virtual bool equals(const SharedRefPtr& data_type) const {
+  virtual bool equals(const DataType::ConstPtr& data_type) const {
     switch (value_type_) {
       // "text" is an alias for  "varchar"
       case CASS_VALUE_TYPE_TEXT:
@@ -168,7 +168,7 @@ class CustomType : public DataType {
     class_name_ = class_name;
   }
 
-  virtual bool equals(const SharedRefPtr& data_type) const {
+  virtual bool equals(const DataType::ConstPtr& data_type) const {
     assert(value_type() == CASS_VALUE_TYPE_CUSTOM);
     if (data_type->value_type() != CASS_VALUE_TYPE_CUSTOM) {
       return false;
@@ -221,6 +221,8 @@ class SubTypesDataType : public DataType {
 
 class CollectionType : public SubTypesDataType {
 public:
+  typedef SharedRefPtr ConstPtr;
+
   CollectionType(CassValueType collection_type)
     : SubTypesDataType(collection_type) { }
 
@@ -233,7 +235,7 @@ class CollectionType : public SubTypesDataType {
   CollectionType(CassValueType collection_type, const DataType::Vec& types)
     : SubTypesDataType(collection_type, types) { }
 
-  virtual bool equals(const SharedRefPtr& data_type) const {
+  virtual bool equals(const DataType::ConstPtr& data_type) const {
     assert(value_type() == CASS_VALUE_TYPE_LIST ||
            value_type() == CASS_VALUE_TYPE_SET ||
            value_type() == CASS_VALUE_TYPE_MAP);
@@ -242,7 +244,7 @@ class CollectionType : public SubTypesDataType {
       return false;
     }
 
-    const SharedRefPtr& collection_type(data_type);
+    const CollectionType::ConstPtr& collection_type(data_type);
 
     // Only compare sub-types if both have sub-types
     if(!types_.empty() && !collection_type->types_.empty()) {
@@ -264,36 +266,38 @@ class CollectionType : public SubTypesDataType {
   }
 
 public:
-  static SharedRefPtr list(SharedRefPtr element_type) {
+  static DataType::ConstPtr list(DataType::ConstPtr element_type) {
     DataType::Vec types;
     types.push_back(element_type);
-    return SharedRefPtr(new CollectionType(CASS_VALUE_TYPE_LIST, types));
+    return DataType::ConstPtr(new CollectionType(CASS_VALUE_TYPE_LIST, types));
   }
 
-  static SharedRefPtr set(SharedRefPtr element_type) {
+  static DataType::ConstPtr set(DataType::ConstPtr element_type) {
     DataType::Vec types;
     types.push_back(element_type);
-    return SharedRefPtr(new CollectionType(CASS_VALUE_TYPE_SET, types));
+    return DataType::ConstPtr(new CollectionType(CASS_VALUE_TYPE_SET, types));
   }
 
-  static SharedRefPtr map(SharedRefPtr key_type,
-                                          SharedRefPtr value_type) {
+  static DataType::ConstPtr map(DataType::ConstPtr key_type,
+                                          DataType::ConstPtr value_type) {
     DataType::Vec types;
     types.push_back(key_type);
     types.push_back(value_type);
-    return SharedRefPtr(new CollectionType(CASS_VALUE_TYPE_MAP, types));
+    return DataType::ConstPtr(new CollectionType(CASS_VALUE_TYPE_MAP, types));
   }
 };
 
 class TupleType : public SubTypesDataType {
 public:
+  typedef SharedRefPtr ConstPtr;
+
   TupleType()
     : SubTypesDataType(CASS_VALUE_TYPE_TUPLE) { }
 
   TupleType(const DataType::Vec& types)
     : SubTypesDataType(CASS_VALUE_TYPE_TUPLE, types) { }
 
-  virtual bool equals(const SharedRefPtr& data_type) const {
+  virtual bool equals(const DataType::ConstPtr& data_type) const {
     assert(value_type() == CASS_VALUE_TYPE_TUPLE);
 
     if (value_type() != data_type->value_type()) {
@@ -322,17 +326,20 @@ class TupleType : public SubTypesDataType {
   }
 };
 
-
 class UserType : public DataType {
 public:
+  typedef SharedRefPtr Ptr;
+  typedef SharedRefPtr ConstPtr;
+  typedef std::map Map;
+
   struct Field : public HashTableEntry {
     Field(const std::string& field_name,
-          const SharedRefPtr& type)
+          const DataType::ConstPtr& type)
       : name(field_name)
       , type(type) { }
 
     std::string name;
-    SharedRefPtr type;
+    DataType::ConstPtr type;
   };
 
   typedef CaseInsensitiveHashTable::EntryVec FieldVec;
@@ -344,6 +351,12 @@ class UserType : public DataType {
     : DataType(CASS_VALUE_TYPE_UDT)
     , fields_(field_count) { }
 
+  UserType(const std::string& keyspace,
+           const std::string& type_name )
+    : DataType(CASS_VALUE_TYPE_UDT)
+    , keyspace_(keyspace)
+    , type_name_(type_name) { }
+
   UserType(const std::string& keyspace,
            const std::string& type_name,
            const FieldVec& fields)
@@ -370,17 +383,21 @@ class UserType : public DataType {
     return fields_.get_indices(name, result);
   }
 
-  void add_field(const std::string name, const SharedRefPtr& data_type) {
+  void add_field(const std::string name, const DataType::ConstPtr& data_type) {
     fields_.add(Field(name, data_type));
   }
 
-  virtual bool equals(const SharedRefPtr& data_type) const {
+  void set_fields(const FieldVec& fields) {
+    fields_.set_entries(fields);
+  }
+
+  virtual bool equals(const DataType::ConstPtr& data_type) const {
     assert(value_type() == CASS_VALUE_TYPE_UDT);
     if (data_type->value_type() != CASS_VALUE_TYPE_UDT) {
       return false;
     }
 
-    const SharedRefPtr& user_type(data_type);
+    const UserType::ConstPtr& user_type(data_type);
 
     if (!equals_both_not_empty(keyspace_, user_type->keyspace_)) {
       return false;
@@ -418,120 +435,134 @@ class UserType : public DataType {
   CaseInsensitiveHashTable fields_;
 };
 
+class NativeDataTypes {
+public:
+  void init_class_names();
+  const DataType::ConstPtr& by_class_name(const std::string& name) const;
+
+  void init_cql_names();
+  const DataType::ConstPtr& by_cql_name(const std::string& name) const;
+
+private:
+  typedef std::map DataTypeMap;
+  DataTypeMap by_class_names_;
+  DataTypeMap by_cql_names_;
+};
+
 template
 struct IsValidDataType;
 
 template<>
 struct IsValidDataType {
-  bool operator()(CassNull, const SharedRefPtr& data_type) const {
+  bool operator()(CassNull, const DataType::ConstPtr& data_type) const {
     return true;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(cass_int8_t, const SharedRefPtr& data_type) const {
+  bool operator()(cass_int8_t, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_TINY_INT;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(cass_int16_t, const SharedRefPtr& data_type) const {
+  bool operator()(cass_int16_t, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_SMALL_INT;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(cass_int32_t, const SharedRefPtr& data_type) const {
+  bool operator()(cass_int32_t, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_INT;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(cass_uint32_t, const SharedRefPtr& data_type) const {
+  bool operator()(cass_uint32_t, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_DATE;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(cass_int64_t, const SharedRefPtr& data_type) const {
+  bool operator()(cass_int64_t, const DataType::ConstPtr& data_type) const {
     return is_int64_type(data_type->value_type());
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(cass_float_t, const SharedRefPtr& data_type) const {
+  bool operator()(cass_float_t, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_FLOAT;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(cass_double_t, const SharedRefPtr& data_type) const {
+  bool operator()(cass_double_t, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_DOUBLE;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(cass_bool_t, const SharedRefPtr& data_type) const {
+  bool operator()(cass_bool_t, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_BOOLEAN;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(CassString, const SharedRefPtr& data_type) const {
+  bool operator()(CassString, const DataType::ConstPtr& data_type) const {
     return is_string_type(data_type->value_type());
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(CassBytes, const SharedRefPtr& data_type) const {
+  bool operator()(CassBytes, const DataType::ConstPtr& data_type) const {
     return is_bytes_type(data_type->value_type());
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(CassUuid, const SharedRefPtr& data_type) const {
+  bool operator()(CassUuid, const DataType::ConstPtr& data_type) const {
     return is_uuid_type(data_type->value_type());
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(CassInet, const SharedRefPtr& data_type) const {
+  bool operator()(CassInet, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_INET;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(CassDecimal, const SharedRefPtr& data_type) const {
+  bool operator()(CassDecimal, const DataType::ConstPtr& data_type) const {
     return data_type->value_type() == CASS_VALUE_TYPE_DECIMAL;
   }
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(const Collection* value, const SharedRefPtr& data_type) const;
+  bool operator()(const Collection* value, const DataType::ConstPtr& data_type) const;
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(const Tuple* value, const SharedRefPtr& data_type) const;
+  bool operator()(const Tuple* value, const DataType::ConstPtr& data_type) const;
 };
 
 template<>
 struct IsValidDataType {
-  bool operator()(const UserTypeValue* value, const SharedRefPtr& data_type) const;
+  bool operator()(const UserTypeValue* value, const DataType::ConstPtr& data_type) const;
 };
 
 } // namespace cass
diff --git a/src/type_parser.cpp b/src/data_type_parser.cpp
similarity index 54%
rename from src/type_parser.cpp
rename to src/data_type_parser.cpp
index 79dc81494..f233a10d6 100644
--- a/src/type_parser.cpp
+++ b/src/data_type_parser.cpp
@@ -14,7 +14,7 @@
   limitations under the License.
 */
 
-#include "type_parser.hpp"
+#include "data_type_parser.hpp"
 
 #include "utils.hpp"
 #include "logger.hpp"
@@ -34,71 +34,8 @@
 #define UDT_TYPE "org.apache.cassandra.db.marshal.UserType"
 #define TUPLE_TYPE "org.apache.cassandra.db.marshal.TupleType"
 
-#define MARSHAL_PACKAGE "org.apache.cassandra.db.marshal."
-
 namespace cass {
 
-static CassValueType get_value_type(const std::string& str) {
-  if (starts_with(str, MARSHAL_PACKAGE)) {
-    StringRef type(StringRef(str).substr(sizeof(MARSHAL_PACKAGE) - 1));
-    switch (type.front()) {
-      case 'A':
-        if (type == "AsciiType") return CASS_VALUE_TYPE_ASCII;
-        break;
-
-      case 'B':
-        if (type == "BooleanType") return CASS_VALUE_TYPE_BOOLEAN;
-        if (type == "ByteType") return CASS_VALUE_TYPE_TINY_INT;
-        if (type == "BytesType") return CASS_VALUE_TYPE_BLOB;
-        break;
-
-      case 'C':
-        if (type == "CounterColumnType") return CASS_VALUE_TYPE_COUNTER;
-        break;
-
-      case 'D':
-        if (type == "DateType") return CASS_VALUE_TYPE_TIMESTAMP;
-        if (type == "DecimalType") return CASS_VALUE_TYPE_DECIMAL;
-        if (type == "DoubleType") return CASS_VALUE_TYPE_DOUBLE;
-        break;
-
-      case 'F':
-        if (type == "FloatType") return CASS_VALUE_TYPE_FLOAT;
-        break;
-
-      case 'I':
-        if (type == "InetAddressType") return CASS_VALUE_TYPE_INET;
-        if (type == "Int32Type") return CASS_VALUE_TYPE_INT;
-        if (type == "IntegerType") return CASS_VALUE_TYPE_INT;
-        break;
-
-      case 'L':
-        if (type == "LongType") return CASS_VALUE_TYPE_BIGINT;
-        break;
-
-      case 'S':
-        if (type == "ShortType") return CASS_VALUE_TYPE_SMALL_INT;
-        if (type == "SimpleDateType") return CASS_VALUE_TYPE_DATE;
-        break;
-
-      case 'T':
-        if (type == "TimeType") return CASS_VALUE_TYPE_TIME;
-        if (type == "TimestampType") return CASS_VALUE_TYPE_TIMESTAMP;
-        if (type == "TimeUUIDType") return CASS_VALUE_TYPE_TIMEUUID;
-        break;
-
-      case 'U':
-        if (type == "UTF8Type") return CASS_VALUE_TYPE_TEXT;
-        if (type == "UUIDType") return CASS_VALUE_TYPE_UUID;
-        break;
-
-      default:
-        break;
-    }
-  }
-  return CASS_VALUE_TYPE_UNKNOWN;
-}
-
 int hex_value(int c) {
   if (c >= '0' && c <= '9') {
     return c - '0';
@@ -127,38 +64,218 @@ bool from_hex(const std::string& hex, std::string* result) {
   return true;
 }
 
-bool TypeParser::is_reversed(const std::string& type) {
+DataType::ConstPtr DataTypeCqlNameParser::parse(const std::string& type,
+                                           const NativeDataTypes& native_types,
+                                           KeyspaceMetadata* keyspace) {
+  Parser parser(type, 0);
+  std::string type_name;
+  Parser::TypeParamsVec params;
+
+  parser.parse_type_name(&type_name);
+  std::transform(type_name.begin(), type_name.end(), type_name.begin(), tolower);
+
+  DataType::ConstPtr native_type(native_types.by_cql_name(type_name));
+  if (native_type) {
+    return native_type;
+  }
+
+  if (type_name == "list") {
+    parser.parse_type_parameters(¶ms);
+    if (params.size() != 1) {
+      LOG_ERROR("Expecting single parameter for list %s", type.c_str());
+      return DataType::NIL;
+    }
+    DataType::ConstPtr element_type = parse(params[0], native_types, keyspace);
+    return CollectionType::list(element_type);
+  }
+
+  if (type_name == "set") {
+    parser.parse_type_parameters(¶ms);
+    if (params.size() != 1) {
+      LOG_ERROR("Expecting single parameter for set %s", type.c_str());
+      return DataType::NIL;
+    }
+    DataType::ConstPtr element_type = parse(params[0], native_types, keyspace);
+    return CollectionType::set(element_type);
+  }
+
+  if (type_name == "map") {
+    parser.parse_type_parameters(¶ms);
+    if (params.size() != 2) {
+      LOG_ERROR("Expecting two parameters for set %s", type.c_str());
+      return DataType::NIL;
+    }
+    DataType::ConstPtr key_type = parse(params[0], native_types, keyspace);
+    DataType::ConstPtr value_type = parse(params[1], native_types, keyspace);
+    return CollectionType::map(key_type, value_type);
+  }
+
+  if (type_name == "tuple") {
+    parser.parse_type_parameters(¶ms);
+    if (params.empty()) {
+      LOG_ERROR("Expecting at least a one parameter for tuple %s", type.c_str());
+      return DataType::NIL;
+    }
+    DataType::Vec types;
+    for (Parser::TypeParamsVec::iterator i = params.begin(),
+         end = params.end();
+         i != end;  ++i) {
+      types.push_back(parse(*i, native_types, keyspace));
+    }
+    return DataType::ConstPtr(new TupleType(types));
+  }
+
+  if (type_name == "frozen") {
+    parser.parse_type_parameters(¶ms);
+    if (params.size() != 1) {
+      LOG_ERROR("Expecting single parameter for frozen keyword %s", type.c_str());
+      return DataType::NIL;
+    }
+    return parse(params[0], native_types, keyspace);
+  }
+
+  if (type_name == "empty") {
+    return DataType::ConstPtr(new CustomType(type_name));
+  }
+
+  if (type_name.empty()) {
+    return DataType::NIL;
+  }
+
+  return keyspace->get_or_create_user_type(type_name);
+}
+
+void DataTypeCqlNameParser::Parser::parse_type_name(std::string* name) {
+  skip_blank();
+  read_next_identifier(name);
+}
+
+void DataTypeCqlNameParser::Parser::parse_type_parameters(TypeParamsVec* params) {
+  params->clear();
+
+  if (is_eos()) return;
+
+  skip_blank_and_comma();
+
+  if (str_[index_] != '<') {
+    LOG_ERROR("Expecting char %u of %s to be '<' but '%c' found",
+              (unsigned int)index_, str_.c_str(), str_[index_]);
+    return;
+  }
+
+  ++index_; // Skip '<'
+
+  std::string name;
+  std::string args;
+  while (skip_blank_and_comma()) {
+    if (str_[index_] == '>') {
+      ++index_;
+      return;
+    }
+    parse_type_name(&name);
+    if (!read_raw_type_parameters(&args))
+      return;
+    params->push_back(name + args);
+  }
+}
+
+void DataTypeCqlNameParser::Parser::read_next_identifier(std::string* name) {
+  size_t start_index = index_;
+  if (str_[start_index] == '"') {
+    ++index_;
+    while (!is_eos()) {
+      bool is_quote = str_[index_] == '"';
+      ++index_;
+      if (is_quote) {
+        if  (!is_eos() && str_[index_] == '"') {
+          ++index_;
+        } else {
+          break;
+        }
+      }
+    }
+  } else {
+    while (!is_eos() && (is_identifier_char(str_[index_]) || str_[index_] == '"')) {
+      ++index_;
+    }
+  }
+  name->assign(str_.begin() + start_index, str_.begin() + index_);
+}
+
+bool DataTypeCqlNameParser::Parser::read_raw_type_parameters(std::string* params) {
+  skip_blank();
+
+  params->clear();
+
+  if (is_eos() || str_[index_] == '>' || str_[index_] == ',') return true;
+
+  if (str_[index_] != '<') {
+    LOG_ERROR("Expecting char %u of %s to be '<' but '%c' found",
+              (unsigned int)index_, str_.c_str(), str_[index_]);
+    return false;
+  }
+
+  size_t start_index = index_;
+  int open = 1;
+  bool in_quotes = false;
+  while (open > 0) {
+    ++index_;
+
+    if (is_eos()) {
+      LOG_ERROR("Angle brackets not closed in type %s", str_.c_str());
+      return false;
+    }
+
+    if (!in_quotes) {
+      if (str_[index_] == '"') {
+        in_quotes = true;
+      } else if (str_[index_] == '<') {
+        open++;
+      } else if (str_[index_] == '>') {
+        open--;
+      }
+    } else if (str_[index_] == '"') {
+      in_quotes = false;
+    }
+  }
+
+  ++index_; // Move past the  trailing '>'
+  params->assign(str_.begin() + start_index, str_.begin() + index_);
+  return true;
+}
+
+bool DataTypeClassNameParser::is_reversed(const std::string& type) {
   return starts_with(type, REVERSED_TYPE);
 }
 
-bool TypeParser::is_frozen(const std::string& type) {
+bool DataTypeClassNameParser::is_frozen(const std::string& type) {
   return starts_with(type, FROZEN_TYPE);
 }
 
-bool TypeParser::is_composite(const std::string& type) {
+bool DataTypeClassNameParser::is_composite(const std::string& type) {
   return starts_with(type, COMPOSITE_TYPE);
 }
 
-bool TypeParser::is_collection(const std::string& type) {
+bool DataTypeClassNameParser::is_collection(const std::string& type) {
   return starts_with(type, COLLECTION_TYPE);
 }
 
-bool TypeParser::is_user_type(const std::string& type) {
+bool DataTypeClassNameParser::is_user_type(const std::string& type) {
   return starts_with(type, UDT_TYPE);
 }
 
-bool TypeParser::is_tuple_type(const std::string& type) {
+bool DataTypeClassNameParser::is_tuple_type(const std::string& type) {
   return starts_with(type, TUPLE_TYPE);
 }
 
-SharedRefPtr TypeParser::parse_one(const std::string& type) {
+DataType::ConstPtr DataTypeClassNameParser::parse_one(const std::string& type, const NativeDataTypes& native_types) {
   bool frozen = is_frozen(type);
 
   std::string class_name;
 
   if (is_reversed(type) || frozen) {
     if (!get_nested_class_name(type, &class_name)) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
   } else {
     class_name = type;
@@ -172,32 +289,32 @@ SharedRefPtr TypeParser::parse_one(const std::string& type) {
   if (starts_with(next, LIST_TYPE)) {
     TypeParamsVec params;
     if (!parser.get_type_params(¶ms) || params.empty()) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
-    SharedRefPtr element_type(parse_one(params[0]));
+    DataType::ConstPtr element_type(parse_one(params[0], native_types));
     if (!element_type) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
     return CollectionType::list(element_type);
   } else if(starts_with(next, SET_TYPE)) {
     TypeParamsVec params;
     if (!parser.get_type_params(¶ms) || params.empty()) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
-    SharedRefPtr element_type(parse_one(params[0]));
+    DataType::ConstPtr element_type(parse_one(params[0], native_types));
     if (!element_type) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
     return CollectionType::set(element_type);
   } else if(starts_with(next, MAP_TYPE)) {
     TypeParamsVec params;
     if (!parser.get_type_params(¶ms) || params.size() < 2) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
-    SharedRefPtr key_type(parse_one(params[0]));
-    SharedRefPtr value_type(parse_one(params[1]));
+    DataType::ConstPtr key_type(parse_one(params[0], native_types));
+    DataType::ConstPtr value_type(parse_one(params[1], native_types));
     if (!key_type || !value_type) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
     return CollectionType::map(key_type, value_type);
   }
@@ -212,77 +329,80 @@ SharedRefPtr TypeParser::parse_one(const std::string& type) {
 
     std::string keyspace;
     if (!parser.read_one(&keyspace)) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
     parser.skip_blank_and_comma();
 
     std::string hex;
     if (!parser.read_one(&hex)) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
 
     std::string type_name;
     if (!from_hex(hex, &type_name)) {
       LOG_ERROR("Invalid hex string \"%s\" for parameter", hex.c_str());
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
 
     if (keyspace.empty() || type_name.empty()) {
       LOG_ERROR("UDT has no keyspace or type name");
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
 
     parser.skip_blank_and_comma();
     NameAndTypeParamsVec raw_fields;
     if (!parser.get_name_and_type_params(&raw_fields)) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
 
     UserType::FieldVec fields;
     for (NameAndTypeParamsVec::const_iterator i = raw_fields.begin(),
          end = raw_fields.end(); i != end; ++i) {
-      SharedRefPtr data_type = parse_one(i->second);
+      DataType::ConstPtr data_type = parse_one(i->second, native_types);
       if (!data_type) {
-        return SharedRefPtr();
+        return DataType::ConstPtr();
       }
       fields.push_back(UserType::Field(i->first, data_type));
     }
 
-    return SharedRefPtr(new UserType(keyspace, type_name, fields));
+    return DataType::ConstPtr(new UserType(keyspace, type_name, fields));
   }
 
   if (is_tuple_type(type)) {
     TypeParamsVec raw_types;
     if (!parser.get_type_params(&raw_types)) {
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
 
     DataType::Vec types;
     for (TypeParamsVec::const_iterator i = raw_types.begin(),
          end = raw_types.end(); i != end; ++i) {
-      SharedRefPtr data_type = parse_one(*i);
+      DataType::ConstPtr data_type = parse_one(*i, native_types);
       if (!data_type) {
-        return SharedRefPtr();
+        return DataType::ConstPtr();
       }
       types.push_back(data_type);
     }
 
-    return SharedRefPtr(new TupleType(types));
+    return DataType::ConstPtr(new TupleType(types));
+  }
+
+  DataType::ConstPtr native_type(native_types.by_class_name(next));
+  if (native_type) {
+    return native_type;
   }
 
-  CassValueType t = get_value_type(next);
-  return t == CASS_VALUE_TYPE_UNKNOWN ? SharedRefPtr(new CustomType(next))
-                                      : SharedRefPtr(new DataType(t));
+  return DataType::ConstPtr(new CustomType(next));
 }
 
-SharedRefPtr TypeParser::parse_with_composite(const std::string& type) {
+SharedRefPtr DataTypeClassNameParser::parse_with_composite(const std::string& type, const NativeDataTypes& native_types) {
   Parser parser(type, 0);
 
   std::string next;
   parser.get_next_name(&next);
 
   if (!is_composite(next)) {
-    SharedRefPtr data_type = parse_one(type);
+    DataType::ConstPtr data_type = parse_one(type, native_types);
     if (!data_type) {
       return SharedRefPtr();
     }
@@ -315,7 +435,7 @@ SharedRefPtr TypeParser::parse_with_composite(const std::string& ty
 
     for (NameAndTypeParamsVec::const_iterator i = params.begin(),
          end = params.end(); i != end; ++i) {
-      SharedRefPtr data_type = parse_one(i->second);
+      DataType::ConstPtr data_type = parse_one(i->second, native_types);
       if (!data_type) {
         return SharedRefPtr();
       }
@@ -326,7 +446,7 @@ SharedRefPtr TypeParser::parse_with_composite(const std::string& ty
   DataType::Vec types;
   ParseResult::ReversedVec reversed;
   for (size_t i = 0; i < count; ++i) {
-    SharedRefPtr data_type = parse_one(sub_class_names[i]);
+    DataType::ConstPtr data_type = parse_one(sub_class_names[i], native_types);
     if (!data_type) {
       return SharedRefPtr();
     }
@@ -337,7 +457,7 @@ SharedRefPtr TypeParser::parse_with_composite(const std::string& ty
   return SharedRefPtr(new ParseResult(true, types, reversed, collections));
 }
 
-bool TypeParser::get_nested_class_name(const std::string& type, std::string* class_name) {
+bool DataTypeClassNameParser::get_nested_class_name(const std::string& type, std::string* class_name) {
   Parser parser(type, 0);
   parser.get_next_name();
   TypeParamsVec params;
@@ -349,7 +469,7 @@ bool TypeParser::get_nested_class_name(const std::string& type, std::string* cla
   return true;
 }
 
-bool TypeParser::Parser::read_one(std::string* name_and_args) {
+bool DataTypeClassNameParser::Parser::read_one(std::string* name_and_args) {
   std::string name;
   get_next_name(&name);
   std::string args;
@@ -360,12 +480,12 @@ bool TypeParser::Parser::read_one(std::string* name_and_args) {
   return true;
 }
 
-void TypeParser::Parser::get_next_name(std::string* name) {
+void DataTypeClassNameParser::Parser::get_next_name(std::string* name) {
   skip_blank();
   read_next_identifier(name);
 }
 
-bool TypeParser::Parser::get_type_params(TypeParamsVec* params) {
+bool DataTypeClassNameParser::Parser::get_type_params(TypeParamsVec* params) {
   if (is_eos()) {
     params->clear();
     return true;
@@ -395,7 +515,7 @@ bool TypeParser::Parser::get_type_params(TypeParamsVec* params) {
   return false;
 }
 
-bool TypeParser::Parser::get_name_and_type_params(NameAndTypeParamsVec* params) {
+bool DataTypeClassNameParser::Parser::get_name_and_type_params(NameAndTypeParamsVec* params) {
   while (skip_blank_and_comma()) {
     if (str_[index_] == ')') {
       ++index_;
@@ -408,7 +528,7 @@ bool TypeParser::Parser::get_name_and_type_params(NameAndTypeParamsVec* params)
     std::string name;
     if (!from_hex(hex, &name)) {
       LOG_ERROR("Invalid hex string \"%s\" for parameter", hex.c_str());
-      return SharedRefPtr();
+      return DataType::ConstPtr();
     }
 
     skip_blank();
@@ -434,7 +554,7 @@ bool TypeParser::Parser::get_name_and_type_params(NameAndTypeParamsVec* params)
   return false;
 }
 
-bool TypeParser::Parser::get_collection_params(NameAndTypeParamsVec* params) {
+bool DataTypeClassNameParser::Parser::get_collection_params(NameAndTypeParamsVec* params) {
   if (is_eos()) {
     params->clear();
     return true;
@@ -450,31 +570,7 @@ bool TypeParser::Parser::get_collection_params(NameAndTypeParamsVec* params) {
   return get_name_and_type_params(params);
 }
 
-void TypeParser::Parser::skip_blank() {
-  while (!is_eos() && is_blank(str_[index_])) {
-    ++index_;
-  }
-}
-
-bool TypeParser::Parser::skip_blank_and_comma() {
-  bool comma_found = false;
-  while (!is_eos()) {
-    int c = str_[index_];
-    if (c == ',') {
-      if (comma_found) {
-        return true;
-      } else {
-        comma_found = true;
-      }
-    } else if (!is_blank(c)) {
-      return true;
-    }
-    ++index_;
-  }
-  return false;
-}
-
-bool TypeParser::Parser::read_raw_arguments(std::string* args) {
+bool DataTypeClassNameParser::Parser::read_raw_arguments(std::string* args) {
   skip_blank();
 
   if (is_eos() || str_[index_] == ')' || str_[index_] == ',') {
@@ -509,7 +605,7 @@ bool TypeParser::Parser::read_raw_arguments(std::string* args) {
   return true;
 }
 
-void TypeParser::Parser::read_next_identifier(std::string* name) {
+void DataTypeClassNameParser::Parser::read_next_identifier(std::string* name) {
   size_t i = index_;
   while (!is_eos() && is_identifier_char(str_[index_]))
     ++index_;
@@ -522,7 +618,7 @@ void TypeParser::Parser::read_next_identifier(std::string* name) {
   }
 }
 
-void TypeParser::Parser::parse_error(const std::string& str,
+void DataTypeClassNameParser::Parser::parse_error(const std::string& str,
                                      size_t index,
                                      const char* error) {
   LOG_ERROR("Error parsing '%s' at %u index: %s",
diff --git a/src/type_parser.hpp b/src/data_type_parser.hpp
similarity index 65%
rename from src/type_parser.hpp
rename to src/data_type_parser.hpp
index cec0aac54..b1e0aaa3e 100644
--- a/src/type_parser.hpp
+++ b/src/data_type_parser.hpp
@@ -19,6 +19,7 @@
 
 #include "cassandra.h"
 #include "data_type.hpp"
+#include "metadata.hpp"
 #include "ref_counted.hpp"
 #include "value.hpp"
 
@@ -28,12 +29,86 @@
 
 namespace cass {
 
+class ParserBase {
+public:
+  ParserBase(const std::string& str, size_t index)
+    : str_(str)
+    , index_(index) { }
+
+    void skip() { ++index_; }
+
+    void skip_blank() {
+      while (!is_eos() && is_blank(str_[index_])) {
+        ++index_;
+      }
+    }
+
+    bool skip_blank_and_comma() {
+      bool comma_found = false;
+      while (!is_eos()) {
+        int c = str_[index_];
+        if (c == ',') {
+          if (comma_found) {
+            return true;
+          } else {
+            comma_found = true;
+          }
+        } else if (!is_blank(c)) {
+          return true;
+        }
+        ++index_;
+      }
+      return false;
+    }
+
+    bool is_eos() const {
+      return index_ >= str_.length();
+    }
+
+    static bool is_identifier_char(int c) {
+      return (c >= '0' && c <= '9')
+          || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')
+          || c == '-' || c == '+' || c == '.' || c == '_' || c == '&';
+    }
+
+    static bool is_blank(int c) {
+      return c == ' ' || c == '\t' || c == '\n';
+    }
+
+protected:
+  const std::string str_;
+  size_t index_;
+};
+
+class DataTypeCqlNameParser {
+public:
+  static DataType::ConstPtr parse(const std::string& type,
+                             const NativeDataTypes& native_types,
+                             KeyspaceMetadata* keyspace);
+
+private:
+  class Parser : public ParserBase {
+  public:
+    typedef std::vector TypeParamsVec;
+
+    Parser(const std::string& str, size_t index)
+      : ParserBase(str, index) { }
+
+    void parse_type_name(std::string* name);
+    void parse_type_parameters(TypeParamsVec* params);
+
+  private:
+    void read_next_identifier(std::string* name);
+    bool read_raw_type_parameters(std::string* param);
+  };
+};
+
 class ParseResult : public RefCounted {
 public:
   typedef std::vector ReversedVec;
-  typedef std::map > CollectionMap;
+  typedef std::map CollectionMap;
 
-  ParseResult(SharedRefPtr type, bool reversed)
+  ParseResult(DataType::ConstPtr type, bool reversed)
     : is_composite_(false) {
     types_.push_back(type);
     reversed_.push_back(reversed);
@@ -60,7 +135,7 @@ class ParseResult : public RefCounted {
   CollectionMap collections_;
 };
 
-class TypeParser {
+class DataTypeClassNameParser {
 public:
   static bool is_reversed(const std::string& type);
   static bool is_frozen(const std::string& type);
@@ -70,8 +145,8 @@ class TypeParser {
   static bool is_user_type(const std::string& type);
   static bool is_tuple_type(const std::string& type);
 
-  static SharedRefPtr parse_one(const std::string& type);
-  static SharedRefPtr parse_with_composite(const std::string& type);
+  static DataType::ConstPtr parse_one(const std::string& type, const NativeDataTypes& native_types);
+  static SharedRefPtr parse_with_composite(const std::string& type, const NativeDataTypes& native_types);
 
 private:
   static bool get_nested_class_name(const std::string& type, std::string* class_name);
@@ -79,15 +154,10 @@ class TypeParser {
   typedef std::vector TypeParamsVec;
   typedef std::vector > NameAndTypeParamsVec;
 
-  class Parser {
+  class Parser : public ParserBase {
   public:
     Parser(const std::string& str, size_t index)
-      : str_(str)
-      , index_(index) {}
-
-    void skip() { ++index_; }
-    void skip_blank();
-    bool skip_blank_and_comma();
+      : ParserBase(str, index) { }
 
     bool read_one(std::string* name_and_args);
     void get_next_name(std::string* name = NULL);
@@ -102,27 +172,9 @@ class TypeParser {
     static void parse_error(const std::string& str,
                             size_t index,
                             const char* error);
-
-    bool is_eos() const {
-      return index_ >= str_.length();
-    }
-
-    static bool is_identifier_char(int c) {
-      return (c >= '0' && c <= '9')
-          || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')
-          || c == '-' || c == '+' || c == '.' || c == '_' || c == '&';
-    }
-
-    static bool is_blank(int c) {
-      return c == ' ' || c == '\t' || c == '\n';
-    }
-
-  private:
-    const std::string str_;
-    size_t index_;
   };
 
-  TypeParser();
+  DataTypeClassNameParser();
 };
 
 } // namespace cass
diff --git a/src/execute_request.hpp b/src/execute_request.hpp
index 91e31b610..7aeec814c 100644
--- a/src/execute_request.hpp
+++ b/src/execute_request.hpp
@@ -50,7 +50,7 @@ class ExecuteRequest : public Statement {
     return metadata_->get_indices(name, indices);
   }
 
-  virtual const SharedRefPtr& get_type(size_t index) const {
+  virtual const DataType::ConstPtr& get_type(size_t index) const {
     return metadata_->get_column_definition(index).data_type;
   }
 
diff --git a/src/hash_table.hpp b/src/hash_table.hpp
index 19c4b40ee..3c70b85f2 100644
--- a/src/hash_table.hpp
+++ b/src/hash_table.hpp
@@ -89,6 +89,8 @@ class CaseInsensitiveHashTable {
   size_t add(const T& entry);
 
   const EntryVec& entries() const { return entries_; }
+  void set_entries(const EntryVec& entries);
+
   size_t size() const { return entries_.size(); }
 
 private:
@@ -114,10 +116,7 @@ CaseInsensitiveHashTable::CaseInsensitiveHashTable(size_t capacity) {
 
 template
 CaseInsensitiveHashTable::CaseInsensitiveHashTable(const EntryVec& entries) {
-  reset(entries.size());
-  for (size_t i = 0; i < entries.size(); ++i) {
-    add(entries[i]);
-  }
+  set_entries(entries);
 }
 
 template
@@ -177,6 +176,16 @@ size_t CaseInsensitiveHashTable::add(const T& entry) {
   return index;
 }
 
+
+template
+void CaseInsensitiveHashTable::set_entries(const EntryVec& entries) {
+  entries_.clear();
+  reset(entries.size());
+  for (size_t i = 0; i < entries.size(); ++i) {
+    add(entries[i]);
+  }
+}
+
 template
 void CaseInsensitiveHashTable::add_index(T* entry) {
   size_t h = fnv1a_hash_lower(entry->name) & index_mask_;
diff --git a/src/host.hpp b/src/host.hpp
index 8dba02e46..9f3c4a716 100644
--- a/src/host.hpp
+++ b/src/host.hpp
@@ -53,6 +53,28 @@ class VersionNumber {
     , minor_(0)
     , patch_(0) { }
 
+  VersionNumber(int major, int minor, int patch)
+    : major_(major)
+    , minor_(minor)
+    , patch_(patch) { }
+
+  bool operator >=(const VersionNumber& other) const {
+    return compare(other) >= 0;
+  }
+
+  int compare(const VersionNumber& other) const {
+    if (major_ < other.major_) return -1;
+    if (major_ > other.major_) return  1;
+
+    if (minor_ < other.minor_) return -1;
+    if (minor_ > other.minor_) return  1;
+
+    if (patch_ < other.patch_) return -1;
+    if (patch_ > other.patch_) return  1;
+
+    return 0;
+  }
+
   bool parse(const std::string& version);
 
   int major() const { return major_; }
diff --git a/src/metadata.cpp b/src/metadata.cpp
index b1d7f7189..9af2bb829 100644
--- a/src/metadata.cpp
+++ b/src/metadata.cpp
@@ -26,7 +26,7 @@
 #include "row.hpp"
 #include "row_iterator.hpp"
 #include "scoped_lock.hpp"
-#include "type_parser.hpp"
+#include "data_type_parser.hpp"
 #include "value.hpp"
 
 #include "third_party/rapidjson/rapidjson/document.h"
@@ -231,10 +231,6 @@ const CassDataType* cass_column_meta_data_type(const CassColumnMeta* column_meta
   return CassDataType::to(column_meta->data_type().get());
 }
 
-cass_bool_t cass_column_meta_is_reversed(const CassColumnMeta* column_meta) {
-  return column_meta->is_reversed() ? cass_true : cass_false;
-}
-
 const CassValue*
 cass_column_meta_field_by_name(const CassColumnMeta* column_meta,
                            const char* name) {
@@ -505,6 +501,14 @@ const CassValue* cass_iterator_get_meta_field_value(const CassIterator* iterator
 
 namespace cass {
 
+static const char* table_column_name(const cass::VersionNumber& cassandra_version) {
+  return cassandra_version >= VersionNumber(3, 0, 0) ? "table_name" : "columnfamily_name";
+}
+
+static const char* signature_column_name(const cass::VersionNumber& cassandra_version) {
+  return cassandra_version >= VersionNumber(3, 0, 0) ? "argument_types" : "signature";
+}
+
 template 
 const T& as_const(const T& x) { return x; }
 
@@ -562,7 +566,7 @@ std::string Metadata::full_function_name(const std::string& name, const StringVe
 Metadata::SchemaSnapshot Metadata::schema_snapshot() const {
   ScopedMutex l(&mutex_);
   return SchemaSnapshot(schema_snapshot_version_,
-                        protocol_version_,
+                        config_.protocol_version,
                         front_.keyspaces());
 }
 
@@ -573,9 +577,9 @@ void Metadata::update_keyspaces(ResultResponse* result) {
 
   if (is_front_buffer()) {
     ScopedMutex l(&mutex_);
-    updating_->update_keyspaces(protocol_version_, result, updates);
+    updating_->update_keyspaces(config_, result, updates);
   } else {
-    updating_->update_keyspaces(protocol_version_, result, updates);
+    updating_->update_keyspaces(config_, result, updates);
   }
 
   for (KeyspaceMetadata::Map::const_iterator i = updates.begin(); i != updates.end(); ++i) {
@@ -588,9 +592,9 @@ void Metadata::update_tables(ResultResponse* tables_result, ResultResponse* colu
 
   if (is_front_buffer()) {
     ScopedMutex l(&mutex_);
-    updating_->update_tables(protocol_version_, cassandra_version_, tables_result, columns_result);
+    updating_->update_tables(config_, tables_result, columns_result);
   } else {
-    updating_->update_tables(protocol_version_, cassandra_version_, tables_result, columns_result);
+    updating_->update_tables(config_, tables_result, columns_result);
   }
 }
 
@@ -599,9 +603,9 @@ void Metadata::update_user_types(ResultResponse* result) {
 
   if (is_front_buffer()) {
     ScopedMutex l(&mutex_);
-    updating_->update_user_types(result);
+    updating_->update_user_types(config_, result);
   } else {
-    updating_->update_user_types(result);
+    updating_->update_user_types(config_, result);
   }
 }
 
@@ -610,9 +614,9 @@ void Metadata::update_functions(ResultResponse* result) {
 
   if (is_front_buffer()) {
     ScopedMutex l(&mutex_);
-    updating_->update_functions(result);
+    updating_->update_functions(config_, result);
   } else {
-    updating_->update_functions(result);
+    updating_->update_functions(config_, result);
   }
 }
 
@@ -621,9 +625,9 @@ void Metadata::update_aggregates(ResultResponse* result) {
 
   if (is_front_buffer()) {
     ScopedMutex l(&mutex_);
-    updating_->update_aggregates(protocol_version_, result);
+    updating_->update_aggregates(config_, result);
   } else {
-    updating_->update_aggregates(protocol_version_, result);
+    updating_->update_aggregates(config_, result);
   }
 }
 
@@ -683,6 +687,11 @@ void Metadata::drop_aggregate(const std::string& keyspace_name, const std::strin
 }
 
 void Metadata::clear_and_update_back() {
+  if (config_.cassandra_version >= VersionNumber(3, 0, 0)) {
+    config_.native_types.init_cql_names();
+  } else {
+    config_.native_types.init_class_names();
+  }
   token_map_.clear();
   back_.clear();
   updating_ = &back_;
@@ -731,7 +740,11 @@ const Value* MetadataBase::add_field(const SharedRefPtr& buffer, cons
   return value;
 }
 
-void MetadataBase::add_json_list_field(int version, const Row* row, const std::string& name) {
+void MetadataBase::add_field(const SharedRefPtr& buffer, const Value& value, const std::string& name) {
+  fields_[name] = MetadataField(name, value, buffer);
+}
+
+void MetadataBase::add_json_list_field(int protocol_version, const Row* row, const std::string& name) {
   const Value* value = row->get_by_name(name);
   if (value == NULL) return;
   if (value->size() <= 0) {
@@ -764,12 +777,12 @@ void MetadataBase::add_json_list_field(int version, const Row* row, const std::s
     collection.append(cass::CassString(i->GetString(), i->GetStringLength()));
   }
 
-  size_t encoded_size = collection.get_items_size(version);
+  size_t encoded_size = collection.get_items_size(protocol_version);
   SharedRefPtr encoded(RefBuffer::create(encoded_size));
 
-  collection.encode_items(version, encoded->data());
+  collection.encode_items(protocol_version, encoded->data());
 
-  Value list(version,
+  Value list(protocol_version,
              collection.data_type(),
              d.Size(),
              encoded->data(),
@@ -777,12 +790,12 @@ void MetadataBase::add_json_list_field(int version, const Row* row, const std::s
   fields_[name] = MetadataField(name, list, encoded);
 }
 
-void MetadataBase::add_json_map_field(int version, const Row* row, const std::string& name) {
+const Value* MetadataBase::add_json_map_field(int protocol_version, const Row* row, const std::string& name) {
   const Value* value = row->get_by_name(name);
-  if (value == NULL) return;
+  if (value == NULL) return NULL;
   if (value->size() <= 0) {
     fields_[name] = MetadataField(name);
-    return;
+    return NULL;
   }
 
   int32_t buffer_size = value->size();
@@ -795,13 +808,13 @@ void MetadataBase::add_json_map_field(int version, const Row* row, const std::st
 
   if (d.HasParseError()) {
     LOG_ERROR("Unable to parse JSON (object) for column '%s'", name.c_str());
-    return;
+    return NULL;
   }
 
   if (!d.IsObject()) {
     LOG_DEBUG("Expected JSON object for column '%s' (probably null or empty)", name.c_str());
     fields_[name] = MetadataField(name);
-    return;
+    return NULL;
   }
 
   Collection collection(CollectionType::map(SharedRefPtr(new DataType(CASS_VALUE_TYPE_TEXT)),
@@ -812,17 +825,18 @@ void MetadataBase::add_json_map_field(int version, const Row* row, const std::st
     collection.append(CassString(i->value.GetString(), i->value.GetStringLength()));
   }
 
-  size_t encoded_size = collection.get_items_size(version);
+  size_t encoded_size = collection.get_items_size(protocol_version);
   SharedRefPtr encoded(RefBuffer::create(encoded_size));
 
-  collection.encode_items(version, encoded->data());
+  collection.encode_items(protocol_version, encoded->data());
 
-  Value map(version,
+  Value map(protocol_version,
             collection.data_type(),
             d.MemberCount(),
             encoded->data(),
             encoded_size);
-  fields_[name] = MetadataField(name, map, encoded);
+
+  return (fields_[name] = MetadataField(name, map, encoded)).value();
 }
 
 const TableMetadata* KeyspaceMetadata::get_table(const std::string& name) const {
@@ -848,21 +862,60 @@ void KeyspaceMetadata::drop_table(const std::string& table_name) {
   tables_->erase(table_name);
 }
 
+const UserType::Ptr& KeyspaceMetadata::get_or_create_user_type(const std::string& name) {
+  UserType::Map::iterator i = user_types_->find(name);
+  if (i == user_types_->end()) {
+    i = user_types_->insert(std::make_pair(name,
+                                           UserType::Ptr(new UserType(MetadataBase::name(), name)))).first;
+  }
+  return i->second;
+}
+
 const UserType* KeyspaceMetadata::get_user_type(const std::string& name) const {
-  UserTypeMap::const_iterator i = user_types_->find(name);
+  UserType::Map::const_iterator i = user_types_->find(name);
   if (i == user_types_->end()) return NULL;
   return i->second.get();
 }
 
-void KeyspaceMetadata::update(int version, const SharedRefPtr& buffer, const Row* row) {
+void KeyspaceMetadata::update(const MetadataConfig& config, const SharedRefPtr& buffer, const Row* row) {
   add_field(buffer, row, "keyspace_name");
   add_field(buffer, row, "durable_writes");
-  add_field(buffer, row, "strategy_class");
-  add_json_map_field(version, row, "strategy_options");
-}
+  if (config.cassandra_version >= VersionNumber(3, 0, 0))  {
+    const Value* map = add_field(buffer, row, "replication");
+    if (map != NULL &&
+        map->value_type() == CASS_VALUE_TYPE_MAP &&
+        is_string_type(map->primary_value_type()) &&
+        is_string_type(map->secondary_value_type())) {
+      MapIterator iterator(map);
+      while (iterator.next()) {
+        const Value* key = iterator.key();
+        const Value* value  = iterator.value();
+        if (key->to_string_ref() == "class") {
+          strategy_class_ = value->to_string_ref();
+        }
+        strategy_options_[key->to_string_ref()] = value->to_string_ref();
+      }
+    }
+  } else {
+    const Value* value = add_field(buffer, row, "strategy_class");
+    if (value != NULL &&
+        is_string_type(value->value_type())) {
+      strategy_class_ = value->to_string_ref();
+    }
 
-void KeyspaceMetadata::add_user_type(const SharedRefPtr& user_type) {
-  (*user_types_)[user_type->type_name()] = user_type;
+    const Value* map = add_json_map_field(config.protocol_version, row, "strategy_options");
+    if (map != NULL &&
+        map->value_type() == CASS_VALUE_TYPE_MAP &&
+        is_string_type(map->primary_value_type()) &&
+        is_string_type(map->secondary_value_type())) {
+      MapIterator iterator(map);
+      while (iterator.next()) {
+        const Value* key = iterator.key();
+        const Value* value  = iterator.value();
+        strategy_options_[key->to_string_ref()] = value->to_string_ref();
+      }
+    }
+  }
 }
 
 void KeyspaceMetadata::drop_user_type(const std::string& type_name) {
@@ -897,43 +950,56 @@ void KeyspaceMetadata::drop_aggregate(const std::string& full_aggregate_name) {
   aggregates_->erase(full_aggregate_name);
 }
 
-TableMetadata::TableMetadata(const std::string& name,
-                             int version, const SharedRefPtr& buffer, const Row* row)
+TableMetadata::TableMetadata(const MetadataConfig& config,
+                             const std::string& name, const SharedRefPtr& buffer, const Row* row)
   : MetadataBase(name) {
   add_field(buffer, row, "keyspace_name");
-  add_field(buffer, row, "columnfamily_name");
-  add_field(buffer, row, "cf_id");
+  add_field(buffer, row, table_column_name(config.cassandra_version));
   add_field(buffer, row, "bloom_filter_fp_chance");
   add_field(buffer, row, "caching");
-  add_field(buffer, row, "id");
-  add_json_list_field(version, row, "column_aliases");
   add_field(buffer, row, "comment");
-  add_field(buffer, row, "compaction_strategy_class");
-  add_json_map_field(version, row, "compaction_strategy_options");
-  add_field(buffer, row, "comparator");
-  add_json_map_field(version, row, "compression_parameters");
   add_field(buffer, row, "default_time_to_live");
-  add_field(buffer, row, "default_validator");
-  add_field(buffer, row, "dropped_columns");
   add_field(buffer, row, "gc_grace_seconds");
-  add_field(buffer, row, "index_interval");
-  add_field(buffer, row, "is_dense");
-  add_field(buffer, row, "key_alias");
-  add_json_list_field(version, row, "key_aliases");
-  add_field(buffer, row, "key_validator");
-  add_field(buffer, row, "local_read_repair_chance");
-  add_field(buffer, row, "max_compaction_threshold");
+  add_field(buffer, row, "id");
+  add_field(buffer, row, "speculative_retry");
   add_field(buffer, row, "max_index_interval");
-  add_field(buffer, row, "memtable_flush_period_in_ms");
-  add_field(buffer, row, "min_compaction_threshold");
   add_field(buffer, row, "min_index_interval");
-  add_field(buffer, row, "populate_io_cache_on_flush");
+  add_field(buffer, row, "memtable_flush_period_in_ms");
   add_field(buffer, row, "read_repair_chance");
-  add_field(buffer, row, "replicate_on_write");
-  add_field(buffer, row, "speculative_retry");
-  add_field(buffer, row, "subcomparator");
-  add_field(buffer, row, "type");
-  add_field(buffer, row, "value_alias");
+
+  if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+    add_field(buffer, row, "dclocal_read_repair_chance");
+    add_field(buffer, row, "crc_check_chance");
+    add_field(buffer, row, "compaction");
+    add_field(buffer, row, "compression");
+    add_field(buffer, row, "extensions");
+    add_field(buffer, row, "flags");
+  } else {
+    add_field(buffer, row, "cf_id");
+    add_field(buffer, row, "local_read_repair_chance");
+
+    add_field(buffer, row, "compaction_strategy_class");
+    add_json_map_field(config.protocol_version, row, "compaction_strategy_options");
+    add_json_map_field(config.protocol_version, row, "compression_parameters");
+
+    add_json_list_field(config.protocol_version, row, "column_aliases");
+    add_field(buffer, row, "comparator");
+    add_field(buffer, row, "subcomparator");
+    add_field(buffer, row, "default_validator");
+    add_field(buffer, row, "key_alias");
+    add_json_list_field(config.protocol_version, row, "key_aliases");
+    add_field(buffer, row, "value_alias");
+    add_field(buffer, row, "key_validator");
+    add_field(buffer, row, "type");
+
+    add_field(buffer, row, "dropped_columns");
+    add_field(buffer, row, "index_interval");
+    add_field(buffer, row, "is_dense");
+    add_field(buffer, row, "max_compaction_threshold");
+    add_field(buffer, row, "min_compaction_threshold");
+    add_field(buffer, row, "populate_io_cache_on_flush");
+    add_field(buffer, row, "replicate_on_write");
+  }
 }
 
 const ColumnMetadata* TableMetadata::get_column(const std::string& name) const {
@@ -973,13 +1039,13 @@ size_t get_column_count(const ColumnMetadata::Vec& columns, CassColumnType type)
   return count;
 }
 
-void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version) {
+void TableMetadata::build_keys_and_sort(const MetadataConfig& config) {
   // Also, Reorders columns so that the order is:
   // 1) Parition key
   // 2) Clustering keys
   // 3) Other columns
 
-  if (cassandra_version.major() >= 2) {
+  if (config.cassandra_version.major() >= 2) {
     partition_key_.resize(get_column_count(columns_, CASS_COLUMN_TYPE_PARTITION_KEY));
     clustering_key_.resize(get_column_count(columns_, CASS_COLUMN_TYPE_CLUSTERING_KEY));
     for (ColumnMetadata::Vec::const_iterator i = columns_.begin(),
@@ -1012,7 +1078,8 @@ void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version)
         }
       }
 
-      SharedRefPtr key_validator = TypeParser::parse_with_composite(get_string_field("key_validator"));
+      SharedRefPtr key_validator
+          = DataTypeClassNameParser::parse_with_composite(get_string_field("key_validator"), config.native_types);
       size_t size = key_validator->types().size();
       partition_key_.reserve(size);
       for (size_t i = 0; i < size; ++i) {
@@ -1045,7 +1112,8 @@ void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version)
       }
 
       // TODO: Figure out how to test these special cases and properly document them here
-      SharedRefPtr comparator = TypeParser::parse_with_composite(get_string_field("comparator"));
+      SharedRefPtr comparator
+          = DataTypeClassNameParser::parse_with_composite(get_string_field("comparator"), config.native_types);
       size_t size = comparator->types().size();
       if (comparator->is_composite()) {
         if (!comparator->collections().empty() ||
@@ -1087,7 +1155,7 @@ void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version)
   }
 }
 
-void TableMetadata::key_aliases(KeyAliases* output) const {
+void TableMetadata::key_aliases(const NativeDataTypes& native_types, KeyAliases* output) const {
   const Value* aliases = get_field("key_aliases");
   if (aliases != NULL) {
     output->reserve(aliases->count());
@@ -1097,7 +1165,8 @@ void TableMetadata::key_aliases(KeyAliases* output) const {
     }
   }
   if (output->empty()) {// C* 1.2 tables created via CQL2 or thrift don't have col meta or key aliases
-    SharedRefPtr key_validator_type = TypeParser::parse_with_composite(get_string_field("key_validator"));
+    SharedRefPtr key_validator_type
+        = DataTypeClassNameParser::parse_with_composite(get_string_field("key_validator"), native_types);
     const size_t count = key_validator_type->types().size();
     std::ostringstream ss("key");
     for (size_t i = 0; i < count; ++i) {
@@ -1110,7 +1179,9 @@ void TableMetadata::key_aliases(KeyAliases* output) const {
   }
 }
 
-FunctionMetadata::FunctionMetadata(const std::string& name, const Value* signature,
+FunctionMetadata::FunctionMetadata(const MetadataConfig& config,
+                                   const std::string& name, const Value* signature,
+                                   KeyspaceMetadata* keyspace,
                                    const SharedRefPtr& buffer, const Row* row)
   : MetadataBase(Metadata::full_function_name(name, signature->as_stringlist()))
   , simple_name_(name) {
@@ -1130,18 +1201,31 @@ FunctionMetadata::FunctionMetadata(const std::string& name, const Value* signatu
       value2->primary_value_type() == CASS_VALUE_TYPE_VARCHAR) {
     CollectionIterator iterator1(value1);
     CollectionIterator iterator2(value2);
-    while (iterator1.next() && iterator2.next()) {
-      StringRef arg_name(iterator1.value()->to_string_ref());
-      DataType::Ptr arg_type(TypeParser::parse_one(iterator2.value()->to_string()));
-      args_.push_back(Argument(arg_name, arg_type));
-      args_by_name_[arg_name] = arg_type;
+    if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+      while (iterator1.next() && iterator2.next()) {
+        StringRef arg_name(iterator1.value()->to_string_ref());
+        DataType::ConstPtr arg_type(DataTypeCqlNameParser::parse(iterator2.value()->to_string(), config.native_types, keyspace));
+        args_.push_back(Argument(arg_name, arg_type));
+        args_by_name_[arg_name] = arg_type;
+      }
+    } else {
+      while (iterator1.next() && iterator2.next()) {
+        StringRef arg_name(iterator1.value()->to_string_ref());
+        DataType::ConstPtr arg_type(DataTypeClassNameParser::parse_one(iterator2.value()->to_string(), config.native_types));
+        args_.push_back(Argument(arg_name, arg_type));
+        args_by_name_[arg_name] = arg_type;
+      }
     }
   }
 
   value1 = add_field(buffer, row, "return_type");
   if (value1 != NULL &&
       value1->value_type() == CASS_VALUE_TYPE_VARCHAR) {
-    return_type_ = TypeParser::parse_one(value1->to_string());
+    if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+      return_type_ = DataTypeCqlNameParser::parse(value1->to_string(), config.native_types, keyspace);
+    } else {
+      return_type_ = DataTypeClassNameParser::parse_one(value1->to_string(), config.native_types);
+    }
   }
 
   value1 = add_field(buffer, row, "body");
@@ -1169,12 +1253,14 @@ const DataType* FunctionMetadata::get_arg_type(StringRef name) const {
   return i->second.get();
 }
 
-AggregateMetadata::AggregateMetadata(const std::string& name, const Value* signature,
-                                     const FunctionMetadata::Map& functions,
-                                     int version, const SharedRefPtr& buffer, const Row* row)
+AggregateMetadata::AggregateMetadata(const MetadataConfig& config,
+                                     const std::string& name, const Value* signature,
+                                     KeyspaceMetadata* keyspace,
+                                     const SharedRefPtr& buffer, const Row* row)
   : MetadataBase(Metadata::full_function_name(name, signature->as_stringlist()))
   , simple_name_(name) {
   const Value* value;
+  const FunctionMetadata::Map& functions = keyspace->functions();
 
   add_field(buffer, row, "keyspace_name");
   add_field(buffer, row, "aggregate_name");
@@ -1184,21 +1270,35 @@ AggregateMetadata::AggregateMetadata(const std::string& name, const Value* signa
       value->value_type() == CASS_VALUE_TYPE_LIST &&
       value->primary_value_type() == CASS_VALUE_TYPE_VARCHAR) {
     CollectionIterator iterator(value);
-    while (iterator.next()) {
-      arg_types_.push_back(TypeParser::parse_one(iterator.value()->to_string()));
+    if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+      while (iterator.next()) {
+        arg_types_.push_back(DataTypeCqlNameParser::parse(iterator.value()->to_string(), config.native_types, keyspace));
+      }
+    } else {
+      while (iterator.next()) {
+        arg_types_.push_back(DataTypeClassNameParser::parse_one(iterator.value()->to_string(), config.native_types));
+      }
     }
   }
 
   value = add_field(buffer, row, "return_type");
   if (value != NULL &&
       value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
-    return_type_ = TypeParser::parse_one(value->to_string());
+    if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+      return_type_ = DataTypeCqlNameParser::parse(value->to_string(), config.native_types, keyspace);
+    } else {
+      return_type_ = DataTypeClassNameParser::parse_one(value->to_string(), config.native_types);
+    }
   }
 
   value = add_field(buffer, row, "state_type");
   if (value != NULL &&
       value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
-    state_type_ = TypeParser::parse_one(value->to_string());
+    if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+      state_type_ = DataTypeCqlNameParser::parse(value->to_string(), config.native_types, keyspace);
+    } else {
+      state_type_ = DataTypeClassNameParser::parse_one(value->to_string(), config.native_types);
+    }
   }
 
   value = add_field(buffer, row, "final_func");
@@ -1226,74 +1326,104 @@ AggregateMetadata::AggregateMetadata(const std::string& name, const Value* signa
   }
 
   value = add_field(buffer, row, "initcond");
-  if (value != NULL &&
-      value->value_type() == CASS_VALUE_TYPE_BLOB) {
-      init_cond_ = Value(version, state_type_, value->data(), value->size());
+  if (value != NULL) {
+    if (value->value_type() == CASS_VALUE_TYPE_BLOB) {
+      init_cond_ = Value(config.protocol_version, state_type_, value->data(), value->size());
+    } else if (config.cassandra_version >= VersionNumber(3, 0, 0) &&
+               value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+      init_cond_ = Value(config.protocol_version,
+                         config.native_types.by_cql_name("varchar"),
+                         value->data(), value->size());
+    }
   }
 }
 
-ColumnMetadata::ColumnMetadata(const std::string& name,
-                               int version, const SharedRefPtr& buffer, const Row* row)
+ColumnMetadata::ColumnMetadata(const MetadataConfig& config,
+                               const std::string& name,
+                               KeyspaceMetadata* keyspace,
+                               const SharedRefPtr& buffer, const Row* row)
   : MetadataBase(name)
   , type_(CASS_COLUMN_TYPE_REGULAR)
-  , position_(0)
-  , is_reversed_(false) {
+  , position_(0) {
   const Value* value;
 
   add_field(buffer, row, "keyspace_name");
-  add_field(buffer, row, "columnfamily_name");
+  add_field(buffer, row, table_column_name(config.cassandra_version));
   add_field(buffer, row, "column_name");
 
-  value = add_field(buffer, row, "type");
-  if (value != NULL &&
-      value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
-    StringRef type = value->to_string_ref();
-    if (type == "partition_key") {
-      type_ = CASS_COLUMN_TYPE_PARTITION_KEY;
-    } else  if (type == "clustering_key") {
-      type_ = CASS_COLUMN_TYPE_CLUSTERING_KEY;
-    } else  if (type == "static") {
-      type_ = CASS_COLUMN_TYPE_STATIC;
-    } else {
-      type_ = CASS_COLUMN_TYPE_REGULAR;
+  if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+    add_field(buffer, row, "clustering_order");
+    add_field(buffer, row, "column_name_bytes");
+
+    value = add_field(buffer, row, "kind");
+    if (value != NULL &&
+        value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+      StringRef type = value->to_string_ref();
+      if (type == "partition_key") {
+        type_ = CASS_COLUMN_TYPE_PARTITION_KEY;
+      } else  if (type == "clustering") {
+        type_ = CASS_COLUMN_TYPE_CLUSTERING_KEY;
+      } else  if (type == "static") {
+        type_ = CASS_COLUMN_TYPE_STATIC;
+      } else {
+        type_ = CASS_COLUMN_TYPE_REGULAR;
+      }
     }
-  }
 
-  value = add_field(buffer, row, "component_index");
-  // For C* 2.0 to 2.2 this is "null" for single component partition keys
-  // so the default position of 0 works. C* 1.2 and below don't use this.
-  if (value != NULL &&
-      value->value_type() == CASS_VALUE_TYPE_INT) {
-    position_ = value->as_int32();
-  }
+    value = add_field(buffer, row, "position");
+    if (value != NULL &&
+        value->value_type() == CASS_VALUE_TYPE_INT) {
+      position_ = value->as_int32();
+      if (position_ < 0) position_ = 0;
+    }
 
-  value = add_field(buffer, row, "validator");
-  if (value != NULL &&
-      value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
-    std::string validator(value->to_string());
-    // TODO: Use const changes from CPP-295
-    data_type_ = SharedRefPtr(TypeParser::parse_one(validator));
-    is_reversed_ = TypeParser::is_reversed(validator);
-  }
+    value = add_field(buffer, row, "type");
+    if (value != NULL &&
+        value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+      std::string type(value->to_string());
+      data_type_ = DataTypeCqlNameParser::parse(type, config.native_types, keyspace);
+    }
+  } else {
+    value = add_field(buffer, row, "type");
+    if (value != NULL &&
+        value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+      StringRef type = value->to_string_ref();
+      if (type == "partition_key") {
+        type_ = CASS_COLUMN_TYPE_PARTITION_KEY;
+      } else  if (type == "clustering_key") {
+        type_ = CASS_COLUMN_TYPE_CLUSTERING_KEY;
+      } else  if (type == "static") {
+        type_ = CASS_COLUMN_TYPE_STATIC;
+      } else  if (type == "compact_value") {
+        type_ = CASS_COLUMN_TYPE_COMPACT_VALUE;
+      } else {
+        type_ = CASS_COLUMN_TYPE_REGULAR;
+      }
+    }
 
-  add_field(buffer, row, "index_name");
-  add_json_map_field(version, row, "index_options");
-  add_field(buffer, row, "index_type");
-}
+    value = add_field(buffer, row, "component_index");
+    // For C* 2.0 to 2.2 this is "null" for single component partition keys
+    // so the default position of 0 works. C* 1.2 and below don't use this.
+    if (value != NULL &&
+        value->value_type() == CASS_VALUE_TYPE_INT) {
+      position_ = value->as_int32();
+    }
 
-void Metadata::SchemaSnapshot::get_table_key_columns(const std::string& ks_name,
-                                                     const std::string& table_name,
-                                                     std::vector* output) const {
-  const KeyspaceMetadata* keyspace = get_keyspace(ks_name);
-  if (keyspace != NULL) {
-    const TableMetadata* table = keyspace->get_table(table_name);
-    if (table != NULL) {
-      table->key_aliases(output);
+    value = add_field(buffer, row, "validator");
+    if (value != NULL &&
+        value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+      std::string validator(value->to_string());
+      data_type_ = DataTypeClassNameParser::parse_one(validator, config.native_types);
     }
+
+    add_field(buffer, row, "index_name");
+    add_json_map_field(config.protocol_version, row, "index_options");
+    add_field(buffer, row, "index_type");
   }
 }
 
-void Metadata::InternalData::update_keyspaces(int version, ResultResponse* result, KeyspaceMetadata::Map& updates) {
+void Metadata::InternalData::update_keyspaces(const MetadataConfig& config,
+                                              ResultResponse* result, KeyspaceMetadata::Map& updates) {
   SharedRefPtr buffer = result->buffer();
   result->decode_first_row();
   ResultIterator rows(result);
@@ -1308,19 +1438,20 @@ void Metadata::InternalData::update_keyspaces(int version, ResultResponse* resul
     }
 
     KeyspaceMetadata* keyspace = get_or_create_keyspace(keyspace_name);
-    keyspace->update(version, buffer, row);
+    keyspace->update(config, buffer, row);
     updates.insert(std::make_pair(keyspace_name, *keyspace));
   }
 }
 
-void Metadata::InternalData::update_tables(int version, const VersionNumber& cassandra_version, ResultResponse* tables_result, ResultResponse* columns_result) {
+void Metadata::InternalData::update_tables(const MetadataConfig& config,
+                                           ResultResponse* tables_result, ResultResponse* columns_result) {
   SharedRefPtr buffer = tables_result->buffer();
 
   tables_result->decode_first_row();
   ResultIterator rows(tables_result);
 
   std::string keyspace_name;
-  std::string columnfamily_name;
+  std::string table_name;
   KeyspaceMetadata* keyspace = NULL;
 
   while (rows.next()) {
@@ -1328,8 +1459,8 @@ void Metadata::InternalData::update_tables(int version, const VersionNumber& cas
     const Row* row = rows.row();
 
     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
-        !row->get_string_by_name("columnfamily_name", &columnfamily_name)) {
-      LOG_ERROR("Unable to get column value for 'keyspace_name' or 'columnfamily_name'");
+        !row->get_string_by_name(table_column_name(config.cassandra_version), &table_name)) {
+      LOG_ERROR("Unable to get column value for 'keyspace_name' or '%s'", table_column_name(config.cassandra_version));
       continue;
     }
 
@@ -1338,13 +1469,13 @@ void Metadata::InternalData::update_tables(int version, const VersionNumber& cas
       keyspace = get_or_create_keyspace(keyspace_name);
     }
 
-    keyspace->add_table(TableMetadata::Ptr(new TableMetadata(columnfamily_name, version, buffer, row)));
+    keyspace->add_table(TableMetadata::Ptr(new TableMetadata(config, table_name, buffer, row)));
   }
 
-  update_columns(version, cassandra_version, columns_result);
+  update_columns(config, columns_result);
 }
 
-void Metadata::InternalData::update_user_types(ResultResponse* result) {
+void Metadata::InternalData::update_user_types(const MetadataConfig& config, ResultResponse* result) {
   result->decode_first_row();
   ResultIterator rows(result);
 
@@ -1404,7 +1535,14 @@ void Metadata::InternalData::update_user_types(ResultResponse* result) {
 
       std::string field_name(name->to_string());
 
-      SharedRefPtr data_type = TypeParser::parse_one(type->to_string());
+      DataType::ConstPtr data_type;
+
+      if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+        data_type = DataTypeCqlNameParser::parse(type->to_string(), config.native_types, keyspace);
+      } else {
+        data_type = DataTypeClassNameParser::parse_one(type->to_string(), config.native_types);
+      }
+
       if (!data_type) {
         LOG_ERROR("Invalid 'field_type' for field \"%s\", keyspace \"%s\" and type \"%s\"",
                   field_name.c_str(),
@@ -1416,12 +1554,12 @@ void Metadata::InternalData::update_user_types(ResultResponse* result) {
       fields.push_back(UserType::Field(field_name, data_type));
     }
 
-    keyspace->add_user_type(
-          SharedRefPtr(new UserType(keyspace_name, type_name, fields)));
+    keyspace->get_or_create_user_type(type_name)->set_fields(fields);
   }
 }
 
-void Metadata::InternalData::update_functions(ResultResponse* result) {
+void Metadata::InternalData::update_functions(const MetadataConfig& config,
+                                              ResultResponse* result) {
   SharedRefPtr buffer = result->buffer();
 
   result->decode_first_row();
@@ -1435,7 +1573,7 @@ void Metadata::InternalData::update_functions(ResultResponse* result) {
     std::string function_name;
     const Row* row = rows.row();
 
-    const Value* signature = row->get_by_name("signature");
+    const Value* signature = row->get_by_name(signature_column_name(config.cassandra_version));
     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
         !row->get_string_by_name("function_name", &function_name) ||
         signature == NULL) {
@@ -1448,13 +1586,15 @@ void Metadata::InternalData::update_functions(ResultResponse* result) {
       keyspace = get_or_create_keyspace(keyspace_name);
     }
 
-    keyspace->add_function(FunctionMetadata::Ptr(new FunctionMetadata(function_name,
-                                                                      signature, buffer, row)));
+    keyspace->add_function(FunctionMetadata::Ptr(new FunctionMetadata(config,
+                                                                      function_name, signature,
+                                                                      keyspace,
+                                                                      buffer, row)));
 
   }
 }
 
-void Metadata::InternalData::update_aggregates(int version, ResultResponse* result) {
+void Metadata::InternalData::update_aggregates(const MetadataConfig& config, ResultResponse* result) {
   SharedRefPtr buffer = result->buffer();
 
   result->decode_first_row();
@@ -1468,7 +1608,7 @@ void Metadata::InternalData::update_aggregates(int version, ResultResponse* resu
     std::string aggregate_name;
     const Row* row = rows.row();
 
-    const Value* signature = row->get_by_name("signature");
+    const Value* signature = row->get_by_name(signature_column_name(config.cassandra_version));
     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
         !row->get_string_by_name("aggregate_name", &aggregate_name) ||
         signature == NULL) {
@@ -1481,9 +1621,10 @@ void Metadata::InternalData::update_aggregates(int version, ResultResponse* resu
       keyspace = get_or_create_keyspace(keyspace_name);
     }
 
-    keyspace->add_aggregate(AggregateMetadata::Ptr(new AggregateMetadata(aggregate_name, signature,
-                                                                         keyspace->functions(),
-                                                                         version, buffer, row)));
+    keyspace->add_aggregate(AggregateMetadata::Ptr(new AggregateMetadata(config,
+                                                                         aggregate_name, signature,
+                                                                         keyspace,
+                                                                         buffer, row)));
   }
 }
 
@@ -1515,14 +1656,14 @@ void Metadata::InternalData::drop_aggregate(const std::string& keyspace_name, co
   i->second.drop_aggregate(full_aggregate_name);
 }
 
-void Metadata::InternalData::update_columns(int version, const VersionNumber& cassandra_version, ResultResponse* result) {
+void Metadata::InternalData::update_columns(const MetadataConfig& config, ResultResponse* result) {
   SharedRefPtr buffer = result->buffer();
 
   result->decode_first_row();
   ResultIterator rows(result);
 
   std::string keyspace_name;
-  std::string columnfamily_name;
+  std::string table_name;
   std::string column_name;
 
   KeyspaceMetadata* keyspace = NULL;
@@ -1530,13 +1671,14 @@ void Metadata::InternalData::update_columns(int version, const VersionNumber& ca
 
   while (rows.next()) {
     std::string temp_keyspace_name;
-    std::string temp_columnfamily_name;
+    std::string temp_table_name;
     const Row* row = rows.row();
 
     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
-        !row->get_string_by_name("columnfamily_name", &temp_columnfamily_name) ||
+        !row->get_string_by_name(table_column_name(config.cassandra_version), &temp_table_name) ||
         !row->get_string_by_name("column_name", &column_name)) {
-      LOG_ERROR("Unable to get column value for 'keyspace_name', 'columnfamily_name' or 'column_name'");
+      LOG_ERROR("Unable to get column value for 'keyspace_name', '%s' or 'column_name'",
+                table_column_name(config.cassandra_version));
       continue;
     }
 
@@ -1545,22 +1687,23 @@ void Metadata::InternalData::update_columns(int version, const VersionNumber& ca
       keyspace = get_or_create_keyspace(keyspace_name);
     }
 
-    if (columnfamily_name != temp_columnfamily_name) {
+    if (table_name != temp_table_name) {
       // Build keys for the previous table
       if (table) {
-        table->build_keys_and_sort(cassandra_version);
+        table->build_keys_and_sort(config);
       }
-      columnfamily_name = temp_columnfamily_name;
-      table = keyspace->get_or_create_table(columnfamily_name);
+      table_name = temp_table_name;
+      table = keyspace->get_or_create_table(table_name);
       table->clear_columns();
     }
 
-    table->add_column(ColumnMetadata::Ptr(new ColumnMetadata(column_name, version, buffer, row)));
+    table->add_column(ColumnMetadata::Ptr(new ColumnMetadata(config, column_name,
+                                                             keyspace, buffer, row)));
   }
 
   // Build keys for the last table
   if (table) {
-    table->build_keys_and_sort(cassandra_version);
+    table->build_keys_and_sort(config);
   }
 }
 
diff --git a/src/metadata.hpp b/src/metadata.hpp
index ec39012b4..60004d95b 100644
--- a/src/metadata.hpp
+++ b/src/metadata.hpp
@@ -24,7 +24,8 @@
 #include "scoped_lock.hpp"
 #include "scoped_ptr.hpp"
 #include "token_map.hpp"
-#include "type_parser.hpp"
+#include "data_type.hpp"
+#include "value.hpp"
 
 #include