denormalized

Denormalized is a single node stream processing engine written in Rust and powered by Apache DataFusion 🚀

  1. Install denormalized pip install denormalized
  2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest
sample_event = {
    "occurred_at_ms": 100,
    "sensor_name": "foo",
    "reading": 0.0,
}

def print_batch(rb):
    pp.pprint(rb.to_pydict())

ds = Context().from_topic(
    "temperature",
    json.dumps(sample_event),
    "localhost:9092",
    "occurred_at_ms",
)

ds.window(
    [col("sensor_name")],
    [
        f.count(col("reading"), distinct=False, filter=None).alias("count"),
        f.min(col("reading")).alias("min"),
        f.max(col("reading")).alias("max"),
    ],
    1000,
    None,
).sink(print_batch)

Head on over to the examples folder to see more examples that demonstrate additional functionality including stream joins and user defined (aggregate) functions.

 1"""
 2[Denormalized](https://www.denormalized.io/) is a single node stream processing engine written in Rust and powered by Apache DataFusion 🚀
 3
 41. Install denormalized `pip install denormalized`
 52. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest`
 6
 7```python
 8sample_event = {
 9    "occurred_at_ms": 100,
10    "sensor_name": "foo",
11    "reading": 0.0,
12}
13
14def print_batch(rb):
15    pp.pprint(rb.to_pydict())
16
17ds = Context().from_topic(
18    "temperature",
19    json.dumps(sample_event),
20    "localhost:9092",
21    "occurred_at_ms",
22)
23
24ds.window(
25    [col("sensor_name")],
26    [
27        f.count(col("reading"), distinct=False, filter=None).alias("count"),
28        f.min(col("reading")).alias("min"),
29        f.max(col("reading")).alias("max"),
30    ],
31    1000,
32    None,
33).sink(print_batch)
34```
35
36
37Head on over to the [examples folder](https://github.com/probably-nothing-labs/denormalized/tree/main/py-denormalized/python/examples) to see more examples that demonstrate additional functionality including stream joins and user defined (aggregate) functions.
38
39"""
40
41from .context import Context
42from .data_stream import DataStream
43from .datafusion import col, column
44from .datafusion import functions as Functions
45from .datafusion import lit, literal, udaf, udf
46from .datafusion.expr import Expr
47
48__all__ = [
49    "Context",
50    "DataStream",
51    "col",
52    "column",
53    "Expr",
54    "Functions",
55    "lit",
56    "literal",
57    "udaf",
58    "udf",
59]
60
61__docformat__ = "google"
62
63try:
64    from .feast_data_stream import FeastDataStream
65
66    __all__.append("FeastDataStream")
67except ImportError:
68    pass
class Context:
 6class Context:
 7    """A context manager for handling data stream operations.
 8
 9    This class provides functionality to create and manage data streams
10    from various sources like Kafka topics.
11    """
12
13    def __init__(self) -> None:
14        """Initializes a new Context instance with PyContext."""
15        self.ctx = PyContext()
16
17    def __repr__(self):
18        """Returns the string representation of the PyContext object.
19
20        Returns:
21            str: String representation of the underlying PyContext.
22        """
23        return self.ctx.__repr__()
24
25    def __str__(self):
26        """Returns the string representation of the PyContext object.
27
28        Returns:
29            str: String representation of the underlying PyContext.
30        """
31        return self.ctx.__str__()
32
33    def from_topic(
34        self,
35        topic: str,
36        sample_json: str,
37        bootstrap_servers: str,
38        timestamp_column: str | None = None,
39        group_id: str = "default_group",
40    ) -> DataStream:
41        """Creates a new DataStream from a Kafka topic.
42
43        Args:
44            topic: The name of the Kafka topic to consume from.
45            sample_json: A sample JSON string representing the expected message format.
46            bootstrap_servers: Comma-separated list of Kafka broker addresses.
47            timestamp_column: Optional column name containing message timestamps.
48            group_id: Kafka consumer group ID, defaults to "default_group".
49
50        Returns:
51            DataStream: A new DataStream instance connected to the specified topic.
52        """
53        py_ds = self.ctx.from_topic(
54            topic,
55            sample_json,
56            bootstrap_servers,
57            group_id,
58            timestamp_column,
59        )
60        ds = DataStream(py_ds)
61        return ds

A context manager for handling data stream operations.

This class provides functionality to create and manage data streams from various sources like Kafka topics.

Context()
13    def __init__(self) -> None:
14        """Initializes a new Context instance with PyContext."""
15        self.ctx = PyContext()

Initializes a new Context instance with PyContext.

ctx
def from_topic( self, topic: str, sample_json: str, bootstrap_servers: str, timestamp_column: str | None = None, group_id: str = 'default_group') -> DataStream:
33    def from_topic(
34        self,
35        topic: str,
36        sample_json: str,
37        bootstrap_servers: str,
38        timestamp_column: str | None = None,
39        group_id: str = "default_group",
40    ) -> DataStream:
41        """Creates a new DataStream from a Kafka topic.
42
43        Args:
44            topic: The name of the Kafka topic to consume from.
45            sample_json: A sample JSON string representing the expected message format.
46            bootstrap_servers: Comma-separated list of Kafka broker addresses.
47            timestamp_column: Optional column name containing message timestamps.
48            group_id: Kafka consumer group ID, defaults to "default_group".
49
50        Returns:
51            DataStream: A new DataStream instance connected to the specified topic.
52        """
53        py_ds = self.ctx.from_topic(
54            topic,
55            sample_json,
56            bootstrap_servers,
57            group_id,
58            timestamp_column,
59        )
60        ds = DataStream(py_ds)
61        return ds

Creates a new DataStream from a Kafka topic.

Arguments:
  • topic: The name of the Kafka topic to consume from.
  • sample_json: A sample JSON string representing the expected message format.
  • bootstrap_servers: Comma-separated list of Kafka broker addresses.
  • timestamp_column: Optional column name containing message timestamps.
  • group_id: Kafka consumer group ID, defaults to "default_group".
Returns:

DataStream: A new DataStream instance connected to the specified topic.

