Skip to content

Impala

One goal of Ibis is to provide an integrated Python API for an Impala cluster without requiring you to switch back and forth between Python code and the Impala shell.

ibis.memtable Support memtable

The Impala backend supports memtables by constructing a string with the contents of the in-memory object. This will be very inefficient for medium to large in-memory tables. Please file an issue if you observe performance issues when using in-memory tables.

Install

Install ibis and dependencies for the Impala backend:

pip install 'ibis-framework[impala]'
conda install -c conda-forge ibis-impala
mamba install -c conda-forge ibis-impala

Connect

API

Create a client by passing in connection parameters to ibis.impala.connect.

See ibis.backends.impala.Backend.do_connect for connection parameter information.

ibis.impala.connect is a thin wrapper around ibis.backends.impala.Backend.do_connect.

Connection Parameters

do_connect(host='localhost', port=21050, database='default', timeout=45, use_ssl=False, ca_cert=None, user=None, password=None, auth_mechanism='NOSASL', kerberos_service_name='impala', pool_size=8, hdfs_client=None)

Create an Impala Backend for use with Ibis.

Parameters:

Name Type Description Default
host str

Host name of the impalad or HiveServer2 in Hive

'localhost'
port int

Impala's HiveServer2 port

21050
database str

Default database when obtaining new cursors

'default'
timeout int

Connection timeout in seconds when communicating with HiveServer2

45
use_ssl bool

Use SSL when connecting to HiveServer2

False
ca_cert str | Path | None

Local path to 3rd party CA certificate or copy of server certificate for self-signed certificates. If SSL is enabled, but this argument is None, then certificate validation is skipped.

None
user str | None

LDAP user to authenticate

None
password str | None

LDAP password to authenticate

None
auth_mechanism Literal['NOSASL', 'PLAIN', 'GSSAPI', 'LDAP']
Value Meaning
'NOSASL' insecure Impala connections
'PLAIN' insecure Hive clusters
'LDAP' LDAP authenticated connections
'GSSAPI' Kerberos-secured clusters
'NOSASL'
kerberos_service_name str

Specify a particular impalad service principal.

'impala'
pool_size int

Size of the connection pool. Typically this is not necessary to configure.

8
hdfs_client fsspec.spec.AbstractFileSystem | None

An existing HDFS client.

None

Examples:

>>> import os
>>> import ibis
>>> hdfs_host = os.environ.get('IBIS_TEST_NN_HOST', 'localhost')
>>> hdfs_port = int(os.environ.get('IBIS_TEST_NN_PORT', 50070))
>>> impala_host = os.environ.get('IBIS_TEST_IMPALA_HOST', 'localhost')
>>> impala_port = int(os.environ.get('IBIS_TEST_IMPALA_PORT', 21050))
>>> hdfs = ibis.impala.hdfs_connect(host=hdfs_host, port=hdfs_port)
>>> client = ibis.impala.connect(
...     host=impala_host,
...     port=impala_port,
...     hdfs_client=hdfs,
... )
>>> client
<ibis.backends.impala.Backend object at 0x...>

Both method calls can take auth_mechanism='GSSAPI' or auth_mechanism='LDAP' to connect to Kerberos clusters. Depending on your cluster setup, this may also include SSL. See the API reference for more, along with the Impala shell reference, as the connection semantics are identical.

These methods are available on the Impala client object after connecting to your HDFS cluster (ibis.impala.hdfs_connect) and connecting to Impala with ibis.impala.connect. See backends.impala for a tutorial on using this backend.

Database methods

Backend

Bases: BaseSQLBackend

Functions

create_database(name, path=None, force=False)

Create a new Impala database.

Parameters:

Name Type Description Default
name

Database name

required
path

HDFS path where to store the database data; otherwise uses Impala default

None
force

Forcibly create the database

False
drop_database(name, force=False)

Drop an Impala database.

Parameters:

Name Type Description Default
name

Database name

required
force

If False and there are any tables in this database, raises an IntegrityError

False
list_databases(like=None)

Table methods

The Backend object itself has many helper utility methods. You'll find the most methods on ImpalaTable.

Backend

Bases: BaseSQLBackend

Functions

table(name, database=None, **kwargs)
list_tables(like=None, database=None)
drop_table(name, *, database=None, force=False)

Drop an Impala table.

Parameters:

Name Type Description Default
name str

Table name

required
database str | None

Database name

None
force bool

Database may throw exception if table does not exist

False

Examples:

>>> table = 'my_table'
>>> db = 'operations'
>>> con.drop_table(table, database=db, force=True)
create_table(name, obj=None, *, schema=None, database=None, temp=None, overwrite=False, external=False, format='parquet', location=None, partition=None, like_parquet=None)

Create a new table in Impala using an Ibis table expression.

This is currently designed for tables whose data is stored in HDFS.

Parameters:

Name Type Description Default
name str

Table name

required
obj ir.Table | None

If passed, creates table from select statement results

None
schema

Mutually exclusive with obj, creates an empty table with a particular schema

None
database

Database name

None
temp bool | None

Whether a table is temporary

None
overwrite bool

Do not create table if table with indicated name already exists

False
external bool

Create an external table; Impala will not delete the underlying data when the table is dropped

False
format

File format

'parquet'
location

Specify the directory location where Impala reads and writes files for the table

None
partition

Must pass a schema to use this. Cannot partition from an expression.

None
like_parquet

Can specify instead of a schema

None
insert(table_name, obj=None, database=None, overwrite=False, partition=None, values=None, validate=True)

Insert data into an existing table.

See ImpalaTable.insert for parameters.

Examples:

>>> table = 'my_table'
>>> con.insert(table, table_expr)

Completely overwrite contents

>>> con.insert(table, table_expr, overwrite=True)
invalidate_metadata(name=None, database=None)

Issue an INVALIDATE METADATA command.

Optionally this applies to a specific table. See Impala documentation.

Parameters:

Name Type Description Default
name str | None

Table name. Can be fully qualified (with database)

None
database str | None

Database name

None
truncate_table(name, database=None)

Delete all rows from an existing table.

Parameters:

Name Type Description Default
name str

Table name

required
database str | None

Database name

None
get_schema(table_name, database=None)

