Skip to content

My Part

My task is composed of creating an application that connects the microcontroller to the phone with bluetooth and transmitting sensor readings to the application.

The bluetooth communication

With the help of my friend Ali, we have successfully done data transfer and receive using an ESP32 as a sample Code:

########## DIGITAL MANUFACTURING ##########
# PIKACHU Project
# Authors: Miguel Angel Guzman
#          Kadriye Nur Bakirci
###########################################

########## IMPORT REQUIRED LIBRARIES ##########

import bluetooth
from ble_uart_peripheral import BLEUART
from machine import Pin, ADC
import time

# Set up ADC for reading sensor value
sensor_pin = 34  # GPIO pin connected to the sensor
adc = ADC(Pin(sensor_pin))  # Initialize ADC on the sensor pin
adc.atten(ADC.ATTN_0DB)  # Set attenuation to 0 dB (adjust if needed)

# Define a threshold for detecting if the sensor is connected
DISCONNECT_THRESHOLD = 10  # This is an example value; adjust based on your sensor

# Create BLE object
ble = bluetooth.BLE()

# Open UART session for BLE
uart = BLEUART(ble)

# Define ISR for UART input on BLE connection
def on_rx(data):
    # Read UART string, AppInventor sends raw bytes
    uart_in = data  # read the message received from the Smartphone via Bluetooth
    try:
        message = uart_in.decode('utf-8')  # decode bytes to string
    except Exception as e:
        print("Error decoding message:", e)
        return

    print("UART IN: ", message)  # display the message received from the Smartphone on the Thonny console

    # Example: Process message commands if needed
    if message == 'READ_SENSOR':
        send_sensor_value()

def send_sensor_value():
    # Read the sensor value
    sensor_value = adc.read()  # Read the analog value from the sensor

    # Check if the sensor value indicates a disconnection
    if sensor_value < DISCONNECT_THRESHOLD:
        # Send "NOT AVAILABLE" if the sensor value is below the threshold
        uart.write('SENSOR_VALUE:NOT AVAILABLE\n')
    else:
        # Send the sensor value over UART
        uart.write(f'SENSOR_VALUE:{sensor_value}\n')  # Send the data in a readable format

# Map ISR to UART read interrupt
uart.irq(handler=on_rx)

# Main loop
while True:
    # Periodically read and send sensor data
    time.sleep(5)  # Adjust the interval as needed
    send_sensor_value()

The result:

Creating the application using MIT app inventor

MIT App Inventor is a visual programming environment that allows individuals, even those without extensive programming experience, to create mobile applications for Android devices. It’s particularly well-suited for creating simple and prototype-level applications that involve interactions with hardware devices like the ESP32

Here are the steps to create the application:

  • Create a new project in MIT App Inventor

  • Add the BluetoothClient and BluetoothLE components to your project

  • Declare the required permissions for your android device in the AndroidManifest.xml, for my app I need ACCESS_FINE_LOCATION, BLUETOOTH_SCAN, BLUETOOTH_CONNECT

  • Put the necessary blocks to connect the smartphone to your device

  • Design a simple interface with a label to replace the text with sensor data

  • Use programming blocks to send Bluetooth commands to the device when its connected to the smartphone

  • Load the MicroPython code onto your device.

  • Install the MIT App Inventor app on your smartphone and install the app you created.

  • Pair and connect your smartphone with the device through Bluetooth.

  • Open the app and connect the smartphone to your device and see the value changing This is how the application looks like Below is the block construction for MIT app inventor

We encountered issues with the Bluetooth connection due to differences in UUID configuration between the ESP32 and the smartwatch. Fortunately, we were able to resolve the problem.

This is the code used to connect through bluetooth and get sensor values:

from machine import Pin, SPI, SoftI2C
from utime import ticks_diff, ticks_us, ticks_ms, sleep
from ubluetooth import BLE, UUID, FLAG_READ, FLAG_WRITE, FLAG_NOTIFY
import gc9a01py as gc9a01
from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM
from fonts.romfonts import vga2_bold_16x32 as font4

error = True

# Global variables for BLE characteristic handles and connection status
char_handles = {}
connected = False
ble = BLE()

class HeartRateMonitor:
    """A simple heart rate monitor that uses a moving window to smooth the signal and find peaks."""

    def __init__(self, sample_rate=100, window_size=10, smoothing_window=5):
        self.sample_rate = sample_rate
        self.window_size = window_size
        self.smoothing_window = smoothing_window
        self.samples = []
        self.timestamps = []
        self.filtered_samples = []

    def add_sample(self, sample):
        """Add a new sample to the monitor."""
        timestamp = ticks_ms()
        self.samples.append(sample)
        self.timestamps.append(timestamp)

        # Apply smoothing
        if len(self.samples) >= self.smoothing_window:
            smoothed_sample = (
                sum(self.samples[-self.smoothing_window:]) / self.smoothing_window
            )
            self.filtered_samples.append(smoothed_sample)
        else:
            self.filtered_samples.append(sample)

        # Maintain the size of samples and timestamps
        if len(self.samples) > self.window_size:
            self.samples.pop(0)
            self.timestamps.pop(0)
            self.filtered_samples.pop(0)

    def find_peaks(self):
        """Find peaks in the filtered samples."""
        peaks = []

        if len(self.filtered_samples) < 3:
            return peaks

        # Calculate dynamic threshold based on the min and max of the recent window of filtered samples
        recent_samples = self.filtered_samples[-self.window_size:]
        min_val = min(recent_samples)
        max_val = max(recent_samples)
        threshold = (min_val + (max_val - min_val) * 0.5)

        for i in range(1, len(self.filtered_samples) - 1):
            if (
                self.filtered_samples[i] > threshold
                and self.filtered_samples[i - 1] < self.filtered_samples[i]
                and self.filtered_samples[i] > self.filtered_samples[i + 1]
            ):
                peak_time = self.timestamps[i]
                peaks.append((peak_time, self.filtered_samples[i]))

        return peaks

    def calculate_heart_rate(self):
        """Calculate the heart rate in beats per minute (BPM)."""
        peaks = self.find_peaks()

        if len(peaks) < 2:
            return None  # Not enough peaks to calculate heart rate

        # Calculate the average interval between peaks in milliseconds
        intervals = []
        for i in range(1, len(peaks)):
            interval = ticks_diff(peaks[i][0], peaks[i - 1][0])
            intervals.append(interval)

        average_interval = sum(intervals) / len(intervals)

        # Convert intervals to heart rate in beats per minute (BPM)
        heart_rate = 60000 / average_interval

        return heart_rate