class DataStream:
 10class DataStream:
 11    """Represents a stream of data that can be manipulated using various operations.
 12
 13    This class provides a high-level interface for stream processing operations, including
 14    filtering, joining, windowing, and sinking data to various destinations. It wraps
 15    a Rust-implemented PyDataStream object.
 16
 17    Attributes:
 18        ds (PyDataStream): The underlying Rust-side DataStream implementation
 19    """
 20
 21    def __init__(self, ds: PyDataStream) -> None:
 22        """Initialize a new DataStream object.
 23
 24        Args:
 25            ds: The underlying PyDataStream object from the Rust implementation
 26        """
 27        self.ds = ds
 28
 29    def __repr__(self):
 30        """Return a string representation of the DataStream object.
 31
 32        Returns:
 33            str: A string representation of the DataStream
 34        """
 35        return self.ds.__repr__()
 36
 37    def __str__(self):
 38        """Return a string description of the DataStream object.
 39
 40        Returns:
 41            str: A human-readable description of the DataStream
 42        """
 43        return self.ds.__str__()
 44
 45    def schema(self) -> pa.Schema:
 46        """Get the schema of the DataStream.
 47
 48        Returns:
 49            pa.Schema: The PyArrow schema describing the structure of the data
 50        """
 51        return self.ds.schema()
 52
 53    def select(self, expr_list: list[Expr]) -> "DataStream":
 54        """Select specific columns or expressions from the DataStream.
 55
 56        Args:
 57            expr_list: List of expressions defining the columns or computations to select
 58
 59        Returns:
 60            DataStream: A new DataStream containing only the selected expressions
 61
 62        Example:
 63            >>> ds.select([col("name"), col("age") + 1])
 64        """
 65        return DataStream(self.ds.select(to_internal_exprs(expr_list)))
 66
 67    def filter(self, predicate: Expr) -> "DataStream":
 68        """Filter the DataStream based on a predicate.
 69
 70        Args:
 71            predicate: Boolean expression used to filter rows
 72
 73        Returns:
 74            DataStream: A new DataStream containing only rows that satisfy the predicate
 75
 76        Example:
 77            >>> ds.filter(col("age") > 18)
 78        """
 79        return DataStream(self.ds.filter(to_internal_expr(predicate)))
 80
 81    def with_column(self, name: str, predicate: Expr) -> "DataStream":
 82        """Add a new column to the DataStream.
 83
 84        Args:
 85            name: Name of the new column
 86            predicate: Expression defining the values for the new column
 87
 88        Returns:
 89            DataStream: A new DataStream with the additional column
 90
 91        Example:
 92            >>> ds.with_column("adult", col("age") >= 18)
 93        """
 94        return DataStream(self.ds.with_column(name, to_internal_expr(predicate)))
 95
 96    def drop_columns(self, columns: list[str]) -> "DataStream":
 97        """Drops specified columns from the DataStream.
 98
 99        Args:
100            columns: List of column names to remove
101
102        Returns:
103            DataStream: A new DataStream without the specified columns
104        """
105        return DataStream(self.ds.drop_columns(columns))
106
107    def join_on(
108        self, right: "DataStream", join_type: str, on_exprs: list[Expr]
109    ) -> "DataStream":
110        """Join this DataStream with another one based on join expressions.
111
112        Args:
113            right: The right DataStream to join with
114            join_type: Type of join ('inner', 'left', 'right', 'full')
115            on_exprs: List of expressions defining the join conditions
116
117        Returns:
118            DataStream: A new DataStream resulting from the join operation
119
120        Example:
121            >>> left.join_on(right, "inner", [col("id") == col("right.id")])
122        """
123        return DataStream(self.ds.join_on(right.ds, join_type, on_exprs))
124
125    def join(
126        self,
127        right: "DataStream",
128        join_type: str,
129        left_cols: list[str],
130        right_cols: list[str],
131        filter: Expr | None = None,
132    ) -> "DataStream":
133        """Join this DataStream with another one based on column names.
134
135        Args:
136            right: The right DataStream to join with
137            join_type: Type of join ('inner', 'left', 'right', 'full')
138            left_cols: Column names from the left DataStream to join on
139            right_cols: Column names from the right DataStream to join on
140            filter: Optional additional join filter expression
141
142        Returns:
143            DataStream: A new DataStream resulting from the join operation
144
145        Example:
146            >>> left.join(right, "inner", ["id"], ["right_id"])
147        """
148        return DataStream(
149            self.ds.join(right.ds, join_type, left_cols, right_cols, filter)
150        )
151
152    def window(
153        self,
154        group_exprs: list[Expr],
155        aggr_exprs: list[Expr],
156        window_length_millis: int,
157        slide_millis: int | None = None,
158    ) -> "DataStream":
159        """Apply a windowing operation to the DataStream.
160
161        If `slide_millis` is `None` a tumbling window will be created otherwise a sliding window will be created.
162
163        Args:
164            group_exprs: List of expressions to group by
165            aggr_exprs: List of aggregation expressions to apply
166            window_length_millis: Length of the window in milliseconds
167            slide_millis: Optional slide interval in milliseconds (defaults to None)
168
169        Returns:
170            DataStream: A new DataStream with the windowing operation applied
171
172        Example:
173            >>> ds.window([col("user_id")], [sum(col("value"))], 60000)  # 1-minute window
174        """
175        return DataStream(
176            self.ds.window(
177                to_internal_exprs(group_exprs),
178                to_internal_exprs(aggr_exprs),
179                window_length_millis,
180                slide_millis,
181            )
182        )
183
184    def print_stream(self) -> None:
185        """Print the contents of the DataStream to stdout."""
186        self.ds.print_stream()
187
188    def print_schema(self) -> "DataStream":
189        """Print the schema of the DataStream to stdout.
190
191        Returns:
192            DataStream: Self for method chaining
193        """
194        return DataStream(self.ds.print_schema())
195
196    def print_plan(self) -> "DataStream":
197        """Print the logical execution plan of the DataStream to stdout.
198
199        Returns:
200            DataStream: Self for method chaining
201        """
202        return DataStream(self.ds.print_plan())
203
204    def print_physical_plan(self) -> "DataStream":
205        """Print the physical execution plan of the DataStream to stdout.
206
207        Returns:
208            DataStream: Self for method chaining
209        """
210        return DataStream(self.ds.print_physical_plan())
211
212    def sink_kafka(self, bootstrap_servers: str, topic: str) -> None:
213        """Sink the DataStream to a Kafka topic.
214
215        Args:
216            bootstrap_servers: Comma-separated list of Kafka broker addresses
217            topic: Name of the Kafka topic to write to
218
219        Raises:
220            ConnectionError: If unable to connect to Kafka brokers
221        """
222        self.ds.sink_kafka(bootstrap_servers, topic)
223
224    def sink(self, func: Callable[[pa.RecordBatch], None]) -> None:
225        """Sink the DataStream to a Python callback function.
226
227        Args:
228            func: Callback function that receives PyArrow RecordBatches
229
230        Example:
231            >>> ds.sink(lambda batch: print(f"Received batch with {len(batch)} rows"))
232        """
233        self.ds.sink_python(func)

Represents a stream of data that can be manipulated using various operations.

This class provides a high-level interface for stream processing operations, including filtering, joining, windowing, and sinking data to various destinations. It wraps a Rust-implemented PyDataStream object.

Attributes:
  • ds (PyDataStream): The underlying Rust-side DataStream implementation
DataStream(ds: denormalized.PyDataStream)
21    def __init__(self, ds: PyDataStream) -> None:
22        """Initialize a new DataStream object.
23
24        Args:
25            ds: The underlying PyDataStream object from the Rust implementation
26        """
27        self.ds = ds

Initialize a new DataStream object.

Arguments:
  • ds: The underlying PyDataStream object from the Rust implementation
ds
def schema(self) -> pyarrow.lib.Schema:
45    def schema(self) -> pa.Schema:
46        """Get the schema of the DataStream.
47
48        Returns:
49            pa.Schema: The PyArrow schema describing the structure of the data
50        """
51        return self.ds.schema()

Get the schema of the DataStream.

Returns:

pa.Schema: The PyArrow schema describing the structure of the data

def select( self, expr_list: list[Expr]) -> DataStream:
53    def select(self, expr_list: list[Expr]) -> "DataStream":
54        """Select specific columns or expressions from the DataStream.
55
56        Args:
57            expr_list: List of expressions defining the columns or computations to select
58
59        Returns:
60            DataStream: A new DataStream containing only the selected expressions
61
62        Example:
63            >>> ds.select([col("name"), col("age") + 1])
64        """
65        return DataStream(self.ds.select(to_internal_exprs(expr_list)))

Select specific columns or expressions from the DataStream.

