Hành trình xây dựng chatbot AI trên Zalo API
Chia sẻ kinh nghiệm tích hợp AI với Zalo API để tạo chatbot thông minh
Hành trình xây dựng chatbot AI trên Zalo API
Trong bài viết này, tôi sẽ chia sẻ kinh nghiệm của mình khi xây dựng một chatbot AI tích hợp với Zalo API. Đây là một dự án thú vị và đầy thách thức, giúp tôi học hỏi được nhiều kiến thức về AI và API integration.
Tại sao chọn Zalo API?
Zalo là một trong những ứng dụng nhắn tin phổ biến nhất tại Việt Nam với hơn 100 triệu người dùng. Việc tích hợp chatbot với Zalo API mang lại nhiều lợi ích:
Zalo có lượng người dùng lớn tại Việt Nam
Documentation rõ ràng và community hỗ trợ tốt
Kiến trúc hệ thống
Sơ đồ tổng quan
User (Zalo) → Zalo API → Webhook → Flask App → AI Model → Response
Các thành phần chính
- Zalo Webhook: Nhận tin nhắn từ người dùng
- Flask Application: Xử lý logic và tích hợp AI
- AI Model: Xử lý và trả lời tin nhắn
- Database: Lưu trữ dữ liệu cuộc trò chuyện
Triển khai từng bước
1. Thiết lập Zalo App
# Cấu hình Zalo App
ZALO_CONFIG = {
'app_id': 'your_app_id',
'app_secret': 'your_app_secret',
'webhook_url': 'https://yourdomain.com/webhook',
'access_token': 'your_access_token'
}
2. Tạo Flask Application
from flask import Flask, request, jsonify
import requests
import json
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
try:
data = request.get_json()
# Xác thực webhook
if not verify_webhook(data):
return jsonify({'error': 'Invalid webhook'}), 400
# Xử lý tin nhắn
message = data.get('message', {})
user_id = data.get('sender', {}).get('id')
# Gửi đến AI model
response = process_with_ai(message.get('text', ''))
# Gửi phản hồi về Zalo
send_message_to_zalo(user_id, response)
return jsonify({'status': 'success'})
except Exception as e:
print(f"Error: {e}")
return jsonify({'error': str(e)}), 500
def verify_webhook(data):
"""Xác thực webhook từ Zalo"""
# Implementation xác thực
return True
def process_with_ai(text):
"""Xử lý tin nhắn với AI model"""
# Gọi AI model (OpenAI, local model, etc.)
return ai_model.generate_response(text)
def send_message_to_zalo(user_id, message):
"""Gửi tin nhắn về Zalo"""
url = f"https://openapi.zalo.me/v2.0/oa/message"
headers = {
'Content-Type': 'application/json',
'access_token': ZALO_CONFIG['access_token']
}
payload = {
'recipient': {'user_id': user_id},
'message': {'text': message}
}
response = requests.post(url, headers=headers, json=payload)
return response.json()
3. Tích hợp AI Model
import openai
from transformers import pipeline
class AIModel:
def __init__(self):
# Sử dụng OpenAI API
openai.api_key = "your_openai_api_key"
# Hoặc sử dụng local model
self.local_model = pipeline(
"text-generation",
model="microsoft/DialoGPT-medium"
)
def generate_response(self, user_input):
"""Tạo phản hồi từ AI model"""
# Sử dụng OpenAI
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Bạn là một chatbot thân thiện và hữu ích."},
{"role": "user", "content": user_input}
],
max_tokens=150,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
# Fallback về local model
return self.generate_local_response(user_input)
def generate_local_response(self, user_input):
"""Sử dụng local model khi OpenAI không khả dụng"""
response = self.local_model(
user_input,
max_length=100,
num_return_sequences=1,
temperature=0.7
)
return response[0]['generated_text']
4. Xử lý các loại tin nhắn
def handle_message(message_data):
"""Xử lý các loại tin nhắn khác nhau"""
message_type = message_data.get('type')
if message_type == 'text':
return handle_text_message(message_data)
elif message_type == 'image':
return handle_image_message(message_data)
elif message_type == 'location':
return handle_location_message(message_data)
elif message_type == 'sticker':
return handle_sticker_message(message_data)
else:
return "Xin lỗi, tôi chưa thể xử lý loại tin nhắn này."
def handle_text_message(message_data):
"""Xử lý tin nhắn văn bản"""
text = message_data.get('text', '')
# Xử lý các lệnh đặc biệt
if text.startswith('/help'):
return get_help_message()
elif text.startswith('/weather'):
return get_weather_info(text)
elif text.startswith('/joke'):
return get_random_joke()
else:
# Xử lý với AI model
return ai_model.generate_response(text)
def handle_image_message(message_data):
"""Xử lý tin nhắn hình ảnh"""
# Tải hình ảnh từ Zalo
image_url = message_data.get('url')
# Xử lý với AI vision model
description = analyze_image(image_url)
return f"Tôi thấy trong hình ảnh: {description}"
def analyze_image(image_url):
"""Phân tích hình ảnh với AI"""
# Implementation với OpenAI Vision hoặc local model
return "Một hình ảnh thú vị"
5. Quản lý trạng thái cuộc trò chuyện
from datetime import datetime, timedelta
import sqlite3
class ConversationManager:
def __init__(self):
self.db_path = 'conversations.db'
self.init_database()
def init_database(self):
"""Khởi tạo database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
message TEXT,
response TEXT,
timestamp DATETIME
)
''')
conn.commit()
conn.close()
def save_conversation(self, user_id, message, response):
"""Lưu cuộc trò chuyện"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO conversations (user_id, message, response, timestamp)
VALUES (?, ?, ?, ?)
''', (user_id, message, response, datetime.now()))
conn.commit()
conn.close()
def get_conversation_history(self, user_id, limit=10):
"""Lấy lịch sử cuộc trò chuyện"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT message, response, timestamp
FROM conversations
WHERE user_id = ?
ORDER BY timestamp DESC
LIMIT ?
''', (user_id, limit))
history = cursor.fetchall()
conn.close()
return history
6. Xử lý lỗi và logging
import logging
from functools import wraps
# Cấu hình logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('chatbot.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def error_handler(func):
"""Decorator để xử lý lỗi"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"Error in {func.__name__}: {str(e)}")
return "Xin lỗi, đã có lỗi xảy ra. Vui lòng thử lại sau."
return wrapper
@error_handler
def process_message_safely(message):
"""Xử lý tin nhắn một cách an toàn"""
return ai_model.generate_response(message)
Tối ưu hóa và cải thiện
1. Caching responses
import redis
import hashlib
import json
class ResponseCache:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cache_key(self, message):
"""Tạo cache key từ tin nhắn"""
return hashlib.md5(message.encode()).hexdigest()
def get_cached_response(self, message):
"""Lấy phản hồi từ cache"""
key = self.get_cache_key(message)
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
return None
def cache_response(self, message, response):
"""Lưu phản hồi vào cache"""
key = self.get_cache_key(message)
self.redis_client.setex(key, 3600, json.dumps(response)) # Cache 1 giờ
2. Rate limiting
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self):
self.user_requests = defaultdict(list)
def is_rate_limited(self, user_id, max_requests=10, window_minutes=1):
"""Kiểm tra rate limit"""
now = datetime.now()
window_start = now - timedelta(minutes=window_minutes)
# Lọc requests trong window
user_requests = [
req_time for req_time in self.user_requests[user_id]
if req_time > window_start
]
self.user_requests[user_id] = user_requests
return len(user_requests) >= max_requests
def add_request(self, user_id):
"""Thêm request mới"""
self.user_requests[user_id].append(datetime.now())
3. Analytics và monitoring
class Analytics:
def __init__(self):
self.stats = {
'total_messages': 0,
'unique_users': set(),
'response_times': [],
'error_count': 0
}
def track_message(self, user_id, response_time, has_error=False):
"""Theo dõi tin nhắn"""
self.stats['total_messages'] += 1
self.stats['unique_users'].add(user_id)
self.stats['response_times'].append(response_time)
if has_error:
self.stats['error_count'] += 1
def get_stats(self):
"""Lấy thống kê"""
return {
'total_messages': self.stats['total_messages'],
'unique_users': len(self.stats['unique_users']),
'avg_response_time': sum(self.stats['response_times']) / len(self.stats['response_times']) if self.stats['response_times'] else 0,
'error_rate': self.stats['error_count'] / self.stats['total_messages'] if self.stats['total_messages'] > 0 else 0
}
Kết quả và bài học
Thành công đạt được
- Tích hợp thành công với Zalo API
- Xử lý được nhiều loại tin nhắn (text, image, location)
- AI model hoạt động ổn định với fallback mechanism
- Database lưu trữ lịch sử cuộc trò chuyện
- Error handling và logging đầy đủ
Những thách thức gặp phải
- Rate limiting: Zalo có giới hạn số lượng request
- Webhook reliability: Cần xử lý webhook timeout và retry
- AI model latency: Response time có thể chậm
- Scalability: Cần tối ưu khi có nhiều user đồng thời
Bài học kinh nghiệm
- Luôn có fallback mechanism khi AI model không khả dụng
- Implement proper error handling và logging
- Cache responses để giảm latency
- Monitor performance và user behavior
- Test thoroughly trước khi deploy
Code hoàn chỉnh
# main.py
from flask import Flask, request, jsonify
import requests
import json
import logging
from datetime import datetime
from conversation_manager import ConversationManager
from ai_model import AIModel
from analytics import Analytics
app = Flask(__name__)
# Initialize components
conversation_manager = ConversationManager()
ai_model = AIModel()
analytics = Analytics()
@app.route('/webhook', methods=['POST'])
def webhook():
start_time = datetime.now()
try:
data = request.get_json()
user_id = data.get('sender', {}).get('id')
message = data.get('message', {})
# Process message
response = handle_message(message)
# Send response to Zalo
send_message_to_zalo(user_id, response)
# Save conversation
conversation_manager.save_conversation(
user_id,
message.get('text', ''),
response
)
# Track analytics
response_time = (datetime.now() - start_time).total_seconds()
analytics.track_message(user_id, response_time)
return jsonify({'status': 'success'})
except Exception as e:
logging.error(f"Webhook error: {str(e)}")
analytics.track_message(user_id, 0, has_error=True)
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
Kết luận
Việc xây dựng chatbot AI tích hợp với Zalo API là một dự án thú vị và bổ ích. Nó giúp tôi hiểu sâu hơn về:
- API integration và webhook handling
- AI model deployment và optimization
- Error handling và monitoring
- Scalability và performance tuning
Dự án này đã mở ra nhiều cơ hội mới và giúp tôi phát triển kỹ năng trong lĩnh vực AI và chatbot development.
Chúc bạn thành công trong việc xây dựng chatbot AI! 🤖