Return a Schema object for the indicated table and database.

Parameters:

Name Type Description Default
table_name str

Table name

required
database str | None

Database name

None

Returns:

Type Description
Schema

Ibis schema

cache_table(table_name, *, database=None, pool='default')

Caches a table in cluster memory in the given pool.

Parameters:

Name Type Description Default
table_name

Table name

required
database

Database name

None
pool

The name of the pool in which to cache the table

'default'

Examples:

>>> table = 'my_table'
>>> db = 'operations'
>>> pool = 'op_4GB_pool'
>>> con.cache_table('my_table', database=db, pool=pool)
get_options()

Return current query options for the Impala session.

set_options(options)
set_compression_codec(codec)

The best way to interact with a single table is through the ImpalaTable object you get back from Backend.table.

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

Attributes

describe_formatted = metadata class-attribute instance-attribute
is_partitioned property

True if the table is partitioned.

Functions

add_partition(spec, location=None)

Add a new table partition.

This API creates any necessary new directories in HDFS.

Partition parameters can be set in a single DDL statement or you can use alter_partition to set them after the fact.

alter(location=None, format=None, tbl_properties=None, serde_properties=None)

Change settings and parameters of the table.

Parameters:

Name Type Description Default
location

For partitioned tables, you may want the alter_partition function

None
format

Table format

None
tbl_properties

Table properties

None
serde_properties

Serialization/deserialization properties

None
alter_partition(spec, location=None, format=None, tbl_properties=None, serde_properties=None)

Change settings and parameters of an existing partition.

Parameters:

Name Type Description Default
spec

The partition keys for the partition being modified

required
location

Location of the partition

None
format

Table format

None
tbl_properties

Table properties

None
serde_properties

Serialization/deserialization properties

None
column_stats()

Return results of SHOW COLUMN STATS.

Returns:

Type Description
DataFrame

Column statistics

compute_stats(incremental=False)

Invoke Impala COMPUTE STATS command on the table.

drop()

Drop the table from the database.

drop_partition(spec)

Drop an existing table partition.

files()

Return results of SHOW FILES statement.

insert(obj=None, overwrite=False, partition=None, values=None, validate=True)

Insert into an Impala table.

Parameters:

Name Type Description Default
obj

Table expression or DataFrame

None
overwrite

If True, will replace existing contents of table

False
partition

For partitioned tables, indicate the partition that's being inserted into, either with an ordered list of partition keys or a dict of partition field name to value. For example for the partition (year=2007, month=7), this can be either (2007, 7) or {'year': 2007, 'month': 7}.

None
values

Unsupported and unused

None
validate

If True, do more rigorous validation that schema of table being inserted is compatible with the existing table

True

Examples:

>>> t.insert(table_expr)

Completely overwrite contents

>>> t.insert(table_expr, overwrite=True)
invalidate_metadata()
load_data(path, overwrite=False, partition=None)

Load data into an Impala table.

Parameters:

Name Type Description Default
path

Data to load

required
overwrite

Overwrite the existing data in the entire table or indicated partition

False
partition

If specified, the partition must already exist

None
metadata()

Return results of DESCRIBE FORMATTED statement.

partition_schema()

Return the schema for the partition columns.

partitions()

Return information about the table's partitions.

Raises an exception if the table is not partitioned.

refresh()
rename(new_name, database=None)

Rename table inside Impala.

References to the old table are no longer valid.

stats()

Return results of SHOW TABLE STATS.

If not partitioned, contains only one row.

Returns:

Type Description
DataFrame

Table statistics

Creating views

Backend

Bases: BaseSQLBackend

Functions

drop_table_or_view(name, *, database=None, force=False)

Drop view or table.

create_view(name, obj, *, database=None, overwrite=False)

Accessing data formats in HDFS

Backend

Bases: BaseSQLBackend

Functions

delimited_file(hdfs_dir, schema, name=None, database=None, delimiter=',', na_rep=None, escapechar=None, lineterminator=None, external=True, persist=False)

Interpret delimited text files as an Ibis table expression.

See the parquet_file method for more details on what happens under the hood.

Parameters:

Name Type Description Default
hdfs_dir

HDFS directory containing delimited text files

required
schema

Ibis schema

required
name

Name for temporary or persistent table; otherwise random names are generated

None
database

Database to create the table in

None
delimiter

Character used to delimit columns

','
na_rep

Character used to represent NULL values

None
escapechar

Character used to escape special characters

None
lineterminator

Character used to delimit lines

None
external

Create table as EXTERNAL (data will not be deleted on drop). Not that if persist=False and external=False, whatever data you reference will be deleted

True
persist

If True, do not delete the table upon garbage collection of ibis table object

False

Returns:

Type Description
ImpalaTable

Impala table expression

parquet_file(hdfs_dir, schema=None, name=None, database=None, external=True, like_file=None, like_table=None, persist=False)

Make indicated parquet file in HDFS available as an Ibis table.

The table created can be optionally named and persisted, otherwise a unique name will be generated. Temporarily, for any non-persistent external table created by Ibis we will attempt to drop it when the underlying object is garbage collected (or the Python interpreter shuts down normally).

Parameters:

Name Type Description Default
hdfs_dir

Path in HDFS

required
schema

If no schema provided, and neither of the like_* argument is passed, one will be inferred from one of the parquet files in the directory.

None
like_file

Absolute path to Parquet file in HDFS to use for schema definitions. An alternative to having to supply an explicit schema

None
like_table

Fully scoped and escaped string to an Impala table whose schema we will use for the newly created table.

None
name

Random unique name generated otherwise

None
database

Database to create the (possibly temporary) table in

None
external

If a table is external, the referenced data will not be deleted when the table is dropped in Impala. Otherwise (external=False) Impala takes ownership of the Parquet file.

True
persist

Do not drop the table during garbage collection

False

Returns:

Type Description
ImpalaTable

Impala table expression

avro_file(hdfs_dir, avro_schema, name=None, database=None, external=True, persist=False)

Create a table to read a collection of Avro data.

Parameters:

Name Type Description Default
hdfs_dir

Absolute HDFS path to directory containing avro files

required
avro_schema

The Avro schema for the data as a Python dict

required
name

Table name

None
database

Database name