Arguments:
  • expr_list: List of expressions defining the columns or computations to select
Returns:

DataStream: A new DataStream containing only the selected expressions

Example:
>>> ds.select([col("name"), col("age") + 1])
def filter( self, predicate: Expr) -> DataStream:
67    def filter(self, predicate: Expr) -> "DataStream":
68        """Filter the DataStream based on a predicate.
69
70        Args:
71            predicate: Boolean expression used to filter rows
72
73        Returns:
74            DataStream: A new DataStream containing only rows that satisfy the predicate
75
76        Example:
77            >>> ds.filter(col("age") > 18)
78        """
79        return DataStream(self.ds.filter(to_internal_expr(predicate)))

Filter the DataStream based on a predicate.

Arguments:
  • predicate: Boolean expression used to filter rows
Returns:

DataStream: A new DataStream containing only rows that satisfy the predicate

Example:
>>> ds.filter(col("age") > 18)
def with_column( self, name: str, predicate: Expr) -> DataStream:
81    def with_column(self, name: str, predicate: Expr) -> "DataStream":
82        """Add a new column to the DataStream.
83
84        Args:
85            name: Name of the new column
86            predicate: Expression defining the values for the new column
87
88        Returns:
89            DataStream: A new DataStream with the additional column
90
91        Example:
92            >>> ds.with_column("adult", col("age") >= 18)
93        """
94        return DataStream(self.ds.with_column(name, to_internal_expr(predicate)))

Add a new column to the DataStream.

Arguments:
  • name: Name of the new column
  • predicate: Expression defining the values for the new column
Returns:

DataStream: A new DataStream with the additional column

Example:
>>> ds.with_column("adult", col("age") >= 18)
def drop_columns(self, columns: list[str]) -> DataStream:
 96    def drop_columns(self, columns: list[str]) -> "DataStream":
 97        """Drops specified columns from the DataStream.
 98
 99        Args:
100            columns: List of column names to remove
101
102        Returns:
103            DataStream: A new DataStream without the specified columns
104        """
105        return DataStream(self.ds.drop_columns(columns))

Drops specified columns from the DataStream.

Arguments:
  • columns: List of column names to remove
Returns:

DataStream: A new DataStream without the specified columns

def join_on( self, right: DataStream, join_type: str, on_exprs: list[Expr]) -> DataStream:
107    def join_on(
108        self, right: "DataStream", join_type: str, on_exprs: list[Expr]
109    ) -> "DataStream":
110        """Join this DataStream with another one based on join expressions.
111
112        Args:
113            right: The right DataStream to join with
114            join_type: Type of join ('inner', 'left', 'right', 'full')
115            on_exprs: List of expressions defining the join conditions
116
117        Returns:
118            DataStream: A new DataStream resulting from the join operation
119
120        Example:
121            >>> left.join_on(right, "inner", [col("id") == col("right.id")])
122        """
123        return DataStream(self.ds.join_on(right.ds, join_type, on_exprs))

Join this DataStream with another one based on join expressions.

Arguments:
  • right: The right DataStream to join with
  • join_type: Type of join ('inner', 'left', 'right', 'full')
  • on_exprs: List of expressions defining the join conditions
Returns:

DataStream: A new DataStream resulting from the join operation

Example:
>>> left.join_on(right, "inner", [col("id") == col("right.id")])
def join( self, right: DataStream, join_type: str, left_cols: list[str], right_cols: list[str], filter: Expr | None = None) -> DataStream:
125    def join(
126        self,
127        right: "DataStream",
128        join_type: str,
129        left_cols: list[str],
130        right_cols: list[str],
131        filter: Expr | None = None,
132    ) -> "DataStream":
133        """Join this DataStream with another one based on column names.
134
135        Args:
136            right: The right DataStream to join with
137            join_type: Type of join ('inner', 'left', 'right', 'full')
138            left_cols: Column names from the left DataStream to join on
139            right_cols: Column names from the right DataStream to join on
140            filter: Optional additional join filter expression
141
142        Returns:
143            DataStream: A new DataStream resulting from the join operation
144
145        Example:
146            >>> left.join(right, "inner", ["id"], ["right_id"])
147        """
148        return DataStream(
149            self.ds.join(right.ds, join_type, left_cols, right_cols, filter)
150        )

Join this DataStream with another one based on column names.

Arguments:
  • right: The right DataStream to join with
  • join_type: Type of join ('inner', 'left', 'right', 'full')
  • left_cols: Column names from the left DataStream to join on
  • right_cols: Column names from the right DataStream to join on
  • filter: Optional additional join filter expression
Returns:

DataStream: A new DataStream resulting from the join operation

Example:
>>> left.join(right, "inner", ["id"], ["right_id"])
def window( self, group_exprs: list[Expr], aggr_exprs: list[Expr], window_length_millis: int, slide_millis: int | None = None) -> DataStream:
152    def window(
153        self,
154        group_exprs: list[Expr],
155        aggr_exprs: list[Expr],
156        window_length_millis: int,
157        slide_millis: int | None = None,
158    ) -> "DataStream":
159        """Apply a windowing operation to the DataStream.
160
161        If `slide_millis` is `None` a tumbling window will be created otherwise a sliding window will be created.
162
163        Args:
164            group_exprs: List of expressions to group by
165            aggr_exprs: List of aggregation expressions to apply
166            window_length_millis: Length of the window in milliseconds
167            slide_millis: Optional slide interval in milliseconds (defaults to None)
168
169        Returns:
170            DataStream: A new DataStream with the windowing operation applied
171
172        Example:
173            >>> ds.window([col("user_id")], [sum(col("value"))], 60000)  # 1-minute window
174        """
175        return DataStream(
176            self.ds.window(
177                to_internal_exprs(group_exprs),
178                to_internal_exprs(aggr_exprs),
179                window_length_millis,
180                slide_millis,
181            )
182        )

Apply a windowing operation to the DataStream.

If slide_millis is None a tumbling window will be created otherwise a sliding window will be created.

Arguments:
  • group_exprs: List of expressions to group by
  • aggr_exprs: List of aggregation expressions to apply
  • window_length_millis: Length of the window in milliseconds
  • slide_millis: Optional slide interval in milliseconds (defaults to None)
Returns:

DataStream: A new DataStream with the windowing operation applied

Example:
>>> ds.window([col("user_id")], [sum(col("value"))], 60000)  # 1-minute window
def print_stream(self) -> None:
184    def print_stream(self) -> None:
185        """Print the contents of the DataStream to stdout."""
186        self.ds.print_stream()

Print the contents of the DataStream to stdout.

def print_schema(self) -> DataStream:
188    def print_schema(self) -> "DataStream":
189        """Print the schema of the DataStream to stdout.
190
191        Returns:
192            DataStream: Self for method chaining
193        """
194        return DataStream(self.ds.print_schema())

Print the schema of the DataStream to stdout.

Returns:

DataStream: Self for method chaining

def print_plan(self) -> DataStream:
196    def print_plan(self) -> "DataStream":
197        """Print the logical execution plan of the DataStream to stdout.
198
199        Returns:
200            DataStream: Self for method chaining
201        """
202        return DataStream(self.ds.print_plan())

Print the logical execution plan of the DataStream to stdout.

Returns:

DataStream: Self for method chaining

