ParquetDataset
Description
ParquetDataset supports reading data from parquet files.
ParquetDataset supports reading parquet files from local or S3/OSS/HDFS file systems.
UserAPI
Environment Variable
# If ARROW_NUM_THREADS > 0, specified number of threads will be used.
# If ARROW_NUM_THREADS = 0, no threads will be used.
# If ARROW_NUM_THREADS < 0, all threads will be used.
os.environ['ARROW_NUM_THREADS'] = '2'
ParquetDataset API
class ParquetDataset(dataset_ops.DatasetV2):
def __init__(
self, filenames,
batch_size=1,
fields=None,
partition_count=1,
partition_index=0,
drop_remainder=False,
num_parallel_reads=None,
num_sequential_reads=1):
# Create a `ParquetDataset` from filenames dataset.
def read_parquet(
batch_size,
fields=None,
partition_count=1,
partition_index=0,
drop_remainder=False,
num_parallel_reads=None,
num_sequential_reads=1):
filenames
: the filename of parquet file, This parameter can receive the following types.A 0-D or 1-D
tf.string
tensorstring
list
ortuple
ofstring
Dataset
containing one or more filenames.
batch_size
: (Optional.) Maxium number of samples in an output batch.fields
: (Optional.) List of DataFrame fields.filenames
parameter typefields
parameter requirementfields
parameter typeTensor
/DataSet
required
DataFrame.Field
/list
ortuple
ofDataFrame.Field
string
/list
ortuple
ofstring
optional, the default value means read all columns
DataFrame.Field
/list
ortuple
ofDataFrame.Field
/string
/list
ortuple
ofstring
partition_count
: (Optional.) Count of row group partitions.partition_index
: (Optional.) Index of row group partitions.drop_remainder
: (Optional.) If True, only keep batches with exactlybatch_size
samples.num_parallel_reads
: (Optional.) Atf.int64
scalar representing the number of files to read in parallel. Defaults to reading files sequentially.num_sequential_reads
: (Optional.) Atf.int64
scalar representing the number of batches to read in sequential. Defaults to 1.
DataFrame
A data frame is a table consisting of multiple named columns. A named column has a logical data type and a physical data type.
Logical Type of DataFrame
Logical Type |
Output Type |
---|---|
Scalar |
|
Fixed-Length List |
|
Variable-Length List |
|
Variable-Length Nested List |
|
Physical Type of DataFrame
Category |
Physical Type |
---|---|
Integers |
|
Numerics |
|
Text |
|
DataFrame API
class DataFrame(object):
class Field(object):
def __init__(self, name,
type=None,
ragged_rank=None,
shape=None):
class Value(collections.namedtuple(
'DataFrameValue', ['values', 'nested_row_splits'])):
def to_sparse(self, name=None):
# Convert values to tensors or sparse tensors from input dataset.
def to_sparse(num_parallel_calls=None):
DataFrame.Field API
name
: Name of column.type
: data type of column, such astf.int64
ragged_rank
: (optional.) Specify the number of nesting levels when column is a nested listshape
: (optional.) Specify the shape of column when column is a fixed-length list
Attention: For fix-length list, only the shape needs to be specified.
DataFrame.Value Conversion API (Use according to the actual situation)
Since there may be DataFrame.Value
types in the output of ParquetDataset that cannot be accessed by model directly, it needs to convert the DataFrame.Value
to SparseTensor. Please use the to_sparse
API for conversion.
import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
from tensorflow.python.data.experimental.ops import dataframe
ds = parquet_dataset_ops.ParquetDataset(...)
ds.apply(dataframe.to_sparse())
...
Examples
1. Read from one file on local filesystem
import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
# Read from a parquet file.
ds = parquet_dataset_ops.ParquetDataset('/path/to/f1.parquet',
batch_size=1024)
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}
2. Read from filenames dataset
import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
filenames = tf.data.Dataset.from_generator(func, tf.string, tf.TensorShape([]))
# Define data frame fields.
fields = [
parquet_dataset_ops.DataFrame.Field('A', tf.int64),
parquet_dataset_ops.DataFrame.Field('C', tf.int64, ragged_rank=1)]
# Read from parquet files by reading upstream filename dataset.
ds = filenames.apply(parquet_dataset_ops.read_parquet(1024, fields=fields))
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}
3. Read from files on HDFS
import tensorflow as tf
from tensorflow.python.data.experimental.ops import parquet_dataset_ops
# Read from parquet files on remote services for selected fields.
ds = parquet_dataset_ops.ParquetDataset(
['hdfs://host:port/path/to/f3.parquet'],
batch_size=1024,
fields=['a', 'c'])
ds = ds.prefetch(4)
it = tf.data.make_one_shot_iterator(ds)
batch = it.get_next()
# {'a': tensora, 'c': tensorc}