State - Arrow Dataframe
States are a key concept in SDF. They hold a simple value or they can be divided into multiple partitions based on a key. Each partition state can have many different types.
One of the type is arrow-row
. For example, follow snippet define state count_per_word
that store each row into arrow dataframe.
states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
count:
type: u32
In here, we are defining a state count_per_word
to track frequency of the words. For each key, we have a row that has a column count
that store the count of the word.
For example, let's we have following word: apple
, orange
, banana
, orange
, grape
, orange
, banana
.
Then this will be mapped to arrow dataframe as follows:
_key | count |
---|---|
apple | 2 |
orange | 3 |
banana | 2 |
grape | 1 |
To update the state, you can use the update-state
operator as below:
update-state:
run: |
fn count_word(_word: String) -> Result<()> {
let mut state = count_per_word();
state.count += 1;
state.update()?;
Ok(())
}
Note that state value can be access using count_per_word
state function which is automatically injected by SDF builder.
This API is invoked by the update-state
operator, which only returns the value of the partition state.
In the example, count_per_word
represents a row value of the dataframe. If operator sees apple
, it will be first row in the dataframe above.
However, aggregate operators like flush
can access the entire state and perform aggregation across all partitions. In this case, the count_per_word
state function returns the entire DataFrame, not just individual rows. You can then perform DataFrame operations using the SQL API. The snippet below shows how to use SQL to get the 3 most frequent words.
flush:
run: |
fn aggregate_wordcount() -> Result<TopWords> {
let word_counts = count_per_word();
let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?;
let rows = top3.rows()?;
let mut top_words = vec![];
let key = top3.key()?;
let count = top3.col("count")?;
while rows.next() {
let word = rows.str(&key)?;
let count = rows.u32(&count)?;
let word_count = WordCount { word, count };
top_words.push(word_count);
}
Ok(top_words)
}
SQL API
For any state that is dataframe, you can use SQL API to perform dataframe operation. SDF uses polar SQL to perform dataframe operation.
The result of the SQL operation is always dataframe. So you can perform multiple SQL operation to get the desired result.
The SQL is executed in the context of the dataframe. And name of the dataframe is state as illustrated below:
let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?;
Row API
Once you have dataframe, you can use row api to access values of individual rows. To get rows, you can use rows
function on the dataframe.
let rows = top3.rows()?;
You can think of row as iterator that you can use to access individual row. Following functions are available on row object. To access row values, must first get columns like below:
let count = top3.col("count")?;
To get key, can use helper function key
:
let key = top.key()?;
Use next
function to goto next row. If successful, it will return true. If there are no more rows, it will return false.
while rows.next() {
...
}
As long as next
is true, you can access current value by passing column. Each column has type associated with it. So you can only access values that are compatible with the column type otherwise it will return error.
For example, to get string value, you can use str
function. To get u32 value, you can use u32
function.
let word = rows.str(&key)?;
let count = rows.u32(&count)?;