ParquetDataset

Description

  1. ParquetDataset supports reading data from parquet files.

  2. ParquetDataset supports reading parquet files from local or S3/OSS/HDFS file systems.

UserAPI

Environment Variable

# If ARROW_NUM_THREADS > 0, specified number of threads will be used.
# If ARROW_NUM_THREADS = 0, no threads will be used.
# If ARROW_NUM_THREADS < 0, all threads will be used.
os.environ['ARROW_NUM_THREADS'] = '2'

ParquetDataset API

class ParquetDataset(dataset_ops.DatasetV2):
  def __init__(
      self, filenames,
      batch_size=1,
      fields=None,
      partition_count=1,
      partition_index=0,
      drop_remainder=False,
      num_parallel_reads=None,
      num_sequential_reads=1):

# Create a `ParquetDataset` from filenames dataset.
def read_parquet(
    batch_size,
    fields=None,
    partition_count=1,
    partition_index=0,
    drop_remainder=False,
    num_parallel_reads=None,
    num_sequential_reads=1):
  • filenames: the filename of parquet file, This parameter can receive the following types.

    • A 0-D or 1-D tf.string tensor

    • string

    • list or tuple of string

    • Dataset containing one or more filenames.

  • batch_size: (Optional.) Maxium number of samples in an output batch.

  • fields: (Optional.) List of DataFrame fields.

    filenames parameter type

    fields parameter requirement

    fields parameter type

    Tensor/DataSet

    required

    DataFrame.Field/list or tuple of DataFrame.Field

    string/list or tuple of string

    optional, the default value means read all columns

    DataFrame.Field/list or tuple of DataFrame.Field/string/list or tuple of string

  • partition_count: (Optional.) Count of row group partitions.

  • partition_index: (Optional.) Index of row group partitions.

  • drop_remainder: (Optional.) If True, only keep batches with exactly batch_size samples.

  • num_parallel_reads: (Optional.)tf.int64 scalar representing the number of files to read in parallel. Defaults to reading files sequentially.

  • num_sequential_reads: (Optional.)tf.int64 scalar representing the number of batches to read in sequential. Defaults to 1.

DataFrame

A data frame is a table consisting of multiple named columns. A named column has a logical data type and a physical data type.

Logical Type of DataFrame

Logical Type

Output Type

Scalar

tf.Tensor/DataFrame.Value

Fixed-Length List

tf.Tensor/DataFrame.Value

Variable-Length List

tf.SparseTensor/DataFrame.Value

Variable-Length Nested List

tf.SparseTensor/DataFrame.Value

Physical Type of DataFrame

Category

Physical Type

Integers

int64 uint64 int32 uint32 int8 uint8

Numerics

float64 float32 float16

Text

string

DataFrame API

class DataFrame(object):
    class Field(object):
        def __init__(self, name,
            type=None,
            ragged_rank=None,
            shape=None):

    class Value(collections.namedtuple(
        'DataFrameValue', ['values', 'nested_row_splits'])):
        def to_sparse(self, name=None):

# Convert values to tensors or sparse tensors from input dataset.
def to_sparse(num_parallel_calls=None):
DataFrame.Field API
  • name: Name of column.

  • type: data type of column, such as tf.int64

  • ragged_rank: (optional.) Specify the number of nesting levels when column is a nested list

  • shape: (optional.) Specify the shape of column when column is a fixed-length list

Attention: For fix-length list, only the shape needs to be specified.

DataFrame.Value Conversion API (Use according to the actual situation)

Since there may be DataFrame.Value types in the output of ParquetDataset that cannot be accessed by model directly, it needs to convert the DataFrame.Value to SparseTensor. Please use the to_sparse API for conversion.

import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
from tensorflow.python.data.experimental.ops import dataframe

ds = parquet_dataset_ops.ParquetDataset(...)
ds.apply(dataframe.to_sparse())
...

Examples

1. Read from one file on local filesystem

import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops

# Read from a parquet file.
ds = parquet_dataset_ops.ParquetDataset('/path/to/f1.parquet',
                                        batch_size=1024)
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}

2. Read from filenames dataset

import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops

filenames = tf.data.Dataset.from_generator(func, tf.string, tf.TensorShape([]))
# Define data frame fields.
fields = [
    parquet_dataset_ops.DataFrame.Field('A', tf.int64),
    parquet_dataset_ops.DataFrame.Field('C', tf.int64, ragged_rank=1)]
# Read from parquet files by reading upstream filename dataset.
ds = filenames.apply(parquet_dataset_ops.read_parquet(1024, fields=fields))
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}

3. Read from files on HDFS

import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops

# Read from parquet files on remote services for selected fields.
ds = parquet_dataset_ops.ParquetDataset(
    ['hdfs://host:port/path/to/f3.parquet'],
    batch_size=1024,
    fields=['a', 'c'])
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}