AI Insights
Featured

Hành trình xây dựng chatbot AI trên Zalo API

Chia sẻ kinh nghiệm tích hợp AI với Zalo API để tạo chatbot thông minh

20/1/2024
12 phút
AI
Chatbot
Zalo API
Python

Hành trình xây dựng chatbot AI trên Zalo API

Trong bài viết này, tôi sẽ chia sẻ kinh nghiệm của mình khi xây dựng một chatbot AI tích hợp với Zalo API. Đây là một dự án thú vị và đầy thách thức, giúp tôi học hỏi được nhiều kiến thức về AI và API integration.

Tại sao chọn Zalo API?

Zalo là một trong những ứng dụng nhắn tin phổ biến nhất tại Việt Nam với hơn 100 triệu người dùng. Việc tích hợp chatbot với Zalo API mang lại nhiều lợi ích:

👥
Tiếp cận đông đảo người dùng

Zalo có lượng người dùng lớn tại Việt Nam

API mạnh mẽ
Zalo cung cấp API đầy đủ tính năng
🔧
Tích hợp dễ dàng

Documentation rõ ràng và community hỗ trợ tốt

💰
Miễn phí
API cơ bản hoàn toàn miễn phí

Kiến trúc hệ thống

Sơ đồ tổng quan

Kiến trúc hệ thống chatbot

User (Zalo) → Zalo API → Webhook → Flask App → AI Model → Response

Các thành phần chính

  1. Zalo Webhook: Nhận tin nhắn từ người dùng
  2. Flask Application: Xử lý logic và tích hợp AI
  3. AI Model: Xử lý và trả lời tin nhắn
  4. Database: Lưu trữ dữ liệu cuộc trò chuyện

Triển khai từng bước

1. Thiết lập Zalo App

Zalo Developer Console

# Cấu hình Zalo App
ZALO_CONFIG = {
    'app_id': 'your_app_id',
    'app_secret': 'your_app_secret',
    'webhook_url': 'https://yourdomain.com/webhook',
    'access_token': 'your_access_token'
}

2. Tạo Flask Application

from flask import Flask, request, jsonify
import requests
import json

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def webhook():
    try:
        data = request.get_json()

        # Xác thực webhook
        if not verify_webhook(data):
            return jsonify({'error': 'Invalid webhook'}), 400

        # Xử lý tin nhắn
        message = data.get('message', {})
        user_id = data.get('sender', {}).get('id')

        # Gửi đến AI model
        response = process_with_ai(message.get('text', ''))

        # Gửi phản hồi về Zalo
        send_message_to_zalo(user_id, response)

        return jsonify({'status': 'success'})

    except Exception as e:
        print(f"Error: {e}")
        return jsonify({'error': str(e)}), 500

def verify_webhook(data):
    """Xác thực webhook từ Zalo"""
    # Implementation xác thực
    return True

def process_with_ai(text):
    """Xử lý tin nhắn với AI model"""
    # Gọi AI model (OpenAI, local model, etc.)
    return ai_model.generate_response(text)

def send_message_to_zalo(user_id, message):
    """Gửi tin nhắn về Zalo"""
    url = f"https://openapi.zalo.me/v2.0/oa/message"

    headers = {
        'Content-Type': 'application/json',
        'access_token': ZALO_CONFIG['access_token']
    }

    payload = {
        'recipient': {'user_id': user_id},
        'message': {'text': message}
    }

    response = requests.post(url, headers=headers, json=payload)
    return response.json()

3. Tích hợp AI Model

AI Model Integration

import openai
from transformers import pipeline

class AIModel:
    def __init__(self):
        # Sử dụng OpenAI API
        openai.api_key = "your_openai_api_key"

        # Hoặc sử dụng local model
        self.local_model = pipeline(
            "text-generation",
            model="microsoft/DialoGPT-medium"
        )

    def generate_response(self, user_input):
        """Tạo phản hồi từ AI model"""

        # Sử dụng OpenAI
        try:
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": "Bạn là một chatbot thân thiện và hữu ích."},
                    {"role": "user", "content": user_input}
                ],
                max_tokens=150,
                temperature=0.7
            )
            return response.choices[0].message.content

        except Exception as e:
            # Fallback về local model
            return self.generate_local_response(user_input)

    def generate_local_response(self, user_input):
        """Sử dụng local model khi OpenAI không khả dụng"""
        response = self.local_model(
            user_input,
            max_length=100,
            num_return_sequences=1,
            temperature=0.7
        )
        return response[0]['generated_text']

