Skip to main content

Installation

Install required packages:
pip install requests

Basic Setup

import requests
import os
from typing import Dict, Optional, BinaryIO

API_KEY = os.getenv('NEOSPEECH_API_KEY')
BASE_URL = 'https://api.neospeech.io/v1'

def api_request(endpoint: str, method: str = 'GET', json_data: Optional[Dict] = None):
    """Helper function for API requests"""
    url = f'{BASE_URL}{endpoint}'
    headers = {
        'Authorization': f'Bearer {API_KEY}',
        'Content-Type': 'application/json'
    }

    response = requests.request(
        method=method,
        url=url,
        headers=headers,
        json=json_data
    )

    if not response.ok:
        error = response.json()
        raise Exception(error.get('message', f'HTTP {response.status_code}'))

    return response

Generate Speech

Basic Example

def generate_speech(text: str, voice: str = 'lyra', model: str = 'aurora-3.5') -> bytes:
    """Generate speech from text"""
    response = api_request('/audio/speech', method='POST', json_data={
        'input': text,
        'voice': voice,
        'model': model
    })

    return response.content

# Usage
audio = generate_speech('Hello, world!', 'lyra', 'aurora-4')

Advanced Example with Options

def generate_speech_advanced(
    input_text: str,
    voice: str = 'lyra',
    model: str = 'aurora-3.5',
    pitch: str = '+0%',
    style: str = 'calm',
    style_degree: str = '1.5',
    lang: str = 'en-US'
) -> Dict:
    """Generate speech with advanced options"""

    response = api_request('/audio/speech', method='POST', json_data={
        'input': input_text,
        'voice': voice,
        'model': model,
        'pitch': pitch,
        'style': style,
        'styleDegree': style_degree,
        'lang': lang
    })

    # Get metadata from headers
    metadata = {
        'character_count': response.headers.get('X-Character-Count'),
        'model_used': response.headers.get('X-Model-Used'),
        'voice_used': response.headers.get('X-Voice-Used'),
        'content_length': response.headers.get('Content-Length')
    }

    return {
        'audio': response.content,
        'metadata': metadata
    }

# Usage
result = generate_speech_advanced(
    input_text='Welcome to our professional service!',
    voice='lyra',
    model='aurora-4',
    pitch='+10%',
    style='cheerful',
    style_degree='1.8'
)

print(f"Generated audio: {result['metadata']}")

Save Audio to File

def save_audio_to_file(
    text: str,
    filename: str,
    voice: str = 'lyra',
    model: str = 'aurora-3.5'
):
    """Generate speech and save to file"""
    audio = generate_speech(text, voice, model)

    with open(filename, 'wb') as f:
        f.write(audio)

    print(f'Audio saved to {filename}')

# Usage
save_audio_to_file(
    'This is a test message.',
    'output.mp3',
    'lyra',
    'aurora-4'
)

Stream Speech

Basic Streaming

def stream_speech(text: str, voice: str = 'lyra', model: str = 'aurora-3.5') -> bytes:
    """Stream speech generation"""
    response = api_request('/audio/stream', method='POST', json_data={
        'input': text,
        'voice': voice,
        'model': model
    })

    chunks = []
    for chunk in response.iter_content(chunk_size=8192):
        if chunk:
            chunks.append(chunk)
            print(f'Received chunk: {len(chunk)} bytes')

    return b''.join(chunks)

# Usage
streamed_audio = stream_speech('Streaming test message', 'kai', 'turbo-3')

Stream with Progress Callback

from typing import Callable

def stream_speech_with_progress(
    text: str,
    voice: str,
    model: str,
    on_progress: Optional[Callable] = None
) -> bytes:
    """Stream speech with progress callback"""
    response = api_request('/audio/stream', method='POST', json_data={
        'input': text,
        'voice': voice,
        'model': model
    })

    chunks = []
    received_bytes = 0

    for chunk in response.iter_content(chunk_size=8192):
        if chunk:
            chunks.append(chunk)
            received_bytes += len(chunk)

            if on_progress:
                on_progress({
                    'received_bytes': received_bytes,
                    'chunks': len(chunks),
                    'latest_chunk_size': len(chunk)
                })

    return b''.join(chunks)

# Usage
def progress_callback(progress):
    print(f"Progress: {progress['received_bytes']} bytes, {progress['chunks']} chunks")

audio = stream_speech_with_progress(
    'Long text content here...',
    'lyra',
    'aurora-3.5',
    on_progress=progress_callback
)

Check Balance

def get_balance() -> Dict:
    """Get account balance information"""
    response = api_request('/balance')
    result = response.json()
    return result['data']

# Usage
balance = get_balance()
print(f"Remaining credits: {balance['remaining_credits']:,}")
print(f"Plan: {balance['plan_type']}")
print(f"Days remaining: {balance['billing_cycle']['days_remaining']}")

List Voices

Get All Voices