None
external

Whether the table is external

True
persist

Persist the table

False

Returns:

Type Description
ImpalaTable

Impala table expression

HDFS Interaction

Ibis delegates all HDFS interaction to the fsspec library.

The Impala client object

To use Ibis with Impala, you first must connect to a cluster using the ibis.impala.connect function, optionally supplying an HDFS connection:

import ibis

hdfs = ibis.impala.hdfs_connect(host=webhdfs_host, port=webhdfs_port)
client = ibis.impala.connect(host=impala_host, port=impala_port, hdfs_client=hdfs)

All examples here use the following block of code to connect to impala using docker:

import ibis

hdfs = ibis.impala.hdfs_connect(host="localhost", port=50070)
client = ibis.impala.connect(host=host, hdfs_client=hdfs)

You can accomplish many tasks directly through the client object, but we additionally provide APIs to streamline tasks involving a single Impala table or database.

Table objects

table(name, database=None)

Construct a table expression.

Parameters:

Name Type Description Default
name str

Table name

required
database str | None

Database name

None

Returns:

Type Description
Table

Table expression

The client's table method allows you to create an Ibis table expression referencing a physical Impala table:

table = client.table('functional_alltypes', database='ibis_testing')

ImpalaTable is a Python subclass of the more general Ibis Table that has additional Impala-specific methods. So you can use it interchangeably with any code expecting a Table.

Like all table expressions in Ibis, ImpalaTable has a schema method you can use to examine its schema:

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

While the client has a drop_table method you can use to drop tables, the table itself has a method drop that you can use:

table.drop()

Expression execution

Ibis expressions have execution methods like to_pandas that compile and run the expressions on Impala or whichever backend is being referenced.

For example:

>>> fa = db.functional_alltypes
>>> expr = fa.double_col.sum()
>>> expr.to_pandas()
331785.00000000006

For longer-running queries, Ibis will attempt to cancel the query in progress if an interrupt is received.

Creating tables

There are several ways to create new Impala tables:

  • From an Ibis table expression
  • Empty, from a declared schema
  • Empty and partitioned

In all cases, you should use the create_table method either on the top-level client connection or a database object.

create_table(name, obj=None, *, schema=None, database=None, temp=None, overwrite=False, external=False, format='parquet', location=None, partition=None, like_parquet=None)

Create a new table in Impala using an Ibis table expression.

This is currently designed for tables whose data is stored in HDFS.

Parameters:

Name Type Description Default
name str

Table name

required
obj ir.Table | None

If passed, creates table from select statement results

None
schema

Mutually exclusive with obj, creates an empty table with a particular schema

None
database

Database name

None
temp bool | None

Whether a table is temporary

None
overwrite bool

Do not create table if table with indicated name already exists

False
external bool

Create an external table; Impala will not delete the underlying data when the table is dropped

False
format

File format

'parquet'
location

Specify the directory location where Impala reads and writes files for the table

None
partition

Must pass a schema to use this. Cannot partition from an expression.

None
like_parquet

Can specify instead of a schema

None

Creating tables from a table expression

If you pass an Ibis expression to create_table, Ibis issues a CREATE TABLE ... AS SELECT (CTAS) statement:

>>> table = db.table('functional_alltypes')
>>> expr = table.group_by('string_col').size()
>>> db.create_table('string_freqs', expr, format='parquet')

>>> freqs = db.table('string_freqs')
>>> freqs.to_pandas()
  string_col  count
0          9    730
1          3    730
2          6    730
3          4    730
4          1    730
5          8    730
6          2    730
7          7    730
8          5    730
9          0    730

>>> files = freqs.files()
>>> files
                                                Path  Size Partition
0  hdfs://impala:8020/user/hive/warehouse/ibis_te...  584B

>>> freqs.drop()

You can also choose to create an empty table and use insert (see below).

Creating an empty table

To create an empty table, you must declare an Ibis schema that will be translated to the appropriate Impala schema and data types.

As Ibis types are simplified compared with Impala types, this may expand in the future to include a more fine-grained schema declaration.

You can use the create_table method either on a database or client object.

schema = ibis.schema([('foo', 'string'),
                      ('year', 'int32'),
                      ('month', 'int16')])
name = 'new_table'
db.create_table(name, schema=schema)

By default, this stores the data files in the database default location. You can force a particular path with the location option.

from getpass import getuser
schema = ibis.schema([('foo', 'string'),
                      ('year', 'int32'),
                      ('month', 'int16')])
name = 'new_table'
location = '/home/{}/new-table-data'.format(getuser())
db.create_table(name, schema=schema, location=location)

If the schema matches a known table schema, you can always use the schema method to get a schema object:

>>> t = db.table('functional_alltypes')
>>> t.schema()
ibis.Schema {
  id               int32
  bool_col         boolean
  tinyint_col      int8
  smallint_col     int16
  int_col          int32
  bigint_col       int64
  float_col        float32
  double_col       float64
  date_string_col  string
  string_col       string
  timestamp_col    timestamp
  year             int32
  month            int32
}

Creating a partitioned table

To create an empty partitioned table, include a list of columns to be used as the partition keys.

schema = ibis.schema([('foo', 'string'),
                      ('year', 'int32'),
                      ('month', 'int16')])
name = 'new_table'
db.create_table(name, schema=schema, partition=['year', 'month'])

Partitioned tables

Ibis enables you to manage partitioned tables in various ways. Since each partition behaves as its own \"subtable\" sharing a common schema, each partition can have its own file format, directory path, serialization properties, and so forth.

There are a handful of table methods for adding and removing partitions and getting information about the partition schema and any existing partition data:

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

Attributes

is_partitioned property

True if the table is partitioned.

Functions

add_partition(spec, location=None)

Add a new table partition.

This API creates any necessary new directories in HDFS.

Partition parameters can be set in a single DDL statement or you can use alter_partition to set them after the fact.

drop_partition(spec)

Drop an existing table partition.

partition_schema()

Return the schema for the partition columns.

partitions()

Return information about the table's partitions.

Raises an exception if the table is not partitioned.

To address a specific partition in any method that is partition specific, you can either use a dict with the partition key names and values, or pass a list of the partition values:

schema = ibis.schema([('foo', 'string'),
                      ('year', 'int32'),
                      ('month', 'int16')])
name = 'new_table'
db.create_table(name, schema=schema, partition=['year', 'month'])

table = db.table(name)

table.add_partition({'year': 2007, 'month', 4})
table.add_partition([2007, 5])
table.add_partition([2007, 6])

table.drop_partition([2007, 6])

We'll cover partition metadata management and data loading below.

Inserting data into tables

If the schemas are compatible, you can insert into a table directly from an Ibis table expression:

>>> t = db.functional_alltypes
>>> db.create_table('insert_test', schema=t.schema())
>>> target = db.table('insert_test')

>>> target.insert(t[:3])
>>> target.insert(t[:3])
>>> target.insert(t[:3])

>>> target.to_pandas()
     id  bool_col  tinyint_col  ...           timestamp_col  year  month
0  5770      True            0  ... 2010-08-01 00:00:00.000  2010      8
1  5771     False            1  ... 2010-08-01 00:01:00.000  2010      8
2  5772      True            2  ... 2010-08-01 00:02:00.100  2010      8
3  5770      True            0  ... 2010-08-01 00:00:00.000  2010      8
4  5771     False            1  ... 2010-08-01 00:01:00.000  2010      8
5  5772      True            2  ... 2010-08-01 00:02:00.100  2010      8
6  5770      True            0  ... 2010-08-01 00:00:00.000  2010      8
7  5771     False            1  ... 2010-08-01 00:01:00.000  2010      8
8  5772      True            2  ... 2010-08-01 00:02:00.100  2010      8

[9 rows x 13 columns]

>>> target.drop()

If the table is partitioned, you must indicate the partition you are inserting into:

part = {'year': 2007, 'month': 4}
table.insert(expr, partition=part)

Managing table metadata

Ibis has functions that wrap many of the DDL commands for Impala table metadata.

Detailed table metadata: DESCRIBE FORMATTED

To get a handy wrangled version of DESCRIBE FORMATTED use the metadata method.

metadata()

Return results of DESCRIBE FORMATTED statement.

>>> t = client.table('ibis_testing.functional_alltypes')
>>> meta = t.metadata()
>>> meta
<class 'ibis.backends.impala.metadata.TableMetadata'>
{'info': {'CreateTime': datetime.datetime(2021, 1, 14, 21, 23, 8),
          'Database': 'ibis_testing',
          'LastAccessTime': 'UNKNOWN',
          'Location': 'hdfs://impala:8020/__ibis/ibis-testing-data/parquet/functional_alltypes',
          'Owner': 'root',
          'Protect Mode': 'None',
          'Retention': 0,
          'Table Parameters': {'COLUMN_STATS_ACCURATE': False,
                               'EXTERNAL': True,
                               'STATS_GENERATED_VIA_STATS_TASK': True,
                               'numFiles': 3,
                               'numRows': 7300,
                               'rawDataSize': '-1',
                               'totalSize': 106278,
                               'transient_lastDdlTime': datetime.datetime(2021, 1, 14, 21, 23, 17)},
          'Table Type': 'EXTERNAL_TABLE'},
 'schema': [('id', 'int'),
            ('bool_col', 'boolean'),
            ('tinyint_col', 'tinyint'),
            ('smallint_col', 'smallint'),
            ('int_col', 'int'),
            ('bigint_col', 'bigint'),
            ('float_col', 'float'),
            ('double_col', 'double'),
            ('date_string_col', 'string'),
            ('string_col', 'string'),
            ('timestamp_col', 'timestamp'),
            ('year', 'int'),
            ('month', 'int')],
 'storage info': {'Bucket Columns': '[]',
                  'Compressed': False,
                  'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
                  'Num Buckets': 0,
                  'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
                  'SerDe Library': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe',
                  'Sort Columns': '[]'}}

>>> meta.location
'hdfs://impala:8020/__ibis/ibis-testing-data/parquet/functional_alltypes'

>>> meta.create_time
datetime.datetime(2021, 1, 14, 21, 23, 8)

The files function is also available to see all of the physical HDFS data files backing a table:

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

Functions

files()

Return results of SHOW FILES statement.

>>> ss = c.table('tpcds_parquet.store_sales')

>>> ss.files()[:5]
                                                path      size  \
0  hdfs://localhost:20500/test-warehouse/tpcds.st...  160.61KB
1  hdfs://localhost:20500/test-warehouse/tpcds.st...  123.88KB
2  hdfs://localhost:20500/test-warehouse/tpcds.st...  139.28KB
3  hdfs://localhost:20500/test-warehouse/tpcds.st...  139.60KB
4  hdfs://localhost:20500/test-warehouse/tpcds.st...   62.84KB

                 partition
0  ss_sold_date_sk=2451803
1  ss_sold_date_sk=2451819
2  ss_sold_date_sk=2451772
3  ss_sold_date_sk=2451789
4  ss_sold_date_sk=2451741

Modifying table metadata

For unpartitioned tables, you can use the alter method to change its location, file format, and other properties. For partitioned tables, to change partition-specific metadata use alter_partition.

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

Functions

alter(location=None, format=None, tbl_properties=None, serde_properties=None)

Change settings and parameters of the table.

Parameters:

Name Type Description Default
location

For partitioned tables, you may want the alter_partition function

None
format

Table format

None
tbl_properties

Table properties

None
serde_properties

Serialization/deserialization properties

None
alter_partition(spec, location=None, format=None, tbl_properties=None, serde_properties=None)

Change settings and parameters of an existing partition.

Parameters:

Name Type Description Default
spec

The partition keys for the partition being modified

required
location

Location of the partition

None
format

Table format

None
tbl_properties

Table properties

None
serde_properties

Serialization/deserialization properties

None

For example, if you wanted to \"point\" an existing table at a directory of CSV files, you could run the following command:

from getpass import getuser

csv_props = {
    'serialization.format': ',',
    'field.delim': ',',
}
data_dir = '/home/{}/my-csv-files'.format(getuser())

table.alter(location=data_dir, format='text', serde_properties=csv_props)

If the table is partitioned, you can modify only the properties of a particular partition:

table.alter_partition(
    {'year': 2007, 'month': 5},
    location=data_dir,
    format='text',
    serde_properties=csv_props
)

