※本記事は2025年7月時点の情報に基づいて執筆されています。内容の正確性には万全を期していますが、最新情報は各公式サイトをご確認ください。
どうも!ゲーマーなら分かると思いますが、Discordでゲーム中にBGMを共有したり、ボイスチャットで一緒に音楽を聴いたりするのって最高ですよね!でも既存の音楽Botは機能制限があったり、突然サービス終了したり…
そんな悩みを解決するのが自作Discord音楽Botです。この記事では、24時間安定稼働する音楽Botの作り方を、初心者でも分かるよう詳しく解説します。
Discord音楽Bot基礎知識
Discord Bot開発の前提条件
必要な知識
- Python基礎(変数、関数、クラス)
- 基本的なLinuxコマンド
- Docker基礎知識
必要なアカウント
- Discord Developer Account
- YouTube Data API v3
- Spotify API(オプション)
2025年Discord音楽Bot事情
主要な変更点
- YouTube Music APIの制限強化
- Spotify APIの商用利用ポリシー変更
- Discord Slash Commandsの推奨
合法的な音楽ソース
- YouTube(個人利用範囲)
- SoundCloud
- Spotify(プレビュー再生のみ)
- ラジオストリーム
Discord Bot作成手順
STEP1: Discord Developer Portal設定
1. Botアプリケーション作成
1. https://discord.com/developers/applications にアクセス
2. 「New Application」クリック
3. Bot名を入力(例:MyMusicBot)
4. 「Create」をクリック
2. Bot設定
1. 左メニュー「Bot」をクリック
2. 「Add Bot」→「Yes,do it!」
3. TOKEN をコピー(重要:他人に見せない)
4. Privileged Gateway Intents を有効化
- PRESENCE INTENT ✓
- SERVER MEMBERS INTENT ✓
- MESSAGE CONTENT INTENT ✓
#### 3. OAuth2設定(サーバー招待用)
- 左メニュー「OAuth2」→「URL Generator」
- SCOPES:
- bot ✓
- applications.commands ✓
- BOT PERMISSIONS:
- Connect ✓
- Speak ✓
- Use Voice Activity ✓
- Send Messages ✓
- Use Slash Commands ✓
- 生成されたURLでBotをサーバーに招待
### STEP2: Python音楽Bot開発
#### 1. 必要ライブラリのインストール
```bash
# 仮想環境作成
python -m venv discord_bot
source discord_bot/bin/activate # Linux/Mac
# discord_bot\Scripts\activate # Windows
# 必要ライブラリインストール
pip install discord.py[voice] youtube-dl yt-dlp asyncio aiohttp
2. プロジェクト構造
discord_music_bot/
├── bot.py
├── music_player.py
├── queue_manager.py
├── config.py
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
└── .env
3. 設定ファイル(config.py)
Copyimport os
from dotenv import load_dotenv
load_dotenv()
class Config:
# Discord設定
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
COMMAND_PREFIX = os.getenv('COMMAND_PREFIX', '!')
# YouTube設定
YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')
# FFmpeg設定
FFMPEG_OPTIONS = {
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn -filter:a "volume=0.25"'
}
# ytdl設定
YTDL_FORMAT_OPTIONS = {
'format': 'bestaudio/best',
'extractaudio': True,
'audioformat': 'mp3',
'outtmpl': '%(extractor)s-%(id)s-%(title)s.%(ext)s',
'restrictfilenames': True,
'noplaylist': True,
'nocheckcertificate': True,
'ignoreerrors': False,
'logtostderr': False,
'quiet': True,
'no_warnings': True,
'default_search': 'auto',
'source_address': '0.0.0.0'
}
4. 音楽プレイヤー(music_player.py)
Copyimport discord
from discord.ext import commands
import yt_dlp as youtube_dl
import asyncio
import os
from config import Config
class YTDLSource(discord.PCMVolumeTransformer):
def __init__(self, source, *, data, volume=0.5):
super().__init__(source, volume)
self.data = data
self.title = data.get('title')
self.url = data.get('url')
self.duration = data.get('duration')
self.thumbnail = data.get('thumbnail')
@classmethod
async def from_url(cls, url, *, loop=None, stream=False):
loop = loop or asyncio.get_event_loop()
ytdl = youtube_dl.YoutubeDL(Config.YTDL_FORMAT_OPTIONS)
data = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=not stream))
if 'entries' in data:
data = data['entries'][0]
filename = data['url'] if stream else ytdl.prepare_filename(data)
return cls(discord.FFmpegPCMAudio(filename, **Config.FFMPEG_OPTIONS), data=data)
class MusicPlayer:
def __init__(self, ctx):
self.bot = ctx.bot
self.ctx = ctx
self.queue = asyncio.Queue()
self.next = asyncio.Event()
self.current = None
self.volume = 0.5
self.np = None
ctx.bot.loop.create_task(self.player_loop())
async def player_loop(self):
"""メイン音楽再生ループ"""
await self.bot.wait_until_ready()
while not self.bot.is_closed():
self.next.clear()
try:
# キューから次の曲を取得(10分でタイムアウト)
source = await asyncio.wait_for(self.queue.get(), timeout=600.0)
except asyncio.TimeoutError:
# タイムアウト時はボイスチャンネルから退出
return self.destroy()
if not isinstance(source, YTDLSource):
try:
source = await YTDLSource.from_url(source['url'], loop=self.bot.loop, stream=True)
except Exception as e:
await self.ctx.send(f'エラーが発生しました: {str(e)}')
continue
source.volume = self.volume
self.current = source
# 現在の再生情報を送信
embed = discord.Embed(
title="🎵 再生中",
description=f"[{source.title}]({source.url})",
color=0x00ff00
)
if source.thumbnail:
embed.set_thumbnail(url=source.thumbnail)
self.np = await self.ctx.send(embed=embed)
# 音楽再生開始
self.ctx.voice_client.play(source, after=lambda _: self.bot.loop.call_soon_threadsafe(self.next.set))
await self.next.wait()
# 再生完了後のクリーンアップ
source.cleanup()
self.current = None
def destroy(self):
"""プレイヤーの破棄"""
return self.bot.loop.create_task(self._cog.cleanup(self.ctx.guild))
5. キュー管理(queue_manager.py)
Copyimport asyncio
from collections import deque
class QueueManager:
def __init__(self):
self.queue = deque()
self.history = deque(maxlen=50) # 最大50曲の履歴
self.loop = False
self.shuffle = False
def add_song(self, song):
"""曲をキューに追加"""
self.queue.append(song)
def get_next_song(self):
"""次の曲を取得"""
if not self.queue:
return None
if self.shuffle:
import random
index = random.randint(0, len(self.queue) - 1)
song = self.queue[index]
del self.queue[index]
else:
song = self.queue.popleft()
self.history.append(song)
return song
def skip_song(self):
"""現在の曲をスキップ"""
return self.get_next_song()
def clear_queue(self):
"""キューをクリア"""
self.queue.clear()
def get_queue_list(self, limit=10):
"""キューの一覧を取得"""
return list(self.queue)[:limit]
def remove_song(self, index):
"""指定したインデックスの曲を削除"""
if 0 <= index < len(self.queue):
return self.queue.pop(index)
return None
6. メインBot(bot.py)
Copyimport discord
from discord.ext import commands
import asyncio
import logging
from music_player import MusicPlayer, YTDLSource
from queue_manager import QueueManager
from config import Config
# ログ設定
logging.basicConfig(level=logging.INFO)
# Intents設定
intents = discord.Intents.default()
intents.message_content = True
intents.voice_states = True
class MusicBot(commands.Bot):
def __init__(self):
super().__init__(
command_prefix=Config.COMMAND_PREFIX,
intents=intents,
help_command=None
)
self.music_players = {}
self.queue_managers = {}
async def on_ready(self):
print(f'{self.user} がログインしました!')
await self.change_presence(
activity=discord.Activity(
type=discord.ActivityType.listening,
name="🎵 音楽 | !help"
)
)
async def on_command_error(self, ctx, error):
if isinstance(error, commands.CommandNotFound):
return
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send("❌ 引数が不足しています。")
else:
await ctx.send(f"❌ エラーが発生しました: {str(error)}")
bot = MusicBot()
@bot.command(name='join', aliases=['connect'])
async def join(ctx):
"""ボイスチャンネルに参加"""
if ctx.author.voice is None:
return await ctx.send("❌ ボイスチャンネルに参加してください。")
channel = ctx.author.voice.channel
if ctx.voice_client is not None:
return await ctx.voice_client.move_to(channel)
await channel.connect()
await ctx.send(f"✅ {channel.name} に参加しました!")
@bot.command(name='leave', aliases=['disconnect'])
async def leave(ctx):
"""ボイスチャンネルから退出"""
if ctx.voice_client is None:
return await ctx.send("❌ ボイスチャンネルに参加していません。")
# プレイヤーのクリーンアップ
if ctx.guild.id in bot.music_players:
del bot.music_players[ctx.guild.id]
if ctx.guild.id in bot.queue_managers:
del bot.queue_managers[ctx.guild.id]
await ctx.voice_client.disconnect()
await ctx.send("👋 ボイスチャンネルから退出しました。")
@bot.command(name='play', aliases=['p'])
async def play(ctx, *, search: str):
"""音楽を再生"""
if ctx.author.voice is None:
return await ctx.send("❌ ボイスチャンネルに参加してください。")
if ctx.voice_client is None:
await ctx.author.voice.channel.connect()
# キューマネージャーの初期化
if ctx.guild.id not in bot.queue_managers:
bot.queue_managers[ctx.guild.id] = QueueManager()
queue_manager = bot.queue_managers[ctx.guild.id]
async with ctx.typing():
try:
# 検索またはURL処理
if not search.startswith('http'):
search = f"ytsearch:{search}"
source = await YTDLSource.from_url(search, loop=bot.loop, stream=True)
# キューに追加
queue_manager.add_song({
'url': search,
'title': source.title,
'duration': source.duration,
'thumbnail': source.thumbnail,
'requester': ctx.author
})
# プレイヤーが存在しない場合は作成
if ctx.guild.id not in bot.music_players:
bot.music_players[ctx.guild.id] = MusicPlayer(ctx)
embed = discord.Embed(
title="✅ キューに追加しました",
description=f"[{source.title}]({source.url})",
color=0x00ff00
)
embed.add_field(name="リクエスト者", value=ctx.author.mention, inline=True)
embed.add_field(name="再生時間", value=f"{source.duration//60}:{source.duration%60:02d}", inline=True)
if source.thumbnail:
embed.set_thumbnail(url=source.thumbnail)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ エラーが発生しました: {str(e)}")
@bot.command(name='skip', aliases=['s'])
async def skip(ctx):
"""現在の曲をスキップ"""
if ctx.voice_client is None or not ctx.voice_client.is_playing():
return await ctx.send("❌ 現在音楽を再生していません。")
ctx.voice_client.stop()
await ctx.send("⏭️ 曲をスキップしました。")
@bot.command(name='stop')
async def stop(ctx):
"""音楽を停止してキューをクリア"""
if ctx.voice_client is None:
return await ctx.send("❌ ボイスチャンネルに参加していません。")
if ctx.guild.id in bot.queue_managers:
bot.queue_managers[ctx.guild.id].clear_queue()
ctx.voice_client.stop()
await ctx.send("⏹️ 音楽を停止しました。")
@bot.command(name='queue', aliases=['q'])
async def queue(ctx):
"""現在のキューを表示"""
if ctx.guild.id not in bot.queue_managers:
return await ctx.send("❌ キューが空です。")
queue_manager = bot.queue_managers[ctx.guild.id]
queue_list = queue_manager.get_queue_list()
if not queue_list:
return await ctx.send("❌ キューが空です。")
embed = discord.Embed(
title="🎵 再生キュー",
color=0x0099ff
)
for i, song in enumerate(queue_list, 1):
embed.add_field(
name=f"{i}. {song['title'][:50]}",
value=f"リクエスト者: {song['requester'].mention}",
inline=False
)
await ctx.send(embed=embed)
@bot.command(name='volume', aliases=['vol'])
async def volume(ctx, volume: int = None):
"""音量を設定(0-100)"""
if ctx.voice_client is None:
return await ctx.send("❌ ボイスチャンネルに参加していません。")
if volume is None:
current_volume = int(ctx.voice_client.source.volume * 100) if ctx.voice_client.source else 50
return await ctx.send(f"🔊 現在の音量: {current_volume}%")
if 0 <= volume <= 100:
if ctx.voice_client.source:
ctx.voice_client.source.volume = volume / 100.0
await ctx.send(f"🔊 音量を {volume}% に設定しました。")
else:
await ctx.send("❌ 音量は0-100の範囲で設定してください。")
@bot.command(name='now', aliases=['np'])
async def now_playing(ctx):
"""現在再生中の曲を表示"""
if ctx.voice_client is None or not ctx.voice_client.is_playing():
return await ctx.send("❌ 現在音楽を再生していません。")
player = bot.music_players.get(ctx.guild.id)
if player and player.current:
source = player.current
embed = discord.Embed(
title="🎵 現在再生中",
description=f"[{source.title}]({source.url})",
color=0x00ff00
)
if source.thumbnail:
embed.set_thumbnail(url=source.thumbnail)
await ctx.send(embed=embed)
@bot.command(name='help', aliases=['h'])
async def help_command(ctx):
"""ヘルプを表示"""
embed = discord.Embed(
title="🎵 音楽Bot コマンド一覧",
description="利用可能なコマンドの一覧です",
color=0x0099ff
)
commands_list = [
("!play <曲名/URL>", "音楽を再生・キューに追加"),
("!skip", "現在の曲をスキップ"),
("!stop", "音楽を停止してキューをクリア"),
("!queue", "再生キューを表示"),
("!volume <0-100>", "音量を設定"),
("!now", "現在再生中の曲を表示"),
("!join", "ボイスチャンネルに参加"),
("!leave", "ボイスチャンネルから退出"),
]
for command, description in commands_list:
embed.add_field(name=command, value=description, inline=False)
await ctx.send(embed=embed)
# Botを起動
if __name__ == "__main__":
bot.run(Config.DISCORD_TOKEN)
STEP3: Docker環境構築
1. Dockerfile
CopyFROM python:3.11-slim
# 必要なパッケージのインストール
RUN apt-get update && apt-get install -y \
ffmpeg \
git \
&& rm -rf /var/lib/apt/lists/*
# 作業ディレクトリ設定
WORKDIR /app
# 依存関係ファイルをコピー
COPY requirements.txt .
# Pythonパッケージインストール
RUN pip install --no-cache-dir -r requirements.txt
# アプリケーションファイルをコピー
COPY . .
# 実行ユーザーを非rootに変更
RUN useradd -m -s /bin/bash botuser
USER botuser
# コンテナ起動時のコマンド
CMD ["python", "bot.py"]
2. requirements.txt
Copydiscord.py[voice]==2.3.2
yt-dlp==2024.6.1
PyNaCl==1.5.0
python-dotenv==1.0.0
aiohttp==3.9.5
asyncio-throttle==1.0.2
3. docker-compose.yml
Copyversion: '3.8'
services:
discord-bot:
build: .
container_name: discord_music_bot
restart: unless-stopped
environment:
- DISCORD_TOKEN=${DISCORD_TOKEN}
- YOUTUBE_API_KEY=${YOUTUBE_API_KEY}
- COMMAND_PREFIX=${COMMAND_PREFIX}
volumes:
- ./logs:/app/logs
- ./data:/app/data
networks:
- bot-network
healthcheck:
test: ["CMD", "python", "-c", "import requests; requests.get('http://localhost:8080/health', timeout=5)"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# オプション: 監視用Webダッシュボード
web-dashboard:
image: nginx:alpine
container_name: bot_dashboard
ports:
- "8080:80"
volumes:
- ./dashboard:/usr/share/nginx/html
networks:
- bot-network
depends_on:
- discord-bot
networks:
bot-network:
driver: bridge
volumes:
bot-data:
bot-logs:
4. 環境変数設定(.env)
# Discord設定
DISCORD_TOKEN=YOUR_DISCORD_BOT_TOKEN_HERE
COMMAND_PREFIX=!
# YouTube API(オプション)
YOUTUBE_API_KEY=YOUR_YOUTUBE_API_KEY_HERE
# その他設定
PYTHONUNBUFFERED=1
TZ=Asia/Tokyo
STEP4: VPSデプロイと運用
1. サーバー初期設定
Copy# システムアップデート
sudo apt update && sudo apt upgrade -y
# Docker インストール
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
# Docker Compose インストール
sudo curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
# ユーザーをdockerグループに追加
sudo usermod -aG docker $USER
2. アプリケーションデプロイ
Copy# プロジェクトディレクトリ作成
mkdir ~/discord-bot && cd ~/discord-bot
# ソースコードの配置(Git clone または SCP/SFTP)
git clone https://your-repository/discord-music-bot.git .
# 環境変数設定
cp .env.example .env
nano .env # トークンなどを設定
# コンテナ起動
docker-compose up -d
# ログ確認
docker-compose logs -f discord-bot
3. 自動起動設定
Copy# systemdサービスファイル作成
sudo nano /etc/systemd/system/discord-bot.service
Copy[Unit]
Description=Discord Music Bot
Requires=docker.service
After=docker.service
[Service]
Type=oneshot
RemainAfterExit=yes
WorkingDirectory=/home/username/discord-bot
ExecStart=/usr/local/bin/docker-compose up -d
ExecStop=/usr/local/bin/docker-compose down
TimeoutStartSec=0
[Install]
WantedBy=multi-user.target
Copy# サービス有効化
sudo systemctl daemon-reload
sudo systemctl enable discord-bot
sudo systemctl start discord-bot
4. 監視・ログ管理
Copy# ログローテーション設定
sudo nano /etc/logrotate.d/discord-bot
/home/username/discord-bot/logs/*.log {
daily
missingok
rotate 14
compress
delaycompress
notifempty
create 644 username username
postrotate
docker-compose restart discord-bot
endscript
}
高度な機能実装
1. プレイリスト機能
Copyimport json
import os
class PlaylistManager:
def __init__(self, data_dir="data/playlists"):
self.data_dir = data_dir
os.makedirs(data_dir, exist_ok=True)
def save_playlist(self, user_id, playlist_name, songs):
"""プレイリストを保存"""
playlist_data = {
'name': playlist_name,
'songs': songs,
'created_by': user_id,
'created_at': datetime.now().isoformat()
}
filename = f"{self.data_dir}/{user_id}_{playlist_name}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(playlist_data, f, ensure_ascii=False, indent=2)
def load_playlist(self, user_id, playlist_name):
"""プレイリストを読み込み"""
filename = f"{self.data_dir}/{user_id}_{playlist_name}.json"
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
return None
def list_playlists(self, user_id):
"""ユーザーのプレイリスト一覧"""
playlists = []
for filename in os.listdir(self.data_dir):
if filename.startswith(f"{user_id}_"):
playlist_name = filename.replace(f"{user_id}_", "").replace(".json", "")
playlists.append(playlist_name)
return playlists
# Botコマンドに追加
@bot.command(name='playlist')
async def playlist(ctx, action=None, name=None, *, songs=None):
"""プレイリスト管理"""
playlist_manager = PlaylistManager()
if action == "save":
# 現在のキューをプレイリストとして保存
if ctx.guild.id in bot.queue_managers:
queue_manager = bot.queue_managers[ctx.guild.id]
current_queue = queue_manager.get_queue_list()
playlist_manager.save_playlist(ctx.author.id, name, current_queue)
await ctx.send(f"✅ プレイリスト '{name}' を保存しました。")
elif action == "load":
# プレイリストを読み込んでキューに追加
playlist_data = playlist_manager.load_playlist(ctx.author.id, name)
if playlist_data:
if ctx.guild.id not in bot.queue_managers:
bot.queue_managers[ctx.guild.id] = QueueManager()
queue_manager = bot.queue_managers[ctx.guild.id]
for song in playlist_data['songs']:
queue_manager.add_song(song)
await ctx.send(f"✅ プレイリスト '{name}' を読み込みました。({len(playlist_data['songs'])}曲)")
elif action == "list":
# プレイリスト一覧表示
playlists = playlist_manager.list_playlists(ctx.author.id)
if playlists:
embed = discord.Embed(title="📋 あなたのプレイリスト", color=0x0099ff)
for i, playlist_name in enumerate(playlists, 1):
embed.add_field(name=f"{i}. {playlist_name}", value="🎵", inline=True)
await ctx.send(embed=embed)
else:
await ctx.send("❌ プレイリストがありません。")
2. 音楽フィルター・エフェクト
Copyclass AudioFilters:
@staticmethod
def get_filter_options(filter_type):
"""オーディオフィルターのFFmpegオプション"""
filters = {
'bass': '-af "bass=g=10"',
'treble': '-af "treble=g=5"',
'nightcore': '-af "asetrate=44100*1.25,aresample=44100"',
'vaporwave': '-af "asetrate=44100*0.8,aresample=44100"',
'echo': '-af "aecho=0.8:0.9:1000:0.3"',
'reverb': '-af "reverb"',
'karaoke': '-af "pan=mono|c0=0.5*c0+-0.5*c1"'
}
return filters.get(filter_type, '')
# フィルター適用コマンド
@bot.command(name='filter')
async def audio_filter(ctx, filter_type=None):
"""オーディオフィルターを適用"""
if filter_type is None:
available_filters = list(AudioFilters.get_filter_options.__defaults__[0].keys())
await ctx.send(f"利用可能なフィルター: {', '.join(available_filters)}")
return
if ctx.voice_client and ctx.voice_client.is_playing():
# 現在の再生を停止して、フィルター付きで再開
ctx.voice_client.stop()
await ctx.send(f"🎛️ {filter_type} フィルターを適用しました。")
else:
await ctx.send("❌ 現在音楽を再生していません。")
3. 統計・分析機能
Copyimport sqlite3
from datetime import datetime, timedelta
class BotStatistics:
def __init__(self, db_path="data/bot_stats.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""データベース初期化"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS song_plays (
id INTEGER PRIMARY KEY AUTOINCREMENT,
guild_id INTEGER,
user_id INTEGER,
song_title TEXT,
song_url TEXT,
played_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date DATE,
guild_id INTEGER,
total_plays INTEGER,
unique_users INTEGER,
total_duration INTEGER
)
''')
conn.commit()
conn.close()
def log_song_play(self, guild_id, user_id, song_title, song_url):
"""楽曲再生ログ"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO song_plays (guild_id, user_id, song_title, song_url)
VALUES (?, ?, ?, ?)
''', (guild_id, user_id, song_title, song_url))
conn.commit()
conn.close()
def get_popular_songs(self, guild_id, days=7, limit=10):
"""人気楽曲ランキング"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT song_title, COUNT(*) as play_count
FROM song_plays
WHERE guild_id = ? AND played_at >= date('now', '-{} days')
GROUP BY song_title
ORDER BY play_count DESC
LIMIT ?
'''.format(days), (guild_id, limit))
results = cursor.fetchall()
conn.close
def get_user_stats(self, guild_id, user_id, days=30):
"""ユーザー統計"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT COUNT(*) as total_plays, COUNT(DISTINCT song_title) as unique_songs
FROM song_plays
WHERE guild_id = ? AND user_id = ? AND played_at >= date('now', '-{} days')
'''.format(days), (guild_id, user_id))
result = cursor.fetchone()
conn.close()
return result
統計コマンド
@bot.command(name=’stats’) async def statistics(ctx, stat_type=None): “””統計情報を表示””” stats = BotStatistics()
if stat_type == "popular":
popular
if period == "week":
days = 7
title = "📊 今週の統計"
elif period == "month":
days = 30
title = "📊 今月の統計"
else:
days = 1
title = "📊 今日の統計"
popular_songs = stats.get_popular_songs(ctx.guild.id, days, 5)
embed = discord.Embed(title=title, color=0x0099ff)
if popular_songs:
ranking = "\n".join([
f"{i+1}. {song[0][:50]} ({song[1]}回)"
for i, song in enumerate(popular_songs)
])
embed.add_field(name="🎵 人気楽曲ランキング", value=ranking, inline=False)
else:
embed.add_field(name="📈 統計", value="データがありません", inline=False)
await ctx.send(embed=embed)
## 法的注意点とベストプラクティス
### 1. 著作権・利用規約の遵守
**YouTube利用時の注意点**
- 個人利用の範囲内での使用
- 商用利用は禁止
- ダウンロード機能の公開禁止
- 利用規約の定期確認
**合法的な音楽ソース**
```python
LEGAL_SOURCES = {
'youtube': {
'allowed': True,
'conditions': ['個人利用のみ', 'ダウンロード禁止'],
'api_required': True
},
'soundcloud': {
'allowed': True,
'conditions': ['API利用規約遵守'],
'api_required': True
},
'spotify': {
'allowed': True,
'conditions': ['プレビューのみ', '30秒制限'],
'api_required': True
},
'radio_streams': {
'allowed': True,
'conditions': ['ストリーム再配信'],
'api_required': False
}
}
2. プライバシー保護
Copyimport hashlib
class PrivacyManager:
@staticmethod
def hash_user_id(user_id):
"""ユーザーIDをハッシュ化"""
return hashlib.sha256(str(user_id).encode()).hexdigest()
@staticmethod
def anonymize_logs(log_data):
"""ログデータの匿名化"""
anonymized = log_data.copy()
if 'user_id' in anonymized:
anonymized['user_id'] = PrivacyManager.hash_user_id(anonymized['user_id'])
return anonymized
3. レート制限・負荷対策
Copyimport asyncio
from collections import defaultdict
import time
class RateLimiter:
def __init__(self):
self.requests = defaultdict(list)
self.limits = {
'play': (5, 60), # 5回/分
'skip': (10, 60), # 10回/分
'queue': (3, 30) # 3回/30秒
}
async def check_rate_limit(self, user_id, command_type):
"""レート制限チェック"""
now = time.time()
limit_count, limit_window = self.limits.get(command_type, (10, 60))
# 古いリクエストを削除
self.requests[user_id] = [
req_time for req_time in self.requests[user_id]
if now - req_time < limit_window
]
# 制限チェック
if len(self.requests[user_id]) >= limit_count:
return False
# リクエスト記録
self.requests[user_id].append(now)
return True
# レート制限デコレータ
def rate_limit(command_type):
def decorator(func):
async def wrapper(ctx, *args, **kwargs):
limiter = getattr(bot, 'rate_limiter', RateLimiter())
if not await limiter.check_rate_limit(ctx.author.id, command_type):
await ctx.send("❌ コマンドの実行頻度が高すぎます。しばらく待ってから再試行してください。")
return
return await func(ctx, *args, **kwargs)
return wrapper
return decorator
# 使用例
@bot.command(name='play')
@rate_limit('play')
async def play(ctx, *, search: str):
# プレイコマンドの実装
pass
パフォーマンス最適化
1. メモリ使用量最適化
Copyimport gc
import psutil
import asyncio
class MemoryManager:
def __init__(self, max_memory_mb=512):
self.max_memory_mb = max_memory_mb
self.cleanup_interval = 300 # 5分間隔
async def monitor_memory(self):
"""メモリ使用量監視"""
while True:
process = psutil.Process()
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
if memory_usage > self.max_memory_mb:
await self.cleanup_memory()
await asyncio.sleep(self.cleanup_interval)
async def cleanup_memory(self):
"""メモリクリーンアップ"""
# 不要なキャッシュをクリア
gc.collect()
# 古いプレイヤーオブジェクトを削除
for guild_id in list(bot.music_players.keys()):
player = bot.music_players[guild_id]
if not player.ctx.voice_client or not player.ctx.voice_client.is_connected():
del bot.music_players[guild_id]
# Botクラスに追加
@bot.event
async def on_ready():
# メモリ監視開始
memory_manager = MemoryManager()
bot.loop.create_task(memory_manager.monitor_memory())
2. 接続プール管理
Copyimport aiohttp
import asyncio
class HTTPManager:
def __init__(self):
self.session = None
self.connector_limit = 100
self.timeout = aiohttp.ClientTimeout(total=30)
async def get_session(self):
"""HTTPセッション取得"""
if self.session is None or self.session.closed:
connector = aiohttp.TCPConnector(
limit=self.connector_limit,
force_close=True,
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout
)
return self.session
async def close(self):
"""セッションクローズ"""
if self.session and not self.session.closed:
await self.session.close()
# Botクラスに統合
bot.http_manager = HTTPManager()
@bot.event
async def on_disconnect():
await bot.http_manager.close()
3. データベース最適化
Copyimport sqlite3
import asyncio
import aiosqlite
class AsyncDatabase:
def __init__(self, db_path="data/bot.db"):
self.db_path = db_path
self.connection_pool = {}
async def get_connection(self):
"""非同期データベース接続取得"""
thread_id = asyncio.current_task()
if thread_id not in self.connection_pool:
self.connection_pool[thread_id] = await aiosqlite.connect(self.db_path)
return self.connection_pool[thread_id]
async def execute_query(self, query, params=None):
"""クエリ実行"""
conn = await self.get_connection()
cursor = await conn.cursor()
if params:
await cursor.execute(query, params)
else:
await cursor.execute(query)
result = await cursor.fetchall()
await conn.commit()
return result
async def close_all_connections(self):
"""全接続クローズ"""
for conn in self.connection_pool.values():
await conn.close()
self.connection_pool.clear()
監視・アラート機能
1. ヘルスチェック
Copyfrom flask import Flask, jsonify
import threading
# ヘルスチェック用Web API
health_app = Flask(__name__)
@health_app.route('/health')
def health_check():
"""ヘルスチェックエンドポイント"""
status = {
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'guilds_connected': len(bot.guilds),
'voice_connections': len([vc for vc in bot.voice_clients if vc.is_connected()]),
'uptime': str(datetime.now() - bot.start_time) if hasattr(bot, 'start_time') else 'unknown'
}
return jsonify(status)
@health_app.route('/metrics')
def metrics():
"""メトリクス情報"""
process = psutil.Process()
metrics_data = {
'memory_usage_mb': process.memory_info().rss / 1024 / 1024,
'cpu_percent': process.cpu_percent(),
'threads': process.num_threads(),
'open_files': len(process.open_files()),
'active_players': len(bot.music_players),
'queue_sizes': {str(gid): len(qm.queue) for gid, qm in bot.queue_managers.items()}
}
return jsonify(metrics_data)
def run_health_server():
"""ヘルスチェックサーバー起動"""
health_app.run(host='0.0.0.0', port=8080, debug=False)
# Bot起動時にヘルスチェックサーバーも起動
@bot.event
async def on_ready():
bot.start_time = datetime.now()
health_thread = threading.Thread(target=run_health_server, daemon=True)
health_thread.start()
2. ログ監視・アラート
Copyimport logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class AlertManager:
def __init__(self, smtp_config=None):
self.smtp_config = smtp_config or {}
self.alert_threshold = {
'error_count': 10, # 10分間に10回エラーでアラート
'memory_usage': 80, # メモリ使用率80%でアラート
'response_time': 5.0 # 応答時間5秒超でアラート
}
self.error_count = 0
self.last_alert_time = {}
async def send_alert(self, alert_type, message):
"""アラート送信"""
now = datetime.now()
alert_key = f"{alert_type}_{now.strftime('%Y%m%d%H')}"
# 同じアラートを1時間に1回までに制限
if alert_key in self.last_alert_time:
return
self.last_alert_time[alert_key] = now
# Discord Webhookでアラート送信
if hasattr(bot, 'alert_webhook'):
embed = discord.Embed(
title=f"🚨 {alert_type.upper()} Alert",
description=message,
color=0xff0000,
timestamp=now
)
try:
async with aiohttp.ClientSession() as session:
webhook = discord.Webhook.from_url(
bot.alert_webhook,
session=session
)
await webhook.send(embed=embed)
except Exception as e:
logging.error(f"Alert webhook failed: {e}")
def check_error_threshold(self):
"""エラー閾値チェック"""
self.error_count += 1
if self.error_count >= self.alert_threshold['error_count']:
asyncio.create_task(
self.send_alert('ERROR_THRESHOLD',
f"Error count exceeded: {self.error_count}")
)
self.error_count = 0
# カスタムログハンドラー
class AlertLogHandler(logging.Handler):
def __init__(self, alert_manager):
super().__init__()
self.alert_manager = alert_manager
def emit(self, record):
if record.levelno >= logging.ERROR:
self.alert_manager.check_error_threshold()
# ログ設定にアラートハンドラーを追加
alert_manager = AlertManager()
alert_handler = AlertLogHandler(alert_manager)
logging.getLogger().addHandler(alert_handler)
おすすめVPS・専用サーバー(Discord Bot用)
Discord音楽Botの24時間安定運用には、適切なサーバー選択が重要です。用途・規模別のおすすめをご紹介します。
🥇 小〜中規模サーバー:ConoHa VPS
特徴
- 時間単位課金で開発コスト最適化
- 高速SSD標準搭載
- Docker対応済み
- 24時間サポート
Discord Bot向け推奨スペック
- CPU: 2vCPU
- メモリ: 2GB
- SSD: 100GB
- 料金: 1,012円/月
メリット ✅ 初期費用無料
✅ 簡単操作のコントロールパネル
✅ 自動バックアップ対応
✅ 時間単位の従量課金
🥈 安定性重視:エックスサーバー VPS
特徴
- 国内最大級の運用実績
- 高い稼働率(99.99%以上)
- 24時間365日サポート
- 自動バックアップ標準
Discord Bot向け推奨スペック
- CPU: 3vCPU
- メモリ: 2GB
- SSD: 50GB
- 料金: 1,150円/月
メリット ✅ 老舗サーバー会社の信頼性
✅ 充実したサポート体制
✅ 安定したネットワーク
✅ 管理ツールが使いやすい
🥉 大規模運用:KAGOYA CLOUD VPS
特徴
- 柔軟なスペック変更
- 高性能CPUオプション
- 複数データセンター対応
- 法人向けサポート
Discord Bot向け推奨スペック
- CPU: 4vCPU
- メモリ: 4GB
- SSD: 200GB
- 料金: 3,520円/月
メリット ✅ スペック変更が柔軟
✅ 高性能オプション豊富
✅ 企業レベルのサポート
✅ バックアップオプション充実
🎮 ゲーマー向け:さくらVPS
特徴
- ゲーマーコミュニティに人気
- 低遅延ネットワーク
- リーズナブルな価格
- 豊富なOS選択肢
Discord Bot向け推奨スペック
- CPU: 2vCPU
- メモリ: 2GB
- SSD: 100GB
- 料金: 1,738円/月
メリット ✅ ゲーマー向け最適化
✅ 低遅延接続
✅ コストパフォーマンス良好
✅ 豊富なテンプレート
トラブルシューティング
よくある問題と解決法
Q1: Botがボイスチャンネルに接続できない
原因と解決法
Copy# 1. 権限不足
# Bot設定でVoice permissions を確認
# 2. FFmpegがインストールされていない
# Dockerfileに追加
RUN apt-get install -y ffmpeg
# 3. PyNaClライブラリの問題
pip install --upgrade PyNaCl
# 4. 接続コードの修正
@bot.command()
async def join(ctx):
if ctx.author.voice:
channel = ctx.author.voice.channel
try:
await channel.connect()
except discord.ClientException:
await ctx.send("既に接続済みです")
except discord.opus.OpusNotLoaded:
await ctx.send("音声コーデックの読み込みに失敗しました")
Q2: 音楽の音質が悪い・途切れる
解決法
Copy# 1. FFmpegオプションの最適化
FFMPEG_OPTIONS = {
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn -filter:a "volume=0.25" -bufsize 64k'
}
# 2. ビットレート調整
YTDL_FORMAT_OPTIONS = {
'format': 'bestaudio[abr<=128]/best[height<=480]',
'audioformat': 'mp3',
'audioquality': 0, # 最高品質
}
# 3. サーバーリソース確認
# CPU使用率とメモリ使用量をチェック
Q3: Dockerコンテナが頻繁に停止する
解決法
Copy# docker-compose.yml の最適化
version: '3.8'
services:
discord-bot:
build: .
restart: unless-stopped
deploy:
resources:
limits:
memory: 1G
reservations:
memory: 512M
healthcheck:
test: ["CMD", "python", "-c", "import discord; print('OK')"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
Q4: YouTube APIの制限に引っかかる
解決法
Copy# 1. 複数APIキーのローテーション
API_KEYS = [
'KEY1', 'KEY2', 'KEY3'
]
current_key_index = 0
def get_youtube_api_key():
global current_key_index
key = API_KEYS[current_key_index]
current_key_index = (current_key_index + 1) % len(API_KEYS)
return key
# 2. キャッシュ機能の実装
import pickle
import os
class YouTubeCache:
def __init__(self, cache_file='youtube_cache.pkl'):
self.cache_file = cache_file
self.cache = self.load_cache()
def load_cache(self):
if os.path.exists(self.cache_file):
with open(self.cache_file, 'rb') as f:
return pickle.load(f)
return {}
def save_cache(self):
with open(self.cache_file, 'wb') as f:
pickle.dump(self.cache, f)
def get(self, search_query):
return self.cache.get(search_query)
def set(self, search_query, result):
self.cache[search_query] = result
self.save_cache()
2025年Discord Bot開発トレンド
1. Slash Commands(スラッシュコマンド)の標準化
Copy# 従来のテキストコマンドからSlash Commandsへ移行
from discord.ext import commands
from discord import app_commands
@bot.tree.command(name="play", description="音楽を再生します")
async def play_slash(interaction: discord.Interaction, search: str):
"""スラッシュコマンド版 play"""
ctx = await bot.get_context(interaction)
await play(ctx, search=search)
await interaction.response.send_message(f"🎵 再生開始: {search}")
# Slash Commandsの同期
@bot.event
async def on_ready():
try:
synced = await bot.tree.sync()
print(f"Synced {len(synced)} command(s)")
except Exception as e:
print(f"Failed to sync commands: {e}")
2. AI機能統合
Copyimport openai
class AIFeatures:
def __init__(self, api_key):
openai.api_key = api_key
async def generate_playlist(self, mood, genre):
"""AIによるプレイリスト生成"""
prompt = f"Create a playlist of 10 songs for {mood} mood in {genre} genre. Format: 'Artist - Song Title'"
response = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=500
)
return response.choices[0].message.content
@bot.command(name='aiplaylist')
async def ai_playlist(ctx, mood: str, genre: str):
"""AI生成プレイリスト"""
ai = AIFeatures(os.getenv('OPENAI_API_KEY'))
playlist = await ai.generate_playlist(mood, genre)
embed = discord.Embed(
title=f"🤖 AI生成プレイリスト ({mood} {genre})",
description=playlist,
color=0x00ff00
)
await ctx.send(embed=embed)
3. 分散アーキテクチャ
Copy# Redis を使った分散キューシステム
import redis
import json
class DistributedQueue:
def __init__(self, redis_url):
self.redis_client = redis.from_url(redis_url)
self.queue_key = "music_queue"
async def add_song(self, guild_id, song_data):
"""分散キューに楽曲追加"""
queue_key = f"{self.queue_key}:{guild_id}"
await self.redis_client.lpush(queue_key, json.dumps(song_data))
async def get_next_song(self, guild_id):
"""次の楽曲を取得"""
queue_key = f"{self.queue_key}:{guild_id}"
song_data = await self.redis_client.rpop(queue_key)
return json.loads(song_data) if song_data else None
まとめ
Discord音楽Botの開発から24時間運用まで、包括的に解説しました。
重要ポイント
- Docker化で安定運用と簡単デプロイ
- 適切なVPS選択でコストと性能のバランス
- 法的コンプライアンスの遵守が必須
- 監視・アラートで問題の早期発見
ゲーマーの皆さんなら、MMORPGでギルド運営をするときの責任感で、Botサーバーも丁寧に管理できるはず。まずは小規模から始めて、徐々にスケールアップしていきましょう!
免責事項
※本記事は2025年7月時点の情報に基づいています。Discord APIの仕様変更や各種サービスの利用規約変更にご注意ください。音楽の著作権に関する法的責任は利用者にあります。