Overview
Learn how to integrate AI capabilities into your Sushify Next.js application using Vercel AI SDK
Sushify Next.js includes built-in AI integration powered by Vercel AI SDK, providing easy access to various AI models for text generation, image creation, and audio processing.
Architecture
This project uses oRPC (a type-safe RPC framework) instead of traditional Next.js API routes. All AI functionality is implemented as oRPC procedures that provide:
- Type Safety: End-to-end type inference
- Streaming Support: Native streaming for AI responses
- Authentication: Built-in user authentication and authorization
- OpenAPI: Automatic API documentation generation
Features
- Multiple AI Providers: Support for OpenAI, Anthropic, and other providers
- Streaming Responses: Real-time streaming of AI-generated content via oRPC
- React Hooks: Vercel AI SDK hooks integrated with oRPC transport
- Type-Safe: Full TypeScript support with oRPC procedures
- Chat Management: Database-backed chat history and multi-user support
- Organization Support: AI chats can be scoped to organizations
- Text Generation: Generate text using GPT-4 and other models
- Image Generation: Create images with DALL-E
- Audio Processing: Speech-to-text with Whisper
Setup
1. Environment Variables
Configure your AI provider credentials in .env.local:
# OpenAI Configuration (Default)
OPENAI_API_KEY="sk-..."
# Anthropic (Optional)
ANTHROPIC_API_KEY="sk-ant-..."2. Available Models
The AI package exports pre-configured models:
import { openai } from "@ai-sdk/openai";
export const textModel = openai("gpt-4o-mini");
export const imageModel = openai("dall-e-3");
export const audioModel = openai("whisper-1");Text Generation
Using the useChat Hook with oRPC
Sushify uses oRPC for API management, including AI endpoints. The useChat hook from Vercel AI SDK is integrated with oRPC through a custom transport layer.
Understanding the Transport Layer
The useChat hook requires a transport object that defines how to communicate with the backend. Here's how it works:
"use client";
import { useChat } from "@ai-sdk/react";
import { eventIteratorToStream } from "@orpc/client";
import { orpcClient } from "@shared/lib/orpc-client";
import { useState } from "react";
export function ChatInterface({ chatId }: { chatId: string }) {
const [input, setInput] = useState("");
const { messages, setMessages, status, sendMessage } = useChat({
// Unique ID for this chat session
id: chatId ?? "new",
// Custom transport layer for oRPC integration
transport: {
/**
* sendMessages: Called when user sends a message
*
* Flow:
* 1. Receives all messages from the useChat hook
* 2. Calls the oRPC endpoint with chatId and messages
* 3. oRPC endpoint returns an async iterator (streaming response)
* 4. eventIteratorToStream converts the iterator to a ReadableStream
* 5. useChat consumes the stream and updates messages in real-time
*/
async sendMessages(options) {
if (!chatId) {
throw new Error("Chat ID is required");
}
// Call oRPC endpoint: POST /api/rpc/ai/chats/{chatId}/messages
// Returns an async iterator of UI message events
return eventIteratorToStream(
await orpcClient.ai.chats.messages.add(
{
chatId, // Which chat to add message to
messages: options.messages, // All messages in the conversation
},
{
signal: options.abortSignal // Allow cancellation
}
)
);
},
/**
* reconnectToStream: Called if stream connection is lost
*
* Not implemented in this project as we don't support reconnection.
* Each message creates a new stream.
*/
reconnectToStream() {
throw new Error("Unsupported");
},
},
});
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
const text = input.trim();
setInput("");
try {
// sendMessage adds user message and triggers AI response
await sendMessage({ text });
} catch (error) {
console.error("Failed to send message", error);
setInput(text); // Restore input on error
}
};How the Streaming Works
Here's the complete flow when a user sends a message:
1. User sends message
↓
2. useChat hook calls transport.sendMessages()
↓
3. oRPC client makes POST request to /api/rpc/ai/chats/{chatId}/messages
↓
4. Server procedure calls OpenAI with streamText()
↓
5. Server returns async iterator (oRPC format)
↓
6. eventIteratorToStream converts iterator → ReadableStream
↓
7. useChat consumes stream, updates UI in real-time
↓
8. Server onFinish callback saves complete conversation to databaseKey Concepts
eventIteratorToStream
- Converts oRPC's async iterator format into a Web ReadableStream
- Required because
useChatexpects a stream, but oRPC returns iterators - Handles streaming chunks as they arrive from the AI model
Message Format
interface UIMessage {
id: string;
role: "user" | "assistant" | "system";
parts: Array<{
type: "text" | "image" | "file";
text?: string;
// ... other part types for multi-modal content
}>;
}Status States
"idle"- No active request"submitted"- Request sent, waiting for first chunk"streaming"- Actively receiving streamed response"error"- Request failed
Why Send All Messages?
The transport.sendMessages() receives ALL messages in the conversation (not just the new one) because:
- AI models need full context to generate coherent responses
- Each request is stateless - the server doesn't maintain conversation state
- Conversation history is stored in the database, not in memory
Creating an AI Procedure (oRPC)
Instead of API routes, create oRPC procedures for AI functionality. Here's the actual server-side implementation:
import { streamToEventIterator } from "@orpc/client";
import { streamText, textModel, convertToModelMessages } from "@repo/ai";
import { getAiChatById, updateAiChat } from "@repo/database";
import { protectedProcedure } from "../../../orpc/procedures";
export const addMessageToChat = protectedProcedure
.route({
method: "POST",
path: "/ai/chats/{chatId}/messages",
tags: ["AI"],
summary: "Add message to chat",
})
.input(type<{ chatId: string; messages: UIMessage[] }>())
.handler(async ({ input, context }) => {
const { chatId, messages } = input;
const user = context.user;
// 1. Verify chat exists and user has access
const chat = await getAiChatById(chatId);
if (!chat || chat.userId !== user.id) {
throw new ORPCError("FORBIDDEN");
}
// 2. Stream AI response from OpenAI
const response = streamText({
model: textModel, // gpt-4o-mini
messages: convertToModelMessages(messages), // Convert to OpenAI format
async onFinish({ text }) {
// 3. Save complete conversation to database when streaming finishes
await updateAiChat({
id: chatId,
messages: [
...messages, // All previous messages
{
role: "assistant",
parts: [{ type: "text", text }], // AI's complete response
},
],
});
},
});
// 4. Convert Vercel AI SDK stream to oRPC event iterator
// This allows oRPC to transport the stream to the client
return streamToEventIterator(response.toUIMessageStream());
});Server-Side Flow Explained
Client Request
↓
[protectedProcedure middleware]
↓ Validates user authentication
↓ Provides user context
↓
[handler function]
↓
1. Verify chat ownership
↓ getAiChatById(chatId)
↓ Check user.id === chat.userId
↓
2. Call OpenAI API
↓ streamText() with gpt-4o-mini
↓ Returns ReadableStream
↓
3. Convert to UI message stream
↓ response.toUIMessageStream()
↓ Chunks: { type: "text-delta", text: "Hello" }
↓
4. Convert to oRPC format
↓ streamToEventIterator()
↓ Async iterator of events
↓
5. Stream to client
↓ oRPC transports iterator
↓ Client receives chunks in real-time
↓
6. On completion (onFinish callback)
↓ Save full conversation to database
↓ updateAiChat() with all messagesKey Functions
convertToModelMessages()
// Converts UI message format to OpenAI's expected format
// From: { role: "user", parts: [{ type: "text", text: "Hello" }] }
// To: { role: "user", content: "Hello" }streamToEventIterator()
// Converts ReadableStream → AsyncIterator
// Required for oRPC to transport streams
// Each chunk is an event like { type: "text-delta", text: "word" }onFinish()
// Callback executed when AI finishes generating response
// Perfect place to save data, log analytics, etc.
// Runs after streaming completesManaging AI Chats
Create and manage AI chat sessions using oRPC:
import { orpcClient } from "@shared/lib/orpc-client";
// Create a new chat
const newChat = await orpcClient.ai.chats.create({
title: "My AI Chat",
organizationId: "org_123", // Optional: for organization-level chats
});
// List all chats
const { chats } = await orpcClient.ai.chats.list({
organizationId: "org_123", // Optional
});
// Get a specific chat
const { chat } = await orpcClient.ai.chats.find({
id: "chat_123",
});
// Update chat title
await orpcClient.ai.chats.update({
id: "chat_123",
title: "Updated Title",
});
// Delete a chat
await orpcClient.ai.chats.delete({
id: "chat_123",
});Complete Chat Component Example
Here's a complete example based on the actual project implementation:
"use client";
import { useChat } from "@ai-sdk/react";
import { eventIteratorToStream } from "@orpc/client";
import { orpcClient } from "@shared/lib/orpc-client";
import { orpc } from "@shared/lib/orpc-query-utils";
import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query";
import { useState, useEffect } from "react";
export function AiChat({ organizationId }: { organizationId?: string }) {
const queryClient = useQueryClient();
const [input, setInput] = useState("");
const [chatId, setChatId] = useState<string | null>(null);
// Fetch all chats
const { data } = useQuery(
orpc.ai.chats.list.queryOptions({
input: { organizationId },
})
);
// Fetch current chat
const currentChatQuery = useQuery(
orpc.ai.chats.find.queryOptions({
input: chatId ? { id: chatId } : null,
})
);
// Create chat mutation
const createChatMutation = useMutation(
orpc.ai.chats.create.mutationOptions()
);
const chats = data?.chats ?? [];
const currentChat = currentChatQuery.data?.chat;
// useChat hook with oRPC transport
const { messages, setMessages, status, sendMessage } = useChat({
id: chatId ?? "new",
transport: {
async sendMessages(options) {
if (!chatId) throw new Error("Chat ID required");
return eventIteratorToStream(
await orpcClient.ai.chats.messages.add(
{
chatId,
messages: options.messages,
},
{ signal: options.abortSignal }
)
);
},
reconnectToStream() {
throw new Error("Unsupported");
},
},
});
// Load messages when chat changes
useEffect(() => {
if (currentChat?.messages) {
setMessages(currentChat.messages);
}
}, [currentChat?.messages]);
// Create new chat
const createNewChat = async () => {
const newChat = await createChatMutation.mutateAsync({
organizationId,
});
await queryClient.invalidateQueries({
queryKey: orpc.ai.chats.list.queryKey({ input: { organizationId } }),
});
setChatId(newChat.chat.id);
};
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
const text = input.trim();
setInput("");
try {
await sendMessage({ text });
} catch (error) {
console.error("Failed to send message", error);
setInput(text);
}
};
return (
<div className="flex h-screen">
{/* Sidebar with chat list */}
<aside className="w-64 border-r p-4">
<button onClick={createNewChat} className="w-full mb-4">
+ New Chat
</button>
{chats.map((chat) => (
<button
key={chat.id}
onClick={() => setChatId(chat.id)}
className={chatId === chat.id ? "font-bold" : ""}
>
{chat.title || "Untitled"}
</button>
))}
</aside>
{/* Main chat area */}
<main className="flex-1 flex flex-col">
<div className="flex-1 overflow-y-auto p-4">
{messages.map((message, index) => (
<div
key={index}
className={message.role === "user" ? "text-right" : "text-left"}
>
{message.parts?.map((part, i) =>
part.type === "text" ? <p key={i}>{part.text}</p> : null
)}
</div>
))}
{(status === "streaming" || status === "submitted") && (
<div>Thinking...</div>
)}
</div>
<form onSubmit={handleSubmit} className="p-4 border-t">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type your message..."
disabled={!chatId}
className="w-full"
/>
<button type="submit" disabled={!chatId}>
Send
</button>
</form>
</main>
</div>
);
}Server-Side Text Generation
Generate Text
Generate text on the server without streaming:
import { generateText, textModel } from "@repo/ai";
const result = await generateText({
model: textModel,
prompt: "Write a haiku about programming",
});
console.log(result.text);Stream Text
Stream text responses for better UX:
import { streamText, textModel } from "@repo/ai";
const result = await streamText({
model: textModel,
prompt: "Write a short story about AI",
});
// Stream to client
return result.toDataStreamResponse();Structured Output
Generate structured data with schema validation:
import { generateObject, textModel } from "@repo/ai";
import { z } from "zod";
const result = await generateObject({
model: textModel,
schema: z.object({
recipe: z.object({
name: z.string(),
ingredients: z.array(z.string()),
steps: z.array(z.string()),
}),
}),
prompt: "Generate a recipe for chocolate chip cookies",
});
console.log(result.object.recipe);Image Generation
Generate Images
Create images using DALL-E:
import { imageModel } from "@repo/ai";
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export async function POST(req: Request) {
const { prompt } = await req.json();
const response = await openai.images.generate({
model: "dall-e-3",
prompt,
n: 1,
size: "1024x1024",
});
return Response.json({ url: response.data[0].url });
}Client Component
"use client";
import { useState } from "react";
export function ImageGenerator() {
const [prompt, setPrompt] = useState("");
const [imageUrl, setImageUrl] = useState("");
const [loading, setLoading] = useState(false);
const generateImage = async () => {
setLoading(true);
try {
const response = await fetch("/api/generate-image", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
});
const data = await response.json();
setImageUrl(data.url);
} finally {
setLoading(false);
}
};
return (
<div>
<input
value={prompt}
onChange={(e) => setPrompt(e.target.value)}
placeholder="Describe an image..."
/>
<button onClick={generateImage} disabled={loading}>
{loading ? "Generating..." : "Generate Image"}
</button>
{imageUrl && <img src={imageUrl} alt="Generated" />}
</div>
);
}Audio Processing
Speech to Text
Transcribe audio using Whisper:
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export async function POST(req: Request) {
const formData = await req.formData();
const audio = formData.get("audio") as File;
const transcription = await openai.audio.transcriptions.create({
file: audio,
model: "whisper-1",
});
return Response.json({ text: transcription.text });
}Custom AI Providers
Using Anthropic (Claude)
Configure Anthropic as an alternative provider:
import { anthropic } from "@ai-sdk/anthropic";
export const claudeModel = anthropic("claude-3-5-sonnet-20241022");import { streamText } from "@repo/ai";
import { claudeModel } from "@repo/ai/anthropic";
export async function POST(req: Request) {
const { messages } = await req.json();
const result = await streamText({
model: claudeModel,
messages,
});
return result.toDataStreamResponse();
}Advanced Features
Function Calling
Enable AI to call functions:
import { generateText, textModel } from "@repo/ai";
import { z } from "zod";
const result = await generateText({
model: textModel,
prompt: "What's the weather in San Francisco?",
tools: {
getWeather: {
description: "Get the weather for a location",
parameters: z.object({
location: z.string(),
}),
execute: async ({ location }) => {
// Call weather API
return { temperature: 72, condition: "sunny" };
},
},
},
});Multi-Modal Input
Process images with text:
import { generateText, textModel } from "@repo/ai";
const result = await generateText({
model: textModel,
messages: [
{
role: "user",
content: [
{ type: "text", text: "What's in this image?" },
{ type: "image", image: imageUrl },
],
},
],
});Rate Limiting
Implement rate limiting for AI endpoints:
import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis";
const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(10, "1 h"),
});
export async function POST(req: Request) {
const identifier = req.headers.get("x-forwarded-for") ?? "anonymous";
const { success } = await ratelimit.limit(identifier);
if (!success) {
return new Response("Rate limit exceeded", { status: 429 });
}
// Continue with AI processing...
}Best Practices
- Rate Limiting: Always implement rate limiting for AI endpoints
- Error Handling: Gracefully handle API errors and timeouts
- Streaming: Use streaming for better user experience
- Cost Management: Monitor API usage and set budgets
- Caching: Cache responses when appropriate
- Type Safety: Use TypeScript and schema validation
- Security: Never expose API keys to the client
- Content Filtering: Implement content moderation for user inputs
API Reference
useChat()
const {
messages,
input,
handleInputChange,
handleSubmit,
isLoading,
error,
reload,
stop,
append,
} = useChat(options);useCompletion()
const {
completion,
input,
handleInputChange,
handleSubmit,
isLoading,
error,
} = useCompletion(options);generateText()
const result = await generateText({
model: textModel,
prompt: string,
system?: string,
messages?: Message[],
tools?: Tools,
maxTokens?: number,
temperature?: number,
});streamText()
const result = await streamText({
model: textModel,
prompt: string,
system?: string,
messages?: Message[],
});
return result.toDataStreamResponse();