FastApply Logo

Integrating Large Language Models in Production Applications

· Calculating... · FastApply Team
Integrating Large Language Models in Production Applications

In this practical guide, you will learn how to create a highly scalable model deployment solution with built-in LLMs for your applications. In your examples, we will use Hugging Face’s ChatGPT2 model, but you can easily plug in any other model including ChatGPT4, Claude, etc. Whether you are designing a new application with AI capabilities or improving the existing AI systems, this guide will help you step by step to create a strong LLM integration.

Understanding LLM Integration Fundamentals

Before we start writing code, let’s figure out what it takes to build a production LLM integration. API calls are not the only thing you need to consider when building production-ready LLM integration, you also need to consider things like reliability, cost, and stability. Your production applications must address issues such as service outages, rate limits, and variability in response time while keeping costs under control. Here’s what we’ll build together:

  • A robust API client that gracefully handles failures
  • A smart caching system to optimize costs and speed
  • A proper prompt management system
  • Comprehensive error handling and monitoring
  • A complete content moderation system as your example project

Prerequisites

Before we start coding, make sure you have:

Want to follow along? The complete code is available in your GitHub repository.

Setting Up your Development Environment

Let’s start by getting your development environment ready. We’ll create a clean project structure and install all the necessary packages.

First, let’s create your project directory and set up a Python virtual environment. Open your terminal and run:

mkdir llm_integration && cd llm_integration
python3 -m venv env
syource env/bin/activate

Now let’s set up your project dependencies. Create a new requirements.txt file with these essential packages:

transformers==4.36.0
huggingface-hub==0.19.4
redis==4.6.0
pydantic==2.5.0
pydantic-settings==2.1.0
tenacity==8.2.3
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
torch==2.1.0
numpy==1.24.3

Let’s break down why we need each of these packages:

  • transformers: This is Hugging Face’s powerful library that we will be using to interface with the Qwen2.5-Coder model.
  • huggingface-hub: Enables us to handle model loading and versioning redis: For implementing request caching
  • pydantic: Used for data validation and settings.
  • tenacity: Responsible for your retrying functionality for increased reliability
  • python-dotenv: For loading environment variables
  • fastapi: Builds your API endpoints with a small amount of code
  • uvicorn: Used for running your FastAPI application with great efficiency
  • torch: For running transformer models and handling machine learning operations
  • numpy: Used for numerical computing.

Install all the packages with the command:

pip install -r requirements.txt

Let’s organize your project with a clean structure. Create these directories and files in your project directory:

llm_integration/
├── core/
│   ├── llm_client.py      # your main LLM interaction code
│   ├── prompt_manager.py  # Handles prompt templates
│   └── response_handler.py # Processes LLM responses
├── cache/
│   └── redis_manager.py   # Manages your caching system
├── config/
│   └── settings.py        # Configuration management
├── api/
│   └── routes.py          # API endpoints
├── utils/
│   ├── monitoring.py      # Usage tracking
│   └── rate_limiter.py    # Rate limiting logic
├── requirements.txt
└── main.py
└── usage_logs.json       

Building the LLM Client

Let’s start with your LLM client which is the most important component of your application. This is where we’ll interact with the ChatGPT model (or any other LLM you prefer). Add the following code snippets to your core/llm_client.py file:

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Dict, Optional
import logging

class LLMClient:
    def __init__(self, model_name: str = "gpt2", timeout: int = 30):
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                device_map="auto",
                torch_dtype=torch.float16
            )
        except Exception as e:
            logging.error(f"Error loading model: {str(e)}")
            # Fallback to a simpler model if the specified one fails
            self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
            self.model = AutoModelForCausalLM.from_pretrained("gpt2")
            
        self.timeout = timeout
        self.logger = logging.getLogger(__name__)

In this first part of yourLLMClient class, we’re setting up the foundation:

  • We are using AutoModelForCausalLM and AutoTokenizer from the transformers library to load your model
  • The device_map="auto" parameter automatically handles GPU/CPU allocation
  • We’re using torch.float16 to optimize memory usage while maintaining good performance