4. Xử lý các loại tin nhắn

def handle_message(message_data):
    """Xử lý các loại tin nhắn khác nhau"""

    message_type = message_data.get('type')

    if message_type == 'text':
        return handle_text_message(message_data)
    elif message_type == 'image':
        return handle_image_message(message_data)
    elif message_type == 'location':
        return handle_location_message(message_data)
    elif message_type == 'sticker':
        return handle_sticker_message(message_data)
    else:
        return "Xin lỗi, tôi chưa thể xử lý loại tin nhắn này."

def handle_text_message(message_data):
    """Xử lý tin nhắn văn bản"""
    text = message_data.get('text', '')

    # Xử lý các lệnh đặc biệt
    if text.startswith('/help'):
        return get_help_message()
    elif text.startswith('/weather'):
        return get_weather_info(text)
    elif text.startswith('/joke'):
        return get_random_joke()
    else:
        # Xử lý với AI model
        return ai_model.generate_response(text)

def handle_image_message(message_data):
    """Xử lý tin nhắn hình ảnh"""
    # Tải hình ảnh từ Zalo
    image_url = message_data.get('url')

    # Xử lý với AI vision model
    description = analyze_image(image_url)

    return f"Tôi thấy trong hình ảnh: {description}"

def analyze_image(image_url):
    """Phân tích hình ảnh với AI"""
    # Implementation với OpenAI Vision hoặc local model
    return "Một hình ảnh thú vị"

5. Quản lý trạng thái cuộc trò chuyện

Database Schema

from datetime import datetime, timedelta
import sqlite3

class ConversationManager:
    def __init__(self):
        self.db_path = 'conversations.db'
        self.init_database()

    def init_database(self):
        """Khởi tạo database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS conversations (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT,
                message TEXT,
                response TEXT,
                timestamp DATETIME
            )
        ''')

        conn.commit()
        conn.close()

    def save_conversation(self, user_id, message, response):
        """Lưu cuộc trò chuyện"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT INTO conversations (user_id, message, response, timestamp)
            VALUES (?, ?, ?, ?)
        ''', (user_id, message, response, datetime.now()))

        conn.commit()
        conn.close()

    def get_conversation_history(self, user_id, limit=10):
        """Lấy lịch sử cuộc trò chuyện"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            SELECT message, response, timestamp
            FROM conversations
            WHERE user_id = ?
            ORDER BY timestamp DESC
            LIMIT ?
        ''', (user_id, limit))

        history = cursor.fetchall()
        conn.close()

        return history

6. Xử lý lỗi và logging

import logging
from functools import wraps

