Queuety
Workflows

Streaming Steps

Streaming steps let a workflow produce output incrementally. Each yielded chunk is written to the database immediately, so if the step fails mid-stream the already-persisted chunks survive and are available on retry.

This is useful for LLM streaming responses, large file processing, and any scenario where partial progress should not be lost.

The StreamingStep interface

A streaming step implements Queuety\Contracts\StreamingStep:

namespace Queuety\Contracts;

interface StreamingStep {
    public function stream( array $state, array $existing_chunks = [] ): \Generator;
    public function on_complete( array $chunks, array $state ): array;
    public function config(): array;
}
MethodDescription
stream( $state, $existing_chunks )Generator that yields string chunks. Each yield triggers an immediate DB write.
on_complete( $chunks, $state )Called after the stream finishes. Receives all chunks (previous + new) and returns data to merge into workflow state.
config()Optional configuration. Supports needs_wordpress and max_attempts keys.

How chunks are persisted

Every time your generator yields a value, Queuety calls ChunkStore::append_chunk() to write it to the queuety_chunks table. There is no batching or buffering. One yield equals one INSERT.

The chunk row stores the job_id, workflow_id, step_index, chunk_index (zero-based), and the string content.

Resumability

When a streaming step is retried after a failure, the worker loads all previously persisted chunks and passes them as the $existing_chunks parameter. Your implementation decides how to resume:

  • Skip already-processed items by checking the count of existing chunks
  • Offset into a paginated API using the chunk count as the page offset
  • Restart from scratch by ignoring $existing_chunks entirely (chunks from the previous attempt are still in the DB and will be prepended)
public function stream( array $state, array $existing_chunks = [] ): \Generator {
    $offset = count( $existing_chunks );

    $pages = $this->client->paginate( $state['query'], offset: $offset );

    foreach ( $pages as $page ) {
        yield json_encode( $page );
    }
}

Heartbeats during streaming

Long-running streams should send heartbeats to prevent the stale job detector from reclaiming the job. Call Heartbeat::beat() inside your generator loop:

use Queuety\Heartbeat;

public function stream( array $state, array $existing_chunks = [] ): \Generator {
    foreach ( $this->client->stream( $state['prompt'] ) as $i => $chunk ) {
        Heartbeat::beat( [ 'chunks' => $i + 1 ] );
        yield $chunk;
    }
}

See Heartbeats for details on the Heartbeat API.

Chunk cleanup

When the stream completes successfully and on_complete() returns, the worker calls ChunkStore::clear_chunks() to delete all chunks for the job. The merged result from on_complete() is the durable record. Chunks are temporary by design.

If the step fails and is retried, chunks are not deleted. They are passed back via $existing_chunks on the next attempt.

Full example: LLM streaming

use Queuety\Contracts\StreamingStep;
use Queuety\Heartbeat;

class StreamLLMResponse implements StreamingStep {

    public function stream( array $state, array $existing_chunks = [] ): \Generator {
        // If we have existing chunks from a previous attempt, we can skip
        // re-requesting. The on_complete method will concatenate everything.
        if ( ! empty( $existing_chunks ) ) {
            return;
        }

        $response = OpenAI::chat()->createStreamed( [
            'model'    => 'gpt-4',
            'messages' => $state['messages'],
        ] );

        foreach ( $response as $i => $chunk ) {
            $text = $chunk->choices[0]->delta->content ?? '';
            if ( '' !== $text ) {
                Heartbeat::beat( [ 'tokens' => $i ] );
                yield $text;
            }
        }
    }

    public function on_complete( array $chunks, array $state ): array {
        return [
            'llm_response' => implode( '', $chunks ),
        ];
    }

    public function config(): array {
        return [ 'max_attempts' => 5 ];
    }
}

Use it in a workflow like any other step:

use Queuety\Queuety;

Queuety::workflow( 'ai_chat' )
    ->then( BuildPromptHandler::class )
    ->then( StreamLLMResponse::class )
    ->then( SaveResponseHandler::class )
    ->dispatch( [ 'user_input' => $input ] );

Full example: file processing

use Queuety\Contracts\StreamingStep;
use Queuety\Heartbeat;

class ProcessCSVRows implements StreamingStep {

    public function stream( array $state, array $existing_chunks = [] ): \Generator {
        $file   = fopen( $state['csv_path'], 'r' );
        $offset = count( $existing_chunks );
        $row    = 0;

        // Skip the header.
        fgetcsv( $file );

        while ( $line = fgetcsv( $file ) ) {
            $row++;

            // Skip rows already processed in a previous attempt.
            if ( $row <= $offset ) {
                continue;
            }

            $result = $this->process_row( $line );
            Heartbeat::beat( [ 'row' => $row ] );
            yield json_encode( $result );
        }

        fclose( $file );
    }

    public function on_complete( array $chunks, array $state ): array {
        $results = array_map( 'json_decode', $chunks );
        return [
            'processed_count' => count( $results ),
            'results'         => $results,
        ];
    }

    public function config(): array {
        return [];
    }

    private function process_row( array $line ): array {
        // Your row processing logic here.
        return [ 'id' => $line[0], 'status' => 'ok' ];
    }
}

ChunkStore API

The ChunkStore class manages chunk persistence. Access it via Queuety::chunk_store().

$store = Queuety::chunk_store();
MethodReturnsDescription
get_chunks( int $job_id )string[]Fetch all chunks for a job, ordered by chunk index
append_chunk( int $job_id, int $chunk_index, string $content, ?int $workflow_id, ?int $step_index )voidAppend a single chunk
clear_chunks( int $job_id )voidDelete all chunks for a job
chunk_count( int $job_id )intCount the number of chunks for a job
get_accumulated( int $job_id )stringConcatenate all chunks into one string

You normally do not need to call ChunkStore directly. The worker handles persistence and cleanup automatically when processing a StreamingStep. The API is available for advanced use cases such as inspecting in-progress chunks or manual cleanup.

On this page