Now let’s add the method that talks to your model:

 @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        reraise=True
    )
    async def complete(self, 
                      prompt: str, 
                      temperature: float = 0.7,
                      max_tokens: Optional[int] = None) -> Dict:
        """Get completion from the model with automatic retries"""
        try:
            inputs = self.tokenizer(prompt, return_tensors="pt").to(
                self.model.device
            )
            
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=max_tokens or 100,
                    temperature=temperature,
                    do_sample=True
                )
            
            response_text = self.tokenizer.decode(
                outputs[0], 
                skip_special_tokens=True
            )
            
            # Calculate token usage for monitoring
            input_tokens = len(inputs.input_ids[0])
            output_tokens = len(outputs[0]) - input_tokens
            
            return {
                'content': response_text,
                'usage': {
                    'prompt_tokens': input_tokens,
                    'completion_tokens': output_tokens,
                    'total_tokens': input_tokens + output_tokens
                },
                'model': "gpt2"
            }
            
        except Exception as e:
            self.logger.error(f"Error in LLM completion: {str(e)}")
            raise

Let’s break down what’s happening in this completion method:

  • Added @retry decorator method to deal with temporary failures.
  • Used torch.no_grad() context manager to save memory by disabling gradient calculations.
  • Tracking the token usage in both input and output which is very important for costing.
  • Returns a structured dictionary with the response and usage statistics.

Creating your LLM Response Handler

Next, we need to add the response handler to parse and structure the LLM’s raw output. Do that in your core/response_handler.py file with the following code snippets:

from typing import Dict
import logging

class ResponseHandler:
    def __init__(self):
        self.logger = logging.getLogger(__name__)

    def parse_moderation_response(self, raw_response: str) -> Dict:
        """Parse and structure the raw LLM response for moderation"""
        try:
            # Default response structure
            structured_response = {
                "is_appropriate": True,
                "confidence_score": 0.0,
                "reason": None
            }
            
            # Simple keyword-based analysis
            lower_response = raw_response.lower()
            
            # Check for inappropriate content signals
            if any(word in lower_response for word in ['inappropriate', 'unsafe', 'offensive', 'harmful']):
                structured_response["is_appropriate"] = False
                structured_response["confidence_score"] = 0.9
                # Extract reason if present
                if "because" in lower_response:
                    reason_start = lower_response.find("because")
                    structured_response["reason"] = raw_response[reason_start:].split('.')[0].strip()
            else:
                structured_response["confidence_score"] = 0.95
            
            return structured_response
            
        except Exception as e:
            self.logger.error(f"Error parsing response: {str(e)}")
            return {
                "is_appropriate": True,
                "confidence_score": 0.5,
                "reason": "Failed to parse response"
            }

    def format_response(self, raw_response: Dict) -> Dict:
        """Format the final response with parsed content and usage stats"""
        try:
            return {
                "content": self.parse_moderation_response(raw_response["content"]),
                "usage": raw_response["usage"],
                "model": raw_response["model"]
            }
        except Exception as e:
            self.logger.error(f"Error formatting response: {str(e)}")
            raise

Adding a Robust Caching System

Now let’s create your caching system to improve the application performance and reduce costs. Add the following code snippets to your cache/redis_manager.py file:

import redis
from typing import Optional, Any
import json
import hashlib

class CacheManager:
    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
        
    def _generate_key(self, prompt: str, params: dict) -> str:
        """Generate a unique cache key"""
        cache_data = {
            'prompt': prompt,
            'params': params
        }
        serialized = json.dumps(cache_data, sort_keys=True)
        return hashlib.sha256(serialized.encode()).hexdigest()
        
    async def get_cached_response(self, 
                                prompt: str, 
                                params: dict) -> Optional[dict]:
        """Retrieve cached LLM response"""
        key = self._generate_key(prompt, params)
        cached = self.redis.get(key)
        return json.loads(cached) if cached else None
        
    async def cache_response(self, 
                           prompt: str, 
                           params: dict, 
                           response: dict) -> None:
        """Cache LLM response"""
        key = self._generate_key(prompt, params)
        self.redis.setex(
            key,
            self.ttl,
            json.dumps(response)
        )

