Examples
Neuron Streaming Step
This example shows a clean split:
- Neuron streams the agent response chunk by chunk
- Queuety persists each chunk durably through
StreamingStep
That means the UI can receive a live draft, while the workflow still survives worker crashes or retries.
When this pattern is useful
Use it when:
- a draft or response is long enough that you want to stream it live
- you do not want to lose partial output if the worker dies
- the final artifact should still land in workflow state after the stream completes
The Neuron agent
namespace App\Neuron;
use NeuronAI\Agent\Agent;
use NeuronAI\Providers\AIProviderInterface;
use NeuronAI\Providers\OpenAI\Responses\OpenAIResponses;
use NeuronAI\SystemPrompt;
final class DraftMemoAgent extends Agent
{
protected function provider(): AIProviderInterface
{
return new OpenAIResponses(
key: $_ENV['OPENAI_API_KEY'],
model: $_ENV['OPENAI_MODEL'],
);
}
public function instructions(): string
{
return (string) new SystemPrompt(
background: [
'You write crisp internal strategy memos.',
'Prefer short sections and operationally useful language.',
],
);
}
}The Queuety streaming step
namespace App\Workflow\Steps;
use App\Neuron\DraftMemoAgent;
use NeuronAI\Chat\Messages\UserMessage;
use Queuety\Contracts\StreamingStep;
use Queuety\Heartbeat;
final class StreamMemoDraftStep implements StreamingStep
{
public function stream(array $state, array $existing_chunks = []): \Generator
{
if (! empty($existing_chunks)) {
return;
}
$handler = (new DraftMemoAgent())->stream(
new UserMessage(sprintf(
"Write a strategy memo about %s for account %d.",
$state['topic'],
$state['account_id'],
))
);
foreach ($handler->events() as $i => $chunk) {
Heartbeat::beat(['chunks' => $i + 1]);
// This example assumes the agent has no tools attached, so
// Neuron will stream text/reasoning chunks only.
yield $chunk->content;
}
}
public function on_complete(array $chunks, array $state): array
{
return [
'draft_markdown' => implode('', $chunks),
];
}
public function config(): array
{
return [
'max_attempts' => 3,
];
}
}Use it in a workflow
use Queuety\Queuety;
Queuety::workflow('memo_drafting')
->then(BuildDraftBriefStep::class)
->then(StreamMemoDraftStep::class)
->then(StoreMemoStep::class)
->dispatch([
'account_id' => 10,
'topic' => 'pricing changes for enterprise accounts',
]);Why use Queuety instead of Neuron stream adapters alone
Neuron's stream adapters are useful for frontend protocols and transports.
Queuety adds something different:
- every yielded chunk is persisted
- retries can resume with the previously saved chunks
- the completed draft is merged back into workflow state
- the stream becomes one durable step in a larger orchestration
That makes this pattern a good fit when the stream is part of a multi-step business process, not just a request/response UI.
One caveat
If your Neuron agent uses tools while streaming, handle the streamed chunk types explicitly. Neuron's current streaming docs describe TextChunk, ReasoningChunk, ToolCallChunk, and ToolResultChunk. For a background drafting workflow, the simplest approach is usually to keep the streaming agent text-only.