def print_physical_plan(self) -> DataStream:
204    def print_physical_plan(self) -> "DataStream":
205        """Print the physical execution plan of the DataStream to stdout.
206
207        Returns:
208            DataStream: Self for method chaining
209        """
210        return DataStream(self.ds.print_physical_plan())

Print the physical execution plan of the DataStream to stdout.

Returns:

DataStream: Self for method chaining

def sink_kafka(self, bootstrap_servers: str, topic: str) -> None:
212    def sink_kafka(self, bootstrap_servers: str, topic: str) -> None:
213        """Sink the DataStream to a Kafka topic.
214
215        Args:
216            bootstrap_servers: Comma-separated list of Kafka broker addresses
217            topic: Name of the Kafka topic to write to
218
219        Raises:
220            ConnectionError: If unable to connect to Kafka brokers
221        """
222        self.ds.sink_kafka(bootstrap_servers, topic)

Sink the DataStream to a Kafka topic.

Arguments:
  • bootstrap_servers: Comma-separated list of Kafka broker addresses
  • topic: Name of the Kafka topic to write to
Raises:
  • ConnectionError: If unable to connect to Kafka brokers
def sink(self, func: Callable[[pyarrow.lib.RecordBatch], NoneType]) -> None:
224    def sink(self, func: Callable[[pa.RecordBatch], None]) -> None:
225        """Sink the DataStream to a Python callback function.
226
227        Args:
228            func: Callback function that receives PyArrow RecordBatches
229
230        Example:
231            >>> ds.sink(lambda batch: print(f"Received batch with {len(batch)} rows"))
232        """
233        self.ds.sink_python(func)

Sink the DataStream to a Python callback function.

Arguments:
  • func: Callback function that receives PyArrow RecordBatches
Example:
>>> ds.sink(lambda batch: print(f"Received batch with {len(batch)} rows"))
def col(value: str):
96def col(value: str):
97    """Create a column expression."""
98    return Expr.column(value)

Create a column expression.

def column(value: str):
91def column(value: str):
92    """Create a column expression."""
93    return Expr.column(value)

Create a column expression.