In the above code snippets, we created a CacheManager class that handles all caching operations with the following:

  • The _generate_key method, which creates unique cache keys based on prompts and parameters
  • get_cached_response which checks if we have a cached response for a given prompt
  • cache_response that stores successful responses for future use

Creating a Smart Prompt Manager

Let’s create your prompt manager that will manage the prompts for your LLM model. Add the following code to your core/prompt_manager.py:

from typing import Dict, Optional
from pydantic import BaseModel
import json
import os

class Prompt(BaseModel):
    template: str
    version: str
    description: Optional[str]
    
class PromptManager:
    def __init__(self, prompts_dir: str = "prompts"):
        self.prompts_dir = prompts_dir
        self.prompts: Dict[str, Prompt] = {}
        self._load_prompts()
        
    def _load_prompts(self):
        """Load prompt templates from JSON files"""
        if not os.path.exists(self.prompts_dir):
            os.makedirs(self.prompts_dir)
            
        for filename in os.listdir(self.prompts_dir):
            if filename.endswith('.json'):
                with open(os.path.join(self.prompts_dir, filename)) as f:
                    prompt_data = json.load(f)
                    prompt_name = filename.replace('.json', '')
                    self.prompts[prompt_name] = Prompt(**prompt_data)
    
    def get_prompt(self, name: str, **kwargs) -> str:
        """Get a formatted prompt template"""
        if name not in self.prompts:
            raise KeyError(f"Prompt template '{name}' not found")
            
        prompt = self.prompts[name]
        return prompt.template.format(**kwargs)

Then create a sample prompt template for content moderation in your prompts/content_moderation.json file with code snippets:

{
    "template": "Analyze if this message is appropriate or not. Respond with whether it is appropriate or inappropriate, and explain why:\n\nMessage: {content}\n\nAnalysis:",
    "version": "1.0",
    "description": "Template for content moderation tasks"
}

Now your prompt manager will be able to load prompt templates from your JSON file and get also get a formatted prompt template.

Setting Up a Configuration Manager

To keep all your LLM configurations in one place and easily reuse them across your application, let’s create configuration settings. Add the code below to your config/settings.py file:

from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Optional

class Settings(BaseSettings):
    huggingface_api_key: str
    redis_url: str = "redis://localhost:6379"
    cache_ttl: int = 3600
    llm_model_name: str = "gpt2"
    request_timeout: int = 30
    max_retries: int = 3
    rate_limit_requests: int = 100
    rate_limit_period: int = 60
    
    model_config = SettingsConfigDict(
        env_file=".env",
        protected_namespaces=()
    )

settings = Settings()

Implementing Rate Limiting

Next, let’s implement rate limiting to control how users access your application’s resources. To do that, add the following code to your utils/rate_limiter.py file:

import time
from collections import deque
from typing import Deque, Tuple

class RateLimiter:
    def __init__(self, requests: int, period: int):
        self.requests = requests
        self.period = period
        self.timestamps: Deque[float] = deque()
    
    async def check_rate_limit(self) -> bool:
        """Check if request is within rate limits"""
        now = time.time()
        
        while self.timestamps and now - self.timestamps[0] > self.period:
            self.timestamps.popleft()
        
        if len(self.timestamps) >= self.requests:
            return False
            
        self.timestamps.append(now)
        return True

In the RateLimiter we implemented a resuable check_rate_limit method that can be used in any route to handle rate limiting by simply passing the period and number of requests allowed for each user for period of time.

Creating your API Endpoints

Now let’s create your API endpoints in the api/routes.py file to integrate your LLM in your application:

from functools import lru_cache
from fastapi import APIRouter, HTTPException, Depends
from typing import Dict
from core.llm_client import LLMClient
from core.prompt_manager import PromptManager
from cache.redis_manager import CacheManager
from config.settings import settings
from utils.monitoring import UsageMonitor
from utils.rate_limiter import RateLimiter
from core.response_handler import ResponseHandler

