PySpark

https://spark.apache.org/docs/latest/api/python

Install

Install Ibis and dependencies for the PySpark backend:

Install with the pyspark extra:

pip install 'ibis-framework[pyspark]'

And connect:

import ibis

con = ibis.pyspark.connect()
1
Adjust connection parameters as needed.

Install for PySpark:

conda install -c conda-forge ibis-pyspark

And connect:

import ibis

con = ibis.pyspark.connect()
1
Adjust connection parameters as needed.

Install for PySpark:

mamba install -c conda-forge ibis-pyspark

And connect:

import ibis

con = ibis.pyspark.connect()
1
Adjust connection parameters as needed.

Connect

ibis.pyspark.connect

con = ibis.pyspark.connect(session=session)
Note

ibis.pyspark.connect is a thin wrapper around ibis.backends.pyspark.Backend.do_connect.

Note

The pyspark backend does not create SparkSession objects (unless you connect using a URL); you must create a SparkSession and pass that to ibis.pyspark.connect.

Connection Parameters

do_connect

do_connect(self, session)

Create a PySpark Backend for use with Ibis.

Parameters
Name Type Description Default
session pyspark.sql.SparkSession A SparkSession instance required
Examples
>>> import ibis
>>> from pyspark.sql import SparkSession
>>> session = SparkSession.builder.getOrCreate()
>>> ibis.pyspark.connect(session)
<ibis.backends.pyspark.Backend at 0x...>

ibis.connect URL format

In addition to ibis.pyspark.connect, you can also connect to PySpark by passing a properly-formatted PySpark connection URL to ibis.connect:

con = ibis.connect(f"pyspark://{warehouse-dir}?spark.app.name=CountingSheep&spark.master=local[2]")

pyspark.Backend

add_operation

add_operation(self, operation)

Add a translation function to the backend for a specific operation.

Operations are defined in ibis.expr.operations, and a translation function receives the translator object and an expression as parameters, and returns a value depending on the backend.

close

close(self)

Close Spark connection and drop any temporary objects.

compile

compile(self, expr, timecontext=None, params=None, *args, **kwargs)

Compile an ibis expression to a PySpark DataFrame object.

compute_stats

compute_stats(self, name, database=None, noscan=False)

Issue a COMPUTE STATISTICS command for a given table.

Parameters

Name Type Description Default
name str Table name required
database str | None Database name None
noscan bool If True, collect only basic statistics for the table (number of rows, size in bytes). False

connect

connect(self, *args, **kwargs)

Connect to the database.

Parameters

Name Type Description Default
*args Mandatory connection parameters, see the docstring of do_connect for details. ()
**kwargs Extra connection parameters, see the docstring of do_connect for details. {}

Notes

This creates a new backend instance with saved args and kwargs, then calls reconnect and finally returns the newly created and connected backend instance.

Returns

Type Description
ibis.backends.base.BaseBackend An instance of the backend

create_database

create_database(self, name, path=None, force=False)

Create a new Spark database.

Parameters

Name Type Description Default
name str Database name required
path str | pathlib.Path | None Path where to store the database data; otherwise uses Spark default None
force bool Whether to append IF NOT EXISTS to the database creation SQL False

create_table

create_table(self, name, obj=None, *, schema=None, database=None, temp=None, overwrite=False, format='parquet')

Create a new table in Spark.

Parameters

Name Type Description Default
name str Name of the new table. required
obj ibis.ibis.Table | pandas.pandas.DataFrame | pyarrow.pyarrow.Table | None If passed, creates table from SELECT statement results None
schema ibis.ibis.Schema | None Mutually exclusive with obj, creates an empty table with a schema None
database str | None Database name None
temp bool | None Whether the new table is temporary None
overwrite bool If True, overwrite existing data False
format str Format of the table on disk 'parquet'

Returns

Type Description
Table The newly created table.

Examples

>>> con.create_table("new_table_name", table_expr)  # quartodoc: +SKIP

create_view

create_view(self, name, obj, *, database=None, overwrite=False)

Create a Spark view from a table expression.

Parameters

Name Type Description Default
name str View name required
obj ibis.ibis.Table Expression to use for the view required
database str | None Database name None
overwrite bool Replace an existing view of the same name if it exists False

Returns

Type Description
Table The created view

database

database(self, name=None)

Return a Database object for the name database.

Parameters

Name Type Description Default
name str | None Name of the database to return the object for. None

Returns

Type Description
ibis.backends.base.Database A database object for the specified database.

drop_database

drop_database(self, name, force=False)

Drop a Spark database.

Parameters

Name Type Description Default
name str Database name required
force bool If False, Spark throws exception if database is not empty or database does not exist False

drop_table