Table statistics

Computing table and partition statistics

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

Functions

compute_stats(incremental=False)

Invoke Impala COMPUTE STATS command on the table.

Impala-backed physical tables have a method compute_stats that computes table, column, and partition-level statistics to assist with query planning and optimization. It is standard practice to invoke this after creating a table or loading new data:

table.compute_stats()

If you are using a recent version of Impala, you can also access the COMPUTE INCREMENTAL STATS DDL command:

table.compute_stats(incremental=True)

Seeing table and column statistics

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

Functions

column_stats()

Return results of SHOW COLUMN STATS.

Returns:

Type Description
DataFrame

Column statistics

stats()

Return results of SHOW TABLE STATS.

If not partitioned, contains only one row.

Returns:

Type Description
DataFrame

Table statistics

The compute_stats and stats functions return the results of SHOW COLUMN STATS and SHOW TABLE STATS, respectively, and their output will depend, of course, on the last COMPUTE STATS call.

>>> ss = c.table('tpcds_parquet.store_sales')
>>> ss.compute_stats(incremental=True)
>>> stats = ss.stats()
>>> stats[:5]
  ss_sold_date_sk  #Rows  #Files     Size Bytes Cached Cache Replication  \
0         2450829   1071       1  78.34KB   NOT CACHED        NOT CACHED
1         2450846    839       1  61.83KB   NOT CACHED        NOT CACHED
2         2450860    747       1  54.86KB   NOT CACHED        NOT CACHED
3         2450874    922       1  66.74KB   NOT CACHED        NOT CACHED
4         2450888    856       1  63.33KB   NOT CACHED        NOT CACHED

    Format Incremental stats  \
0  PARQUET              true
1  PARQUET              true
2  PARQUET              true
3  PARQUET              true
4  PARQUET              true

                                            Location
0  hdfs://localhost:20500/test-warehouse/tpcds.st...
1  hdfs://localhost:20500/test-warehouse/tpcds.st...
2  hdfs://localhost:20500/test-warehouse/tpcds.st...
3  hdfs://localhost:20500/test-warehouse/tpcds.st...
4  hdfs://localhost:20500/test-warehouse/tpcds.st...

>>> cstats = ss.column_stats()
>>> cstats
                   Column          Type  #Distinct Values  #Nulls  Max Size  Avg Size
0         ss_sold_time_sk        BIGINT             13879      -1       NaN         8
1              ss_item_sk        BIGINT             17925      -1       NaN         8
2          ss_customer_sk        BIGINT             15207      -1       NaN         8
3             ss_cdemo_sk        BIGINT             16968      -1       NaN         8
4             ss_hdemo_sk        BIGINT              6220      -1       NaN         8
5              ss_addr_sk        BIGINT             14077      -1       NaN         8
6             ss_store_sk        BIGINT                 6      -1       NaN         8
7             ss_promo_sk        BIGINT               298      -1       NaN         8
8        ss_ticket_number           INT             15006      -1       NaN         4
9             ss_quantity           INT                99      -1       NaN         4
10      ss_wholesale_cost  DECIMAL(7,2)             10196      -1       NaN         4
11          ss_list_price  DECIMAL(7,2)             19393      -1       NaN         4
12         ss_sales_price  DECIMAL(7,2)             15594      -1       NaN         4
13    ss_ext_discount_amt  DECIMAL(7,2)             29772      -1       NaN         4
14     ss_ext_sales_price  DECIMAL(7,2)            102758      -1       NaN         4
15  ss_ext_wholesale_cost  DECIMAL(7,2)            125448      -1       NaN         4
16      ss_ext_list_price  DECIMAL(7,2)            141419      -1       NaN         4
17             ss_ext_tax  DECIMAL(7,2)             33837      -1       NaN         4
18          ss_coupon_amt  DECIMAL(7,2)             29772      -1       NaN         4
19            ss_net_paid  DECIMAL(7,2)            109981      -1       NaN         4
20    ss_net_paid_inc_tax  DECIMAL(7,2)            132286      -1       NaN         4
21          ss_net_profit  DECIMAL(7,2)            122436      -1       NaN         4
22        ss_sold_date_sk        BIGINT               120       0       NaN         8

REFRESH and INVALIDATE METADATA

These DDL commands are available as table-level and client-level methods:

Backend

Bases: BaseSQLBackend

Functions

invalidate_metadata(name=None, database=None)

Issue an INVALIDATE METADATA command.

Optionally this applies to a specific table. See Impala documentation.

Parameters:

Name Type Description Default
name str | None

Table name. Can be fully qualified (with database)

None
database str | None

Database name

None

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

Functions

invalidate_metadata()
refresh()

You can invalidate the cached metadata for a single table or for all tables using invalidate_metadata, and similarly invoke REFRESH db_name.table_name using the refresh method.

client.invalidate_metadata()

table = db.table(table_name)
table.invalidate_metadata()

table.refresh()

These methods are often used in conjunction with the LOAD DATA commands and COMPUTE STATS. See the Impala documentation for full details.

Issuing LOAD DATA commands

The LOAD DATA DDL physically moves a single data file or a directory of files into the correct location for a table or table partition. It is especially useful for partitioned tables as you do not have to construct the directory path for a partition by hand, so simpler and less error-prone than manually moving files with low level HDFS commands. It also deals with file name conflicts so data is not lost in such cases.

Backend

ImpalaTable

Bases: ir.Table

A physical table in the Impala-Hive metastore.

Functions

load_data(path, overwrite=False, partition=None)

Load data into an Impala table.

Parameters:

Name Type Description Default
path

Data to load

required
overwrite

Overwrite the existing data in the entire table or indicated partition

False
partition

If specified, the partition must already exist

None

To use these methods, pass the path of a single file or a directory of files you want to load. Afterward, you may want to update the table statistics (see Impala documentation):

table.load_data(path)
table.refresh()

Like the other methods with support for partitioned tables, you can load into a particular partition with the partition keyword argument:

part = [2007, 5]
table.load_data(path, partition=part)

Parquet and other session options

Ibis gives you access to Impala session-level variables that affect query execution:

Backend

Bases: BaseSQLBackend

Functions