class Expr:
184class Expr:
185    """Expression object.
186
187    Expressions are one of the core concepts in DataFusion. See
188    :ref:`Expressions` in the online documentation for more information.
189    """
190
191    def __init__(self, expr: expr_internal.Expr) -> None:
192        """This constructor should not be called by the end user."""
193        self.expr = expr
194
195    def to_variant(self) -> Any:
196        """Convert this expression into a python object if possible."""
197        return self.expr.to_variant()
198
199    @deprecated(
200        "display_name() is deprecated. Use :py:meth:`~Expr.schema_name` instead"
201    )
202    def display_name(self) -> str:
203        """Returns the name of this expression as it should appear in a schema.
204
205        This name will not include any CAST expressions.
206        """
207        return self.schema_name()
208
209    def schema_name(self) -> str:
210        """Returns the name of this expression as it should appear in a schema.
211
212        This name will not include any CAST expressions.
213        """
214        return self.expr.schema_name()
215
216    def canonical_name(self) -> str:
217        """Returns a complete string representation of this expression."""
218        return self.expr.canonical_name()
219
220    def variant_name(self) -> str:
221        """Returns the name of the Expr variant.
222
223        Ex: ``IsNotNull``, ``Literal``, ``BinaryExpr``, etc
224        """
225        return self.expr.variant_name()
226
227    def __richcmp__(self, other: Expr, op: int) -> Expr:
228        """Comparison operator."""
229        return Expr(self.expr.__richcmp__(other, op))
230
231    def __repr__(self) -> str:
232        """Generate a string representation of this expression."""
233        return self.expr.__repr__()
234
235    def __add__(self, rhs: Any) -> Expr:
236        """Addition operator.
237
238        Accepts either an expression or any valid PyArrow scalar literal value.
239        """
240        if not isinstance(rhs, Expr):
241            rhs = Expr.literal(rhs)
242        return Expr(self.expr.__add__(rhs.expr))
243
244    def __sub__(self, rhs: Any) -> Expr:
245        """Subtraction operator.
246
247        Accepts either an expression or any valid PyArrow scalar literal value.
248        """
249        if not isinstance(rhs, Expr):
250            rhs = Expr.literal(rhs)
251        return Expr(self.expr.__sub__(rhs.expr))
252
253    def __truediv__(self, rhs: Any) -> Expr:
254        """Division operator.
255
256        Accepts either an expression or any valid PyArrow scalar literal value.
257        """
258        if not isinstance(rhs, Expr):
259            rhs = Expr.literal(rhs)
260        return Expr(self.expr.__truediv__(rhs.expr))
261
262    def __mul__(self, rhs: Any) -> Expr:
263        """Multiplication operator.
264
265        Accepts either an expression or any valid PyArrow scalar literal value.
266        """
267        if not isinstance(rhs, Expr):
268            rhs = Expr.literal(rhs)
269        return Expr(self.expr.__mul__(rhs.expr))
270
271    def __mod__(self, rhs: Any) -> Expr:
272        """Modulo operator (%).
273
274        Accepts either an expression or any valid PyArrow scalar literal value.
275        """
276        if not isinstance(rhs, Expr):
277            rhs = Expr.literal(rhs)
278        return Expr(self.expr.__mod__(rhs.expr))
279
280    def __and__(self, rhs: Expr) -> Expr:
281        """Logical AND."""
282        if not isinstance(rhs, Expr):
283            rhs = Expr.literal(rhs)
284        return Expr(self.expr.__and__(rhs.expr))
285
286    def __or__(self, rhs: Expr) -> Expr:
287        """Logical OR."""
288        if not isinstance(rhs, Expr):
289            rhs = Expr.literal(rhs)
290        return Expr(self.expr.__or__(rhs.expr))
291
292    def __invert__(self) -> Expr:
293        """Binary not (~)."""
294        return Expr(self.expr.__invert__())
295
296    def __getitem__(self, key: str | int) -> Expr:
297        """Retrieve sub-object.
298
299        If ``key`` is a string, returns the subfield of the struct.
300        If ``key`` is an integer, retrieves the element in the array. Note that the
301        element index begins at ``0``, unlike `array_element` which begins at ``1``.
302        """
303        if isinstance(key, int):
304            return Expr(
305                functions_internal.array_element(self.expr, Expr.literal(key + 1).expr)
306            )
307        return Expr(self.expr.__getitem__(key))
308
309    def __eq__(self, rhs: Any) -> Expr:
310        """Equal to.
311
312        Accepts either an expression or any valid PyArrow scalar literal value.
313        """
314        if not isinstance(rhs, Expr):
315            rhs = Expr.literal(rhs)
316        return Expr(self.expr.__eq__(rhs.expr))
317
318    def __ne__(self, rhs: Any) -> Expr:
319        """Not equal to.
320
321        Accepts either an expression or any valid PyArrow scalar literal value.
322        """
323        if not isinstance(rhs, Expr):
324            rhs = Expr.literal(rhs)
325        return Expr(self.expr.__ne__(rhs.expr))
326
327    def __ge__(self, rhs: Any) -> Expr:
328        """Greater than or equal to.
329
330        Accepts either an expression or any valid PyArrow scalar literal value.
331        """
332        if not isinstance(rhs, Expr):
333            rhs = Expr.literal(rhs)
334        return Expr(self.expr.__ge__(rhs.expr))
335
336    def __gt__(self, rhs: Any) -> Expr:
337        """Greater than.
338
339        Accepts either an expression or any valid PyArrow scalar literal value.
340        """
341        if not isinstance(rhs, Expr):
342            rhs = Expr.literal(rhs)
343        return Expr(self.expr.__gt__(rhs.expr))
344
345    def __le__(self, rhs: Any) -> Expr:
346        """Less than or equal to.
347
348        Accepts either an expression or any valid PyArrow scalar literal value.
349        """
350        if not isinstance(rhs, Expr):
351            rhs = Expr.literal(rhs)
352        return Expr(self.expr.__le__(rhs.expr))
353
354    def __lt__(self, rhs: Any) -> Expr:
355        """Less than.
356
357        Accepts either an expression or any valid PyArrow scalar literal value.
358        """
359        if not isinstance(rhs, Expr):
360            rhs = Expr.literal(rhs)
361        return Expr(self.expr.__lt__(rhs.expr))
362
363    __radd__ = __add__
364    __rand__ = __and__
365    __rmod__ = __mod__
366    __rmul__ = __mul__
367    __ror__ = __or__
368    __rsub__ = __sub__
369    __rtruediv__ = __truediv__
370
371    @staticmethod
372    def literal(value: Any) -> Expr:
373        """Creates a new expression representing a scalar value.
374
375        ``value`` must be a valid PyArrow scalar value or easily castable to one.
376        """
377        if not isinstance(value, pa.Scalar):
378            value = pa.scalar(value)
379        return Expr(expr_internal.Expr.literal(value))
380
381    @staticmethod
382    def column(value: str) -> Expr:
383        """Creates a new expression representing a column."""
384        return Expr(expr_internal.Expr.column(value))
385
386    def alias(self, name: str) -> Expr:
387        """Assign a name to the expression."""
388        return Expr(self.expr.alias(name))
389
390    def sort(self, ascending: bool = True, nulls_first: bool = True) -> SortExpr:
391        """Creates a sort :py:class:`Expr` from an existing :py:class:`Expr`.
392
393        Args:
394            ascending: If true, sort in ascending order.
395            nulls_first: Return null values first.
396        """
397        return SortExpr(self.expr, ascending=ascending, nulls_first=nulls_first)
398
399    def is_null(self) -> Expr:
400        """Returns ``True`` if this expression is null."""
401        return Expr(self.expr.is_null())
402
403    def is_not_null(self) -> Expr:
404        """Returns ``True`` if this expression is not null."""
405        return Expr(self.expr.is_not_null())
406
407    _to_pyarrow_types = {
408        float: pa.float64(),
409        int: pa.int64(),
410        str: pa.string(),
411        bool: pa.bool_(),
412    }
413
414    def cast(
415        self, to: pa.DataType[Any] | Type[float] | Type[int] | Type[str] | Type[bool]
416    ) -> Expr:
417        """Cast to a new data type."""
418        if not isinstance(to, pa.DataType):
419            try:
420                to = self._to_pyarrow_types[to]
421            except KeyError:
422                raise TypeError(
423                    "Expected instance of pyarrow.DataType or builtins.type"
424                )
425
426        return Expr(self.expr.cast(to))
427
428    def between(self, low: Any, high: Any, negated: bool = False) -> Expr:
429        """Returns ``True`` if this expression is between a given range.
430
431        Args:
432            low: lower bound of the range (inclusive).
433            high: higher bound of the range (inclusive).
434            negated: negates whether the expression is between a given range
435        """
436        if not isinstance(low, Expr):
437            low = Expr.literal(low)
438
439        if not isinstance(high, Expr):
440            high = Expr.literal(high)
441
442        return Expr(self.expr.between(low.expr, high.expr, negated=negated))
443
444    def rex_type(self) -> RexType:
445        """Return the Rex Type of this expression.
446
447        A Rex (Row Expression) specifies a single row of data.That specification
448        could include user defined functions or types. RexType identifies the
449        row as one of the possible valid ``RexType``.
450        """
451        return self.expr.rex_type()
452
453    def types(self) -> DataTypeMap:
454        """Return the ``DataTypeMap``.
455
456        Returns:
457            DataTypeMap which represents the PythonType, Arrow DataType, and
458            SqlType Enum which this expression represents.
459        """
460        return self.expr.types()
461
462    def python_value(self) -> Any:
463        """Extracts the Expr value into a PyObject.
464
465        This is only valid for literal expressions.
466
467        Returns:
468            Python object representing literal value of the expression.
469        """
470        return self.expr.python_value()
471
472    def rex_call_operands(self) -> list[Expr]:
473        """Return the operands of the expression based on it's variant type.
474
475        Row expressions, Rex(s), operate on the concept of operands. Different
476        variants of Expressions, Expr(s), store those operands in different
477        datastructures. This function examines the Expr variant and returns
478        the operands to the calling logic.
479        """
480        return [Expr(e) for e in self.expr.rex_call_operands()]
481
482    def rex_call_operator(self) -> str:
483        """Extracts the operator associated with a row expression type call."""
484        return self.expr.rex_call_operator()
485
486    def column_name(self, plan: LogicalPlan) -> str:
487        """Compute the output column name based on the provided logical plan."""
488        return self.expr.column_name(plan)
489
490    def order_by(self, *exprs: Expr | SortExpr) -> ExprFuncBuilder:
491        """Set the ordering for a window or aggregate function.
492
493        This function will create an :py:class:`ExprFuncBuilder` that can be used to
494        set parameters for either window or aggregate functions. If used on any other
495        type of expression, an error will be generated when ``build()`` is called.
496        """
497        return ExprFuncBuilder(self.expr.order_by([sort_or_default(e) for e in exprs]))
498
499    def filter(self, filter: Expr) -> ExprFuncBuilder:
500        """Filter an aggregate function.
501
502        This function will create an :py:class:`ExprFuncBuilder` that can be used to
503        set parameters for either window or aggregate functions. If used on any other
504        type of expression, an error will be generated when ``build()`` is called.
505        """
506        return ExprFuncBuilder(self.expr.filter(filter.expr))
507
508    def distinct(self) -> ExprFuncBuilder:
509        """Only evaluate distinct values for an aggregate function.
510
511        This function will create an :py:class:`ExprFuncBuilder` that can be used to
512        set parameters for either window or aggregate functions. If used on any other
513        type of expression, an error will be generated when ``build()`` is called.
514        """
515        return ExprFuncBuilder(self.expr.distinct())
516
517    def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder:
518        """Set the treatment for ``null`` values for a window or aggregate function.
519
520        This function will create an :py:class:`ExprFuncBuilder` that can be used to
521        set parameters for either window or aggregate functions. If used on any other
522        type of expression, an error will be generated when ``build()`` is called.
523        """
524        return ExprFuncBuilder(self.expr.null_treatment(null_treatment.value))
525
526    def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder:
527        """Set the partitioning for a window function.
528
529        This function will create an :py:class:`ExprFuncBuilder` that can be used to
530        set parameters for either window or aggregate functions. If used on any other
531        type of expression, an error will be generated when ``build()`` is called.
532        """
533        return ExprFuncBuilder(
534            self.expr.partition_by(list(e.expr for e in partition_by))
535        )
536
537    def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder:
538        """Set the frame fora  window function.
539
540        This function will create an :py:class:`ExprFuncBuilder` that can be used to
541        set parameters for either window or aggregate functions. If used on any other
542        type of expression, an error will be generated when ``build()`` is called.
543        """
544        return ExprFuncBuilder(self.expr.window_frame(window_frame.window_frame))