drop_table(self, name, *, database=None, force=False)

Drop a table.

drop_table_or_view

drop_table_or_view(self, name, *, database=None, force=False)

Drop a Spark table or view.

Parameters

Name Type Description Default
name str Table or view name required
database str | None Database name None
force bool Database may throw exception if table does not exist False

Examples

>>> table = "my_table"
>>> db = "operations"
>>> con.drop_table_or_view(table, db, force=True)  # quartodoc: +SKIP

drop_view

drop_view(self, name, *, database=None, force=False)

Drop a view.

execute

execute(self, expr, **kwargs)

Execute an expression.

fetch_from_cursor

fetch_from_cursor(self, cursor, schema)

Fetch data from cursor.

get_schema

get_schema(self, table_name, database=None)

Return a Schema object for the indicated table and database.

Parameters

Name Type Description Default
table_name str Table name. May be fully qualified required
database str | None Spark does not have a database argument for its table() method, so this must be None None

Returns

Type Description
Schema An ibis schema

has_operation

has_operation(cls, operation)

insert

insert(self, table_name, obj=None, database=None, overwrite=False, values=None, validate=True)

Insert data into an existing table.

Examples

>>> table = "my_table"
>>> con.insert(table, table_expr)  # quartodoc: +SKIP

Completely overwrite contents

>>> con.insert(table, table_expr, overwrite=True)  # quartodoc: +SKIP

list_databases

list_databases(self, like=None)

List existing databases in the current connection.

Parameters

Name Type Description Default
like str | None A pattern in Python’s regex format to filter returned database names. None

Returns

Type Description
list[str] The database names that exist in the current connection, that match the like pattern if provided.

list_tables

list_tables(self, like=None, database=None)

Return the list of table names in the current database.

For some backends, the tables may be files in a directory, or other equivalent entities in a SQL database.

Parameters

Name Type Description Default
like str | None A pattern in Python’s regex format. None
database str | None The database from which to list tables. If not provided, the current database is used. None

Returns

Type Description
list[str] The list of the table names that match the pattern like.

raw_sql

raw_sql(self, query)

Execute a query string and return the cursor used for execution.

Consider using .sql instead

If your query is a SELECT statement you can use the backend .sql method to avoid having to manually release the cursor returned from this method.

The cursor returned from this method must be manually released

You do not need to call .close() on the cursor when running DDL or DML statements like CREATE, INSERT or DROP, only when using SELECT statements.

To release a cursor, call the close method on the returned cursor object.

You can close the cursor by explicitly calling its close method:

cursor = con.raw_sql("SELECT ...")
cursor.close()

Or you can use a context manager:

with con.raw_sql("SELECT ...") as cursor:
    ...

Parameters

Name Type Description Default
query str SQL query string required

Examples

>>> con = ibis.connect("duckdb://")
>>> with con.raw_sql("SELECT 1") as cursor:
...     result = cursor.fetchall()
>>> result
[(1,)]
>>> cursor.closed
True

read_csv

read_csv(self, source_list, table_name=None, **kwargs)

Register a CSV file as a table in the current database.

Parameters

Name Type Description Default
source_list str | list[str] | tuple[str] The data source(s). May be a path to a file or directory of CSV files, or an iterable of CSV files. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
kwargs typing.Any Additional keyword arguments passed to PySpark loading function. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html {}

Returns

Type Description
ibis.ibis.Table The just-registered table

read_delta

read_delta(self, source, table_name=None, **kwargs)

Register a Delta Lake table as a table in the current database.

Parameters

Name Type Description Default
source str | pathlib.Path The path to the Delta Lake table. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
kwargs typing.Any Additional keyword arguments passed to PySpark. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.load.html {}

Returns

Type Description
ibis.ibis.Table The just-registered table

read_json

read_json(self, source_list, table_name=None, **kwargs)

Register a JSON file as a table in the current database.

Parameters

Name Type Description Default
source_list str | collections.abc.Sequence[str] The data source(s). May be a path to a file or directory of JSON files, or an iterable of JSON files. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
kwargs typing.Any Additional keyword arguments passed to PySpark loading function. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html {}

Returns

Type Description
ibis.ibis.Table The just-registered table

read_parquet

read_parquet(self, source, table_name=None, **kwargs)

Register a parquet file as a table in the current database.

Parameters

Name Type Description Default
source str | pathlib.Path The data source. May be a path to a file or directory of parquet files. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
kwargs typing.Any Additional keyword arguments passed to PySpark. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html {}

Returns

Type Description
ibis.ibis.Table The just-registered table

reconnect

reconnect(self)

Reconnect to the database already configured with connect.

register

register(self, source, table_name=None, **kwargs)

Register a data source as a table in the current database.

Parameters

