Queuety
Examples

Neuron Streaming Step

This example shows a clean split. Neuron streams the agent response chunk by chunk, while Queuety persists each chunk durably through StreamingStep. That means the UI can receive a live draft while the workflow still survives worker crashes or retries.

When this pattern is useful

Use it when a draft or response is long enough that you want to stream it live, when you do not want to lose partial output if the worker dies, and when the final artifact should still land in workflow state after the stream completes.

The Neuron agent

namespace App\Neuron;

use NeuronAI\Agent\Agent;
use NeuronAI\Providers\AIProviderInterface;
use NeuronAI\Providers\OpenAI\Responses\OpenAIResponses;
use NeuronAI\SystemPrompt;

final class DraftMemoAgent extends Agent
{
    protected function provider(): AIProviderInterface
    {
        return new OpenAIResponses(
            key: $_ENV['OPENAI_API_KEY'],
            model: $_ENV['OPENAI_MODEL'],
        );
    }

    public function instructions(): string
    {
        return (string) new SystemPrompt(
            background: [
                'You write crisp internal strategy memos.',
                'Prefer short sections and operationally useful language.',
            ],
        );
    }
}

The Queuety streaming step

namespace App\Workflow\Steps;

use App\Neuron\DraftMemoAgent;
use NeuronAI\Chat\Messages\UserMessage;
use Queuety\Contracts\StreamingStep;
use Queuety\Heartbeat;

final class StreamMemoDraftStep implements StreamingStep
{
    public function stream(array $state, array $existing_chunks = []): \Generator
    {
        if (! empty($existing_chunks)) {
            return;
        }

        $handler = (new DraftMemoAgent())->stream(
            new UserMessage(sprintf(
                "Write a strategy memo about %s for account %d.",
                $state['topic'],
                $state['account_id'],
            ))
        );

        foreach ($handler->events() as $i => $chunk) {
            Heartbeat::beat(['chunks' => $i + 1]);

            // This example assumes the agent has no tools attached, so
            // Neuron will stream text/reasoning chunks only.
            yield $chunk->content;
        }
    }

    public function on_complete(array $chunks, array $state): array
    {
        return [
            'draft_markdown' => implode('', $chunks),
        ];
    }

    public function config(): array
    {
        return [
            'max_attempts' => 3,
        ];
    }
}

Use it in a workflow

use Queuety\Queuety;

Queuety::workflow('memo_drafting')
    ->then(BuildDraftBriefStep::class)
    ->then(StreamMemoDraftStep::class)
    ->then(StoreMemoStep::class)
    ->dispatch([
        'account_id' => 10,
        'topic' => 'pricing changes for enterprise accounts',
    ]);

Why use Queuety instead of Neuron stream adapters alone

Neuron's stream adapters are useful for frontend protocols and transports. Queuety adds something different: every yielded chunk is persisted, retries can resume with the previously saved chunks, the completed draft is merged back into workflow state, and the stream becomes one durable step in a larger orchestration.

That makes this pattern a good fit when the stream is part of a multi-step business process, not just a request/response UI.

One caveat

If your Neuron agent uses tools while streaming, handle the streamed chunk types explicitly. Neuron's current streaming docs describe TextChunk, ReasoningChunk, ToolCallChunk, and ToolResultChunk. For a background drafting workflow, the simplest approach is usually to keep the streaming agent text-only.

On this page