denormalized
Denormalized is a single node stream processing engine written in Rust and powered by Apache DataFusion 🚀
- Install denormalized
pip install denormalized
- Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka
docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest
sample_event = {
"occurred_at_ms": 100,
"sensor_name": "foo",
"reading": 0.0,
}
def print_batch(rb):
pp.pprint(rb.to_pydict())
ds = Context().from_topic(
"temperature",
json.dumps(sample_event),
"localhost:9092",
"occurred_at_ms",
)
ds.window(
[col("sensor_name")],
[
f.count(col("reading"), distinct=False, filter=None).alias("count"),
f.min(col("reading")).alias("min"),
f.max(col("reading")).alias("max"),
],
1000,
None,
).sink(print_batch)
Head on over to the examples folder to see more examples that demonstrate additional functionality including stream joins and user defined (aggregate) functions.
1""" 2[Denormalized](https://www.denormalized.io/) is a single node stream processing engine written in Rust and powered by Apache DataFusion 🚀 3 41. Install denormalized `pip install denormalized` 52. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest` 6 7```python 8sample_event = { 9 "occurred_at_ms": 100, 10 "sensor_name": "foo", 11 "reading": 0.0, 12} 13 14def print_batch(rb): 15 pp.pprint(rb.to_pydict()) 16 17ds = Context().from_topic( 18 "temperature", 19 json.dumps(sample_event), 20 "localhost:9092", 21 "occurred_at_ms", 22) 23 24ds.window( 25 [col("sensor_name")], 26 [ 27 f.count(col("reading"), distinct=False, filter=None).alias("count"), 28 f.min(col("reading")).alias("min"), 29 f.max(col("reading")).alias("max"), 30 ], 31 1000, 32 None, 33).sink(print_batch) 34``` 35 36 37Head on over to the [examples folder](https://github.com/probably-nothing-labs/denormalized/tree/main/py-denormalized/python/examples) to see more examples that demonstrate additional functionality including stream joins and user defined (aggregate) functions. 38 39""" 40 41from .context import Context 42from .data_stream import DataStream 43from .datafusion import col, column 44from .datafusion import functions as Functions 45from .datafusion import lit, literal, udaf, udf 46from .datafusion.expr import Expr 47 48__all__ = [ 49 "Context", 50 "DataStream", 51 "col", 52 "column", 53 "Expr", 54 "Functions", 55 "lit", 56 "literal", 57 "udaf", 58 "udf", 59] 60 61__docformat__ = "google" 62 63try: 64 from .feast_data_stream import FeastDataStream 65 66 __all__.append("FeastDataStream") 67except ImportError: 68 pass
6class Context: 7 """A context manager for handling data stream operations. 8 9 This class provides functionality to create and manage data streams 10 from various sources like Kafka topics. 11 """ 12 13 def __init__(self) -> None: 14 """Initializes a new Context instance with PyContext.""" 15 self.ctx = PyContext() 16 17 def __repr__(self): 18 """Returns the string representation of the PyContext object. 19 20 Returns: 21 str: String representation of the underlying PyContext. 22 """ 23 return self.ctx.__repr__() 24 25 def __str__(self): 26 """Returns the string representation of the PyContext object. 27 28 Returns: 29 str: String representation of the underlying PyContext. 30 """ 31 return self.ctx.__str__() 32 33 def from_topic( 34 self, 35 topic: str, 36 sample_json: str, 37 bootstrap_servers: str, 38 timestamp_column: str | None = None, 39 group_id: str = "default_group", 40 ) -> DataStream: 41 """Creates a new DataStream from a Kafka topic. 42 43 Args: 44 topic: The name of the Kafka topic to consume from. 45 sample_json: A sample JSON string representing the expected message format. 46 bootstrap_servers: Comma-separated list of Kafka broker addresses. 47 timestamp_column: Optional column name containing message timestamps. 48 group_id: Kafka consumer group ID, defaults to "default_group". 49 50 Returns: 51 DataStream: A new DataStream instance connected to the specified topic. 52 """ 53 py_ds = self.ctx.from_topic( 54 topic, 55 sample_json, 56 bootstrap_servers, 57 group_id, 58 timestamp_column, 59 ) 60 ds = DataStream(py_ds) 61 return ds
A context manager for handling data stream operations.
This class provides functionality to create and manage data streams from various sources like Kafka topics.
13 def __init__(self) -> None: 14 """Initializes a new Context instance with PyContext.""" 15 self.ctx = PyContext()
Initializes a new Context instance with PyContext.
33 def from_topic( 34 self, 35 topic: str, 36 sample_json: str, 37 bootstrap_servers: str, 38 timestamp_column: str | None = None, 39 group_id: str = "default_group", 40 ) -> DataStream: 41 """Creates a new DataStream from a Kafka topic. 42 43 Args: 44 topic: The name of the Kafka topic to consume from. 45 sample_json: A sample JSON string representing the expected message format. 46 bootstrap_servers: Comma-separated list of Kafka broker addresses. 47 timestamp_column: Optional column name containing message timestamps. 48 group_id: Kafka consumer group ID, defaults to "default_group". 49 50 Returns: 51 DataStream: A new DataStream instance connected to the specified topic. 52 """ 53 py_ds = self.ctx.from_topic( 54 topic, 55 sample_json, 56 bootstrap_servers, 57 group_id, 58 timestamp_column, 59 ) 60 ds = DataStream(py_ds) 61 return ds
Creates a new DataStream from a Kafka topic.
Arguments:
- topic: The name of the Kafka topic to consume from.
- sample_json: A sample JSON string representing the expected message format.
- bootstrap_servers: Comma-separated list of Kafka broker addresses.
- timestamp_column: Optional column name containing message timestamps.
- group_id: Kafka consumer group ID, defaults to "default_group".
Returns:
DataStream: A new DataStream instance connected to the specified topic.
10class DataStream: 11 """Represents a stream of data that can be manipulated using various operations. 12 13 This class provides a high-level interface for stream processing operations, including 14 filtering, joining, windowing, and sinking data to various destinations. It wraps 15 a Rust-implemented PyDataStream object. 16 17 Attributes: 18 ds (PyDataStream): The underlying Rust-side DataStream implementation 19 """ 20 21 def __init__(self, ds: PyDataStream) -> None: 22 """Initialize a new DataStream object. 23 24 Args: 25 ds: The underlying PyDataStream object from the Rust implementation 26 """ 27 self.ds = ds 28 29 def __repr__(self): 30 """Return a string representation of the DataStream object. 31 32 Returns: 33 str: A string representation of the DataStream 34 """ 35 return self.ds.__repr__() 36 37 def __str__(self): 38 """Return a string description of the DataStream object. 39 40 Returns: 41 str: A human-readable description of the DataStream 42 """ 43 return self.ds.__str__() 44 45 def schema(self) -> pa.Schema: 46 """Get the schema of the DataStream. 47 48 Returns: 49 pa.Schema: The PyArrow schema describing the structure of the data 50 """ 51 return self.ds.schema() 52 53 def select(self, expr_list: list[Expr]) -> "DataStream": 54 """Select specific columns or expressions from the DataStream. 55 56 Args: 57 expr_list: List of expressions defining the columns or computations to select 58 59 Returns: 60 DataStream: A new DataStream containing only the selected expressions 61 62 Example: 63 >>> ds.select([col("name"), col("age") + 1]) 64 """ 65 return DataStream(self.ds.select(to_internal_exprs(expr_list))) 66 67 def filter(self, predicate: Expr) -> "DataStream": 68 """Filter the DataStream based on a predicate. 69 70 Args: 71 predicate: Boolean expression used to filter rows 72 73 Returns: 74 DataStream: A new DataStream containing only rows that satisfy the predicate 75 76 Example: 77 >>> ds.filter(col("age") > 18) 78 """ 79 return DataStream(self.ds.filter(to_internal_expr(predicate))) 80 81 def with_column(self, name: str, predicate: Expr) -> "DataStream": 82 """Add a new column to the DataStream. 83 84 Args: 85 name: Name of the new column 86 predicate: Expression defining the values for the new column 87 88 Returns: 89 DataStream: A new DataStream with the additional column 90 91 Example: 92 >>> ds.with_column("adult", col("age") >= 18) 93 """ 94 return DataStream(self.ds.with_column(name, to_internal_expr(predicate))) 95 96 def drop_columns(self, columns: list[str]) -> "DataStream": 97 """Drops specified columns from the DataStream. 98 99 Args: 100 columns: List of column names to remove 101 102 Returns: 103 DataStream: A new DataStream without the specified columns 104 """ 105 return DataStream(self.ds.drop_columns(columns)) 106 107 def join_on( 108 self, right: "DataStream", join_type: str, on_exprs: list[Expr] 109 ) -> "DataStream": 110 """Join this DataStream with another one based on join expressions. 111 112 Args: 113 right: The right DataStream to join with 114 join_type: Type of join ('inner', 'left', 'right', 'full') 115 on_exprs: List of expressions defining the join conditions 116 117 Returns: 118 DataStream: A new DataStream resulting from the join operation 119 120 Example: 121 >>> left.join_on(right, "inner", [col("id") == col("right.id")]) 122 """ 123 return DataStream(self.ds.join_on(right.ds, join_type, on_exprs)) 124 125 def join( 126 self, 127 right: "DataStream", 128 join_type: str, 129 left_cols: list[str], 130 right_cols: list[str], 131 filter: Expr | None = None, 132 ) -> "DataStream": 133 """Join this DataStream with another one based on column names. 134 135 Args: 136 right: The right DataStream to join with 137 join_type: Type of join ('inner', 'left', 'right', 'full') 138 left_cols: Column names from the left DataStream to join on 139 right_cols: Column names from the right DataStream to join on 140 filter: Optional additional join filter expression 141 142 Returns: 143 DataStream: A new DataStream resulting from the join operation 144 145 Example: 146 >>> left.join(right, "inner", ["id"], ["right_id"]) 147 """ 148 return DataStream( 149 self.ds.join(right.ds, join_type, left_cols, right_cols, filter) 150 ) 151 152 def window( 153 self, 154 group_exprs: list[Expr], 155 aggr_exprs: list[Expr], 156 window_length_millis: int, 157 slide_millis: int | None = None, 158 ) -> "DataStream": 159 """Apply a windowing operation to the DataStream. 160 161 If `slide_millis` is `None` a tumbling window will be created otherwise a sliding window will be created. 162 163 Args: 164 group_exprs: List of expressions to group by 165 aggr_exprs: List of aggregation expressions to apply 166 window_length_millis: Length of the window in milliseconds 167 slide_millis: Optional slide interval in milliseconds (defaults to None) 168 169 Returns: 170 DataStream: A new DataStream with the windowing operation applied 171 172 Example: 173 >>> ds.window([col("user_id")], [sum(col("value"))], 60000) # 1-minute window 174 """ 175 return DataStream( 176 self.ds.window( 177 to_internal_exprs(group_exprs), 178 to_internal_exprs(aggr_exprs), 179 window_length_millis, 180 slide_millis, 181 ) 182 ) 183 184 def print_stream(self) -> None: 185 """Print the contents of the DataStream to stdout.""" 186 self.ds.print_stream() 187 188 def print_schema(self) -> "DataStream": 189 """Print the schema of the DataStream to stdout. 190 191 Returns: 192 DataStream: Self for method chaining 193 """ 194 return DataStream(self.ds.print_schema()) 195 196 def print_plan(self) -> "DataStream": 197 """Print the logical execution plan of the DataStream to stdout. 198 199 Returns: 200 DataStream: Self for method chaining 201 """ 202 return DataStream(self.ds.print_plan()) 203 204 def print_physical_plan(self) -> "DataStream": 205 """Print the physical execution plan of the DataStream to stdout. 206 207 Returns: 208 DataStream: Self for method chaining 209 """ 210 return DataStream(self.ds.print_physical_plan()) 211 212 def sink_kafka(self, bootstrap_servers: str, topic: str) -> None: 213 """Sink the DataStream to a Kafka topic. 214 215 Args: 216 bootstrap_servers: Comma-separated list of Kafka broker addresses 217 topic: Name of the Kafka topic to write to 218 219 Raises: 220 ConnectionError: If unable to connect to Kafka brokers 221 """ 222 self.ds.sink_kafka(bootstrap_servers, topic) 223 224 def sink(self, func: Callable[[pa.RecordBatch], None]) -> None: 225 """Sink the DataStream to a Python callback function. 226 227 Args: 228 func: Callback function that receives PyArrow RecordBatches 229 230 Example: 231 >>> ds.sink(lambda batch: print(f"Received batch with {len(batch)} rows")) 232 """ 233 self.ds.sink_python(func)
Represents a stream of data that can be manipulated using various operations.
This class provides a high-level interface for stream processing operations, including filtering, joining, windowing, and sinking data to various destinations. It wraps a Rust-implemented PyDataStream object.
Attributes:
- ds (PyDataStream): The underlying Rust-side DataStream implementation
21 def __init__(self, ds: PyDataStream) -> None: 22 """Initialize a new DataStream object. 23 24 Args: 25 ds: The underlying PyDataStream object from the Rust implementation 26 """ 27 self.ds = ds
Initialize a new DataStream object.
Arguments:
- ds: The underlying PyDataStream object from the Rust implementation
45 def schema(self) -> pa.Schema: 46 """Get the schema of the DataStream. 47 48 Returns: 49 pa.Schema: The PyArrow schema describing the structure of the data 50 """ 51 return self.ds.schema()
Get the schema of the DataStream.
Returns:
pa.Schema: The PyArrow schema describing the structure of the data
53 def select(self, expr_list: list[Expr]) -> "DataStream": 54 """Select specific columns or expressions from the DataStream. 55 56 Args: 57 expr_list: List of expressions defining the columns or computations to select 58 59 Returns: 60 DataStream: A new DataStream containing only the selected expressions 61 62 Example: 63 >>> ds.select([col("name"), col("age") + 1]) 64 """ 65 return DataStream(self.ds.select(to_internal_exprs(expr_list)))
Select specific columns or expressions from the DataStream.
Arguments:
- expr_list: List of expressions defining the columns or computations to select
Returns:
DataStream: A new DataStream containing only the selected expressions
Example:
>>> ds.select([col("name"), col("age") + 1])
67 def filter(self, predicate: Expr) -> "DataStream": 68 """Filter the DataStream based on a predicate. 69 70 Args: 71 predicate: Boolean expression used to filter rows 72 73 Returns: 74 DataStream: A new DataStream containing only rows that satisfy the predicate 75 76 Example: 77 >>> ds.filter(col("age") > 18) 78 """ 79 return DataStream(self.ds.filter(to_internal_expr(predicate)))
Filter the DataStream based on a predicate.
Arguments:
- predicate: Boolean expression used to filter rows
Returns:
DataStream: A new DataStream containing only rows that satisfy the predicate
Example:
>>> ds.filter(col("age") > 18)
81 def with_column(self, name: str, predicate: Expr) -> "DataStream": 82 """Add a new column to the DataStream. 83 84 Args: 85 name: Name of the new column 86 predicate: Expression defining the values for the new column 87 88 Returns: 89 DataStream: A new DataStream with the additional column 90 91 Example: 92 >>> ds.with_column("adult", col("age") >= 18) 93 """ 94 return DataStream(self.ds.with_column(name, to_internal_expr(predicate)))
Add a new column to the DataStream.
Arguments:
- name: Name of the new column
- predicate: Expression defining the values for the new column
Returns:
DataStream: A new DataStream with the additional column
Example:
>>> ds.with_column("adult", col("age") >= 18)
96 def drop_columns(self, columns: list[str]) -> "DataStream": 97 """Drops specified columns from the DataStream. 98 99 Args: 100 columns: List of column names to remove 101 102 Returns: 103 DataStream: A new DataStream without the specified columns 104 """ 105 return DataStream(self.ds.drop_columns(columns))
Drops specified columns from the DataStream.
Arguments:
- columns: List of column names to remove
Returns:
DataStream: A new DataStream without the specified columns
107 def join_on( 108 self, right: "DataStream", join_type: str, on_exprs: list[Expr] 109 ) -> "DataStream": 110 """Join this DataStream with another one based on join expressions. 111 112 Args: 113 right: The right DataStream to join with 114 join_type: Type of join ('inner', 'left', 'right', 'full') 115 on_exprs: List of expressions defining the join conditions 116 117 Returns: 118 DataStream: A new DataStream resulting from the join operation 119 120 Example: 121 >>> left.join_on(right, "inner", [col("id") == col("right.id")]) 122 """ 123 return DataStream(self.ds.join_on(right.ds, join_type, on_exprs))
Join this DataStream with another one based on join expressions.
Arguments:
- right: The right DataStream to join with
- join_type: Type of join ('inner', 'left', 'right', 'full')
- on_exprs: List of expressions defining the join conditions
Returns:
DataStream: A new DataStream resulting from the join operation
Example:
>>> left.join_on(right, "inner", [col("id") == col("right.id")])
125 def join( 126 self, 127 right: "DataStream", 128 join_type: str, 129 left_cols: list[str], 130 right_cols: list[str], 131 filter: Expr | None = None, 132 ) -> "DataStream": 133 """Join this DataStream with another one based on column names. 134 135 Args: 136 right: The right DataStream to join with 137 join_type: Type of join ('inner', 'left', 'right', 'full') 138 left_cols: Column names from the left DataStream to join on 139 right_cols: Column names from the right DataStream to join on 140 filter: Optional additional join filter expression 141 142 Returns: 143 DataStream: A new DataStream resulting from the join operation 144 145 Example: 146 >>> left.join(right, "inner", ["id"], ["right_id"]) 147 """ 148 return DataStream( 149 self.ds.join(right.ds, join_type, left_cols, right_cols, filter) 150 )
Join this DataStream with another one based on column names.
Arguments:
- right: The right DataStream to join with
- join_type: Type of join ('inner', 'left', 'right', 'full')
- left_cols: Column names from the left DataStream to join on
- right_cols: Column names from the right DataStream to join on
- filter: Optional additional join filter expression
Returns:
DataStream: A new DataStream resulting from the join operation
Example:
>>> left.join(right, "inner", ["id"], ["right_id"])
152 def window( 153 self, 154 group_exprs: list[Expr], 155 aggr_exprs: list[Expr], 156 window_length_millis: int, 157 slide_millis: int | None = None, 158 ) -> "DataStream": 159 """Apply a windowing operation to the DataStream. 160 161 If `slide_millis` is `None` a tumbling window will be created otherwise a sliding window will be created. 162 163 Args: 164 group_exprs: List of expressions to group by 165 aggr_exprs: List of aggregation expressions to apply 166 window_length_millis: Length of the window in milliseconds 167 slide_millis: Optional slide interval in milliseconds (defaults to None) 168 169 Returns: 170 DataStream: A new DataStream with the windowing operation applied 171 172 Example: 173 >>> ds.window([col("user_id")], [sum(col("value"))], 60000) # 1-minute window 174 """ 175 return DataStream( 176 self.ds.window( 177 to_internal_exprs(group_exprs), 178 to_internal_exprs(aggr_exprs), 179 window_length_millis, 180 slide_millis, 181 ) 182 )
Apply a windowing operation to the DataStream.
If slide_millis
is None
a tumbling window will be created otherwise a sliding window will be created.
Arguments:
- group_exprs: List of expressions to group by
- aggr_exprs: List of aggregation expressions to apply
- window_length_millis: Length of the window in milliseconds
- slide_millis: Optional slide interval in milliseconds (defaults to None)
Returns:
DataStream: A new DataStream with the windowing operation applied
Example:
>>> ds.window([col("user_id")], [sum(col("value"))], 60000) # 1-minute window
184 def print_stream(self) -> None: 185 """Print the contents of the DataStream to stdout.""" 186 self.ds.print_stream()
Print the contents of the DataStream to stdout.
188 def print_schema(self) -> "DataStream": 189 """Print the schema of the DataStream to stdout. 190 191 Returns: 192 DataStream: Self for method chaining 193 """ 194 return DataStream(self.ds.print_schema())
Print the schema of the DataStream to stdout.
Returns:
DataStream: Self for method chaining
196 def print_plan(self) -> "DataStream": 197 """Print the logical execution plan of the DataStream to stdout. 198 199 Returns: 200 DataStream: Self for method chaining 201 """ 202 return DataStream(self.ds.print_plan())
Print the logical execution plan of the DataStream to stdout.
Returns:
DataStream: Self for method chaining
204 def print_physical_plan(self) -> "DataStream": 205 """Print the physical execution plan of the DataStream to stdout. 206 207 Returns: 208 DataStream: Self for method chaining 209 """ 210 return DataStream(self.ds.print_physical_plan())
Print the physical execution plan of the DataStream to stdout.
Returns:
DataStream: Self for method chaining
212 def sink_kafka(self, bootstrap_servers: str, topic: str) -> None: 213 """Sink the DataStream to a Kafka topic. 214 215 Args: 216 bootstrap_servers: Comma-separated list of Kafka broker addresses 217 topic: Name of the Kafka topic to write to 218 219 Raises: 220 ConnectionError: If unable to connect to Kafka brokers 221 """ 222 self.ds.sink_kafka(bootstrap_servers, topic)
Sink the DataStream to a Kafka topic.
Arguments:
- bootstrap_servers: Comma-separated list of Kafka broker addresses
- topic: Name of the Kafka topic to write to
Raises:
- ConnectionError: If unable to connect to Kafka brokers
224 def sink(self, func: Callable[[pa.RecordBatch], None]) -> None: 225 """Sink the DataStream to a Python callback function. 226 227 Args: 228 func: Callback function that receives PyArrow RecordBatches 229 230 Example: 231 >>> ds.sink(lambda batch: print(f"Received batch with {len(batch)} rows")) 232 """ 233 self.ds.sink_python(func)
Sink the DataStream to a Python callback function.
Arguments:
- func: Callback function that receives PyArrow RecordBatches
Example:
>>> ds.sink(lambda batch: print(f"Received batch with {len(batch)} rows"))
Create a column expression.
Create a column expression.
184class Expr: 185 """Expression object. 186 187 Expressions are one of the core concepts in DataFusion. See 188 :ref:`Expressions` in the online documentation for more information. 189 """ 190 191 def __init__(self, expr: expr_internal.Expr) -> None: 192 """This constructor should not be called by the end user.""" 193 self.expr = expr 194 195 def to_variant(self) -> Any: 196 """Convert this expression into a python object if possible.""" 197 return self.expr.to_variant() 198 199 @deprecated( 200 "display_name() is deprecated. Use :py:meth:`~Expr.schema_name` instead" 201 ) 202 def display_name(self) -> str: 203 """Returns the name of this expression as it should appear in a schema. 204 205 This name will not include any CAST expressions. 206 """ 207 return self.schema_name() 208 209 def schema_name(self) -> str: 210 """Returns the name of this expression as it should appear in a schema. 211 212 This name will not include any CAST expressions. 213 """ 214 return self.expr.schema_name() 215 216 def canonical_name(self) -> str: 217 """Returns a complete string representation of this expression.""" 218 return self.expr.canonical_name() 219 220 def variant_name(self) -> str: 221 """Returns the name of the Expr variant. 222 223 Ex: ``IsNotNull``, ``Literal``, ``BinaryExpr``, etc 224 """ 225 return self.expr.variant_name() 226 227 def __richcmp__(self, other: Expr, op: int) -> Expr: 228 """Comparison operator.""" 229 return Expr(self.expr.__richcmp__(other, op)) 230 231 def __repr__(self) -> str: 232 """Generate a string representation of this expression.""" 233 return self.expr.__repr__() 234 235 def __add__(self, rhs: Any) -> Expr: 236 """Addition operator. 237 238 Accepts either an expression or any valid PyArrow scalar literal value. 239 """ 240 if not isinstance(rhs, Expr): 241 rhs = Expr.literal(rhs) 242 return Expr(self.expr.__add__(rhs.expr)) 243 244 def __sub__(self, rhs: Any) -> Expr: 245 """Subtraction operator. 246 247 Accepts either an expression or any valid PyArrow scalar literal value. 248 """ 249 if not isinstance(rhs, Expr): 250 rhs = Expr.literal(rhs) 251 return Expr(self.expr.__sub__(rhs.expr)) 252 253 def __truediv__(self, rhs: Any) -> Expr: 254 """Division operator. 255 256 Accepts either an expression or any valid PyArrow scalar literal value. 257 """ 258 if not isinstance(rhs, Expr): 259 rhs = Expr.literal(rhs) 260 return Expr(self.expr.__truediv__(rhs.expr)) 261 262 def __mul__(self, rhs: Any) -> Expr: 263 """Multiplication operator. 264 265 Accepts either an expression or any valid PyArrow scalar literal value. 266 """ 267 if not isinstance(rhs, Expr): 268 rhs = Expr.literal(rhs) 269 return Expr(self.expr.__mul__(rhs.expr)) 270 271 def __mod__(self, rhs: Any) -> Expr: 272 """Modulo operator (%). 273 274 Accepts either an expression or any valid PyArrow scalar literal value. 275 """ 276 if not isinstance(rhs, Expr): 277 rhs = Expr.literal(rhs) 278 return Expr(self.expr.__mod__(rhs.expr)) 279 280 def __and__(self, rhs: Expr) -> Expr: 281 """Logical AND.""" 282 if not isinstance(rhs, Expr): 283 rhs = Expr.literal(rhs) 284 return Expr(self.expr.__and__(rhs.expr)) 285 286 def __or__(self, rhs: Expr) -> Expr: 287 """Logical OR.""" 288 if not isinstance(rhs, Expr): 289 rhs = Expr.literal(rhs) 290 return Expr(self.expr.__or__(rhs.expr)) 291 292 def __invert__(self) -> Expr: 293 """Binary not (~).""" 294 return Expr(self.expr.__invert__()) 295 296 def __getitem__(self, key: str | int) -> Expr: 297 """Retrieve sub-object. 298 299 If ``key`` is a string, returns the subfield of the struct. 300 If ``key`` is an integer, retrieves the element in the array. Note that the 301 element index begins at ``0``, unlike `array_element` which begins at ``1``. 302 """ 303 if isinstance(key, int): 304 return Expr( 305 functions_internal.array_element(self.expr, Expr.literal(key + 1).expr) 306 ) 307 return Expr(self.expr.__getitem__(key)) 308 309 def __eq__(self, rhs: Any) -> Expr: 310 """Equal to. 311 312 Accepts either an expression or any valid PyArrow scalar literal value. 313 """ 314 if not isinstance(rhs, Expr): 315 rhs = Expr.literal(rhs) 316 return Expr(self.expr.__eq__(rhs.expr)) 317 318 def __ne__(self, rhs: Any) -> Expr: 319 """Not equal to. 320 321 Accepts either an expression or any valid PyArrow scalar literal value. 322 """ 323 if not isinstance(rhs, Expr): 324 rhs = Expr.literal(rhs) 325 return Expr(self.expr.__ne__(rhs.expr)) 326 327 def __ge__(self, rhs: Any) -> Expr: 328 """Greater than or equal to. 329 330 Accepts either an expression or any valid PyArrow scalar literal value. 331 """ 332 if not isinstance(rhs, Expr): 333 rhs = Expr.literal(rhs) 334 return Expr(self.expr.__ge__(rhs.expr)) 335 336 def __gt__(self, rhs: Any) -> Expr: 337 """Greater than. 338 339 Accepts either an expression or any valid PyArrow scalar literal value. 340 """ 341 if not isinstance(rhs, Expr): 342 rhs = Expr.literal(rhs) 343 return Expr(self.expr.__gt__(rhs.expr)) 344 345 def __le__(self, rhs: Any) -> Expr: 346 """Less than or equal to. 347 348 Accepts either an expression or any valid PyArrow scalar literal value. 349 """ 350 if not isinstance(rhs, Expr): 351 rhs = Expr.literal(rhs) 352 return Expr(self.expr.__le__(rhs.expr)) 353 354 def __lt__(self, rhs: Any) -> Expr: 355 """Less than. 356 357 Accepts either an expression or any valid PyArrow scalar literal value. 358 """ 359 if not isinstance(rhs, Expr): 360 rhs = Expr.literal(rhs) 361 return Expr(self.expr.__lt__(rhs.expr)) 362 363 __radd__ = __add__ 364 __rand__ = __and__ 365 __rmod__ = __mod__ 366 __rmul__ = __mul__ 367 __ror__ = __or__ 368 __rsub__ = __sub__ 369 __rtruediv__ = __truediv__ 370 371 @staticmethod 372 def literal(value: Any) -> Expr: 373 """Creates a new expression representing a scalar value. 374 375 ``value`` must be a valid PyArrow scalar value or easily castable to one. 376 """ 377 if not isinstance(value, pa.Scalar): 378 value = pa.scalar(value) 379 return Expr(expr_internal.Expr.literal(value)) 380 381 @staticmethod 382 def column(value: str) -> Expr: 383 """Creates a new expression representing a column.""" 384 return Expr(expr_internal.Expr.column(value)) 385 386 def alias(self, name: str) -> Expr: 387 """Assign a name to the expression.""" 388 return Expr(self.expr.alias(name)) 389 390 def sort(self, ascending: bool = True, nulls_first: bool = True) -> SortExpr: 391 """Creates a sort :py:class:`Expr` from an existing :py:class:`Expr`. 392 393 Args: 394 ascending: If true, sort in ascending order. 395 nulls_first: Return null values first. 396 """ 397 return SortExpr(self.expr, ascending=ascending, nulls_first=nulls_first) 398 399 def is_null(self) -> Expr: 400 """Returns ``True`` if this expression is null.""" 401 return Expr(self.expr.is_null()) 402 403 def is_not_null(self) -> Expr: 404 """Returns ``True`` if this expression is not null.""" 405 return Expr(self.expr.is_not_null()) 406 407 _to_pyarrow_types = { 408 float: pa.float64(), 409 int: pa.int64(), 410 str: pa.string(), 411 bool: pa.bool_(), 412 } 413 414 def cast( 415 self, to: pa.DataType[Any] | Type[float] | Type[int] | Type[str] | Type[bool] 416 ) -> Expr: 417 """Cast to a new data type.""" 418 if not isinstance(to, pa.DataType): 419 try: 420 to = self._to_pyarrow_types[to] 421 except KeyError: 422 raise TypeError( 423 "Expected instance of pyarrow.DataType or builtins.type" 424 ) 425 426 return Expr(self.expr.cast(to)) 427 428 def between(self, low: Any, high: Any, negated: bool = False) -> Expr: 429 """Returns ``True`` if this expression is between a given range. 430 431 Args: 432 low: lower bound of the range (inclusive). 433 high: higher bound of the range (inclusive). 434 negated: negates whether the expression is between a given range 435 """ 436 if not isinstance(low, Expr): 437 low = Expr.literal(low) 438 439 if not isinstance(high, Expr): 440 high = Expr.literal(high) 441 442 return Expr(self.expr.between(low.expr, high.expr, negated=negated)) 443 444 def rex_type(self) -> RexType: 445 """Return the Rex Type of this expression. 446 447 A Rex (Row Expression) specifies a single row of data.That specification 448 could include user defined functions or types. RexType identifies the 449 row as one of the possible valid ``RexType``. 450 """ 451 return self.expr.rex_type() 452 453 def types(self) -> DataTypeMap: 454 """Return the ``DataTypeMap``. 455 456 Returns: 457 DataTypeMap which represents the PythonType, Arrow DataType, and 458 SqlType Enum which this expression represents. 459 """ 460 return self.expr.types() 461 462 def python_value(self) -> Any: 463 """Extracts the Expr value into a PyObject. 464 465 This is only valid for literal expressions. 466 467 Returns: 468 Python object representing literal value of the expression. 469 """ 470 return self.expr.python_value() 471 472 def rex_call_operands(self) -> list[Expr]: 473 """Return the operands of the expression based on it's variant type. 474 475 Row expressions, Rex(s), operate on the concept of operands. Different 476 variants of Expressions, Expr(s), store those operands in different 477 datastructures. This function examines the Expr variant and returns 478 the operands to the calling logic. 479 """ 480 return [Expr(e) for e in self.expr.rex_call_operands()] 481 482 def rex_call_operator(self) -> str: 483 """Extracts the operator associated with a row expression type call.""" 484 return self.expr.rex_call_operator() 485 486 def column_name(self, plan: LogicalPlan) -> str: 487 """Compute the output column name based on the provided logical plan.""" 488 return self.expr.column_name(plan) 489 490 def order_by(self, *exprs: Expr | SortExpr) -> ExprFuncBuilder: 491 """Set the ordering for a window or aggregate function. 492 493 This function will create an :py:class:`ExprFuncBuilder` that can be used to 494 set parameters for either window or aggregate functions. If used on any other 495 type of expression, an error will be generated when ``build()`` is called. 496 """ 497 return ExprFuncBuilder(self.expr.order_by([sort_or_default(e) for e in exprs])) 498 499 def filter(self, filter: Expr) -> ExprFuncBuilder: 500 """Filter an aggregate function. 501 502 This function will create an :py:class:`ExprFuncBuilder` that can be used to 503 set parameters for either window or aggregate functions. If used on any other 504 type of expression, an error will be generated when ``build()`` is called. 505 """ 506 return ExprFuncBuilder(self.expr.filter(filter.expr)) 507 508 def distinct(self) -> ExprFuncBuilder: 509 """Only evaluate distinct values for an aggregate function. 510 511 This function will create an :py:class:`ExprFuncBuilder` that can be used to 512 set parameters for either window or aggregate functions. If used on any other 513 type of expression, an error will be generated when ``build()`` is called. 514 """ 515 return ExprFuncBuilder(self.expr.distinct()) 516 517 def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder: 518 """Set the treatment for ``null`` values for a window or aggregate function. 519 520 This function will create an :py:class:`ExprFuncBuilder` that can be used to 521 set parameters for either window or aggregate functions. If used on any other 522 type of expression, an error will be generated when ``build()`` is called. 523 """ 524 return ExprFuncBuilder(self.expr.null_treatment(null_treatment.value)) 525 526 def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder: 527 """Set the partitioning for a window function. 528 529 This function will create an :py:class:`ExprFuncBuilder` that can be used to 530 set parameters for either window or aggregate functions. If used on any other 531 type of expression, an error will be generated when ``build()`` is called. 532 """ 533 return ExprFuncBuilder( 534 self.expr.partition_by(list(e.expr for e in partition_by)) 535 ) 536 537 def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder: 538 """Set the frame fora window function. 539 540 This function will create an :py:class:`ExprFuncBuilder` that can be used to 541 set parameters for either window or aggregate functions. If used on any other 542 type of expression, an error will be generated when ``build()`` is called. 543 """ 544 return ExprFuncBuilder(self.expr.window_frame(window_frame.window_frame))
Expression object.
Expressions are one of the core concepts in DataFusion. See
:ref:Expressions
in the online documentation for more information.
191 def __init__(self, expr: expr_internal.Expr) -> None: 192 """This constructor should not be called by the end user.""" 193 self.expr = expr
This constructor should not be called by the end user.
195 def to_variant(self) -> Any: 196 """Convert this expression into a python object if possible.""" 197 return self.expr.to_variant()
Convert this expression into a python object if possible.
199 @deprecated( 200 "display_name() is deprecated. Use :py:meth:`~Expr.schema_name` instead" 201 ) 202 def display_name(self) -> str: 203 """Returns the name of this expression as it should appear in a schema. 204 205 This name will not include any CAST expressions. 206 """ 207 return self.schema_name()
Returns the name of this expression as it should appear in a schema.
This name will not include any CAST expressions.
209 def schema_name(self) -> str: 210 """Returns the name of this expression as it should appear in a schema. 211 212 This name will not include any CAST expressions. 213 """ 214 return self.expr.schema_name()
Returns the name of this expression as it should appear in a schema.
This name will not include any CAST expressions.
216 def canonical_name(self) -> str: 217 """Returns a complete string representation of this expression.""" 218 return self.expr.canonical_name()
Returns a complete string representation of this expression.
220 def variant_name(self) -> str: 221 """Returns the name of the Expr variant. 222 223 Ex: ``IsNotNull``, ``Literal``, ``BinaryExpr``, etc 224 """ 225 return self.expr.variant_name()
Returns the name of the Expr variant.
Ex: IsNotNull
, Literal
, BinaryExpr
, etc
371 @staticmethod 372 def literal(value: Any) -> Expr: 373 """Creates a new expression representing a scalar value. 374 375 ``value`` must be a valid PyArrow scalar value or easily castable to one. 376 """ 377 if not isinstance(value, pa.Scalar): 378 value = pa.scalar(value) 379 return Expr(expr_internal.Expr.literal(value))
Creates a new expression representing a scalar value.
value
must be a valid PyArrow scalar value or easily castable to one.
381 @staticmethod 382 def column(value: str) -> Expr: 383 """Creates a new expression representing a column.""" 384 return Expr(expr_internal.Expr.column(value))
Creates a new expression representing a column.
386 def alias(self, name: str) -> Expr: 387 """Assign a name to the expression.""" 388 return Expr(self.expr.alias(name))
Assign a name to the expression.
390 def sort(self, ascending: bool = True, nulls_first: bool = True) -> SortExpr: 391 """Creates a sort :py:class:`Expr` from an existing :py:class:`Expr`. 392 393 Args: 394 ascending: If true, sort in ascending order. 395 nulls_first: Return null values first. 396 """ 397 return SortExpr(self.expr, ascending=ascending, nulls_first=nulls_first)
399 def is_null(self) -> Expr: 400 """Returns ``True`` if this expression is null.""" 401 return Expr(self.expr.is_null())
Returns True
if this expression is null.
403 def is_not_null(self) -> Expr: 404 """Returns ``True`` if this expression is not null.""" 405 return Expr(self.expr.is_not_null())
Returns True
if this expression is not null.
414 def cast( 415 self, to: pa.DataType[Any] | Type[float] | Type[int] | Type[str] | Type[bool] 416 ) -> Expr: 417 """Cast to a new data type.""" 418 if not isinstance(to, pa.DataType): 419 try: 420 to = self._to_pyarrow_types[to] 421 except KeyError: 422 raise TypeError( 423 "Expected instance of pyarrow.DataType or builtins.type" 424 ) 425 426 return Expr(self.expr.cast(to))
Cast to a new data type.
428 def between(self, low: Any, high: Any, negated: bool = False) -> Expr: 429 """Returns ``True`` if this expression is between a given range. 430 431 Args: 432 low: lower bound of the range (inclusive). 433 high: higher bound of the range (inclusive). 434 negated: negates whether the expression is between a given range 435 """ 436 if not isinstance(low, Expr): 437 low = Expr.literal(low) 438 439 if not isinstance(high, Expr): 440 high = Expr.literal(high) 441 442 return Expr(self.expr.between(low.expr, high.expr, negated=negated))
Returns True
if this expression is between a given range.
Arguments:
- low: lower bound of the range (inclusive).
- high: higher bound of the range (inclusive).
- negated: negates whether the expression is between a given range
444 def rex_type(self) -> RexType: 445 """Return the Rex Type of this expression. 446 447 A Rex (Row Expression) specifies a single row of data.That specification 448 could include user defined functions or types. RexType identifies the 449 row as one of the possible valid ``RexType``. 450 """ 451 return self.expr.rex_type()
Return the Rex Type of this expression.
A Rex (Row Expression) specifies a single row of data.That specification
could include user defined functions or types. RexType identifies the
row as one of the possible valid RexType
.
453 def types(self) -> DataTypeMap: 454 """Return the ``DataTypeMap``. 455 456 Returns: 457 DataTypeMap which represents the PythonType, Arrow DataType, and 458 SqlType Enum which this expression represents. 459 """ 460 return self.expr.types()
Return the DataTypeMap
.
Returns:
DataTypeMap which represents the PythonType, Arrow DataType, and SqlType Enum which this expression represents.
462 def python_value(self) -> Any: 463 """Extracts the Expr value into a PyObject. 464 465 This is only valid for literal expressions. 466 467 Returns: 468 Python object representing literal value of the expression. 469 """ 470 return self.expr.python_value()
Extracts the Expr value into a PyObject.
This is only valid for literal expressions.
Returns:
Python object representing literal value of the expression.
472 def rex_call_operands(self) -> list[Expr]: 473 """Return the operands of the expression based on it's variant type. 474 475 Row expressions, Rex(s), operate on the concept of operands. Different 476 variants of Expressions, Expr(s), store those operands in different 477 datastructures. This function examines the Expr variant and returns 478 the operands to the calling logic. 479 """ 480 return [Expr(e) for e in self.expr.rex_call_operands()]
Return the operands of the expression based on it's variant type.
Row expressions, Rex(s), operate on the concept of operands. Different variants of Expressions, Expr(s), store those operands in different datastructures. This function examines the Expr variant and returns the operands to the calling logic.
482 def rex_call_operator(self) -> str: 483 """Extracts the operator associated with a row expression type call.""" 484 return self.expr.rex_call_operator()
Extracts the operator associated with a row expression type call.
486 def column_name(self, plan: LogicalPlan) -> str: 487 """Compute the output column name based on the provided logical plan.""" 488 return self.expr.column_name(plan)
Compute the output column name based on the provided logical plan.
490 def order_by(self, *exprs: Expr | SortExpr) -> ExprFuncBuilder: 491 """Set the ordering for a window or aggregate function. 492 493 This function will create an :py:class:`ExprFuncBuilder` that can be used to 494 set parameters for either window or aggregate functions. If used on any other 495 type of expression, an error will be generated when ``build()`` is called. 496 """ 497 return ExprFuncBuilder(self.expr.order_by([sort_or_default(e) for e in exprs]))
Set the ordering for a window or aggregate function.
This function will create an ExprFuncBuilder
that can be used to
set parameters for either window or aggregate functions. If used on any other
type of expression, an error will be generated when build()
is called.
499 def filter(self, filter: Expr) -> ExprFuncBuilder: 500 """Filter an aggregate function. 501 502 This function will create an :py:class:`ExprFuncBuilder` that can be used to 503 set parameters for either window or aggregate functions. If used on any other 504 type of expression, an error will be generated when ``build()`` is called. 505 """ 506 return ExprFuncBuilder(self.expr.filter(filter.expr))
Filter an aggregate function.
This function will create an ExprFuncBuilder
that can be used to
set parameters for either window or aggregate functions. If used on any other
type of expression, an error will be generated when build()
is called.
508 def distinct(self) -> ExprFuncBuilder: 509 """Only evaluate distinct values for an aggregate function. 510 511 This function will create an :py:class:`ExprFuncBuilder` that can be used to 512 set parameters for either window or aggregate functions. If used on any other 513 type of expression, an error will be generated when ``build()`` is called. 514 """ 515 return ExprFuncBuilder(self.expr.distinct())
Only evaluate distinct values for an aggregate function.
This function will create an ExprFuncBuilder
that can be used to
set parameters for either window or aggregate functions. If used on any other
type of expression, an error will be generated when build()
is called.
517 def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder: 518 """Set the treatment for ``null`` values for a window or aggregate function. 519 520 This function will create an :py:class:`ExprFuncBuilder` that can be used to 521 set parameters for either window or aggregate functions. If used on any other 522 type of expression, an error will be generated when ``build()`` is called. 523 """ 524 return ExprFuncBuilder(self.expr.null_treatment(null_treatment.value))
Set the treatment for null
values for a window or aggregate function.
This function will create an ExprFuncBuilder
that can be used to
set parameters for either window or aggregate functions. If used on any other
type of expression, an error will be generated when build()
is called.
526 def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder: 527 """Set the partitioning for a window function. 528 529 This function will create an :py:class:`ExprFuncBuilder` that can be used to 530 set parameters for either window or aggregate functions. If used on any other 531 type of expression, an error will be generated when ``build()`` is called. 532 """ 533 return ExprFuncBuilder( 534 self.expr.partition_by(list(e.expr for e in partition_by)) 535 )
Set the partitioning for a window function.
This function will create an ExprFuncBuilder
that can be used to
set parameters for either window or aggregate functions. If used on any other
type of expression, an error will be generated when build()
is called.
537 def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder: 538 """Set the frame fora window function. 539 540 This function will create an :py:class:`ExprFuncBuilder` that can be used to 541 set parameters for either window or aggregate functions. If used on any other 542 type of expression, an error will be generated when ``build()`` is called. 543 """ 544 return ExprFuncBuilder(self.expr.window_frame(window_frame.window_frame))
Set the frame fora window function.
This function will create an ExprFuncBuilder
that can be used to
set parameters for either window or aggregate functions. If used on any other
type of expression, an error will be generated when build()
is called.
Create a literal expression.
Create a literal expression.
209 @staticmethod 210 def udaf( 211 accum: _A, 212 input_types: list[pyarrow.DataType], 213 return_type: _R, 214 state_type: list[pyarrow.DataType], 215 volatility: Volatility | str, 216 name: str | None = None, 217 ) -> AggregateUDF: 218 """Create a new User Defined Aggregate Function. 219 220 The accumulator function must be callable and implement :py:class:`Accumulator`. 221 222 Args: 223 accum: The accumulator python function. 224 input_types: The data types of the arguments to ``accum``. 225 return_type: The data type of the return value. 226 state_type: The data types of the intermediate accumulation. 227 volatility: See :py:class:`Volatility` for allowed values. 228 name: A descriptive name for the function. 229 230 Returns: 231 A user defined aggregate function, which can be used in either data 232 aggregation or window function calls. 233 """ 234 if not issubclass(accum, Accumulator): 235 raise TypeError( 236 "`accum` must implement the abstract base class Accumulator" 237 ) 238 if name is None: 239 name = accum.__qualname__.lower() 240 if isinstance(input_types, pyarrow.lib.DataType): 241 input_types = [input_types] 242 return AggregateUDF( 243 name=name, 244 accumulator=accum, 245 input_types=input_types, 246 return_type=return_type, 247 state_type=state_type, 248 volatility=volatility, 249 )
Create a new User Defined Aggregate Function.
The accumulator function must be callable and implement Accumulator
.
Arguments:
- accum: The accumulator python function.
- input_types: The data types of the arguments to
accum
. - return_type: The data type of the return value.
- state_type: The data types of the intermediate accumulation.
- volatility: See
Volatility
for allowed values. - name: A descriptive name for the function.
Returns:
A user defined aggregate function, which can be used in either data aggregation or window function calls.
111 @staticmethod 112 def udf( 113 func: Callable[..., _R], 114 input_types: list[pyarrow.DataType], 115 return_type: _R, 116 volatility: Volatility | str, 117 name: str | None = None, 118 ) -> ScalarUDF: 119 """Create a new User Defined Function. 120 121 Args: 122 func: A callable python function. 123 input_types: The data types of the arguments to ``func``. This list 124 must be of the same length as the number of arguments. 125 return_type: The data type of the return value from the python 126 function. 127 volatility: See ``Volatility`` for allowed values. 128 name: A descriptive name for the function. 129 130 Returns: 131 A user defined aggregate function, which can be used in either data 132 aggregation or window function calls. 133 """ 134 if not callable(func): 135 raise TypeError("`func` argument must be callable") 136 if name is None: 137 name = func.__qualname__.lower() 138 return ScalarUDF( 139 name=name, 140 func=func, 141 input_types=input_types, 142 return_type=return_type, 143 volatility=volatility, 144 )
Create a new User Defined Function.
Arguments:
- func: A callable python function.
- input_types: The data types of the arguments to
func
. This list must be of the same length as the number of arguments. - return_type: The data type of the return value from the python function.
- volatility: See
Volatility
for allowed values. - name: A descriptive name for the function.
Returns:
A user defined aggregate function, which can be used in either data aggregation or window function calls.
68class FeastDataStream(DataStream, metaclass=FeastDataStreamMeta): 69 """A DataStream subclass with additional Feast-specific functionality. 70 71 This class extends DataStream to provide integration with Feast feature stores 72 and related functionality. 73 74 You must install with the extra 'feast': `pip install denormalized[feast]` 75 """ 76 77 def __init__(self, stream: Union[PyDataStream, DataStream]) -> None: 78 """Initialize a FeastDataStream from either a PyDataStream or DataStream. 79 80 Args: 81 stream: Either a PyDataStream object or a DataStream object 82 """ 83 if isinstance(stream, DataStream): 84 super().__init__(stream.ds) 85 else: 86 super().__init__(stream) 87 88 def get_feast_schema(self) -> list[Field]: 89 """Get the Feast schema for this DataStream. 90 91 Returns: 92 list[Field]: List of Feast Field objects representing the schema 93 """ 94 return [ 95 Field( 96 name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type))) 97 ) 98 for s in self.schema() 99 ] 100 101 def write_feast_feature( 102 self, feature_store: FeatureStore, source_name: str 103 ) -> None: 104 """Write the DataStream to a Feast feature store. 105 106 Args: 107 feature_store: The target Feast feature store 108 source_name: Name of the feature source in Feast 109 110 Note: 111 Any exceptions during the push operation will be printed but not raised. 112 """ 113 114 def _sink_to_feast(rb: pa.RecordBatch): 115 if len(rb): 116 df = rb.to_pandas() 117 try: 118 feature_store.push(source_name, df, to=PushMode.ONLINE) 119 except Exception as e: 120 logger.error( 121 f"Failed to push to Feast feature store: {e}", exc_info=True 122 ) 123 124 self.ds.sink_python(_sink_to_feast)
A DataStream subclass with additional Feast-specific functionality.
This class extends DataStream to provide integration with Feast feature stores and related functionality.
You must install with the extra 'feast': pip install denormalized[feast]
77 def __init__(self, stream: Union[PyDataStream, DataStream]) -> None: 78 """Initialize a FeastDataStream from either a PyDataStream or DataStream. 79 80 Args: 81 stream: Either a PyDataStream object or a DataStream object 82 """ 83 if isinstance(stream, DataStream): 84 super().__init__(stream.ds) 85 else: 86 super().__init__(stream)
Initialize a FeastDataStream from either a PyDataStream or DataStream.
Arguments:
- stream: Either a PyDataStream object or a DataStream object
88 def get_feast_schema(self) -> list[Field]: 89 """Get the Feast schema for this DataStream. 90 91 Returns: 92 list[Field]: List of Feast Field objects representing the schema 93 """ 94 return [ 95 Field( 96 name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type))) 97 ) 98 for s in self.schema() 99 ]
Get the Feast schema for this DataStream.
Returns:
list[Field]: List of Feast Field objects representing the schema
101 def write_feast_feature( 102 self, feature_store: FeatureStore, source_name: str 103 ) -> None: 104 """Write the DataStream to a Feast feature store. 105 106 Args: 107 feature_store: The target Feast feature store 108 source_name: Name of the feature source in Feast 109 110 Note: 111 Any exceptions during the push operation will be printed but not raised. 112 """ 113 114 def _sink_to_feast(rb: pa.RecordBatch): 115 if len(rb): 116 df = rb.to_pandas() 117 try: 118 feature_store.push(source_name, df, to=PushMode.ONLINE) 119 except Exception as e: 120 logger.error( 121 f"Failed to push to Feast feature store: {e}", exc_info=True 122 ) 123 124 self.ds.sink_python(_sink_to_feast)
Write the DataStream to a Feast feature store.
Arguments:
- feature_store: The target Feast feature store
- source_name: Name of the feature source in Feast
Note:
Any exceptions during the push operation will be printed but not raised.
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Select specific columns or expressions from the DataStream.
Arguments:
- expr_list: List of expressions defining the columns or computations to select
Returns:
DataStream: A new DataStream containing only the selected expressions
Example:
>>> ds.select([col("name"), col("age") + 1])
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Filter the DataStream based on a predicate.
Arguments:
- predicate: Boolean expression used to filter rows
Returns:
DataStream: A new DataStream containing only rows that satisfy the predicate
Example:
>>> ds.filter(col("age") > 18)
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Add a new column to the DataStream.
Arguments:
- name: Name of the new column
- predicate: Expression defining the values for the new column
Returns:
DataStream: A new DataStream with the additional column
Example:
>>> ds.with_column("adult", col("age") >= 18)
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Drops specified columns from the DataStream.
Arguments:
- columns: List of column names to remove
Returns:
DataStream: A new DataStream without the specified columns
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Join this DataStream with another one based on join expressions.
Arguments:
- right: The right DataStream to join with
- join_type: Type of join ('inner', 'left', 'right', 'full')
- on_exprs: List of expressions defining the join conditions
Returns:
DataStream: A new DataStream resulting from the join operation
Example:
>>> left.join_on(right, "inner", [col("id") == col("right.id")])
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Join this DataStream with another one based on column names.
Arguments:
- right: The right DataStream to join with
- join_type: Type of join ('inner', 'left', 'right', 'full')
- left_cols: Column names from the left DataStream to join on
- right_cols: Column names from the right DataStream to join on
- filter: Optional additional join filter expression
Returns:
DataStream: A new DataStream resulting from the join operation
Example:
>>> left.join(right, "inner", ["id"], ["right_id"])
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Apply a windowing operation to the DataStream.
If slide_millis
is None
a tumbling window will be created otherwise a sliding window will be created.
Arguments:
- group_exprs: List of expressions to group by
- aggr_exprs: List of aggregation expressions to apply
- window_length_millis: Length of the window in milliseconds
- slide_millis: Optional slide interval in milliseconds (defaults to None)
Returns:
DataStream: A new DataStream with the windowing operation applied
Example:
>>> ds.window([col("user_id")], [sum(col("value"))], 60000) # 1-minute window
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Print the schema of the DataStream to stdout.
Returns:
DataStream: Self for method chaining
50 def wrapper(self, *args, **kwargs): 51 result = getattr( 52 super(cast(type, self.__class__), self), method_name 53 )(*args, **kwargs) 54 return self.__class__(result)
Print the logical execution plan of the DataStream to stdout.
Returns:
DataStream: Self for method chaining