Name Type Description Default
source str | pathlib.Path | typing.Any The data source(s). May be a path to a file or directory of parquet/csv files, or an iterable of CSV files. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
**kwargs typing.Any Additional keyword arguments passed to PySpark loading functions for CSV or parquet. {}

Returns

Type Description
ibis.ibis.Table The just-registered table

register_options

register_options(cls)

Register custom backend options.

rename_table

rename_table(self, old_name, new_name)

Rename an existing table.

Parameters

Name Type Description Default
old_name str The old name of the table. required
new_name str The new name of the table. required

sql

sql(self, query, schema=None, dialect=None)

Convert a SQL query to an Ibis table expression.

Parameters

Name Type Description Default
query str SQL string required
schema ibis.ibis.Schema | None The expected schema for this query. If not provided, will be inferred automatically if possible. None
dialect str | None Optional string indicating the dialect of query. The default value of None will use the backend’s native dialect. None

Returns

Type Description
Table Table expression

table

table(self, name, database=None)

Return a table expression from a table or view in the database.

Parameters

Name Type Description Default
name str Table name required
database str | None Database in which the table resides None

Returns

Type Description
Table Table named name from database

to_csv

to_csv(self, expr, path, *, params=None, **kwargs)

Write the results of executing the given expression to a CSV file.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ibis.ibis.Table The ibis expression to execute and persist to CSV. required
path str | pathlib.Path The data source. A string or Path to the CSV file. required
params collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None Mapping of scalar parameter expressions to value. None
kwargs typing.Any Additional keyword arguments passed to pyarrow.csv.CSVWriter {}
https required

to_delta

to_delta(self, expr, path, **kwargs)

Write the results of executing the given expression to a Delta Lake table.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ibis.ibis.Table The ibis expression to execute and persist to a Delta Lake table. required
path str | pathlib.Path The data source. A string or Path to the Delta Lake table. required
**kwargs typing.Any PySpark Delta Lake table write arguments. https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.save.html {}

to_pandas

to_pandas(self, expr, *, params=None, limit=None, **kwargs)

Execute an Ibis expression and return a pandas DataFrame, Series, or scalar.

Note

This method is a wrapper around execute.

Parameters

Name Type Description Default
expr ibis.ibis.Expr Ibis expression to execute. required
params collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
kwargs typing.Any Keyword arguments {}

to_pandas_batches

to_pandas_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **kwargs)

Execute an Ibis expression and return an iterator of pandas DataFrames.

Parameters

Name Type Description Default
expr ibis.ibis.Expr Ibis expression to execute. required
params collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
chunk_size int Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. 1000000
kwargs typing.Any Keyword arguments {}

Returns

Type Description
collections.abc.Iterator[pandas.pandas.DataFrame] An iterator of pandas DataFrames.

to_parquet

to_parquet(self, expr, path, *, params=None, **kwargs)

Write the results of executing the given expression to a parquet file.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ibis.ibis.Table The ibis expression to execute and persist to parquet. required
path str | pathlib.Path The data source. A string or Path to the parquet file. required
params collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None Mapping of scalar parameter expressions to value. None
**kwargs typing.Any Additional keyword arguments passed to pyarrow.parquet.ParquetWriter {}
https required

to_pyarrow

to_pyarrow(self, expr, params=None, limit=None, **kwargs)

Execute expression and return results in as a pyarrow table.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ibis.ibis.Expr Ibis expression to export to pyarrow required
params collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
kwargs typing.Any Keyword arguments {}

Returns

Type Description
Table A pyarrow table holding the results of the executed expression.

to_pyarrow_batches

to_pyarrow_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **kwargs)

Execute expression and return an iterator of pyarrow record batches.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ibis.ibis.Expr Ibis expression to export to pyarrow required
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
params collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None Mapping of scalar parameter expressions to value. None
chunk_size int Maximum number of rows in each returned record batch. 1000000

Returns

Type Description
RecordBatchReader Collection of pyarrow RecordBatchs.

to_torch

to_torch(self, expr, *, params=None, limit=None, **kwargs)

Execute an expression and return results as a dictionary of torch tensors.

Parameters

Name Type Description Default
expr ibis.ibis.Expr Ibis expression to execute. required
params collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None Parameters to substitute into the expression. None
limit int | str | None An integer to effect a specific row limit. A value of None means no limit. None
kwargs typing.Any Keyword arguments passed into the backend’s to_torch implementation. {}

Returns

Type Description
dict[str, torch.torch.Tensor] A dictionary of torch tensors, keyed by column name.

truncate_table

truncate_table(self, name, database=None)

Delete all rows from an existing table.

Parameters

Name Type Description Default
name str Table name required
database str | None Database name None
Back to top