My Part¶
My task is composed of creating an application that connects the microcontroller to the phone with bluetooth and transmitting sensor readings to the application.
The bluetooth communication¶
With the help of my friend Ali, we have successfully done data transfer and receive using an ESP32 as a sample Code:
########## DIGITAL MANUFACTURING ##########
# PIKACHU Project
# Authors: Miguel Angel Guzman
# Kadriye Nur Bakirci
###########################################
########## IMPORT REQUIRED LIBRARIES ##########
import bluetooth
from ble_uart_peripheral import BLEUART
from machine import Pin, ADC
import time
# Set up ADC for reading sensor value
sensor_pin = 34 # GPIO pin connected to the sensor
adc = ADC(Pin(sensor_pin)) # Initialize ADC on the sensor pin
adc.atten(ADC.ATTN_0DB) # Set attenuation to 0 dB (adjust if needed)
# Define a threshold for detecting if the sensor is connected
DISCONNECT_THRESHOLD = 10 # This is an example value; adjust based on your sensor
# Create BLE object
ble = bluetooth.BLE()
# Open UART session for BLE
uart = BLEUART(ble)
# Define ISR for UART input on BLE connection
def on_rx(data):
# Read UART string, AppInventor sends raw bytes
uart_in = data # read the message received from the Smartphone via Bluetooth
try:
message = uart_in.decode('utf-8') # decode bytes to string
except Exception as e:
print("Error decoding message:", e)
return
print("UART IN: ", message) # display the message received from the Smartphone on the Thonny console
# Example: Process message commands if needed
if message == 'READ_SENSOR':
send_sensor_value()
def send_sensor_value():
# Read the sensor value
sensor_value = adc.read() # Read the analog value from the sensor
# Check if the sensor value indicates a disconnection
if sensor_value < DISCONNECT_THRESHOLD:
# Send "NOT AVAILABLE" if the sensor value is below the threshold
uart.write('SENSOR_VALUE:NOT AVAILABLE\n')
else:
# Send the sensor value over UART
uart.write(f'SENSOR_VALUE:{sensor_value}\n') # Send the data in a readable format
# Map ISR to UART read interrupt
uart.irq(handler=on_rx)
# Main loop
while True:
# Periodically read and send sensor data
time.sleep(5) # Adjust the interval as needed
send_sensor_value()
The result:
Creating the application using MIT app inventor¶
MIT App Inventor is a visual programming environment that allows individuals, even those without extensive programming experience, to create mobile applications for Android devices. It’s particularly well-suited for creating simple and prototype-level applications that involve interactions with hardware devices like the ESP32
Here are the steps to create the application:
-
Create a new project in MIT App Inventor
-
Add the BluetoothClient and BluetoothLE components to your project
-
Declare the required permissions for your android device in the AndroidManifest.xml, for my app I need ACCESS_FINE_LOCATION, BLUETOOTH_SCAN, BLUETOOTH_CONNECT
-
Put the necessary blocks to connect the smartphone to your device
-
Design a simple interface with a label to replace the text with sensor data
-
Use programming blocks to send Bluetooth commands to the device when its connected to the smartphone
-
Load the MicroPython code onto your device.
-
Install the MIT App Inventor app on your smartphone and install the app you created.
-
Pair and connect your smartphone with the device through Bluetooth.
-
Open the app and connect the smartphone to your device and see the value changing This is how the application looks like Below is the block construction for MIT app inventor
We encountered issues with the Bluetooth connection due to differences in UUID configuration between the ESP32 and the smartwatch. Fortunately, we were able to resolve the problem.
This is the code used to connect through bluetooth and get sensor values:
from machine import Pin, SPI, SoftI2C
from utime import ticks_diff, ticks_us, ticks_ms, sleep
from ubluetooth import BLE, UUID, FLAG_READ, FLAG_WRITE, FLAG_NOTIFY
import gc9a01py as gc9a01
from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM
from fonts.romfonts import vga2_bold_16x32 as font4
error = True
# Global variables for BLE characteristic handles and connection status
char_handles = {}
connected = False
ble = BLE()
class HeartRateMonitor:
"""A simple heart rate monitor that uses a moving window to smooth the signal and find peaks."""
def __init__(self, sample_rate=100, window_size=10, smoothing_window=5):
self.sample_rate = sample_rate
self.window_size = window_size
self.smoothing_window = smoothing_window
self.samples = []
self.timestamps = []
self.filtered_samples = []
def add_sample(self, sample):
"""Add a new sample to the monitor."""
timestamp = ticks_ms()
self.samples.append(sample)
self.timestamps.append(timestamp)
# Apply smoothing
if len(self.samples) >= self.smoothing_window:
smoothed_sample = (
sum(self.samples[-self.smoothing_window:]) / self.smoothing_window
)
self.filtered_samples.append(smoothed_sample)
else:
self.filtered_samples.append(sample)
# Maintain the size of samples and timestamps
if len(self.samples) > self.window_size:
self.samples.pop(0)
self.timestamps.pop(0)
self.filtered_samples.pop(0)
def find_peaks(self):
"""Find peaks in the filtered samples."""
peaks = []
if len(self.filtered_samples) < 3:
return peaks
# Calculate dynamic threshold based on the min and max of the recent window of filtered samples
recent_samples = self.filtered_samples[-self.window_size:]
min_val = min(recent_samples)
max_val = max(recent_samples)
threshold = (min_val + (max_val - min_val) * 0.5)
for i in range(1, len(self.filtered_samples) - 1):
if (
self.filtered_samples[i] > threshold
and self.filtered_samples[i - 1] < self.filtered_samples[i]
and self.filtered_samples[i] > self.filtered_samples[i + 1]
):
peak_time = self.timestamps[i]
peaks.append((peak_time, self.filtered_samples[i]))
return peaks
def calculate_heart_rate(self):
"""Calculate the heart rate in beats per minute (BPM)."""
peaks = self.find_peaks()
if len(peaks) < 2:
return None # Not enough peaks to calculate heart rate
# Calculate the average interval between peaks in milliseconds
intervals = []
for i in range(1, len(peaks)):
interval = ticks_diff(peaks[i][0], peaks[i - 1][0])
intervals.append(interval)
average_interval = sum(intervals) / len(intervals)
# Convert intervals to heart rate in beats per minute (BPM)
heart_rate = 60000 / average_interval
return heart_rate
def setup_display():
"""Initialize the GC9A01 display."""
spi = SPI(2, baudrate=80000000, polarity=0, sck=Pin(10), mosi=Pin(11))
tft = gc9a01.GC9A01(
spi, dc=Pin(8, Pin.OUT), cs=Pin(9, Pin.OUT),
reset=Pin(14, Pin.OUT), backlight=Pin(2, Pin.OUT),
rotation=0
)
return tft
def update_display(tft, heart_rate):
"""Update the display with the current heart rate."""
tft.fill(gc9a01.BLACK)
line = tft.height // 2
col = tft.width // 3
if heart_rate is not None:
display_text = "HR: {:.0f} BPM".format(heart_rate)
else:
display_text = "No HR Data"
tft.text(font4, display_text, col, line, gc9a01.WHITE, gc9a01.RED)
if error:
tft.text(font4, "warning", col, 0, gc9a01.WHITE, gc9a01.RED)
# Callback function to handle BLE events
def bt_callback(event, data):
global connected
if event == 1: # Connection event
print("Connected")
connected = True
elif event == 2: # Disconnection event
print("Disconnected")
connected = False
start_advertising()
# Function to set up BLE service and multiple characteristics
def setup_ble_service():
global char_handles
ble.active(True)
ble.irq(bt_callback)
# Define a service UUID
service_uuid = UUID('19b10000-e8f2-537e-4f6c-d104768a1214')
# Define characteristics UUIDs and properties
char_uuids = [
UUID('19b10001-e8f2-537e-4f6c-d104768a1214'),
UUID('19b10002-e8f2-537e-4f6c-d104768a1214'),
UUID('19b10003-e8f2-537e-4f6c-d104768a1214')
]
char_properties = FLAG_READ | FLAG_WRITE | FLAG_NOTIFY
# Create the characteristics
chars = [(char_uuid, char_properties) for char_uuid in char_uuids]
# Register the service with its characteristics
service = (service_uuid, chars)
handles = ble.gatts_register_services((service,))
# Store the characteristic handles
char_handles['char1'] = handles[0][0]
char_handles['char2'] = handles[0][1]
char_handles['char3'] = handles[0][2]
# Function to start BLE advertising
def start_advertising():
adv_name = 'ESP32-BLE'
adv_data = bytearray(b'\x02\x01\x06' + bytes([len(adv_name) + 1, 0x09]) + adv_name.encode())
ble.gap_advertise(100, adv_data)
print("Advertising as", adv_name)
# Function to send data through a specified characteristic
def send_data(char_key, data):
global char_handles, connected
if connected and char_key in char_handles:
try:
handle = char_handles[char_key]
data_bytes = data.encode('utf-8')
ble.gatts_notify(0, handle, data_bytes)
print(f"Sent data to {char_key}: {data}")
except OSError as e:
print(f"Failed to send data: {e}")
def main():
# Setup Bluetooth and start advertising
setup_ble_service()
start_advertising()
# Initialize I2C and sensor
i2c = SoftI2C(sda=Pin(15), scl=Pin(16), freq=400000)
sensor = MAX30102(i2c=i2c)
if sensor.i2c_address not in i2c.scan() or not sensor.check_part_id():
print("Sensor not found or not recognized.")
return
sensor.setup_sensor()
sensor.set_sample_rate(400)
sensor.set_fifo_average(8)
sensor.set_active_leds_amplitude(MAX30105_PULSE_AMP_MEDIUM)
# Initialize heart rate monitor and display
hr_monitor = HeartRateMonitor(sample_rate=50, window_size=150)
tft = setup_display()
ref_time = ticks_ms()
while True:
sensor.check()
if sensor.available():
ir_reading = sensor.pop_ir_from_storage()
hr_monitor.add_sample(ir_reading)
if ticks_diff(ticks_ms(), ref_time) / 1000 > 2:
heart_rate = hr_monitor.calculate_heart_rate()
update_display(tft, heart_rate)
if heart_rate:
send_data('char3', f"HR: {int(heart_rate)} BPM") # Send to char1
ref_time = ticks_ms()
if __name__ == "__main__":
main()
Final Product¶
In the end after Design is printed using 3D printer, and the strap was created using moulding and casting, and the application is finished, we assemble the final product and install the app and test if both the sensor and the app work.
-
Final Product assembled
-
test the sensor and see if its working
- check the app and see if sensor data is being transmitted