#!/usr/bin/python3
import re
import os
import sys
from time import sleep
import _thread as thread
from array import array
import pyaudio
import statistics as stat
from ._settings import Settings
from ._processor import SignalProcessor
[docs]class Listener():
"""Describes methods which are called by user for the initialisation of the PyAudio module to stream microphone input.
:param config: An object of :class:`Settings` which is used for configuring the module, defaults to None
:type config: class: :class:`Settings`
:param bool calibrate: If the flag is set, the chunk size is calibrated, defaults to True
:var config: Store the `Settings` object
:vartype config: class: `Settings`
:var input: Store the `PyAudio` object
:vartype input: class: `pyaudio.PyAudio`
:var stream: Open a new input audio stream and store the stream object
:vartype stream: class: `pyaudio.Stream`
:var int claps: Store current value of claps counted and initially set to `0`
:var lock: Store the thread lock
:vartype lock: thread.lock
:var processor: Store the `Processor` object
:vartype processor: class: `SignalProcessor`
"""
def __init__(self, config=None, calibrate=True):
"""Constructor method"""
self.config = config or Settings()
"""If the :attr:`config` parameter is ``None``, an object of :class:`Settings` is assigned"""
self.claps = 0
"""**default:** ``0``
This class property contains the current count of claps detected"""
self.lock = thread.allocate_lock()
self.device = Device(self.config, calibrate)
"""Stores an :class:`Device` object which initialise :obj:`PyAudio` with calibration and manages the audio interface"""
self.processor = SignalProcessor(method=self.config.method)
"""Initialised with an :class:`SignalProcessor` object using the signal processing method found inside :attr:`config`"""
self.confirm()
[docs] def clapWait(self, clap):
"""Start waiting for a small duration of time recursively until no more new claps are detected
:param int clap: Number of claps found at the time of wait initialised
"""
sleep(self.config.wait)
if self.claps > clap:
self.clapWait(self.claps)
[docs] def listenClaps(self, threadName):
"""This method runs on a child thread with :attr:`lock` when :attr:`claps` equals ``1`` and reset the class property :attr:`claps` to ``0`` when execution is finished
:param str threadName: Name of the child thread started
"""
with self.lock:
print("Waiting for claps...")
self.clapWait(self.claps)
action = 'on' + str(self.claps) + 'Claps'
if action in self.config.actions:
getattr(self.config, action)()
print("You clapped", self.claps, "times.\n")
self.claps = 0
def __printInfo(self):
print("Default Device\t\t:", self.device.defaultDevice['name'])
print("Channels\t\t:", self.config.channels)
print("Chunk size\t\t:", self.config.chunk_size, "bytes")
print("Rate\t\t\t:", self.config.rate, "Hz")
print("Clap wait\t\t:", self.config.wait, "sec")
print("Algorithm selected\t:", self.config.method.name)
print("Threshold Value\t\t:", self.config.method.value, "\n")
[docs] def confirm(self):
while True:
self.__printInfo()
ask = input('Confirm(c) / Recalibrate(r) / Manual(m) / Quit(q)?[c/r/m/q](default is c): ')
if re.search(r'^[Cc]$|^$', ask):
break
elif re.search(r'^[Rr]$', ask):
print()
self.device.calibrateBufferSize()
elif re.search(r'^[Mm]$', ask):
t_value = input('Threshold value: ')
self.config.method.value = int(t_value)
elif re.search(r'^[Qq]$', ask):
sys.exit(0)
else:
print("Invalid input. Try again.\n")
[docs] def start(self):
"""When this method is called, the listener start reading binary data from stream and sreach for claps inside the chunks of data using :class:`SignalProcessor` until :attr:`Settings.exit` flag is ``True``
:raises: ``KeyboardInterrupt``: If **Control + C** is pressed on keyboard
"""
try:
self.device.openStream()
print("\nClap detection started")
while not self.config.exit:
try:
data = self.device.readData()
except (OSError, IOError):
data = None
if self.processor.findClap(data):
self.claps += 1
if self.claps == 1 and not self.lock.locked():
thread.start_new_thread(
self.listenClaps, ("ListenClaps",))
except(KeyboardInterrupt, SystemExit):
pass
self.stop()
[docs] def stop(self):
"""When this method is called, the listener stop listening by closing the stream safely and terminating the connection"""
print("\rExiting")
self.device.closeStream()
del self.device
[docs]class Device:
"""Describes methods which are called by user for the initialisation of the PyAudio module to stream microphone input.
:param config: An object of :class:`Settings` which is used for configuring the module, defaults to None
:type config: class: :class:`Settings`
:param bool calibrate: If the flag is set, the chunk size is calibrated, defaults to True
:var config: Store the :class:`Settings` object
:vartype config: class: `Settings`
:var input: Store the `PyAudio` object
:vartype input: class: `PyAudio`
"""
def __init__(self, config, calibrate):
"""Constructor method"""
self.config = config
self.input = pyaudio.PyAudio()
self.maxSamples = []
os.system('clear')
self.__setInputDevice()
self.calibrateBufferSize(calibrate)
# sys.exit(0)
def __del__(self):
"""Terminate input connection from microphone when class object is deconstructed"""
self.input.terminate()
def __setInputDevice(self):
"""Test for host api and input device support. If the supported host api and input device are available, default input device information is set"""
if self.input.get_host_api_count() < 1:
print("No supported PortAudio Host APIs are found in your system")
sys.exit(1)
if self.input.get_device_count() < 1:
print("No input audio device is found in your system")
sys.exit(1)
self.defaultDevice = self.input.get_default_input_device_info()
self.config.channels = int(self.defaultDevice['maxInputChannels'])
self.config.rate = int(self.defaultDevice['defaultSampleRate'])
[docs] def readData(self):
"""Reads a single chunk of binary data from stream"""
return self.stream.read(self.config.chunk_size)
[docs] def openStream(self):
"""Open as binary stream to receive audio signals from microphone"""
self.stream = self.input.open(format=pyaudio.paInt16,
channels=self.config.channels,
rate=self.config.rate,
input=True,
frames_per_buffer=self.config.chunk_size)
[docs] def closeStream(self):
"""Close the audio stream"""
self.stream.stop_stream()
self.stream.close()
[docs] def calibrateBufferSize(self, enabled=True):
"""Calibrate the chunk size to a size which the user's system can process without any errors
:param bool enabled: If :obj:`True`, calibration is done. Otherwise, it is not calibrated
"""
if enabled:
calibrated = False
totalSamples = 400
newChunkSize = self.config.chunk_size
self.__printProgress(0, totalSamples)
while not calibrated:
try:
self.openStream()
for count in range(totalSamples):
data = self.stream.read(newChunkSize)
byte_stream = array('b', [0]) if data == None else data
maximum = max(array('h', byte_stream))
self.maxSamples.append(maximum)
self.__printProgress(count+1, totalSamples)
calibrated = True
except OSError as e:
if re.search(r'.+Input overflowed$', str(e)):
newChunkSize = int(newChunkSize / 2)
self.config.chunk_size = newChunkSize
self.closeStream()
self.setThreshold()
[docs] def setThreshold(self):
"""Set the threashold value to the most closest value where a clap is detected. (Not accurate)"""
maximum = max(self.maxSamples)
median = stat.median_high(self.maxSamples)
inter_value = median * 3.141592653589793
value = stat.mean([inter_value,maximum])
self.config.method.value = int(value)
def __printProgress(self, iteration, total):
"""Print progressbar"""
terminalSize = re.match("^[^0-9]*columns=([0-9]+), .*$",
str(os.get_terminal_size()))
length = int(int(terminalSize.group(1)) - 24) if terminalSize else 75
percent = ("{0:.1f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = '█' * filledLength + '_' * (length - filledLength)
if iteration != total:
sys.stdout.write(f'Calibrating: [{bar}] {percent}%\r')
else:
sys.stdout.write(f'Calibrating: [{bar}] {percent}%\nCalibration complete\n\n')
sys.stdout.flush()