router = APIRouter()
rate_limiter = RateLimiter(
    requests=settings.rate_limit_requests,
    period=settings.rate_limit_period
)

usage_monitor = UsageMonitor()

# Dependency injection functions
@lru_cache()
def get_llm_client() -> LLMClient:
    """Provide LLM client instance"""
    return LLMClient(
        model_name=settings.llm_model_name,
        timeout=settings.request_timeout
    )

@lru_cache()
def get_response_handler() -> ResponseHandler:
    """Provide response handler instance"""
    return ResponseHandler()

@lru_cache()
def get_cache_manager() -> CacheManager:
    """Provide cache manager instance"""
    return CacheManager(
        redis_url=settings.redis_url,
        ttl=settings.cache_ttl
    )

@lru_cache()
def get_prompt_manager() -> PromptManager:
    """Provide prompt manager instance"""
    return PromptManager()

@router.post("/moderate")
async def moderate_content(
    content: Dict[str, str],
    llm_client: LLMClient = Depends(get_llm_client),
    cache_manager: CacheManager = Depends(get_cache_manager),
    prompt_manager: PromptManager = Depends(get_prompt_manager),
    response_handler: ResponseHandler = Depends(get_response_handler)
):
    """Moderate content using LLM"""
    if not await rate_limiter.check_rate_limit():
        raise HTTPException(
            status_code=429, 
            detail="Rate limit exceeded"
        )
        
    try:
        prompt = prompt_manager.get_prompt(
            "content_moderation",
            content=content["text"]
        )
        
        # Check cache first
        cached_response = await cache_manager.get_cached_response(
            prompt,
            {'temperature': 0.3}
        )
        if cached_response:
            return cached_response
            
        # Get LLM response
        raw_response = await llm_client.complete(
            prompt=prompt,
            temperature=0.3
        )
        
        # Process the response
        formatted_response = response_handler.format_response(raw_response)
        
        # Cache the processed response
        await cache_manager.cache_response(
            prompt,
            {'temperature': 0.3},
            formatted_response
        )
        
        return formatted_response
        
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Error processing request: {str(e)}"
        )

Here we defined a /moderate endpoint in the APIRouter class, which is responsible for organizing API routes. The @lru_cache decorator is applied to dependency injection functions (get_llm_client, get_response_handler, get_cache_manager, and get_prompt_manager) to ensure that instances of LLMClient, CacheManager, and PromptManager are cached for better performance. The moderate_content function, decorated with @router.post, defines a POST route for content moderation and utilizes FastAPI’s Depends mechanism to inject these dependencies. Inside the function, the RateLimiter class, configured with rate limit settings from settings, enforces request limits.

Finally, let’s update your main.py to bring everything together:

from fastapi import FastAPI
from api.routes import router
import uvicorn
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

app = FastAPI(title="LLM Integration API")
app.include_router(router, prefix="/api/v1")

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=True
    )

In the above code, we’ve created a FastAPI app and the router using api.routes under the /api/v1 prefix. Enabled logging to display informational messages with timestamps. The app will run localhost:8000 using Uvicorn, with hot-reloading enabled.

Running your Application

We now have all the components in place, let’s start getting your application up and running. First, create a .env file in your project root directory and add your HUGGINGFACE_API_KEY and REDIS_URL:

HUGGINGFACE_API_KEY=your_api_key_here
REDIS_URL=redis://localhost:6379

Then ensure Redis is running on your machine. On most Unix-based systems, you can start it with the command:

redis-server

Now you can start your application:

python main.py

your FastAPI server will start running on http://localhost:8000. The automatic API documentation will be available at http://localhost:8000/docs - this is super helpful for testing your endpoints!

llm integration fastapi automatic api docs

Testing your Content Moderation API

Let’s test your newly created API with a real request. Open a new terminal and run this curl command:

curl -X POST http://localhost:8000/api/v1/moderate \
  -H "Content-Type: application/json" \
  -d '{"text": "This is a test message that needs moderation."}'

You should see a response like this on your terminal:

running the llm content moderator

Adding Monitoring and Analytics

