分享一个MS4525DO气压传感器的驱动

MicroPython相关代码、库、工具
回复
dukeduck
帖子: 28
注册时间: 2020年 4月 25日 16:19

分享一个MS4525DO气压传感器的驱动

#1

帖子 dukeduck » 2021年 1月 5日 18:32

最近用到空速计,其核心是一个差分式气压传感器,第一次买的是MPXV7002DP(对应Arduplane里的APM2.5),输出为模拟量的电压信号。后来在TB上转了转,发现上述模拟量的空速计普遍缺货,现在大多都用I2C数字输出的MS4525DO-DS5AI001DP(对应PIX4),搜了下没找到Micropython的驱动,于是查了datasheet,参考了C++的驱动,写了个MPY版的驱动。这是我第一次写驱动,多指教。

下面代码使用MS4525DO-DS3AI001DP传感器,在ESP32 MicroPython V1.13上实测工作正常。

代码: 全选

class MS4525DO:
    """
    I2C driver for MS4525DO pressure sensor
    """

    # Specific Model of MS4525DO
    # e.g. for model 3AI 001D
    # Where:
    # '3' means 3.3v power supply
    # 'A' means Interface Type A
    # 'I' means I2C address is 'I'
    # '001' means the pressure range is 1 psi
    # 'D' means the pressure type is differential
    _INTERFACE_TYPE = ('A', 'B')
    _I2C_ADDRESS = {
        'I': 0x28,
        'J': 0x36,
        'K': 0x46,
        '0': 0x48,
        '1': 0x49,
        '2': 0x4A,
        '3': 0x4B,
        '4': 0x4C,
        '5': 0x4D,
        '6': 0x4E,
        '7': 0x4F,
        '8': 0x50,
        '9': 0x51,
    }
    _PRESSURE_RANGE = {
        '001': 1,
        '002': 2,
        '005': 5,
        '015': 15,
        '030': 30,
        '050': 50,
        '100': 100,
        '150': 150,
    }

    # Pressure transfer function per Datasheet
    def _PSI_TYPE_A(self, output_decimal):
        return (output_decimal - 0.1 * 16383) * (self._pres_max - self._pres_min) / (0.8 * 16383) + self._pres_min

    def _PSI_TYPE_B(self, output_decimal):
        return (output_decimal - 0.05 * 16383) * (self._pres_max - self._pres_min) / (0.9 * 16383) + self._pres_min

    # Vacuum series pressure transfer function per Datasheet
    def _PSI_TYPE_A_VAC(self, output_decimal):
        return -1 * ((output_decimal - 0.1 * 16383) * 15 / (0.8 * 16383) - self._pres_max)

    def _PSI_TYPE_B_VAC(self, output_decimal):
        return -1 * ((output_decimal - 0.05 * 16383) * 15 / (0.9 * 16383) - self._pres_max)

    # Temperature transfer function per Datasheet
    @staticmethod
    def _TEMP_CELSIUS(output_decimal):
        return output_decimal * (150 - (-50)) / 2047 + (-50)

    def __init__(self, i2c_obj, address=None, interface_type='A', pressure_range='001', pressure_type='D'):
        """
        :param i2c_obj: I2C class; I2C bus
        :param address: str or int or None; Sensor I2C address, pls refer to the datasheet
        :param interface_type: str; Sensor interface type, pls refer to the datasheet
        :param pressure_range: str or int; Sensor pressure range, pls refer to the datasheet
        """
        self._i2c = i2c_obj
        self._addr = self._get_i2c_addr(address)
        self._pres_range = self._get_pres_range(pressure_range)
        self._pres_min, self._pres_max, self._is_vacuum = self._get_pres_min_max(pressure_type)
        self._pres_func = self._get_pres_func(interface_type)
        self._data_buf = bytearray(4)  # buffer used to store raw data readings

    def _get_i2c_addr(self, addr):
        if addr:
            if isinstance(addr, str):
                if addr.upper() in self._I2C_ADDRESS:
                    return self._I2C_ADDRESS.get(addr.upper())
            elif isinstance(addr, int):
                if str(addr) in self._I2C_ADDRESS:
                    return self._I2C_ADDRESS.get(str(addr))
                if addr in self._I2C_ADDRESS.values():
                    return addr
            raise Exception('Error: Incorrect I2C address.')
        else:
            _scanned_addrs = self._i2c.scan()
            _possible_addr = set(_scanned_addrs).intersection(set(self._I2C_ADDRESS.values()))
            if len(_possible_addr) == 0:
                raise Exception('Error: MS4525DO device not found.')
            elif len(_possible_addr) == 1:
                return _possible_addr.pop()
            else:
                raise Exception('Error: Multiple devices found.  Please specify the address manually.')

    def _get_pres_func(self, interface_type):
        if isinstance(interface_type, str):
            intf_type = interface_type.upper()
            if intf_type in self._INTERFACE_TYPE:
                func = '_PSI_TYPE_' + intf_type
                if self._is_vacuum:
                    func = func + '_VAC'
                if hasattr(self, func) and callable(getattr(self, func)):
                    return getattr(self, func)
        raise Exception('Error: Incorrect interface type.')

    def _get_pres_range(self, pressure_range):
        if isinstance(pressure_range, str):
            if pressure_range in self._PRESSURE_RANGE:
                return self._PRESSURE_RANGE.get(pressure_range)
        elif isinstance(pressure_range, int):
            if pressure_range in self._PRESSURE_RANGE.values():
                return pressure_range
        else:
            raise Exception('Error: Incorrect pressure range setting.')

    def _get_pres_min_max(self, pressure_type):
        if isinstance(pressure_type, str):
            pres_type = pressure_type.upper()
            if pres_type == 'A':
                return 0, self._pres_range, False
            elif pres_type == 'D':
                return -1 * self._pres_range, self._pres_range, False
            elif pres_type == 'G':
                return 0, self._pres_range, False
            elif pres_type == 'V':
                return -15, 0, True
            elif pres_type == 'C':
                return -15, self._pres_range, False
            else:
                raise Exception('Error: Incorrect pressure type.')

    def read_raw(self):
        """
        Read raw decimal output
        :return: tuple(int, int); raw pressure decimal output, raw temperature decimal output
        """
        self._i2c.readfrom_into(self._addr, self._data_buf)
        psi_h = self._data_buf[0]
        psi_l = self._data_buf[1]
        temp_h = self._data_buf[2]
        temp_l = self._data_buf[3]
        raw_psi_output_decimal = psi_h << 8 | psi_l
        raw_temp_output_decimal = temp_h << 3 | temp_l >> 5
        return raw_psi_output_decimal, raw_temp_output_decimal

    def read(self):
        """
        Read pressure in PSI & temperature in Celsius
        :return: tuple(float, float); pressure reading in PSI, temperature reading in Celsius
        """
        raw_psi, raw_temp = self.read_raw()
        temp = self._TEMP_CELSIUS(raw_temp)
        psi = self._pres_func(raw_psi)
        return psi, temp

    def read_psi(self):
        """
        Read pressure in PSI
        :return: float;
        """
        psi, _ = self.read()
        return psi

    def read_pa(self):
        """
        Read pressure in Pa
        :return: float;
        """
        return self.read_psi() * 6894.75729

    def read_hpa(self):
        """
        Read pressure in hPa
        :return: float;
        """
        return self.read_pa() / 100

    def read_temp_c(self):
        """
        Read temperature in Celsius
        :return: float;
        """
        _, temp = self.read()
        return temp

    def read_temp_f(self):
        """
        Read temperature in Fahrenheit
        :return: float
        """
        return self.read_temp_c() * 1.8 + 32
 

头像
shaoziyang
帖子: 1739
注册时间: 2019年 10月 21日 13:48

Re: 分享一个MS4525DO气压传感器的驱动

#2

帖子 shaoziyang » 2021年 1月 5日 21:31

:DING
 

回复

  • 随机主题
    回复总数
    阅读次数
    最新文章