disable_codegen(disabled=True)

Turn off or on LLVM codegen in Impala query execution.

Parameters:

Name Type Description Default
disabled

To disable codegen, pass with no argument or True. To enable codegen, pass False.

True
get_options()

Return current query options for the Impala session.

set_options(options)
set_compression_codec(codec)

For example:

>>> client.get_options()
{'ABORT_ON_ERROR': '0',
 'APPX_COUNT_DISTINCT': '0',
 'BUFFER_POOL_LIMIT': '',
 'COMPRESSION_CODEC': '',
 'COMPUTE_STATS_MIN_SAMPLE_SIZE': '1073741824',
 'DEFAULT_JOIN_DISTRIBUTION_MODE': '0',
 'DEFAULT_SPILLABLE_BUFFER_SIZE': '2097152',
 'DISABLE_CODEGEN': '0',
 'DISABLE_CODEGEN_ROWS_THRESHOLD': '50000',
 'DISABLE_ROW_RUNTIME_FILTERING': '0',
 'DISABLE_STREAMING_PREAGGREGATIONS': '0',
 'DISABLE_UNSAFE_SPILLS': '0',
 'ENABLE_EXPR_REWRITES': '1',
 'EXEC_SINGLE_NODE_ROWS_THRESHOLD': '100',
 'EXEC_TIME_LIMIT_S': '0',
 'EXPLAIN_LEVEL': '1',
 'HBASE_CACHE_BLOCKS': '0',
 'HBASE_CACHING': '0',
 'IDLE_SESSION_TIMEOUT': '0',
 'MAX_ERRORS': '100',
 'MAX_NUM_RUNTIME_FILTERS': '10',
 'MAX_ROW_SIZE': '524288',
 'MEM_LIMIT': '0',
 'MIN_SPILLABLE_BUFFER_SIZE': '65536',
 'MT_DOP': '',
 'NUM_SCANNER_THREADS': '0',
 'OPTIMIZE_PARTITION_KEY_SCANS': '0',
 'PARQUET_ANNOTATE_STRINGS_UTF8': '0',
 'PARQUET_ARRAY_RESOLUTION': '2',
 'PARQUET_DICTIONARY_FILTERING': '1',
 'PARQUET_FALLBACK_SCHEMA_RESOLUTION': '0',
 'PARQUET_FILE_SIZE': '0',
 'PARQUET_READ_STATISTICS': '1',
 'PREFETCH_MODE': '1',
 'QUERY_TIMEOUT_S': '0',
 'REPLICA_PREFERENCE': '0',
 'REQUEST_POOL': '',
 'RUNTIME_BLOOM_FILTER_SIZE': '1048576',
 'RUNTIME_FILTER_MAX_SIZE': '16777216',
 'RUNTIME_FILTER_MIN_SIZE': '1048576',
 'RUNTIME_FILTER_MODE': '2',
 'RUNTIME_FILTER_WAIT_TIME_MS': '0',
 'S3_SKIP_INSERT_STAGING': '1',
 'SCHEDULE_RANDOM_REPLICA': '0',
 'SCRATCH_LIMIT': '-1',
 'SEQ_COMPRESSION_MODE': '',
 'SYNC_DDL': '0'}

To enable Snappy compression for Parquet files, you could do either of:

>>> client.set_options({'COMPRESSION_CODEC': 'snappy'})
>>> client.set_compression_codec('snappy')

>>> client.get_options()['COMPRESSION_CODEC']
'SNAPPY'

Ingesting data from pandas

Overall interoperability between the Hadoop / Spark ecosystems and pandas / the PyData stack is poor, but it will improve in time (this is a major part of the Ibis roadmap).

Ibis's Impala tools currently interoperate with pandas in these ways:

  • Ibis expressions return pandas objects (i.e. DataFrame or Series) for non-scalar expressions when calling their to_pandas method
  • The create_table and insert methods can accept pandas objects. This includes inserting into partitioned tables. It currently uses CSV as the ingest route.

For example:

>>> import pandas as pd

>>> data = pd.DataFrame({'foo': [1, 2, 3, 4], 'bar': ['a', 'b', 'c', 'd']})

>>> db.create_table('pandas_table', data)
>>> t = db.pandas_table
>>> t.to_pandas()
  bar  foo
0   a    1
1   b    2
2   c    3
3   d    4

>>> t.drop()

>>> db.create_table('empty_for_insert', schema=t.schema())

>>> to_insert = db.empty_for_insert
>>> to_insert.insert(data)
>>> to_insert.to_pandas()
  bar  foo
0   a    1
1   b    2
2   c    3
3   d    4

>>> to_insert.drop()
>>> import pandas as pd

>>> data = pd.DataFrame({'foo': [1, 2, 3, 4], 'bar': ['a', 'b', 'c', 'd']})

>>> db.create_table('pandas_table', data)
>>> t = db.pandas_table
>>> t.to_pandas()
   foo bar
0    1   a
1    2   b
2    3   c
3    4   d

>>> t.drop()
>>> db.create_table('empty_for_insert', schema=t.schema())
>>> to_insert = db.empty_for_insert
>>> to_insert.insert(data)
>>> to_insert.to_pandas()
   foo bar
0    1   a
1    2   b
2    3   c
3    4   d

>>> to_insert.drop()

Uploading / downloading data from HDFS

If you've set up an HDFS connection, you can use the Ibis HDFS interface to look through your data and read and write files to and from HDFS:

>>> hdfs = con.hdfs
>>> hdfs.ls('/__ibis/ibis-testing-data')
['README.md',
 'avro',
 'awards_players.csv',
 'batting.csv',
 'csv',
 'diamonds.csv',
 'functional_alltypes.csv',
 'functional_alltypes.parquet',
 'geo.csv',
 'ibis_testing.db',
 'parquet',
 'struct_table.avro',
 'udf']
>>> hdfs.ls('/__ibis/ibis-testing-data/parquet')
['functional_alltypes',
 'tpch_customer',
 'tpch_lineitem',
 'tpch_nation',
 'tpch_orders',
 'tpch_part',
 'tpch_partsupp',
 'tpch_region',
 'tpch_supplier']

Suppose we wanted to download /__ibis/ibis-testing-data/parquet/functional_alltypes, which is a directory. We need only do:

$ rm -rf parquet_dir/
>>> hdfs.get('/__ibis/ibis-testing-data/parquet/functional_alltypes',
...          'parquet_dir',
...           recursive=True)
'/ibis/docs/source/tutorial/parquet_dir'

Now we have that directory locally:

$ ls parquet_dir/
9a41de519352ab07-4e76bc4d9fb5a789_1624886651_data.0.parq
9a41de519352ab07-4e76bc4d9fb5a78a_778826485_data.0.parq
9a41de519352ab07-4e76bc4d9fb5a78b_1277612014_data.0.parq

Files and directories can be written to HDFS just as easily using put:

>>> path = '/__ibis/dir-write-example'
>>> hdfs.rm(path, recursive=True)
>>> hdfs.put(path, 'parquet_dir', recursive=True)
>>> hdfs.ls('/__ibis/dir-write-example')
['9a41de519352ab07-4e76bc4d9fb5a789_1624886651_data.0.parq',
 '9a41de519352ab07-4e76bc4d9fb5a78a_778826485_data.0.parq',
 '9a41de519352ab07-4e76bc4d9fb5a78b_1277612014_data.0.parq']

Delete files and directories with rm:

>>> hdfs.rm('/__ibis/dir-write-example', recursive=True)
rm -rf parquet_dir/

Queries on Parquet, Avro, and Delimited files in HDFS

Ibis can easily create temporary or persistent Impala tables that reference data in the following formats:

  • Parquet (parquet_file)
  • Avro (avro_file)
  • Delimited text formats (CSV, TSV, etc.) (delimited_file)

Parquet is the easiest because the schema can be read from the data files:

>>> path = '/__ibis/ibis-testing-data/parquet/tpch_lineitem'
>>> lineitem = con.parquet_file(path)
>>> lineitem.limit(2)
   l_orderkey  l_partkey  l_suppkey  l_linenumber l_quantity l_extendedprice  \
0           1     155190       7706             1      17.00        21168.23
1           1      67310       7311             2      36.00        45983.16

  l_discount l_tax l_returnflag l_linestatus  l_shipdate l_commitdate  \
0       0.04  0.02            N            O  1996-03-13   1996-02-12
1       0.09  0.06            N            O  1996-04-12   1996-02-28

  l_receiptdate     l_shipinstruct l_shipmode  \
0    1996-03-22  DELIVER IN PERSON      TRUCK
1    1996-04-20   TAKE BACK RETURN       MAIL

                            l_comment
0             egular courts above the
1  ly final dependencies: slyly bold
>>> lineitem.l_extendedprice.sum()
Decimal('229577310901.20')

If you want to query a Parquet file and also create a table in Impala that remains after your session, you can pass more information to parquet_file:

>>> table = con.parquet_file(path, name='my_parquet_table',
...                          database='ibis_testing',
...                          persist=True)
>>> table.l_extendedprice.sum()
Decimal('229577310901.20')
>>> con.table('my_parquet_table').l_extendedprice.sum()
Decimal('229577310901.20')
>>> con.drop_table('my_parquet_table')

To query delimited files, you need to write down an Ibis schema. At some point we'd like to build some helper tools that will infer the schema for you, all in good time.

There's some CSV files in the test folder, so let's use those:

>>> hdfs.get('/__ibis/ibis-testing-data/csv', 'csv-files', recursive=True)
'/ibis/docs/source/tutorial/csv-files'
$ cat csv-files/0.csv
63IEbRheTh,0.679388707915,6
mG4hlqnjeG,2.80710565922,15
JTPdX9SZH5,-0.155126406372,55
2jcl6FypOl,1.03787834032,21
k3TbJLaadQ,-1.40190801103,23
rP5J4xvinM,-0.442092712869,22
WniUylixYt,-0.863748033806,27
znsDuKOB1n,-0.566029637098,47
4SRP9jlo1M,0.331460412318,88
KsfjPyDf5e,-0.578930506363,70
$ rm -rf csv-files/

The schema here is pretty simple (see ibis.schema for more):

>>> schema = ibis.schema([('foo', 'string'),
...                       ('bar', 'double'),
...                       ('baz', 'int32')])

>>> table = con.delimited_file('/__ibis/ibis-testing-data/csv',
...                            schema)
>>> table.limit(10)
          foo       bar  baz
0  63IEbRheTh  0.679389    6
1  mG4hlqnjeG  2.807106   15
2  JTPdX9SZH5 -0.155126   55
3  2jcl6FypOl  1.037878   21
4  k3TbJLaadQ -1.401908   23
5  rP5J4xvinM -0.442093   22
6  WniUylixYt -0.863748   27
7  znsDuKOB1n -0.566030   47
8  4SRP9jlo1M  0.331460   88
9  KsfjPyDf5e -0.578931   70
>>> table.bar.summary()
   count  nulls       min       max       sum    mean  approx_nunique
0    100      0 -1.401908  2.807106  8.479978  0.0848              10