Expression object.

Expressions are one of the core concepts in DataFusion. See :ref:Expressions in the online documentation for more information.

Expr(expr: datafusion.expr.Expr)
191    def __init__(self, expr: expr_internal.Expr) -> None:
192        """This constructor should not be called by the end user."""
193        self.expr = expr

This constructor should not be called by the end user.

expr
def to_variant(self) -> Any:
195    def to_variant(self) -> Any:
196        """Convert this expression into a python object if possible."""
197        return self.expr.to_variant()

Convert this expression into a python object if possible.

@deprecated('display_name() is deprecated. Use :py:meth:`~Expr.schema_name` instead')
def display_name(self) -> str:
199    @deprecated(
200        "display_name() is deprecated. Use :py:meth:`~Expr.schema_name` instead"
201    )
202    def display_name(self) -> str:
203        """Returns the name of this expression as it should appear in a schema.
204
205        This name will not include any CAST expressions.
206        """
207        return self.schema_name()

Returns the name of this expression as it should appear in a schema.

This name will not include any CAST expressions.

def schema_name(self) -> str:
209    def schema_name(self) -> str:
210        """Returns the name of this expression as it should appear in a schema.
211
212        This name will not include any CAST expressions.
213        """
214        return self.expr.schema_name()

Returns the name of this expression as it should appear in a schema.

This name will not include any CAST expressions.

def canonical_name(self) -> str:
216    def canonical_name(self) -> str:
217        """Returns a complete string representation of this expression."""
218        return self.expr.canonical_name()

Returns a complete string representation of this expression.

def variant_name(self) -> str:
220    def variant_name(self) -> str:
221        """Returns the name of the Expr variant.
222
223        Ex: ``IsNotNull``, ``Literal``, ``BinaryExpr``, etc
224        """
225        return self.expr.variant_name()

Returns the name of the Expr variant.

Ex: IsNotNull, Literal, BinaryExpr, etc

@staticmethod
def literal(value: Any) -> Expr:
371    @staticmethod
372    def literal(value: Any) -> Expr:
373        """Creates a new expression representing a scalar value.
374
375        ``value`` must be a valid PyArrow scalar value or easily castable to one.
376        """
377        if not isinstance(value, pa.Scalar):
378            value = pa.scalar(value)
379        return Expr(expr_internal.Expr.literal(value))

Creates a new expression representing a scalar value.

value must be a valid PyArrow scalar value or easily castable to one.

@staticmethod
def column(value: str) -> Expr:
381    @staticmethod
382    def column(value: str) -> Expr:
383        """Creates a new expression representing a column."""
384        return Expr(expr_internal.Expr.column(value))

Creates a new expression representing a column.

def alias(self, name: str) -> Expr:
386    def alias(self, name: str) -> Expr:
387        """Assign a name to the expression."""
388        return Expr(self.expr.alias(name))

Assign a name to the expression.

def sort( self, ascending: bool = True, nulls_first: bool = True) -> denormalized.datafusion.expr.SortExpr:
390    def sort(self, ascending: bool = True, nulls_first: bool = True) -> SortExpr:
391        """Creates a sort :py:class:`Expr` from an existing :py:class:`Expr`.
392
393        Args:
394            ascending: If true, sort in ascending order.
395            nulls_first: Return null values first.
396        """
397        return SortExpr(self.expr, ascending=ascending, nulls_first=nulls_first)

Creates a sort Expr from an existing Expr.

Arguments:
  • ascending: If true, sort in ascending order.
  • nulls_first: Return null values first.
def is_null(self) -> Expr:
399    def is_null(self) -> Expr:
400        """Returns ``True`` if this expression is null."""
401        return Expr(self.expr.is_null())

Returns True if this expression is null.

def is_not_null(self) -> Expr:
403    def is_not_null(self) -> Expr:
404        """Returns ``True`` if this expression is not null."""
405        return Expr(self.expr.is_not_null())

Returns True if this expression is not null.

def cast( self, to: 'pa.DataType[Any] | Type[float] | Type[int] | Type[str] | Type[bool]') -> Expr:
414    def cast(
415        self, to: pa.DataType[Any] | Type[float] | Type[int] | Type[str] | Type[bool]
416    ) -> Expr:
417        """Cast to a new data type."""
418        if not isinstance(to, pa.DataType):
419            try:
420                to = self._to_pyarrow_types[to]
421            except KeyError:
422                raise TypeError(
423                    "Expected instance of pyarrow.DataType or builtins.type"
424                )
425
426        return Expr(self.expr.cast(to))

Cast to a new data type.

def between( self, low: Any, high: Any, negated: bool = False) -> Expr:
428    def between(self, low: Any, high: Any, negated: bool = False) -> Expr:
429        """Returns ``True`` if this expression is between a given range.
430
431        Args:
432            low: lower bound of the range (inclusive).
433            high: higher bound of the range (inclusive).
434            negated: negates whether the expression is between a given range
435        """
436        if not isinstance(low, Expr):
437            low = Expr.literal(low)
438
439        if not isinstance(high, Expr):
440            high = Expr.literal(high)
441
442        return Expr(self.expr.between(low.expr, high.expr, negated=negated))

Returns True if this expression is between a given range.

Arguments:
  • low: lower bound of the range (inclusive).
  • high: higher bound of the range (inclusive).
  • negated: negates whether the expression is between a given range
def rex_type(self) -> datafusion.common.RexType:
444    def rex_type(self) -> RexType:
445        """Return the Rex Type of this expression.
446
447        A Rex (Row Expression) specifies a single row of data.That specification
448        could include user defined functions or types. RexType identifies the
449        row as one of the possible valid ``RexType``.
450        """
451        return self.expr.rex_type()

Return the Rex Type of this expression.

A Rex (Row Expression) specifies a single row of data.That specification could include user defined functions or types. RexType identifies the row as one of the possible valid RexType.

def types(self) -> datafusion.common.DataTypeMap:
453    def types(self) -> DataTypeMap:
454        """Return the ``DataTypeMap``.
455
456        Returns:
457            DataTypeMap which represents the PythonType, Arrow DataType, and
458            SqlType Enum which this expression represents.
459        """
460        return self.expr.types()

Return the DataTypeMap.

Returns:

DataTypeMap which represents the PythonType, Arrow DataType, and SqlType Enum which this expression represents.

def python_value(self) -> Any:
462    def python_value(self) -> Any:
463        """Extracts the Expr value into a PyObject.
464
465        This is only valid for literal expressions.
466
467        Returns:
468            Python object representing literal value of the expression.
469        """
470        return self.expr.python_value()

Extracts the Expr value into a PyObject.

This is only valid for literal expressions.

Returns:

Python object representing literal value of the expression.

def rex_call_operands(self) -> list[Expr]:
472    def rex_call_operands(self) -> list[Expr]:
473        """Return the operands of the expression based on it's variant type.
474
475        Row expressions, Rex(s), operate on the concept of operands. Different
476        variants of Expressions, Expr(s), store those operands in different
477        datastructures. This function examines the Expr variant and returns
478        the operands to the calling logic.
479        """
480        return [Expr(e) for e in self.expr.rex_call_operands()]

Return the operands of the expression based on it's variant type.

Row expressions, Rex(s), operate on the concept of operands. Different variants of Expressions, Expr(s), store those operands in different datastructures. This function examines the Expr variant and returns the operands to the calling logic.