def setup_display():
    """Initialize the GC9A01 display."""
    spi = SPI(2, baudrate=80000000, polarity=0, sck=Pin(10), mosi=Pin(11))
    tft = gc9a01.GC9A01(
        spi, dc=Pin(8, Pin.OUT), cs=Pin(9, Pin.OUT),
        reset=Pin(14, Pin.OUT), backlight=Pin(2, Pin.OUT),
        rotation=0
    )
    return tft

def update_display(tft, heart_rate):
    """Update the display with the current heart rate."""
    tft.fill(gc9a01.BLACK)
    line = tft.height // 2
    col = tft.width // 3
    if heart_rate is not None:
        display_text = "HR: {:.0f} BPM".format(heart_rate)
    else:
        display_text = "No HR Data"
    tft.text(font4, display_text, col, line, gc9a01.WHITE, gc9a01.RED)
    if error:
        tft.text(font4, "warning", col, 0, gc9a01.WHITE, gc9a01.RED)

# Callback function to handle BLE events
def bt_callback(event, data):
    global connected
    if event == 1:  # Connection event
        print("Connected")
        connected = True
    elif event == 2:  # Disconnection event
        print("Disconnected")
        connected = False
        start_advertising()

# Function to set up BLE service and multiple characteristics
def setup_ble_service():
    global char_handles
    ble.active(True)
    ble.irq(bt_callback)

    # Define a service UUID
    service_uuid = UUID('19b10000-e8f2-537e-4f6c-d104768a1214')

    # Define characteristics UUIDs and properties
    char_uuids = [
        UUID('19b10001-e8f2-537e-4f6c-d104768a1214'),
        UUID('19b10002-e8f2-537e-4f6c-d104768a1214'),
        UUID('19b10003-e8f2-537e-4f6c-d104768a1214')
    ]
    char_properties = FLAG_READ | FLAG_WRITE | FLAG_NOTIFY

    # Create the characteristics
    chars = [(char_uuid, char_properties) for char_uuid in char_uuids]

    # Register the service with its characteristics
    service = (service_uuid, chars)
    handles = ble.gatts_register_services((service,))

    # Store the characteristic handles
    char_handles['char1'] = handles[0][0]
    char_handles['char2'] = handles[0][1]
    char_handles['char3'] = handles[0][2]

# Function to start BLE advertising
def start_advertising():
    adv_name = 'ESP32-BLE'
    adv_data = bytearray(b'\x02\x01\x06' + bytes([len(adv_name) + 1, 0x09]) + adv_name.encode())
    ble.gap_advertise(100, adv_data)
    print("Advertising as", adv_name)

# Function to send data through a specified characteristic
def send_data(char_key, data):
    global char_handles, connected
    if connected and char_key in char_handles:
        try:
            handle = char_handles[char_key]
            data_bytes = data.encode('utf-8')
            ble.gatts_notify(0, handle, data_bytes)
            print(f"Sent data to {char_key}: {data}")
        except OSError as e:
            print(f"Failed to send data: {e}")

def main():
    # Setup Bluetooth and start advertising
    setup_ble_service()
    start_advertising()

    # Initialize I2C and sensor
    i2c = SoftI2C(sda=Pin(15), scl=Pin(16), freq=400000)
    sensor = MAX30102(i2c=i2c)

    if sensor.i2c_address not in i2c.scan() or not sensor.check_part_id():
        print("Sensor not found or not recognized.")
        return

    sensor.setup_sensor()
    sensor.set_sample_rate(400)
    sensor.set_fifo_average(8)
    sensor.set_active_leds_amplitude(MAX30105_PULSE_AMP_MEDIUM)

    # Initialize heart rate monitor and display
    hr_monitor = HeartRateMonitor(sample_rate=50, window_size=150)
    tft = setup_display()

    ref_time = ticks_ms()

    while True:
        sensor.check()

        if sensor.available():
            ir_reading = sensor.pop_ir_from_storage()
            hr_monitor.add_sample(ir_reading)

        if ticks_diff(ticks_ms(), ref_time) / 1000 > 2:
            heart_rate = hr_monitor.calculate_heart_rate()
            update_display(tft, heart_rate)
            if heart_rate:
                send_data('char3', f"HR: {int(heart_rate)} BPM")  # Send to char1
            ref_time = ticks_ms()

if __name__ == "__main__":
    main()

Final Product

In the end after Design is printed using 3D printer, and the strap was created using moulding and casting, and the application is finished, we assemble the final product and install the app and test if both the sensor and the app work.

  • Final Product assembled

  • test the sensor and see if its working

  • check the app and see if sensor data is being transmitted


Last update: September 8, 2024