def list_voices(filters: Optional[Dict] = None) -> Dict:
    """List available voices with optional filters"""
    params = filters or {}
    query_string = '&'.join([f'{k}={v}' for k, v in params.items()])
    endpoint = f"/voices/list{'?' + query_string if query_string else ''}"

    response = api_request(endpoint)
    result = response.json()
    return result['data']

# Usage
all_voices = list_voices()
print(f"Found {all_voices['pagination']['total']} voices")

Filter Voices

# Get female US English voices
female_voices = list_voices({
    'gender': 'female',
    'locale': 'en-US'
})

# Search for professional voices
professional_voices = list_voices({
    'search': 'professional'
})

# Paginated results
page1 = list_voices({'limit': 10, 'offset': 0})
page2 = list_voices({'limit': 10, 'offset': 10})

Get All Voices with Pagination

def get_all_voices() -> list:
    """Get all voices with pagination"""
    all_voices = []
    offset = 0
    limit = 50

    while True:
        result = list_voices({'limit': limit, 'offset': offset})
        all_voices.extend(result['voices'])

        print(f"Retrieved {len(all_voices)} of {result['pagination']['total']}")

        if len(all_voices) >= result['pagination']['total']:
            break

        offset += limit

    return all_voices

List Models

def list_models() -> Dict:
    """List available models"""
    response = api_request('/models/list')
    result = response.json()
    return result['data']

# Usage
models = list_models()
for model in models['models']:
    print(f"{model['name']}: {model['quality']} quality, {model['avg_latency_ms']}ms latency")

Error Handling

Basic Error Handling

def generate_speech_with_error_handling(text: str, voice: str, model: str) -> bytes:
    """Generate speech with error handling"""
    try:
        response = api_request('/audio/speech', method='POST', json_data={
            'input': text,
            'voice': voice,
            'model': model
        })
        return response.content
    except Exception as error:
        print(f'Speech generation failed: {error}')

        # Handle specific error types
        error_msg = str(error)
        if 'Rate limit' in error_msg:
            print('Rate limited. Please wait and retry.')
        elif 'Authorization' in error_msg:
            print('Check your API key.')

        raise

Custom Error Class

class NeoSpeechError(Exception):
    """Custom exception for NeoSpeech API errors"""

    def __init__(self, message: str, code: str, status: int, retryable: bool):
        super().__init__(message)
        self.code = code
        self.status = status
        self.retryable = retryable

def api_request_with_errors(endpoint: str, method: str = 'GET', json_data: Optional[Dict] = None):
    """API request with custom error handling"""
    url = f'{BASE_URL}{endpoint}'
    headers = {
        'Authorization': f'Bearer {API_KEY}',
        'Content-Type': 'application/json'
    }

    response = requests.request(
        method=method,
        url=url,
        headers=headers,
        json=json_data
    )

    if not response.ok:
        error = response.json()
        raise NeoSpeechError(
            error.get('message', f'HTTP {response.status_code}'),
            error.get('error_code'),
            response.status_code,
            error.get('retryable', False)
        )

    return response

# Usage
try:
    audio = generate_speech('test', 'lyra', 'aurora-4')
except NeoSpeechError as error:
    print(f"Error [{error.code}]: {error}")
    print(f"Retryable: {error.retryable}")

Retry with Exponential Backoff

import time

def generate_speech_with_retry(
    text: str,
    voice: str,
    model: str,
    max_retries: int = 3
) -> bytes:
    """Generate speech with retry logic"""
    for attempt in range(max_retries):
        try:
            return generate_speech(text, voice, model)
        except Exception as error:
            is_last_attempt = attempt == max_retries - 1

            # Check if error is retryable
            error_msg = str(error)
            is_retryable = '500' in error_msg or 'timeout' in error_msg.lower()

            if not is_retryable or is_last_attempt:
                raise

            # Exponential backoff
            delay = 2 ** attempt
            print(f"Attempt {attempt + 1} failed. Retrying in {delay}s...")
            time.sleep(delay)

Rate Limiting

Request Queue

import threading
import queue
import time
from typing import Callable, Any

class RequestQueue:
    """Queue for managing concurrent and rate-limited requests"""

    def __init__(self, max_concurrent: int = 18, requests_per_minute: int = 60):
        self.max_concurrent = max_concurrent
        self.requests_per_minute = requests_per_minute
        self.queue = queue.Queue()
        self.active = 0
        self.request_times = []
        self.lock = threading.Lock()

    def add(self, request_fn: Callable) -> Any:
        """Add a request to the queue"""
        result_queue = queue.Queue()

        def wrapped_request():
            try:
                result = request_fn()
                result_queue.put(('success', result))
            except Exception as e:
                result_queue.put(('error', e))

        self.queue.put(wrapped_request)
        self._process_queue()

        status, result = result_queue.get()
        if status == 'error':
            raise result
        return result

    def _process_queue(self):
        """Process queued requests"""
        with self.lock:
            if self.active >= self.max_concurrent:
                return

            # Check rate limit
            now = time.time()
            self.request_times = [t for t in self.request_times if now - t < 60]

            if len(self.request_times) >= self.requests_per_minute:
                oldest_request = min(self.request_times)
                delay = 60 - (now - oldest_request)
                threading.Timer(delay, self._process_queue).start()
                return

            if self.queue.empty():
                return

            request_fn = self.queue.get()
            self.active += 1
            self.request_times.append(now)

            def execute():
                try:
                    request_fn()
                finally:
                    with self.lock:
                        self.active -= 1
                    self._process_queue()

            threading.Thread(target=execute).start()

