PySpark
https://spark.apache.org/docs/latest/api/python
Install
Install Ibis and dependencies for the PySpark backend:
Install with the pyspark
extra:
pip install 'ibis-framework[pyspark]'
And connect:
import ibis
= ibis.pyspark.connect() con
- 1
- Adjust connection parameters as needed.
Install for PySpark:
conda install -c conda-forge ibis-pyspark
And connect:
import ibis
= ibis.pyspark.connect() con
- 1
- Adjust connection parameters as needed.
Install for PySpark:
mamba install -c conda-forge ibis-pyspark
And connect:
import ibis
= ibis.pyspark.connect() con
- 1
- Adjust connection parameters as needed.
Connect
ibis.pyspark.connect
= ibis.pyspark.connect(session=session) con
ibis.pyspark.connect
is a thin wrapper around ibis.backends.pyspark.Backend.do_connect
.
The pyspark
backend does not create SparkSession
objects (unless you connect using a URL); you must create a SparkSession
and pass that to ibis.pyspark.connect
.
Connection Parameters
do_connect
do_connect(self, session)
Create a PySpark Backend
for use with Ibis.
Parameters
Name | Type | Description | Default |
---|---|---|---|
session |
pyspark.sql.SparkSession | A SparkSession instance | required |
Examples
>>> import ibis
>>> from pyspark.sql import SparkSession
>>> session = SparkSession.builder.getOrCreate()
>>> ibis.pyspark.connect(session)
<ibis.backends.pyspark.Backend at 0x...>
ibis.connect
URL format
In addition to ibis.pyspark.connect
, you can also connect to PySpark by passing a properly-formatted PySpark connection URL to ibis.connect
:
= ibis.connect(f"pyspark://{warehouse-dir}?spark.app.name=CountingSheep&spark.master=local[2]") con
pyspark.Backend
add_operation
add_operation(self, operation)
Add a translation function to the backend for a specific operation.
Operations are defined in ibis.expr.operations
, and a translation function receives the translator object and an expression as parameters, and returns a value depending on the backend.
close
close(self)
Close Spark connection and drop any temporary objects.
compile
compile(self, expr, timecontext=None, params=None, *args, **kwargs)
Compile an ibis expression to a PySpark DataFrame object.
compute_stats
compute_stats(self, name, database=None, noscan=False)
Issue a COMPUTE STATISTICS
command for a given table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Table name | required |
database |
str | None | Database name | None |
noscan |
bool | If True , collect only basic statistics for the table (number of rows, size in bytes). |
False |
connect
connect(self, *args, **kwargs)
Connect to the database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
*args |
Mandatory connection parameters, see the docstring of do_connect for details. |
() |
|
**kwargs |
Extra connection parameters, see the docstring of do_connect for details. |
{} |
Notes
This creates a new backend instance with saved args
and kwargs
, then calls reconnect
and finally returns the newly created and connected backend instance.
Returns
Type | Description |
---|---|
ibis.backends.base.BaseBackend | An instance of the backend |
create_database
create_database(self, name, path=None, force=False)
Create a new Spark database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Database name | required |
path |
str | pathlib.Path | None | Path where to store the database data; otherwise uses Spark default | None |
force |
bool | Whether to append IF NOT EXISTS to the database creation SQL |
False |
create_table
create_table(self, name, obj=None, *, schema=None, database=None, temp=None, overwrite=False, format='parquet')
Create a new table in Spark.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Name of the new table. | required |
obj |
ibis.ibis.Table | pandas.pandas.DataFrame | pyarrow.pyarrow.Table | None | If passed, creates table from SELECT statement results |
None |
schema |
ibis.ibis.Schema | None | Mutually exclusive with obj , creates an empty table with a schema |
None |
database |
str | None | Database name | None |
temp |
bool | None | Whether the new table is temporary | None |
overwrite |
bool | If True , overwrite existing data |
False |
format |
str | Format of the table on disk | 'parquet' |
Returns
Type | Description |
---|---|
Table | The newly created table. |
Examples
>>> con.create_table("new_table_name", table_expr) # quartodoc: +SKIP
create_view
create_view(self, name, obj, *, database=None, overwrite=False)
Create a Spark view from a table expression.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | View name | required |
obj |
ibis.ibis.Table | Expression to use for the view | required |
database |
str | None | Database name | None |
overwrite |
bool | Replace an existing view of the same name if it exists | False |
Returns
Type | Description |
---|---|
Table | The created view |
database
database(self, name=None)
Return a Database
object for the name
database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | None | Name of the database to return the object for. | None |
Returns
Type | Description |
---|---|
ibis.backends.base.Database | A database object for the specified database. |
drop_database
drop_database(self, name, force=False)
Drop a Spark database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Database name | required |
force |
bool | If False, Spark throws exception if database is not empty or database does not exist | False |
drop_table
drop_table(self, name, *, database=None, force=False)
Drop a table.
drop_table_or_view
drop_table_or_view(self, name, *, database=None, force=False)
Drop a Spark table or view.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Table or view name | required |
database |
str | None | Database name | None |
force |
bool | Database may throw exception if table does not exist | False |
Examples
>>> table = "my_table"
>>> db = "operations"
>>> con.drop_table_or_view(table, db, force=True) # quartodoc: +SKIP
drop_view
drop_view(self, name, *, database=None, force=False)
Drop a view.
execute
execute(self, expr, **kwargs)
Execute an expression.
fetch_from_cursor
fetch_from_cursor(self, cursor, schema)
Fetch data from cursor.
get_schema
get_schema(self, table_name, database=None)
Return a Schema object for the indicated table and database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
table_name |
str | Table name. May be fully qualified | required |
database |
str | None | Spark does not have a database argument for its table() method, so this must be None | None |
Returns
Type | Description |
---|---|
Schema | An ibis schema |
has_operation
has_operation(cls, operation)
insert
insert(self, table_name, obj=None, database=None, overwrite=False, values=None, validate=True)
Insert data into an existing table.
Examples
>>> table = "my_table"
>>> con.insert(table, table_expr) # quartodoc: +SKIP
Completely overwrite contents
>>> con.insert(table, table_expr, overwrite=True) # quartodoc: +SKIP
list_databases
list_databases(self, like=None)
List existing databases in the current connection.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like |
str | None | A pattern in Python’s regex format to filter returned database names. | None |
Returns
Type | Description |
---|---|
list[str] | The database names that exist in the current connection, that match the like pattern if provided. |
list_tables
list_tables(self, like=None, database=None)
Return the list of table names in the current database.
For some backends, the tables may be files in a directory, or other equivalent entities in a SQL database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like |
str | None | A pattern in Python’s regex format. | None |
database |
str | None | The database from which to list tables. If not provided, the current database is used. | None |
Returns
Type | Description |
---|---|
list[str] | The list of the table names that match the pattern like . |
raw_sql
raw_sql(self, query)
Execute a query string and return the cursor used for execution.
.sql
instead
If your query is a SELECT
statement you can use the backend .sql
method to avoid having to manually release the cursor returned from this method.
You do not need to call .close()
on the cursor when running DDL or DML statements like CREATE
, INSERT
or DROP
, only when using SELECT
statements.
To release a cursor, call the close
method on the returned cursor object.
You can close the cursor by explicitly calling its close
method:
= con.raw_sql("SELECT ...")
cursor cursor.close()
Or you can use a context manager:
with con.raw_sql("SELECT ...") as cursor:
...
Parameters
Name | Type | Description | Default |
---|---|---|---|
query |
str | SQL query string | required |
Examples
>>> con = ibis.connect("duckdb://")
>>> with con.raw_sql("SELECT 1") as cursor:
= cursor.fetchall()
... result >>> result
1,)]
[(>>> cursor.closed
True
read_csv
read_csv(self, source_list, table_name=None, **kwargs)
Register a CSV file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source_list |
str | list[str] | tuple[str] | The data source(s). May be a path to a file or directory of CSV files, or an iterable of CSV files. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
kwargs |
typing.Any | Additional keyword arguments passed to PySpark loading function. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html | {} |
Returns
Type | Description |
---|---|
ibis.ibis.Table | The just-registered table |
read_delta
read_delta(self, source, table_name=None, **kwargs)
Register a Delta Lake table as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source |
str | pathlib.Path | The path to the Delta Lake table. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
kwargs |
typing.Any | Additional keyword arguments passed to PySpark. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.load.html | {} |
Returns
Type | Description |
---|---|
ibis.ibis.Table | The just-registered table |
read_json
read_json(self, source_list, table_name=None, **kwargs)
Register a JSON file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source_list |
str | collections.abc.Sequence[str] | The data source(s). May be a path to a file or directory of JSON files, or an iterable of JSON files. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
kwargs |
typing.Any | Additional keyword arguments passed to PySpark loading function. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html | {} |
Returns
Type | Description |
---|---|
ibis.ibis.Table | The just-registered table |
read_parquet
read_parquet(self, source, table_name=None, **kwargs)
Register a parquet file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source |
str | pathlib.Path | The data source. May be a path to a file or directory of parquet files. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
kwargs |
typing.Any | Additional keyword arguments passed to PySpark. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html | {} |
Returns
Type | Description |
---|---|
ibis.ibis.Table | The just-registered table |
reconnect
reconnect(self)
Reconnect to the database already configured with connect.
register
register(self, source, table_name=None, **kwargs)
Register a data source as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source |
str | pathlib.Path | typing.Any | The data source(s). May be a path to a file or directory of parquet/csv files, or an iterable of CSV files. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
**kwargs |
typing.Any | Additional keyword arguments passed to PySpark loading functions for CSV or parquet. | {} |
Returns
Type | Description |
---|---|
ibis.ibis.Table | The just-registered table |
register_options
register_options(cls)
Register custom backend options.
rename_table
rename_table(self, old_name, new_name)
Rename an existing table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
old_name |
str | The old name of the table. | required |
new_name |
str | The new name of the table. | required |
sql
sql(self, query, schema=None, dialect=None)
Convert a SQL query to an Ibis table expression.
Parameters
Name | Type | Description | Default |
---|---|---|---|
query |
str | SQL string | required |
schema |
ibis.ibis.Schema | None | The expected schema for this query. If not provided, will be inferred automatically if possible. | None |
dialect |
str | None | Optional string indicating the dialect of query . The default value of None will use the backend’s native dialect. |
None |
Returns
Type | Description |
---|---|
Table | Table expression |
table
table(self, name, database=None)
Return a table expression from a table or view in the database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Table name | required |
database |
str | None | Database in which the table resides | None |
Returns
Type | Description |
---|---|
Table | Table named name from database |
to_csv
to_csv(self, expr, path, *, params=None, **kwargs)
Write the results of executing the given expression to a CSV file.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ibis.ibis.Table | The ibis expression to execute and persist to CSV. | required |
path |
str | pathlib.Path | The data source. A string or Path to the CSV file. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
kwargs |
typing.Any | Additional keyword arguments passed to pyarrow.csv.CSVWriter | {} |
https |
required |
to_delta
to_delta(self, expr, path, **kwargs)
Write the results of executing the given expression to a Delta Lake table.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ibis.ibis.Table | The ibis expression to execute and persist to a Delta Lake table. | required |
path |
str | pathlib.Path | The data source. A string or Path to the Delta Lake table. | required |
**kwargs |
typing.Any | PySpark Delta Lake table write arguments. https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.save.html | {} |
to_pandas
to_pandas(self, expr, *, params=None, limit=None, **kwargs)
Execute an Ibis expression and return a pandas DataFrame
, Series
, or scalar.
This method is a wrapper around execute
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to execute. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
kwargs |
typing.Any | Keyword arguments | {} |
to_pandas_batches
to_pandas_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **kwargs)
Execute an Ibis expression and return an iterator of pandas DataFrame
s.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to execute. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
chunk_size |
int | Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. |
1000000 |
kwargs |
typing.Any | Keyword arguments | {} |
Returns
Type | Description |
---|---|
collections.abc.Iterator[pandas.pandas.DataFrame] | An iterator of pandas DataFrame s. |
to_parquet
to_parquet(self, expr, path, *, params=None, **kwargs)
Write the results of executing the given expression to a parquet file.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ibis.ibis.Table | The ibis expression to execute and persist to parquet. | required |
path |
str | pathlib.Path | The data source. A string or Path to the parquet file. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
**kwargs |
typing.Any | Additional keyword arguments passed to pyarrow.parquet.ParquetWriter | {} |
https |
required |
to_pyarrow
to_pyarrow(self, expr, params=None, limit=None, **kwargs)
Execute expression and return results in as a pyarrow table.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to export to pyarrow | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
kwargs |
typing.Any | Keyword arguments | {} |
Returns
Type | Description |
---|---|
Table | A pyarrow table holding the results of the executed expression. |
to_pyarrow_batches
to_pyarrow_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **kwargs)
Execute expression and return an iterator of pyarrow record batches.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to export to pyarrow | required |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Mapping of scalar parameter expressions to value. | None |
chunk_size |
int | Maximum number of rows in each returned record batch. | 1000000 |
Returns
Type | Description |
---|---|
RecordBatchReader | Collection of pyarrow RecordBatch s. |
to_torch
to_torch(self, expr, *, params=None, limit=None, **kwargs)
Execute an expression and return results as a dictionary of torch tensors.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ibis.ibis.Expr | Ibis expression to execute. | required |
params |
collections.abc.Mapping[ibis.ibis.Scalar, typing.Any] | None | Parameters to substitute into the expression. | None |
limit |
int | str | None | An integer to effect a specific row limit. A value of None means no limit. |
None |
kwargs |
typing.Any | Keyword arguments passed into the backend’s to_torch implementation. |
{} |
Returns
Type | Description |
---|---|
dict[str, torch.torch.Tensor] | A dictionary of torch tensors, keyed by column name. |
truncate_table
truncate_table(self, name, database=None)
Delete all rows from an existing table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Table name | required |
database |
str | None | Database name | None |