Streaming Steps
Streaming steps let a workflow produce output incrementally. Each yielded chunk is written to the database immediately, so if the step fails mid-stream the already-persisted chunks survive and are available on retry.
This is useful for LLM streaming responses, large file processing, and any scenario where partial progress should not be lost.
The StreamingStep interface
A streaming step implements Queuety\Contracts\StreamingStep:
namespace Queuety\Contracts;
interface StreamingStep {
public function stream( array $state, array $existing_chunks = [] ): \Generator;
public function on_complete( array $chunks, array $state ): array;
public function config(): array;
}| Method | Description |
|---|---|
stream( $state, $existing_chunks ) | Generator that yields string chunks. Each yield triggers an immediate DB write. |
on_complete( $chunks, $state ) | Called after the stream finishes. Receives all chunks (previous + new) and returns data to merge into workflow state. |
config() | Optional configuration. Supports needs_wordpress and max_attempts keys. |
How chunks are persisted
Every time your generator yields a value, Queuety calls ChunkStore::append_chunk() to write it to the queuety_chunks table. There is no batching or buffering. One yield equals one INSERT.
The chunk row stores the job_id, workflow_id, step_index, chunk_index (zero-based), and the string content.
Resumability
When a streaming step is retried after a failure, the worker loads all previously persisted chunks and passes them as the $existing_chunks parameter. Your implementation decides how to resume:
- Skip already-processed items by checking the count of existing chunks
- Offset into a paginated API using the chunk count as the page offset
- Restart from scratch by ignoring
$existing_chunksentirely (chunks from the previous attempt are still in the DB and will be prepended)
public function stream( array $state, array $existing_chunks = [] ): \Generator {
$offset = count( $existing_chunks );
$pages = $this->client->paginate( $state['query'], offset: $offset );
foreach ( $pages as $page ) {
yield json_encode( $page );
}
}Heartbeats during streaming
Long-running streams should send heartbeats to prevent the stale job detector from reclaiming the job. Call Heartbeat::beat() inside your generator loop:
use Queuety\Heartbeat;
public function stream( array $state, array $existing_chunks = [] ): \Generator {
foreach ( $this->client->stream( $state['prompt'] ) as $i => $chunk ) {
Heartbeat::beat( [ 'chunks' => $i + 1 ] );
yield $chunk;
}
}See Heartbeats for details on the Heartbeat API.
Chunk cleanup
When the stream completes successfully and on_complete() returns, the worker calls ChunkStore::clear_chunks() to delete all chunks for the job. The merged result from on_complete() is the durable record. Chunks are temporary by design.
If the step fails and is retried, chunks are not deleted. They are passed back via $existing_chunks on the next attempt.
Full example: LLM streaming
use Queuety\Contracts\StreamingStep;
use Queuety\Heartbeat;
class StreamLLMResponse implements StreamingStep {
public function stream( array $state, array $existing_chunks = [] ): \Generator {
// If we have existing chunks from a previous attempt, we can skip
// re-requesting. The on_complete method will concatenate everything.
if ( ! empty( $existing_chunks ) ) {
return;
}
$response = OpenAI::chat()->createStreamed( [
'model' => 'gpt-4',
'messages' => $state['messages'],
] );
foreach ( $response as $i => $chunk ) {
$text = $chunk->choices[0]->delta->content ?? '';
if ( '' !== $text ) {
Heartbeat::beat( [ 'tokens' => $i ] );
yield $text;
}
}
}
public function on_complete( array $chunks, array $state ): array {
return [
'llm_response' => implode( '', $chunks ),
];
}
public function config(): array {
return [ 'max_attempts' => 5 ];
}
}Use it in a workflow like any other step:
use Queuety\Queuety;
Queuety::workflow( 'ai_chat' )
->then( BuildPromptHandler::class )
->then( StreamLLMResponse::class )
->then( SaveResponseHandler::class )
->dispatch( [ 'user_input' => $input ] );Full example: file processing
use Queuety\Contracts\StreamingStep;
use Queuety\Heartbeat;
class ProcessCSVRows implements StreamingStep {
public function stream( array $state, array $existing_chunks = [] ): \Generator {
$file = fopen( $state['csv_path'], 'r' );
$offset = count( $existing_chunks );
$row = 0;
// Skip the header.
fgetcsv( $file );
while ( $line = fgetcsv( $file ) ) {
$row++;
// Skip rows already processed in a previous attempt.
if ( $row <= $offset ) {
continue;
}
$result = $this->process_row( $line );
Heartbeat::beat( [ 'row' => $row ] );
yield json_encode( $result );
}
fclose( $file );
}
public function on_complete( array $chunks, array $state ): array {
$results = array_map( 'json_decode', $chunks );
return [
'processed_count' => count( $results ),
'results' => $results,
];
}
public function config(): array {
return [];
}
private function process_row( array $line ): array {
// Your row processing logic here.
return [ 'id' => $line[0], 'status' => 'ok' ];
}
}ChunkStore API
The ChunkStore class manages chunk persistence. Access it via Queuety::chunk_store().
$store = Queuety::chunk_store();| Method | Returns | Description |
|---|---|---|
get_chunks( int $job_id ) | string[] | Fetch all chunks for a job, ordered by chunk index |
append_chunk( int $job_id, int $chunk_index, string $content, ?int $workflow_id, ?int $step_index ) | void | Append a single chunk |
clear_chunks( int $job_id ) | void | Delete all chunks for a job |
chunk_count( int $job_id ) | int | Count the number of chunks for a job |
get_accumulated( int $job_id ) | string | Concatenate all chunks into one string |
You normally do not need to call ChunkStore directly. The worker handles persistence and cleanup automatically when processing a StreamingStep. The API is available for advanced use cases such as inspecting in-progress chunks or manual cleanup.