def rex_call_operator(self) -> str:
482    def rex_call_operator(self) -> str:
483        """Extracts the operator associated with a row expression type call."""
484        return self.expr.rex_call_operator()

Extracts the operator associated with a row expression type call.

def column_name(self, plan: datafusion.LogicalPlan) -> str:
486    def column_name(self, plan: LogicalPlan) -> str:
487        """Compute the output column name based on the provided logical plan."""
488        return self.expr.column_name(plan)

Compute the output column name based on the provided logical plan.

def order_by( self, *exprs: Expr | denormalized.datafusion.expr.SortExpr) -> denormalized.datafusion.expr.ExprFuncBuilder:
490    def order_by(self, *exprs: Expr | SortExpr) -> ExprFuncBuilder:
491        """Set the ordering for a window or aggregate function.
492
493        This function will create an :py:class:`ExprFuncBuilder` that can be used to
494        set parameters for either window or aggregate functions. If used on any other
495        type of expression, an error will be generated when ``build()`` is called.
496        """
497        return ExprFuncBuilder(self.expr.order_by([sort_or_default(e) for e in exprs]))

Set the ordering for a window or aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

def filter( self, filter: Expr) -> denormalized.datafusion.expr.ExprFuncBuilder:
499    def filter(self, filter: Expr) -> ExprFuncBuilder:
500        """Filter an aggregate function.
501
502        This function will create an :py:class:`ExprFuncBuilder` that can be used to
503        set parameters for either window or aggregate functions. If used on any other
504        type of expression, an error will be generated when ``build()`` is called.
505        """
506        return ExprFuncBuilder(self.expr.filter(filter.expr))

Filter an aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

def distinct(self) -> denormalized.datafusion.expr.ExprFuncBuilder:
508    def distinct(self) -> ExprFuncBuilder:
509        """Only evaluate distinct values for an aggregate function.
510
511        This function will create an :py:class:`ExprFuncBuilder` that can be used to
512        set parameters for either window or aggregate functions. If used on any other
513        type of expression, an error will be generated when ``build()`` is called.
514        """
515        return ExprFuncBuilder(self.expr.distinct())

Only evaluate distinct values for an aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

def null_treatment( self, null_treatment: denormalized.datafusion.common.NullTreatment) -> denormalized.datafusion.expr.ExprFuncBuilder:
517    def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder:
518        """Set the treatment for ``null`` values for a window or aggregate function.
519
520        This function will create an :py:class:`ExprFuncBuilder` that can be used to
521        set parameters for either window or aggregate functions. If used on any other
522        type of expression, an error will be generated when ``build()`` is called.
523        """
524        return ExprFuncBuilder(self.expr.null_treatment(null_treatment.value))

Set the treatment for null values for a window or aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

def partition_by( self, *partition_by: Expr) -> denormalized.datafusion.expr.ExprFuncBuilder:
526    def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder:
527        """Set the partitioning for a window function.
528
529        This function will create an :py:class:`ExprFuncBuilder` that can be used to
530        set parameters for either window or aggregate functions. If used on any other
531        type of expression, an error will be generated when ``build()`` is called.
532        """
533        return ExprFuncBuilder(
534            self.expr.partition_by(list(e.expr for e in partition_by))
535        )

Set the partitioning for a window function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

def window_frame( self, window_frame: denormalized.datafusion.expr.WindowFrame) -> denormalized.datafusion.expr.ExprFuncBuilder:
537    def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder:
538        """Set the frame fora  window function.
539
540        This function will create an :py:class:`ExprFuncBuilder` that can be used to
541        set parameters for either window or aggregate functions. If used on any other
542        type of expression, an error will be generated when ``build()`` is called.
543        """
544        return ExprFuncBuilder(self.expr.window_frame(window_frame.window_frame))

Set the frame fora window function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

def lit(value):
106def lit(value):
107    """Create a literal expression."""
108    return Expr.literal(value)

Create a literal expression.

def literal(value):
101def literal(value):
102    """Create a literal expression."""
103    return Expr.literal(value)

Create a literal expression.

@staticmethod
def udaf( accum: '_A', input_types: list[pyarrow.lib.DataType], return_type: ~_R, state_type: list[pyarrow.lib.DataType], volatility: denormalized.datafusion.udf.Volatility | str, name: str | None = None) -> denormalized.datafusion.udf.AggregateUDF:
209    @staticmethod
210    def udaf(
211        accum: _A,
212        input_types: list[pyarrow.DataType],
213        return_type: _R,
214        state_type: list[pyarrow.DataType],
215        volatility: Volatility | str,
216        name: str | None = None,
217    ) -> AggregateUDF:
218        """Create a new User Defined Aggregate Function.
219
220        The accumulator function must be callable and implement :py:class:`Accumulator`.
221
222        Args:
223            accum: The accumulator python function.
224            input_types: The data types of the arguments to ``accum``.
225            return_type: The data type of the return value.
226            state_type: The data types of the intermediate accumulation.
227            volatility: See :py:class:`Volatility` for allowed values.
228            name: A descriptive name for the function.
229
230        Returns:
231            A user defined aggregate function, which can be used in either data
232            aggregation or window function calls.
233        """
234        if not issubclass(accum, Accumulator):
235            raise TypeError(
236                "`accum` must implement the abstract base class Accumulator"
237            )
238        if name is None:
239            name = accum.__qualname__.lower()
240        if isinstance(input_types, pyarrow.lib.DataType):
241            input_types = [input_types]
242        return AggregateUDF(
243            name=name,
244            accumulator=accum,
245            input_types=input_types,
246            return_type=return_type,
247            state_type=state_type,
248            volatility=volatility,
249        )

Create a new User Defined Aggregate Function.

The accumulator function must be callable and implement Accumulator.

Arguments:
  • accum: The accumulator python function.
  • input_types: The data types of the arguments to accum.
  • return_type: The data type of the return value.
  • state_type: The data types of the intermediate accumulation.
  • volatility: See Volatility for allowed values.
  • name: A descriptive name for the function.
Returns:

A user defined aggregate function, which can be used in either data aggregation or window function calls.

@staticmethod
def udf( func: Callable[..., ~_R], input_types: list[pyarrow.lib.DataType], return_type: ~_R, volatility: denormalized.datafusion.udf.Volatility | str, name: str | None = None) -> denormalized.datafusion.udf.ScalarUDF:
111    @staticmethod
112    def udf(
113        func: Callable[..., _R],
114        input_types: list[pyarrow.DataType],
115        return_type: _R,
116        volatility: Volatility | str,
117        name: str | None = None,
118    ) -> ScalarUDF:
119        """Create a new User Defined Function.
120
121        Args:
122            func: A callable python function.
123            input_types: The data types of the arguments to ``func``. This list
124                must be of the same length as the number of arguments.
125            return_type: The data type of the return value from the python
126                function.
127            volatility: See ``Volatility`` for allowed values.
128            name: A descriptive name for the function.
129
130        Returns:
131            A user defined aggregate function, which can be used in either data
132                aggregation or window function calls.
133        """
134        if not callable(func):
135            raise TypeError("`func` argument must be callable")
136        if name is None:
137            name = func.__qualname__.lower()
138        return ScalarUDF(
139            name=name,
140            func=func,
141            input_types=input_types,
142            return_type=return_type,
143            volatility=volatility,
144        )

Create a new User Defined Function.