# Cấu hình logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('chatbot.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def error_handler(func):
    """Decorator để xử lý lỗi"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.error(f"Error in {func.__name__}: {str(e)}")
            return "Xin lỗi, đã có lỗi xảy ra. Vui lòng thử lại sau."
    return wrapper

@error_handler
def process_message_safely(message):
    """Xử lý tin nhắn một cách an toàn"""
    return ai_model.generate_response(message)

Tối ưu hóa và cải thiện

Performance Optimization

1. Caching responses

import redis
import hashlib
import json

class ResponseCache:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)

    def get_cache_key(self, message):
        """Tạo cache key từ tin nhắn"""
        return hashlib.md5(message.encode()).hexdigest()

    def get_cached_response(self, message):
        """Lấy phản hồi từ cache"""
        key = self.get_cache_key(message)
        cached = self.redis_client.get(key)

        if cached:
            return json.loads(cached)
        return None

    def cache_response(self, message, response):
        """Lưu phản hồi vào cache"""
        key = self.get_cache_key(message)
        self.redis_client.setex(key, 3600, json.dumps(response))  # Cache 1 giờ

2. Rate limiting

from collections import defaultdict
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self):
        self.user_requests = defaultdict(list)

    def is_rate_limited(self, user_id, max_requests=10, window_minutes=1):
        """Kiểm tra rate limit"""
        now = datetime.now()
        window_start = now - timedelta(minutes=window_minutes)

        # Lọc requests trong window
        user_requests = [
            req_time for req_time in self.user_requests[user_id]
            if req_time > window_start
        ]

        self.user_requests[user_id] = user_requests

        return len(user_requests) >= max_requests

    def add_request(self, user_id):
        """Thêm request mới"""
        self.user_requests[user_id].append(datetime.now())

3. Analytics và monitoring

class Analytics:
    def __init__(self):
        self.stats = {
            'total_messages': 0,
            'unique_users': set(),
            'response_times': [],
            'error_count': 0
        }

    def track_message(self, user_id, response_time, has_error=False):
        """Theo dõi tin nhắn"""
        self.stats['total_messages'] += 1
        self.stats['unique_users'].add(user_id)
        self.stats['response_times'].append(response_time)

        if has_error:
            self.stats['error_count'] += 1

    def get_stats(self):
        """Lấy thống kê"""
        return {
            'total_messages': self.stats['total_messages'],
            'unique_users': len(self.stats['unique_users']),
            'avg_response_time': sum(self.stats['response_times']) / len(self.stats['response_times']) if self.stats['response_times'] else 0,
            'error_rate': self.stats['error_count'] / self.stats['total_messages'] if self.stats['total_messages'] > 0 else 0
        }

Kết quả và bài học

Success Metrics

Thành công đạt được

  • Tích hợp thành công với Zalo API
  • Xử lý được nhiều loại tin nhắn (text, image, location)
  • AI model hoạt động ổn định với fallback mechanism
  • Database lưu trữ lịch sử cuộc trò chuyện
  • Error handling và logging đầy đủ

Những thách thức gặp phải

  1. Rate limiting: Zalo có giới hạn số lượng request
  2. Webhook reliability: Cần xử lý webhook timeout và retry
  3. AI model latency: Response time có thể chậm
  4. Scalability: Cần tối ưu khi có nhiều user đồng thời

Bài học kinh nghiệm

  • Luôn có fallback mechanism khi AI model không khả dụng
  • Implement proper error handling và logging
  • Cache responses để giảm latency
  • Monitor performance và user behavior
  • Test thoroughly trước khi deploy

Code hoàn chỉnh

# main.py
from flask import Flask, request, jsonify
import requests
import json
import logging
from datetime import datetime
from conversation_manager import ConversationManager
from ai_model import AIModel
from analytics import Analytics

app = Flask(__name__)

# Initialize components
conversation_manager = ConversationManager()
ai_model = AIModel()
analytics = Analytics()

@app.route('/webhook', methods=['POST'])
def webhook():
    start_time = datetime.now()

    try:
        data = request.get_json()
        user_id = data.get('sender', {}).get('id')
        message = data.get('message', {})

        # Process message
        response = handle_message(message)

        # Send response to Zalo
        send_message_to_zalo(user_id, response)

        # Save conversation
        conversation_manager.save_conversation(
            user_id,
            message.get('text', ''),
            response
        )

        # Track analytics
        response_time = (datetime.now() - start_time).total_seconds()
        analytics.track_message(user_id, response_time)

        return jsonify({'status': 'success'})

    except Exception as e:
        logging.error(f"Webhook error: {str(e)}")
        analytics.track_message(user_id, 0, has_error=True)
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

Kết luận

Việc xây dựng chatbot AI tích hợp với Zalo API là một dự án thú vị và bổ ích. Nó giúp tôi hiểu sâu hơn về:

  • API integration và webhook handling
  • AI model deployment và optimization
  • Error handling và monitoring
  • Scalability và performance tuning

Dự án này đã mở ra nhiều cơ hội mới và giúp tôi phát triển kỹ năng trong lĩnh vực AI và chatbot development.


Chúc bạn thành công trong việc xây dựng chatbot AI! 🤖

My photo

Tân Đoàn

AI Automation • Python Dev

Trực tuyến
Portfolio Website
Hồ Chí Minh, VN
Tân Đoàn
Data Scientist với niềm đam mê chia sẻ kiến thức về AI, Machine Learning và Python.
Tìm hiểu thêm