Queuety
Examples

Neuron Streaming Step

This example shows a clean split:

  • Neuron streams the agent response chunk by chunk
  • Queuety persists each chunk durably through StreamingStep

That means the UI can receive a live draft, while the workflow still survives worker crashes or retries.

When this pattern is useful

Use it when:

  • a draft or response is long enough that you want to stream it live
  • you do not want to lose partial output if the worker dies
  • the final artifact should still land in workflow state after the stream completes

The Neuron agent

namespace App\Neuron;

use NeuronAI\Agent\Agent;
use NeuronAI\Providers\AIProviderInterface;
use NeuronAI\Providers\OpenAI\Responses\OpenAIResponses;
use NeuronAI\SystemPrompt;

final class DraftMemoAgent extends Agent
{
    protected function provider(): AIProviderInterface
    {
        return new OpenAIResponses(
            key: $_ENV['OPENAI_API_KEY'],
            model: $_ENV['OPENAI_MODEL'],
        );
    }

    public function instructions(): string
    {
        return (string) new SystemPrompt(
            background: [
                'You write crisp internal strategy memos.',
                'Prefer short sections and operationally useful language.',
            ],
        );
    }
}

The Queuety streaming step

namespace App\Workflow\Steps;

use App\Neuron\DraftMemoAgent;
use NeuronAI\Chat\Messages\UserMessage;
use Queuety\Contracts\StreamingStep;
use Queuety\Heartbeat;

final class StreamMemoDraftStep implements StreamingStep
{
    public function stream(array $state, array $existing_chunks = []): \Generator
    {
        if (! empty($existing_chunks)) {
            return;
        }

        $handler = (new DraftMemoAgent())->stream(
            new UserMessage(sprintf(
                "Write a strategy memo about %s for account %d.",
                $state['topic'],
                $state['account_id'],
            ))
        );

        foreach ($handler->events() as $i => $chunk) {
            Heartbeat::beat(['chunks' => $i + 1]);

            // This example assumes the agent has no tools attached, so
            // Neuron will stream text/reasoning chunks only.
            yield $chunk->content;
        }
    }

    public function on_complete(array $chunks, array $state): array
    {
        return [
            'draft_markdown' => implode('', $chunks),
        ];
    }

    public function config(): array
    {
        return [
            'max_attempts' => 3,
        ];
    }
}

Use it in a workflow

use Queuety\Queuety;

Queuety::workflow('memo_drafting')
    ->then(BuildDraftBriefStep::class)
    ->then(StreamMemoDraftStep::class)
    ->then(StoreMemoStep::class)
    ->dispatch([
        'account_id' => 10,
        'topic' => 'pricing changes for enterprise accounts',
    ]);

Why use Queuety instead of Neuron stream adapters alone

Neuron's stream adapters are useful for frontend protocols and transports.

Queuety adds something different:

  • every yielded chunk is persisted
  • retries can resume with the previously saved chunks
  • the completed draft is merged back into workflow state
  • the stream becomes one durable step in a larger orchestration

That makes this pattern a good fit when the stream is part of a multi-step business process, not just a request/response UI.

One caveat

If your Neuron agent uses tools while streaming, handle the streamed chunk types explicitly. Neuron's current streaming docs describe TextChunk, ReasoningChunk, ToolCallChunk, and ToolResultChunk. For a background drafting workflow, the simplest approach is usually to keep the streaming agent text-only.

On this page