Now let’s add some monitoring features to track how your application is performing and how much resyources is being used. Add the following code to your utils/monitoring.py file:

from datetime import datetime
import logging
from typing import Dict
import json

class UsageMonitor:
    def __init__(self, log_file: str = "usage_logs.json"):
        self.log_file = log_file
        self.logger = logging.getLogger(__name__)
        
    async def log_request(self, 
                         endpoint: str, 
                         response: Dict, 
                         duration_ms: float):
        """Log API request details"""
        try:
            log_entry = {
                "timestamp": datetime.now().isoformat(),
                "endpoint": endpoint,
                "tokens_used": response["usage"]["total_tokens"],
                "duration_ms": duration_ms,
                "model": response["model"]
            }
            
            with open(self.log_file, "a") as f:
                f.write(json.dumps(log_entry) + "\n")
                
        except Exception as e:
            self.logger.error(f"Error logging usage: {str(e)}")

The UsageMonitor class will be performing the following operations:

  • Tracking every API request with timestamps
  • Recording token usage for cost monitoring
  • Measuring response times
  • Storing everything in a structured log file (replace this with a database before you deploy your application to production)

Next, add a new method to calculate usage statistics:

  async def get_usage_statistics(self) -> Dict:
        """Calculate usage statistics"""
        try:
            total_tokens = 0
            total_requests = 0
            total_duration = 0
            
            with open(self.log_file, "r") as f:
                for line in f:
                    entry = json.loads(line)
                    total_tokens += entry["tokens_used"]
                    total_requests += 1
                    total_duration += entry["duration_ms"]
            
            return {
                "total_tokens": total_tokens,
                "total_requests": total_requests,
                "average_latency": total_duration / total_requests if total_requests > 0 else 0,
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Error calculating statistics: {str(e)}")
            return {}

Update your API to add the monitoring features from the UsageMonitor class:

#...
from utils.monitoring import UsageMonitor
import time

#...
usage_monitor = UsageMonitor()

@router.get("/stats")
async def get_statistics():
    """Get usage statistics"""
    return await usage_monitor.get_usage_statistics()


@router.post("/moderate")
async def moderate_content(
    content: Dict[str, str],
    llm_client: LLMClient = Depends(get_llm_client),
    cache_manager: CacheManager = Depends(get_cache_manager),
    prompt_manager: PromptManager = Depends(get_prompt_manager),
    response_handler: ResponseHandler = Depends(get_response_handler)  # Add this
):
    start_time = time.time()

    """Moderate content using LLM"""
    if not await rate_limiter.check_rate_limit():
        raise HTTPException(
            status_code=429, 
            detail="Rate limit exceeded"
        )
        
    try:
        #...

        # Add monitoring
        duration_ms = (time.time() - start_time) * 1000
        print(duration_ms)
        await usage_monitor.log_request(
            "/moderate", 
            formatted_response, 
            duration_ms
        )
        
        return formatted_response
        
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Error processing request: {str(e)}"
        )  

Now, test your /stats endpoint by running this curl command:

curl -X GET http://localhost:8000/api/v1/stats

The above command will show you the stats of your requests on the /moderate endpoint as shown in the screenshot below:

testing llm moderator monitoring

Conclusion

Throughout this tutorial, have learned how to use a large language model in production applications. You implemented features like API clients, caching, prompt management, and error handling. As an example of these concepts, you developed a content moderation system.

Now that you have a solid foundation, you could enhance your system with:

  • Streaming responses for real-time applications
  • A/B testing for an immediate improvement
  • A web-based interface to manage prompts
  • Custom model fine-tuning
  • Integration with third-party monitoring services

Please recall that in the examples you used the ChatGPT2 model, but you can adapt this system to work with any LLM provider. So choose the model that meets your requirements and is within your budget.

Please don’t hesitate to contact me if you have questions or if you want to tell me what you are building with this system.

Happy coding! 🚀

#llm-integration

#ai-deployment

#language-models

#production-ai

#enterprise-llm

#ai-architecture

#model-optimization

#llm-performance

FastApply Team

FastApply Team

AI/ML Engineers