Flink
https://nightlies.apache.org/flink/flink-docs-stable/

Install
Install Ibis and dependencies for the Flink backend:
Install alongside the apache-flink package:
pip install ibis-framework apache-flinkAnd connect:
import ibis
con = ibis.flink.connect()- 1
- Adjust connection parameters as needed.
Connect
ibis.flink.connect
con = ibis.flink.connect(table_env=table_env)ibis.flink.connect is a thin wrapper around ibis.backends.flink.Backend.do_connect.
The flink backend does not create TableEnvironment objects; you must create a TableEnvironment and pass that to ibis.flink.connect.
Connection Parameters
do_connect
do_connect(self, table_env)
Create a Flink Backend for use with Ibis.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
table_env |
pyflink.table.TableEnvironment | A table environment. | required |
Examples
>>> import ibis
>>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>> table_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>> ibis.flink.connect(table_env)
<ibis.backends.flink.Backend at 0x...>flink.Backend
add_operation
add_operation(self, operation)
Add a translation function to the backend for a specific operation.
Operations are defined in ibis.expr.operations, and a translation function receives the translator object and an expression as parameters, and returns a value depending on the backend.
compile
compile(self, expr, params=None, **kwargs)
Compile an expression.
connect
connect(self, *args, **kwargs)
Connect to the database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
*args |
Mandatory connection parameters, see the docstring of do_connect for details. |
() |
|
**kwargs |
Extra connection parameters, see the docstring of do_connect for details. |
{} |
Notes
This creates a new backend instance with saved args and kwargs, then calls reconnect and finally returns the newly created and connected backend instance.
Returns
| Type | Description |
|---|---|
| ibis.backends.base.BaseBackend | An instance of the backend |
create_database
create_database(self, name, db_properties=None, catalog=None, force=False)
Create a new database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | Name of the new database. | required |
db_properties |
dict | Properties of the database. Accepts dictionary of key-value pairs (key1=val1, key2=val2, …). | None |
catalog |
str | Name of the catalog in which the new database will be created. | None |
force |
bool | If False, an exception is raised if the database already exists. |
False |
create_table
create_table(self, name, obj=None, *, schema=None, database=None, catalog=None, tbl_properties=None, watermark=None, primary_key=None, temp=False, overwrite=False)
Create a new table in Flink.
In Flink, tables can be either virtual (VIEWS) or regular (TABLES). VIEWS can be created from an existing Table object, usually the result of a Table API or SQL query. TABLES describe external data, such as a file, database table, or message queue. In other words, TABLES refer explicitly to tables constructed directly from source/sink connectors.
When obj is in-memory (e.g., Dataframe), currently this function can create only a TEMPORARY VIEW. If obj is in-memory and temp is False, it will raise an error.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | Name of the new table. | required |
obj |
pandas.pandas.DataFrame | pyarrow.pyarrow.Table | ibis.ibis.Table | None | An Ibis table expression, pandas DataFrame, or PyArrow Table that will be used to extract the schema and the data of the new table. An optional schema can be used to override the schema. |
None |
schema |
ibis.ibis.Schema | None | The schema for the new table. Required if obj is not provided. |
None |
database |
str | None | Name of the database where the table will be created, if not the default. | None |
catalog |
str | None | Name of the catalog where the table will be created, if not the default. | None |
tbl_properties |
dict | None | Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector. Accepts dictionary of key-value pairs (key1=val1, key2=val2, …). | None |
watermark |
ibis.api.Watermark | None | Watermark strategy for the table, only applicable on sources. | None |
primary_key |
str | list[str] | None | A single column or a list of columns to be marked as primary. Raises an error if the column(s) in primary_key is NOT a subset of the columns in schema. Primary keys must be non-nullable in Flink and the columns indicated as primary key will be designated as non-nullable. |
None |
temp |
bool | Whether a table is temporary or not. | False |
overwrite |
bool | Whether to clobber existing data. | False |
Returns
| Type | Description |
|---|---|
| pyflink.table.Table | The table that was created. |
create_view
create_view(self, name, obj, *, schema=None, database=None, catalog=None, force=False, temp=False, overwrite=False)
Create a new view from a dataframe or table.
When obj is in-memory (e.g., Dataframe), currently this function can create only a TEMPORARY VIEW. If obj is in-memory and temp is False, it will raise an error.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | Name of the new view. | required |
obj |
pandas.pandas.DataFrame | ibis.ibis.Table | An Ibis table expression that will be used to create the view. | required |
schema |
ibis.ibis.Schema | None | The schema for the new view. | None |
database |
str | None | Name of the database where the view will be created, if not provided the database’s default is used. | None |
catalog |
str | None | Name of the catalog where the table exists, if not the default. | None |
force |
bool | If False, an exception is raised if the table is already present. |
False |
temp |
bool | Whether the table is temporary or not. | False |
overwrite |
bool | If True, remove the existing view, and create a new one. |
False |
Returns
| Type | Description |
|---|---|
| pyflink.table.Table | The view that was created. |
database
database(self, name=None)
Return a Database object for the name database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | None | Name of the database to return the object for. | None |
Returns
| Type | Description |
|---|---|
| ibis.backends.base.Database | A database object for the specified database. |
drop_database
drop_database(self, name, catalog=None, force=False)
Drop a database with name name.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | Database to drop. | required |
catalog |
str | Name of the catalog from which the database will be dropped. | None |
force |
bool | If False, an exception is raised if the database does not exist. |
False |
drop_table
drop_table(self, name, *, database=None, catalog=None, temp=False, force=False)
Drop a table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | Name of the table to drop. | required |
database |
str | None | Name of the database where the table exists, if not the default. | None |
catalog |
str | None | Name of the catalog where the table exists, if not the default. | None |
temp |
bool | Whether the table is temporary or not. | False |
force |
bool | If False, an exception is raised if the table does not exist. |
False |
drop_view
drop_view(self, name, *, database=None, catalog=None, temp=False, force=False)
Drop a view.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | Name of the view to drop. | required |
database |
str | None | Name of the database where the view exists, if not the default. | None |
catalog |
str | None | Name of the catalog where the view exists, if not the default. | None |
temp |
bool | Whether the view is temporary or not. | False |
force |
bool | If False, an exception is raised if the view does not exist. |
False |
execute
execute(self, expr, **kwargs)
Execute an expression.
get_schema
get_schema(self, table_name, database=None, catalog=None)
Return a Schema object for the indicated table and database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
table_name |
str | Table name. | required |
database |
str | Database name. | None |
catalog |
str | Catalog name. | None |
Returns
| Type | Description |
|---|---|
| ibis.ibis.Schema | Ibis schema |
has_operation
has_operation(cls, operation)
Return whether the backend implements support for operation.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
operation |
type[ibis.ibis.Value] | A class corresponding to an operation. | required |
Returns
| Type | Description |
|---|---|
| bool | Whether the backend implements the operation. |
Examples
>>> import ibis
>>> import ibis.expr.operations as ops
>>> ibis.sqlite.has_operation(ops.ArrayIndex)
False
>>> ibis.postgres.has_operation(ops.ArrayIndex)
Trueinsert
insert(self, table_name, obj, database=None, catalog=None, overwrite=False)
Insert data into a table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
table_name |
str | The name of the table to insert data into. | required |
obj |
pyarrow.pyarrow.Table | pandas.pandas.DataFrame | ibis.ibis.Table | list | dict | The source data or expression to insert. | required |
database |
str | None | Name of the attached database that the table is located in. | None |
catalog |
str | None | Name of the attached catalog that the table is located in. | None |
overwrite |
bool | If True then replace existing contents of table. |
False |
Returns
| Type | Description |
|---|---|
| pyflink.table.table_result.TableResult | The table result. |
Raises
| Type | Description |
|---|---|
| ValueError | If the type of obj isn’t supported |
list_databases
list_databases(self, like=None)
List existing databases in the current connection.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
like |
str | None | A pattern in Python’s regex format to filter returned database names. | None |
Returns
| Type | Description |
|---|---|
| list[str] | The database names that exist in the current connection, that match the like pattern if provided. |
list_tables
list_tables(self, like=None, *, database=None, catalog=None, temp=False)
Return the list of table/view names.
Return the list of table/view names in the database and catalog. If database/catalog are not specified, their default values will be used. Temporary tables can only be listed for the default database and catalog, hence database and catalog are ignored if temp is True.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
like |
str | A pattern in Python’s regex format. | None |
temp |
bool | Whether to list temporary tables or permanent tables. | False |
database |
str | The database to list tables of, if not the current one. | None |
catalog |
str | The catalog to list tables of, if not the current one. | None |
Returns
| Type | Description |
|---|---|
| list[str] | The list of the table/view names that match the pattern like. |
list_views
list_views(self, like=None, temp=False)
Return the list of view names.
Return the list of view names.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
like |
str | A pattern in Python’s regex format. | None |
temp |
bool | Whether to list temporary views or permanent views. | False |
Returns
| Type | Description |
|---|---|
| list[str] | The list of the view names that match the pattern like. |
raw_sql
raw_sql(self, query)
read_csv
read_csv(self, path, schema=None, table_name=None)
Register a csv file as a table in the current database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
path |
str | pathlib.Path | The data source. | required |
schema |
ibis.ibis.Schema | None | The schema for the new table. | None |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
Returns
| Type | Description |
|---|---|
| ibis.ibis.Table | The just-registered table |
read_delta
read_delta(self, source, table_name=None, **kwargs)
Register a Delta Lake table in the current database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
source |
str | pathlib.Path | The data source. Must be a directory containing a Delta Lake table. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
**kwargs |
typing.Any | Additional keyword arguments passed to the underlying backend or library. | {} |
Returns
| Type | Description |
|---|---|
| ibis.ibis.Table | The just-registered table. |
read_json
read_json(self, path, schema=None, table_name=None)
Register a json file as a table in the current database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
path |
str | pathlib.Path | The data source. | required |
schema |
ibis.ibis.Schema | None | The schema for the new table. | None |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
Returns
| Type | Description |
|---|---|
| ibis.ibis.Table | The just-registered table |
read_parquet
read_parquet(self, path, schema=None, table_name=None)
Register a parquet file as a table in the current database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
path |
str | pathlib.Path | The data source. | required |
schema |
ibis.ibis.Schema | None | The schema for the new table. | None |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
Returns
| Type | Description |
|---|---|
| ibis.ibis.Table | The just-registered table |
reconnect
reconnect(self)
Reconnect to the database already configured with connect.
register_options
register_options(cls)
Register custom backend options.
rename_table
rename_table(self, old_name, new_name, force=True)
Rename an existing table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
old_name |
str | The old name of the table. | required |
new_name |
str | The new name of the table. | required |
force |
bool | If False, an exception is raised if the table does not exist. |
True |
table
table(self, name, database=None, catalog=None)
Return a table expression from a table or view in the database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | Table name. | required |
database |
str | None | Database in which the table resides. | None |
catalog |
str | None | Catalog in which the table resides. | None |
Returns
| Type | Description |
|---|---|
| pyflink.table.Table | Table named name from database |
to_csv
to_csv(self, expr, path, *, params=None, **kwargs)
Write the results of executing the given expression to a CSV file.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
expr |
ibis.ibis.Table | The ibis expression to execute and persist to CSV. | required |
path |
str | pathlib.Path | The data source. A string or Path to the CSV file. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
kwargs |
typing.Any | Additional keyword arguments passed to pyarrow.csv.CSVWriter | {} |
https |
required |
to_delta
to_delta(self, expr, path, *, params=None, **kwargs)
Write the results of executing the given expression to a Delta Lake table.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
expr |
ibis.ibis.Table | The ibis expression to execute and persist to Delta Lake table. | required |
path |
str | pathlib.Path | The data source. A string or Path to the Delta Lake table. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
kwargs |
typing.Any | Additional keyword arguments passed to deltalake.writer.write_deltalake method | {} |
to_pandas
to_pandas(self, expr, *, params=None, limit=None, **kwargs)
Execute an Ibis expression and return a pandas DataFrame, Series, or scalar.
This method is a wrapper around execute.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to execute. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
kwargs |
typing.Any | Keyword arguments | {} |
to_pandas_batches
to_pandas_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **kwargs)
Execute an Ibis expression and return an iterator of pandas DataFrames.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to execute. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
chunk_size |
int | Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. |
1000000 |
kwargs |
typing.Any | Keyword arguments | {} |
Returns
| Type | Description |
|---|---|
| collections.abc.Iterator[pandas.pandas.DataFrame] | An iterator of pandas DataFrames. |
to_parquet
to_parquet(self, expr, path, *, params=None, **kwargs)
Write the results of executing the given expression to a parquet file.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
expr |
ibis.ibis.Table | The ibis expression to execute and persist to parquet. | required |
path |
str | pathlib.Path | The data source. A string or Path to the parquet file. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
**kwargs |
typing.Any | Additional keyword arguments passed to pyarrow.parquet.ParquetWriter | {} |
https |
required |
to_pyarrow
to_pyarrow(self, expr, *, params=None, limit=None, **kwargs)
Execute expression and return results in as a pyarrow table.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to export to pyarrow | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
kwargs |
typing.Any | Keyword arguments | {} |
Returns
| Type | Description |
|---|---|
| Table | A pyarrow table holding the results of the executed expression. |
to_pyarrow_batches
to_pyarrow_batches(self, expr, *, params=None, chunk_size=None, limit=None, **kwargs)
Execute expression and return a RecordBatchReader.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to export to pyarrow | required |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
chunk_size |
int | Maximum number of rows in each returned record batch. | 1000000 |
kwargs |
typing.Any | Keyword arguments | {} |
Returns
| Type | Description |
|---|---|
| results | RecordBatchReader |
to_torch
to_torch(self, expr, *, params=None, limit=None, **kwargs)
Execute an expression and return results as a dictionary of torch tensors.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to execute. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Parameters to substitute into the expression. | None |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means no limit. |
None |
kwargs |
typing.Any | Keyword arguments passed into the backend’s to_torch implementation. |
{} |
Returns
| Type | Description |
|---|---|
| dict[str, torch.torch.Tensor] | A dictionary of torch tensors, keyed by column name. |