Installation
Install required packages:pip install requests
Basic Setup
import requests
import os
from typing import Dict, Optional, BinaryIO
API_KEY = os.getenv('NEOSPEECH_API_KEY')
BASE_URL = 'https://api.neospeech.io/v1'
def api_request(endpoint: str, method: str = 'GET', json_data: Optional[Dict] = None):
"""Helper function for API requests"""
url = f'{BASE_URL}{endpoint}'
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
response = requests.request(
method=method,
url=url,
headers=headers,
json=json_data
)
if not response.ok:
error = response.json()
raise Exception(error.get('message', f'HTTP {response.status_code}'))
return response
Generate Speech
Basic Example
def generate_speech(text: str, voice: str = 'lyra', model: str = 'aurora-3.5') -> bytes:
"""Generate speech from text"""
response = api_request('/audio/speech', method='POST', json_data={
'input': text,
'voice': voice,
'model': model
})
return response.content
# Usage
audio = generate_speech('Hello, world!', 'lyra', 'aurora-4')
Advanced Example with Options
def generate_speech_advanced(
input_text: str,
voice: str = 'lyra',
model: str = 'aurora-3.5',
pitch: str = '+0%',
style: str = 'calm',
style_degree: str = '1.5',
lang: str = 'en-US'
) -> Dict:
"""Generate speech with advanced options"""
response = api_request('/audio/speech', method='POST', json_data={
'input': input_text,
'voice': voice,
'model': model,
'pitch': pitch,
'style': style,
'styleDegree': style_degree,
'lang': lang
})
# Get metadata from headers
metadata = {
'character_count': response.headers.get('X-Character-Count'),
'model_used': response.headers.get('X-Model-Used'),
'voice_used': response.headers.get('X-Voice-Used'),
'content_length': response.headers.get('Content-Length')
}
return {
'audio': response.content,
'metadata': metadata
}
# Usage
result = generate_speech_advanced(
input_text='Welcome to our professional service!',
voice='lyra',
model='aurora-4',
pitch='+10%',
style='cheerful',
style_degree='1.8'
)
print(f"Generated audio: {result['metadata']}")
Save Audio to File
def save_audio_to_file(
text: str,
filename: str,
voice: str = 'lyra',
model: str = 'aurora-3.5'
):
"""Generate speech and save to file"""
audio = generate_speech(text, voice, model)
with open(filename, 'wb') as f:
f.write(audio)
print(f'Audio saved to {filename}')
# Usage
save_audio_to_file(
'This is a test message.',
'output.mp3',
'lyra',
'aurora-4'
)
Stream Speech
Basic Streaming
def stream_speech(text: str, voice: str = 'lyra', model: str = 'aurora-3.5') -> bytes:
"""Stream speech generation"""
response = api_request('/audio/stream', method='POST', json_data={
'input': text,
'voice': voice,
'model': model
})
chunks = []
for chunk in response.iter_content(chunk_size=8192):
if chunk:
chunks.append(chunk)
print(f'Received chunk: {len(chunk)} bytes')
return b''.join(chunks)
# Usage
streamed_audio = stream_speech('Streaming test message', 'kai', 'turbo-3')
Stream with Progress Callback
from typing import Callable
def stream_speech_with_progress(
text: str,
voice: str,
model: str,
on_progress: Optional[Callable] = None
) -> bytes:
"""Stream speech with progress callback"""
response = api_request('/audio/stream', method='POST', json_data={
'input': text,
'voice': voice,
'model': model
})
chunks = []
received_bytes = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk:
chunks.append(chunk)
received_bytes += len(chunk)
if on_progress:
on_progress({
'received_bytes': received_bytes,
'chunks': len(chunks),
'latest_chunk_size': len(chunk)
})
return b''.join(chunks)
# Usage
def progress_callback(progress):
print(f"Progress: {progress['received_bytes']} bytes, {progress['chunks']} chunks")
audio = stream_speech_with_progress(
'Long text content here...',
'lyra',
'aurora-3.5',
on_progress=progress_callback
)
Check Balance
def get_balance() -> Dict:
"""Get account balance information"""
response = api_request('/balance')
result = response.json()
return result['data']
# Usage
balance = get_balance()
print(f"Remaining credits: {balance['remaining_credits']:,}")
print(f"Plan: {balance['plan_type']}")
print(f"Days remaining: {balance['billing_cycle']['days_remaining']}")
List Voices
Get All Voices
def list_voices(filters: Optional[Dict] = None) -> Dict:
"""List available voices with optional filters"""
params = filters or {}
query_string = '&'.join([f'{k}={v}' for k, v in params.items()])
endpoint = f"/voices/list{'?' + query_string if query_string else ''}"
response = api_request(endpoint)
result = response.json()
return result['data']
# Usage
all_voices = list_voices()
print(f"Found {all_voices['pagination']['total']} voices")
Filter Voices
# Get female US English voices
female_voices = list_voices({
'gender': 'female',
'locale': 'en-US'
})
# Search for professional voices
professional_voices = list_voices({
'search': 'professional'
})
# Paginated results
page1 = list_voices({'limit': 10, 'offset': 0})
page2 = list_voices({'limit': 10, 'offset': 10})
Get All Voices with Pagination
def get_all_voices() -> list:
"""Get all voices with pagination"""
all_voices = []
offset = 0
limit = 50
while True:
result = list_voices({'limit': limit, 'offset': offset})
all_voices.extend(result['voices'])
print(f"Retrieved {len(all_voices)} of {result['pagination']['total']}")
if len(all_voices) >= result['pagination']['total']:
break
offset += limit
return all_voices
List Models
def list_models() -> Dict:
"""List available models"""
response = api_request('/models/list')
result = response.json()
return result['data']
# Usage
models = list_models()
for model in models['models']:
print(f"{model['name']}: {model['quality']} quality, {model['avg_latency_ms']}ms latency")
Error Handling
Basic Error Handling
def generate_speech_with_error_handling(text: str, voice: str, model: str) -> bytes:
"""Generate speech with error handling"""
try:
response = api_request('/audio/speech', method='POST', json_data={
'input': text,
'voice': voice,
'model': model
})
return response.content
except Exception as error:
print(f'Speech generation failed: {error}')
# Handle specific error types
error_msg = str(error)
if 'Rate limit' in error_msg:
print('Rate limited. Please wait and retry.')
elif 'Authorization' in error_msg:
print('Check your API key.')
raise
Custom Error Class
class NeoSpeechError(Exception):
"""Custom exception for NeoSpeech API errors"""
def __init__(self, message: str, code: str, status: int, retryable: bool):
super().__init__(message)
self.code = code
self.status = status
self.retryable = retryable
def api_request_with_errors(endpoint: str, method: str = 'GET', json_data: Optional[Dict] = None):
"""API request with custom error handling"""
url = f'{BASE_URL}{endpoint}'
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
response = requests.request(
method=method,
url=url,
headers=headers,
json=json_data
)
if not response.ok:
error = response.json()
raise NeoSpeechError(
error.get('message', f'HTTP {response.status_code}'),
error.get('error_code'),
response.status_code,
error.get('retryable', False)
)
return response
# Usage
try:
audio = generate_speech('test', 'lyra', 'aurora-4')
except NeoSpeechError as error:
print(f"Error [{error.code}]: {error}")
print(f"Retryable: {error.retryable}")
Retry with Exponential Backoff
import time
def generate_speech_with_retry(
text: str,
voice: str,
model: str,
max_retries: int = 3
) -> bytes:
"""Generate speech with retry logic"""
for attempt in range(max_retries):
try:
return generate_speech(text, voice, model)
except Exception as error:
is_last_attempt = attempt == max_retries - 1
# Check if error is retryable
error_msg = str(error)
is_retryable = '500' in error_msg or 'timeout' in error_msg.lower()
if not is_retryable or is_last_attempt:
raise
# Exponential backoff
delay = 2 ** attempt
print(f"Attempt {attempt + 1} failed. Retrying in {delay}s...")
time.sleep(delay)
Rate Limiting
Request Queue
import threading
import queue
import time
from typing import Callable, Any
class RequestQueue:
"""Queue for managing concurrent and rate-limited requests"""
def __init__(self, max_concurrent: int = 18, requests_per_minute: int = 60):
self.max_concurrent = max_concurrent
self.requests_per_minute = requests_per_minute
self.queue = queue.Queue()
self.active = 0
self.request_times = []
self.lock = threading.Lock()
def add(self, request_fn: Callable) -> Any:
"""Add a request to the queue"""
result_queue = queue.Queue()
def wrapped_request():
try:
result = request_fn()
result_queue.put(('success', result))
except Exception as e:
result_queue.put(('error', e))
self.queue.put(wrapped_request)
self._process_queue()
status, result = result_queue.get()
if status == 'error':
raise result
return result
def _process_queue(self):
"""Process queued requests"""
with self.lock:
if self.active >= self.max_concurrent:
return
# Check rate limit
now = time.time()
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.requests_per_minute:
oldest_request = min(self.request_times)
delay = 60 - (now - oldest_request)
threading.Timer(delay, self._process_queue).start()
return
if self.queue.empty():
return
request_fn = self.queue.get()
self.active += 1
self.request_times.append(now)
def execute():
try:
request_fn()
finally:
with self.lock:
self.active -= 1
self._process_queue()
threading.Thread(target=execute).start()
# Usage
request_queue = RequestQueue(18, 60)
def queued_generate_speech(text: str, voice: str, model: str) -> bytes:
return request_queue.add(
lambda: generate_speech(text, voice, model)
)
# Process multiple requests safely
texts = ['text1', 'text2', 'text3']
audios = [queued_generate_speech(text, 'lyra', 'aurora-3.5') for text in texts]
Complete SDK Example
class NeoSpeechClient:
"""Complete SDK client for NeoSpeech API"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = 'https://api.neospeech.io/v1'
def _request(self, endpoint: str, method: str = 'GET', json_data: Optional[Dict] = None):
"""Make API request"""
url = f'{self.base_url}{endpoint}'
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
response = requests.request(
method=method,
url=url,
headers=headers,
json=json_data
)
if not response.ok:
error = response.json()
raise Exception(error.get('message', f'HTTP {response.status_code}'))
return response
def generate_speech(self, options: Dict) -> bytes:
"""Generate speech from text"""
response = self._request('/audio/speech', method='POST', json_data=options)
return response.content
def stream_speech(self, options: Dict) -> bytes:
"""Stream speech generation"""
response = self._request('/audio/stream', method='POST', json_data=options)
chunks = []
for chunk in response.iter_content(chunk_size=8192):
if chunk:
chunks.append(chunk)
return b''.join(chunks)
def get_balance(self) -> Dict:
"""Get account balance"""
response = self._request('/balance')
result = response.json()
return result['data']
def list_voices(self, filters: Optional[Dict] = None) -> Dict:
"""List available voices"""
params = filters or {}
query_string = '&'.join([f'{k}={v}' for k, v in params.items()])
endpoint = f"/voices/list{'?' + query_string if query_string else ''}"
response = self._request(endpoint)
result = response.json()
return result['data']
def list_models(self) -> Dict:
"""List available models"""
response = self._request('/models/list')
result = response.json()
return result['data']
# Usage
client = NeoSpeechClient(os.getenv('NEOSPEECH_API_KEY'))
# Generate speech
audio = client.generate_speech({
'input': 'Hello, world!',
'voice': 'lyra',
'model': 'aurora-4'
})
# Check balance
balance = client.get_balance()
print(f"Credits: {balance['remaining_credits']}")
# List voices
voices = client.list_voices({'gender': 'female'})
print(f"Found {len(voices['voices'])} female voices")
Async Implementation
import aiohttp
import asyncio
class AsyncNeoSpeechClient:
"""Async SDK client for NeoSpeech API"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = 'https://api.neospeech.io/v1'
async def _request(self, endpoint: str, method: str = 'GET', json_data: Optional[Dict] = None):
"""Make async API request"""
url = f'{self.base_url}{endpoint}'
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
async with aiohttp.ClientSession() as session:
async with session.request(
method=method,
url=url,
headers=headers,
json=json_data
) as response:
if response.status != 200:
error = await response.json()
raise Exception(error.get('message', f'HTTP {response.status}'))
return response
async def generate_speech(self, options: Dict) -> bytes:
"""Generate speech asynchronously"""
response = await self._request('/audio/speech', method='POST', json_data=options)
return await response.read()
async def get_balance(self) -> Dict:
"""Get account balance asynchronously"""
response = await self._request('/balance')
result = await response.json()
return result['data']
async def list_voices(self, filters: Optional[Dict] = None) -> Dict:
"""List voices asynchronously"""
params = filters or {}
query_string = '&'.join([f'{k}={v}' for k, v in params.items()])
endpoint = f"/voices/list{'?' + query_string if query_string else ''}"
response = await self._request(endpoint)
result = await response.json()
return result['data']
# Usage
async def main():
client = AsyncNeoSpeechClient(os.getenv('NEOSPEECH_API_KEY'))
# Generate multiple speeches concurrently
tasks = [
client.generate_speech({
'input': f'Message {i}',
'voice': 'lyra',
'model': 'aurora-3.5'
})
for i in range(10)
]
audios = await asyncio.gather(*tasks)
print(f"Generated {len(audios)} audio files")
asyncio.run(main())
Related Resources
JavaScript Examples
JavaScript/Node.js examples
cURL Examples
Command-line examples
API Reference
Complete API documentation
Best Practices
Optimization tips

