Skip to content

Instantly share code, notes, and snippets.

@Apfelin
Last active April 27, 2024 11:31
Show Gist options
  • Save Apfelin/c9cbb7988a9d8e55d77b06473b72dd57 to your computer and use it in GitHub Desktop.
Save Apfelin/c9cbb7988a9d8e55d77b06473b72dd57 to your computer and use it in GitHub Desktop.
Discord speech-to-text bot, made with discord.py by Rapptz and voice additions by imayhaveborkedit. Uses wit.ai for speech recognition
import discord
import asyncio
import speech_recognition as sr
from threading import Thread
# bot token and wit.ai api key
TOKEN = ""
WIT_AI_KEY = ""
# we need a sink for the listen function, so we just define our own
# extremely simple: just appends data to a byte array buffer
class BufSink(discord.reader.AudioSink):
def __init__(self):
# byte array to store stuff
self.bytearr_buf = bytearray()
# sample width, which is (bit_rate/8) * channels
self.sample_width = 2
# 48000Hz sampling rate
# doubled, because speech_recognition needs mono and we've got stereo
self.sample_rate = 96000
# calculated bytes per second, sample_rate * sample_width
# we need this to know what slices we can take from the buffer
# would be 96000, but mono
self.bytes_ps = 192000
# just append data to the byte array
def write(self, data):
self.bytearr_buf += data.data
# to prevent the buffer from getting immense, we just cut the part we've
# just read from it, using the index calculated when we extracted the part
def freshen(self, idx):
self.bytearr_buf = self.bytearr_buf[idx:]
# global var - needed to stop the thread
close_flag = False
# client bot class
class Deffy(discord.Client):
# init variables
def __init__(self):
super().__init__()
# save the channel we need to post to
self.target_channel = None
# the thread object
self.post_thread = None
# buffer to hold info
self.buffer = BufSink()
# post some sanity messages on start-up
async def on_ready(self):
print()
print("Logged in as")
print(self.user.name)
print(self.user.id)
print("----------")
print("Discord.py version")
print(discord.__version__)
print("----------")
print()
# wait for a message to interact with the user
async def on_message(self, message):
# notify the thread we're closing
global close_flag
# don't respond to ourselves
if message.author == self.user:
return False
# handle closing
if message.content.lower().startswith("$close"):
# send a message to ack the command
await message.channel.send("Got it, shutting down...")
# the polite thing to do is close any active voice connections properly
if self.voice_clients:
for vc in self.voice_clients:
await vc.disconnect()
# set the flag and wait for the thread to end
close_flag = True
self.post_thread.join()
# shut down the bot, then quit the program
await self.close()
quit()
# handle disconnecting
if message.content.lower().startswith("$leave"):
# close any active voice connections. in theory, there's only one, but
# could be extended for more
if self.voice_clients:
for vc in self.voice_clients:
await vc.disconnect()
# set the flag and wait for the thread to end
close_flag = True
self.post_thread.join()
else:
await message.channel.send("Sorry, you're not in a voice channel.")
# handle summoning
if message.content.lower().startswith("$here"):
# if the user is not connect to a voice channel, but tries to summon,
# just send a message and exit
if message.author.voice is None:
await message.channel.send("Sorry, you're not in a voice channel.")
else:
# check if we already have an active voice connection, and use that
# one instead of creating another one
if self.voice_clients:
# store the channel we need to post our output to
self.target_channel = message.channel
# ack the command and inform the user
await message.channel.send("Got it, moving to voice channel " +
message.author.voice.channel.name + " and directing output to " +
self.target_channel.name + ".")
# use the existing voice connection to move to the new voice channel
await self.voice_clients[0].move_to(message.author.voice.channel)
# start a thread that will handle voice analysis
# if it doesn't exist already
if self.post_thread is None:
self.post_thread = Thread(target=poster,
args=(self, self.buffer, self.target_channel))
self.post_thread.start()
# start listening - user filter just listens to a certain user
self.voice_clients[0].listen(discord.reader.UserFilter(
self.buffer, message.author))
else:
# if we don't have an already active connection, create a new one
self.target_channel = message.channel
await message.channel.send("Got it, moving to voice channel " +
message.author.voice.channel.name + " and directing output to " +
self.target_channel.name + ".")
# create a new voice client
await message.author.voice.channel.connect()
# start a thread that will handle voice analysis,
# if it doesn't exist already
if self.post_thread is None:
self.post_thread = Thread(target=poster,
args=(self, self.buffer, self.target_channel))
self.post_thread.start()
# start listening - user filter just listens to a certain user
self.voice_clients[0].listen(discord.reader.UserFilter(
self.buffer, message.author))
# thread that handles message posting and voice analysis
def poster(bot, buffer, target_channel):
global close_flag
# instantiate our recognizer object
recog = sr.Recognizer()
# we don't want the thread to end, so just loop forever
while True:
# useless to try anything if we don't have anything in the buffer
# wait until we have enough data for a 5-second voice clip in the buffer
if len(buffer.bytearr_buf) > 960000:
# get 5 seconds worth of data from the buffer
idx = buffer.bytes_ps * 5
slice = buffer.bytearr_buf[:idx]
# if the slice isn't all 0s, create an AudioData instance with it,
# needed by the speech_recognition lib
if any(slice):
# trim leading zeroes, should be more accurate
idx_strip = slice.index(next(filter(lambda x: x!=0, slice)))
if idx_strip:
buffer.freshen(idx_strip)
slice = buffer.bytearr_buf[:idx]
# create the AudioData object
audio = sr.AudioData(bytes(slice), buffer.sample_rate,
buffer.sample_width)
# send the data to get recognized
try:
msg = recog.recognize_wit(audio, key=WIT_AI_KEY)
except sr.UnknownValueError:
print("ERROR: Couldn't understand.")
except sr.RequestError as e:
print("ERROR: Could not request results from Wit.ai service; {0}".format(e))
# if we send a msg with all 0s or something unintelligible,
# we'll get a message, but it'll be empty
if msg:
# send the message to the async routine
asyncio.run_coroutine_threadsafe(target_channel.send(msg), bot.loop)
# cut the part we just read from the buffer
buffer.freshen(idx)
# since it's an infinite loop, we need some way to break out, once the
# program shuts down
if close_flag:
break
client = Deffy()
client.run(TOKEN)
@iHazzu
Copy link

iHazzu commented May 4, 2023

AttributeError: module 'discord' has no attribute 'reader'

i get this error. do you know how to fix it ?

To get voice support, you should use discord.py[voice] instead of discord.py.
installing discord.py

@CheevieB
Copy link

Same issue here that iHazzu lists. Using anaconda in an anaconda created environment. Using python 3.10. Installed in the activated environment with pip install -U discord.py[voice].

@StaffanJOlsson
Copy link

Same here using a virtual environment using discord.py[voice]

@Eichhorn10
Copy link

I guess the used library of Discord.py is outdated because for example there doesn’t exist a class discord.reader.AudioSink anymore. You can read it in the Discord.py docs: https://discordpy.readthedocs.io/en/stable/api.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment