Hey, I’m a fairly new user to TouchDesigner, and have run into a problem with my project:
I’m currently working on a project where i need touch designer to react to certain, predefined spoken sentiments, and im therefore trying to implement a Google Cloud Speech script in TouchDesigner.
The code works outside of TD, with microphone handling done by PyAudio.
I presume i have to do the microphone handling through a AudioDeviceIn CHOP, when putting it together in TouchDesigner. Currently, when running the script, i get no output when speaking into the mic, yet i also don’t receive any error messages. I assume that the script somehow isn’t receiving the audio input.
For now i just want the output in textport.
Can someone help me out? here’s my python code
from __future__ import division
import re
import sys
from google.cloud import speech
from google.cloud.speech import enums
from google.cloud.speech import types
from six.moves import queue
import os
credential_path = "/Path/to/googlecredentials.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credential_path
# Audio recording parameters
RATE = 16000
CHUNK = int(RATE / 10) # 100ms
touchmic = op('audiodevin1')['chan1']
class MicrophoneStream(object):
def __init__(self, rate, chunk):
self._rate = rate
self._chunk = chunk
self._buff = queue.Queue()
self.closed = True
def __enter__(self):
self._audio_interface = touchmic
self._audio_stream = self._audio_interface.open(
format=mono,
channels=1, rate=self._rate,
input=True, frames_per_buffer=self._chunk,
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(self, type, value, traceback):
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
self._buff.put(in_data)
return None, Continue
def generator(self):
while not self.closed:
chunk = self._buff.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b''.join(data)
def listen_print_loop(responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
overwrite_chars = ' ' * (num_chars_printed - len(transcript))
if not result.is_final:
sys.stdout.write(transcript + overwrite_chars + '\r')
sys.stdout.flush()
num_chars_printed = len(transcript)
else:
print(transcript + overwrite_chars)
if re.search(r'\b(exit|quit)\b', transcript, re.I):
print('Exiting..')
break
num_chars_printed = 0
def main():
language_code = 'en-US' # a BCP-47 language tag
client = speech.SpeechClient()
config = types.RecognitionConfig(
encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=RATE,
language_code=language_code)
streaming_config = types.StreamingRecognitionConfig(
config=config,
interim_results=True)
with MicrophoneStream(RATE, CHUNK) as stream:
audio_generator = stream.generator()
requests = (types.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator)
responses = client.streaming_recognize(streaming_config, requests)
listen_print_loop(responses)
if __name__ == '__main__':
main()