Arguments:
  • func: A callable python function.
  • input_types: The data types of the arguments to func. This list must be of the same length as the number of arguments.
  • return_type: The data type of the return value from the python function.
  • volatility: See Volatility for allowed values.
  • name: A descriptive name for the function.
Returns:

A user defined aggregate function, which can be used in either data aggregation or window function calls.

class FeastDataStream(denormalized.DataStream):
 68class FeastDataStream(DataStream, metaclass=FeastDataStreamMeta):
 69    """A DataStream subclass with additional Feast-specific functionality.
 70
 71    This class extends DataStream to provide integration with Feast feature stores
 72    and related functionality.
 73
 74    You must install with the extra 'feast': `pip install denormalized[feast]`
 75    """
 76
 77    def __init__(self, stream: Union[PyDataStream, DataStream]) -> None:
 78        """Initialize a FeastDataStream from either a PyDataStream or DataStream.
 79
 80        Args:
 81            stream: Either a PyDataStream object or a DataStream object
 82        """
 83        if isinstance(stream, DataStream):
 84            super().__init__(stream.ds)
 85        else:
 86            super().__init__(stream)
 87
 88    def get_feast_schema(self) -> list[Field]:
 89        """Get the Feast schema for this DataStream.
 90
 91        Returns:
 92            list[Field]: List of Feast Field objects representing the schema
 93        """
 94        return [
 95            Field(
 96                name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type)))
 97            )
 98            for s in self.schema()
 99        ]
100
101    def write_feast_feature(
102        self, feature_store: FeatureStore, source_name: str
103    ) -> None:
104        """Write the DataStream to a Feast feature store.
105
106        Args:
107            feature_store: The target Feast feature store
108            source_name: Name of the feature source in Feast
109
110        Note:
111            Any exceptions during the push operation will be printed but not raised.
112        """
113
114        def _sink_to_feast(rb: pa.RecordBatch):
115            if len(rb):
116                df = rb.to_pandas()
117                try:
118                    feature_store.push(source_name, df, to=PushMode.ONLINE)
119                except Exception as e:
120                    logger.error(
121                        f"Failed to push to Feast feature store: {e}", exc_info=True
122                    )
123
124        self.ds.sink_python(_sink_to_feast)

A DataStream subclass with additional Feast-specific functionality.

This class extends DataStream to provide integration with Feast feature stores and related functionality.

You must install with the extra 'feast': pip install denormalized[feast]

FeastDataStream( stream: Union[denormalized.PyDataStream, DataStream])
77    def __init__(self, stream: Union[PyDataStream, DataStream]) -> None:
78        """Initialize a FeastDataStream from either a PyDataStream or DataStream.
79
80        Args:
81            stream: Either a PyDataStream object or a DataStream object
82        """
83        if isinstance(stream, DataStream):
84            super().__init__(stream.ds)
85        else:
86            super().__init__(stream)

Initialize a FeastDataStream from either a PyDataStream or DataStream.

Arguments:
  • stream: Either a PyDataStream object or a DataStream object
def get_feast_schema(self) -> list[feast.field.Field]:
88    def get_feast_schema(self) -> list[Field]:
89        """Get the Feast schema for this DataStream.
90
91        Returns:
92            list[Field]: List of Feast Field objects representing the schema
93        """
94        return [
95            Field(
96                name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type)))
97            )
98            for s in self.schema()
99        ]

Get the Feast schema for this DataStream.

Returns:

list[Field]: List of Feast Field objects representing the schema

def write_feast_feature( self, feature_store: feast.feature_store.FeatureStore, source_name: str) -> None:
101    def write_feast_feature(
102        self, feature_store: FeatureStore, source_name: str
103    ) -> None:
104        """Write the DataStream to a Feast feature store.
105
106        Args:
107            feature_store: The target Feast feature store
108            source_name: Name of the feature source in Feast
109
110        Note:
111            Any exceptions during the push operation will be printed but not raised.
112        """
113
114        def _sink_to_feast(rb: pa.RecordBatch):
115            if len(rb):
116                df = rb.to_pandas()
117                try:
118                    feature_store.push(source_name, df, to=PushMode.ONLINE)
119                except Exception as e:
120                    logger.error(
121                        f"Failed to push to Feast feature store: {e}", exc_info=True
122                    )
123
124        self.ds.sink_python(_sink_to_feast)

Write the DataStream to a Feast feature store.

Arguments:
  • feature_store: The target Feast feature store
  • source_name: Name of the feature source in Feast
Note:

Any exceptions during the push operation will be printed but not raised.

def select(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Select specific columns or expressions from the DataStream.

Arguments:
  • expr_list: List of expressions defining the columns or computations to select
Returns:

DataStream: A new DataStream containing only the selected expressions

Example:
>>> ds.select([col("name"), col("age") + 1])
def filter(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Filter the DataStream based on a predicate.

Arguments:
  • predicate: Boolean expression used to filter rows
Returns:

DataStream: A new DataStream containing only rows that satisfy the predicate

Example:
>>> ds.filter(col("age") > 18)
def with_column(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Add a new column to the DataStream.

Arguments:
  • name: Name of the new column
  • predicate: Expression defining the values for the new column
Returns:

DataStream: A new DataStream with the additional column

Example:
>>> ds.with_column("adult", col("age") >= 18)
def drop_columns(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Drops specified columns from the DataStream.

Arguments:
  • columns: List of column names to remove
Returns:

DataStream: A new DataStream without the specified columns

def join_on(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Join this DataStream with another one based on join expressions.

Arguments:
  • right: The right DataStream to join with
  • join_type: Type of join ('inner', 'left', 'right', 'full')
  • on_exprs: List of expressions defining the join conditions
Returns:

DataStream: A new DataStream resulting from the join operation

Example:
>>> left.join_on(right, "inner", [col("id") == col("right.id")])
def join(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Join this DataStream with another one based on column names.

Arguments:
  • right: The right DataStream to join with
  • join_type: Type of join ('inner', 'left', 'right', 'full')
  • left_cols: Column names from the left DataStream to join on
  • right_cols: Column names from the right DataStream to join on
  • filter: Optional additional join filter expression
Returns:

DataStream: A new DataStream resulting from the join operation

Example:
>>> left.join(right, "inner", ["id"], ["right_id"])
def window(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Apply a windowing operation to the DataStream.

If slide_millis is None a tumbling window will be created otherwise a sliding window will be created.

Arguments:
  • group_exprs: List of expressions to group by
  • aggr_exprs: List of aggregation expressions to apply
  • window_length_millis: Length of the window in milliseconds
  • slide_millis: Optional slide interval in milliseconds (defaults to None)
Returns:

DataStream: A new DataStream with the windowing operation applied

Example:
>>> ds.window([col("user_id")], [sum(col("value"))], 60000)  # 1-minute window
def print_schema(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Print the schema of the DataStream to stdout.

Returns:

DataStream: Self for method chaining

def print_plan(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Print the logical execution plan of the DataStream to stdout.

Returns:

DataStream: Self for method chaining

def print_physical_plan(self, *args, **kwargs) -> FeastDataStream:
50                    def wrapper(self, *args, **kwargs):
51                        result = getattr(
52                            super(cast(type, self.__class__), self), method_name
53                        )(*args, **kwargs)
54                        return self.__class__(result)

Print the physical execution plan of the DataStream to stdout.

Returns:

DataStream: Self for method chaining