# Usage
request_queue = RequestQueue(18, 60)

def queued_generate_speech(text: str, voice: str, model: str) -> bytes:
    return request_queue.add(
        lambda: generate_speech(text, voice, model)
    )

# Process multiple requests safely
texts = ['text1', 'text2', 'text3']
audios = [queued_generate_speech(text, 'lyra', 'aurora-3.5') for text in texts]

Complete SDK Example

class NeoSpeechClient:
    """Complete SDK client for NeoSpeech API"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = 'https://api.neospeech.io/v1'

    def _request(self, endpoint: str, method: str = 'GET', json_data: Optional[Dict] = None):
        """Make API request"""
        url = f'{self.base_url}{endpoint}'
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }

        response = requests.request(
            method=method,
            url=url,
            headers=headers,
            json=json_data
        )

        if not response.ok:
            error = response.json()
            raise Exception(error.get('message', f'HTTP {response.status_code}'))

        return response

    def generate_speech(self, options: Dict) -> bytes:
        """Generate speech from text"""
        response = self._request('/audio/speech', method='POST', json_data=options)
        return response.content

    def stream_speech(self, options: Dict) -> bytes:
        """Stream speech generation"""
        response = self._request('/audio/stream', method='POST', json_data=options)

        chunks = []
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                chunks.append(chunk)

        return b''.join(chunks)

    def get_balance(self) -> Dict:
        """Get account balance"""
        response = self._request('/balance')
        result = response.json()
        return result['data']

    def list_voices(self, filters: Optional[Dict] = None) -> Dict:
        """List available voices"""
        params = filters or {}
        query_string = '&'.join([f'{k}={v}' for k, v in params.items()])
        endpoint = f"/voices/list{'?' + query_string if query_string else ''}"

        response = self._request(endpoint)
        result = response.json()
        return result['data']

    def list_models(self) -> Dict:
        """List available models"""
        response = self._request('/models/list')
        result = response.json()
        return result['data']

# Usage
client = NeoSpeechClient(os.getenv('NEOSPEECH_API_KEY'))

# Generate speech
audio = client.generate_speech({
    'input': 'Hello, world!',
    'voice': 'lyra',
    'model': 'aurora-4'
})

# Check balance
balance = client.get_balance()
print(f"Credits: {balance['remaining_credits']}")

# List voices
voices = client.list_voices({'gender': 'female'})
print(f"Found {len(voices['voices'])} female voices")

Async Implementation

import aiohttp
import asyncio

class AsyncNeoSpeechClient:
    """Async SDK client for NeoSpeech API"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = 'https://api.neospeech.io/v1'

    async def _request(self, endpoint: str, method: str = 'GET', json_data: Optional[Dict] = None):
        """Make async API request"""
        url = f'{self.base_url}{endpoint}'
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }

        async with aiohttp.ClientSession() as session:
            async with session.request(
                method=method,
                url=url,
                headers=headers,
                json=json_data
            ) as response:
                if response.status != 200:
                    error = await response.json()
                    raise Exception(error.get('message', f'HTTP {response.status}'))

                return response

    async def generate_speech(self, options: Dict) -> bytes:
        """Generate speech asynchronously"""
        response = await self._request('/audio/speech', method='POST', json_data=options)
        return await response.read()

    async def get_balance(self) -> Dict:
        """Get account balance asynchronously"""
        response = await self._request('/balance')
        result = await response.json()
        return result['data']

    async def list_voices(self, filters: Optional[Dict] = None) -> Dict:
        """List voices asynchronously"""
        params = filters or {}
        query_string = '&'.join([f'{k}={v}' for k, v in params.items()])
        endpoint = f"/voices/list{'?' + query_string if query_string else ''}"

        response = await self._request(endpoint)
        result = await response.json()
        return result['data']

# Usage
async def main():
    client = AsyncNeoSpeechClient(os.getenv('NEOSPEECH_API_KEY'))

    # Generate multiple speeches concurrently
    tasks = [
        client.generate_speech({
            'input': f'Message {i}',
            'voice': 'lyra',
            'model': 'aurora-3.5'
        })
        for i in range(10)
    ]

    audios = await asyncio.gather(*tasks)
    print(f"Generated {len(audios)} audio files")

asyncio.run(main())

JavaScript Examples

JavaScript/Node.js examples

cURL Examples

Command-line examples

API Reference

Complete API documentation

Best Practices

Optimization tips