For functions like parquet_file and delimited_file, an HDFS directory must be passed (we'll add support for S3 and other filesystems later) and the directory must contain files all having the same schema.

If you have Avro data, you can query it too if you have the full avro schema:

>>> avro_schema = {
...     "fields": [
...         {"type": ["int", "null"], "name": "R_REGIONKEY"},
...         {"type": ["string", "null"], "name": "R_NAME"},
...         {"type": ["string", "null"], "name": "R_COMMENT"}],
...     "type": "record",
...     "name": "a"
... }

>>> path = '/__ibis/ibis-testing-data/avro/tpch.region'

>>> hdfs.mkdir(path, create_parents=True)
>>> table = con.avro_file(path, avro_schema)
>>> table
Empty DataFrame
Columns: [r_regionkey, r_name, r_comment]
Index: []

Other helper functions for interacting with the database

We're adding a growing list of useful utility functions for interacting with an Impala cluster on the client object. The idea is that you should be able to do any database-admin-type work with Ibis and not have to switch over to the Impala SQL shell. Any ways we can make this more pleasant, please let us know.

Here's some of the features, which we'll give examples for:

  • Listing and searching for available databases and tables
  • Creating and dropping databases
  • Getting table schemas
>>> con.list_databases(like='ibis*')
['ibis_testing', 'ibis_testing_tmp_db']
>>> con.list_tables(database='ibis_testing', like='tpch*')
['tpch_customer',
 'tpch_lineitem',
 'tpch_nation',
 'tpch_orders',
 'tpch_part',
 'tpch_partsupp',
 'tpch_region',
 'tpch_region_avro',
 'tpch_supplier']
>>> schema = con.get_schema('functional_alltypes')
>>> schema
ibis.Schema {
  id               int32
  bool_col         boolean
  tinyint_col      int8
  smallint_col     int16
  int_col          int32
  bigint_col       int64
  float_col        float32
  double_col       float64
  date_string_col  string
  string_col       string
  timestamp_col    timestamp
  year             int32
  month            int32
}

Databases can be created, too, and you can set the storage path in HDFS you want for the data files

>>> db = 'ibis_testing2'
>>> con.create_database(db, path='/__ibis/my-test-database', force=True)

>>> # you may or may not have to give the impala user write and execute permissions to '/__ibis/my-test-database'
>>> hdfs.chmod('/__ibis/my-test-database', 0o777)
>>> con.create_table('example_table', con.table('functional_alltypes'),
...                  database=db, force=True)

Hopefully, there will be data files in the indicated spot in HDFS:

>>> hdfs.ls('/__ibis/my-test-database')
['example_table']

To drop a database, including all tables in it, you can use drop_database with force=True:

>>> con.drop_database(db, force=True)

Faster queries on small data in Impala

Since Impala internally uses LLVM to compile parts of queries (aka "codegen") to make them faster on large data sets there is a certain amount of overhead with running many kinds of queries, even on small datasets. You can disable LLVM code generation when using Ibis, which may significantly speed up queries on smaller datasets:

>>> from numpy.random import rand
>>> con.disable_codegen()
>>> t = con.table('ibis_testing.functional_alltypes')
$ time python -c "(t.double_col + rand()).sum().to_pandas()"
27.7 ms ± 996 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
# Turn codegen back on
con.disable_codegen(False)
$ time python -c "(t.double_col + rand()).sum().to_pandas()"
27 ms ± 1.62 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

It's important to remember that codegen is a fixed overhead and will significantly speed up queries on big data

User Defined functions (UDF)

Impala currently supports user-defined scalar functions (known henceforth as UDFs) and aggregate functions (respectively UDAs) via a C++ extension API.

Initial support for using C++ UDFs in Ibis came in version 0.4.0.

Using scalar functions (UDFs)

Let's take an example to illustrate how to make a C++ UDF available to Ibis. Here is a function that computes an approximate equality between floating point values:

#include "impala_udf/udf.h"

#include <cctype>
#include <cmath>

BooleanVal FuzzyEquals(FunctionContext* ctx, const DoubleVal& x, const DoubleVal& y) {
  const double EPSILON = 0.000001f;
  if (x.is_null || y.is_null) return BooleanVal::null();
  double delta = fabs(x.val - y.val);
  return BooleanVal(delta < EPSILON);
}

You can compile this to either a shared library (a .so file) or to LLVM bitcode with clang (a .ll file). Skipping that step for now (will add some more detailed instructions here later, promise).

To make this function callable, we use ibis.impala.wrap_udf:

library = '/ibis/udfs/udftest.ll'
inputs = ['double', 'double']
output = 'boolean'
symbol = 'FuzzyEquals'
udf_db = 'ibis_testing'
udf_name = 'fuzzy_equals'

fuzzy_equals = ibis.impala.wrap_udf(
    library, inputs, output, symbol, name=udf_name
)

In typical workflows, you will set up a UDF in Impala once then use it thenceforth. So the first time you do this, you need to create the UDF in Impala:

client.create_function(fuzzy_equals, database=udf_db)

Now, we must register this function as a new Impala operation in Ibis. This must take place each time you load your Ibis session.

func.register(fuzzy_equals.name, udf_db)

The object fuzzy_equals is callable and works with Ibis expressions:

>>> db = c.database('ibis_testing')

>>> t = db.functional_alltypes

>>> expr = fuzzy_equals(t.float_col, t.double_col / 10)

>>> expr.to_pandas()[:10]
0     True
1    False
2    False
3    False
4    False
5    False
6    False
7    False
8    False
9    False
Name: tmp, dtype: bool

Note that the call to register on the UDF object must happen each time you use Ibis. If you have a lot of UDFs, I suggest you create a file with all of your wrapper declarations and user APIs that you load with your Ibis session to plug in all your own functions.

Working with secure clusters (Kerberos)

Ibis is compatible with Hadoop clusters that are secured with Kerberos (as well as SSL and LDAP). Note that to enable this support, you'll also need to install the kerberos package.

$ pip install kerberos

Just like the Impala shell and ODBC/JDBC connectors, Ibis connects to Impala through the HiveServer2 interface (using the impyla client). Therefore, the connection semantics are similar to the other access methods for working with secure clusters.

Specifically, after authenticating yourself against Kerberos (e.g., by issuing the appropriate kinit command), simply pass auth_mechanism='GSSAPI' or auth_mechanism='LDAP' (and set kerberos_service_name if necessary along with user and password if necessary) to the ibis.impala_connect(...) method when instantiating an ImpalaConnection. This method also takes arguments to configure SSL (use_ssl, ca_cert). See the documentation for the Impala shell for more details.

Ibis also includes functionality that communicates directly with HDFS, using the WebHDFS REST API. When calling ibis.impala.hdfs_connect(...), also pass auth_mechanism='GSSAPI' or auth_mechanism='LDAP', and ensure that you are connecting to the correct port, which may likely be an SSL-secured WebHDFS port. Also note that you can pass verify=False to avoid verifying SSL certificates (which may be helpful in testing). Ibis will assume https when connecting to a Kerberized cluster. Because some Ibis commands create HDFS directories as well as new Impala databases and/or tables, your user will require the necessary privileges.

Default Configuration Values for CDH Components

Cloudera CDH ships with HDFS, Impala, Hive and many other components. Sometimes it's not obvious what default configuration values these tools are using or should be using.

Check out this link to see the default configuration values for every component of CDH.


Last update: August 1, 2023