下面代码使用MS4525DO-DS3AI001DP传感器,在ESP32 MicroPython V1.13上实测工作正常。
代码: 全选
class MS4525DO:
"""
I2C driver for MS4525DO pressure sensor
"""
# Specific Model of MS4525DO
# e.g. for model 3AI 001D
# Where:
# '3' means 3.3v power supply
# 'A' means Interface Type A
# 'I' means I2C address is 'I'
# '001' means the pressure range is 1 psi
# 'D' means the pressure type is differential
_INTERFACE_TYPE = ('A', 'B')
_I2C_ADDRESS = {
'I': 0x28,
'J': 0x36,
'K': 0x46,
'0': 0x48,
'1': 0x49,
'2': 0x4A,
'3': 0x4B,
'4': 0x4C,
'5': 0x4D,
'6': 0x4E,
'7': 0x4F,
'8': 0x50,
'9': 0x51,
}
_PRESSURE_RANGE = {
'001': 1,
'002': 2,
'005': 5,
'015': 15,
'030': 30,
'050': 50,
'100': 100,
'150': 150,
}
# Pressure transfer function per Datasheet
def _PSI_TYPE_A(self, output_decimal):
return (output_decimal - 0.1 * 16383) * (self._pres_max - self._pres_min) / (0.8 * 16383) + self._pres_min
def _PSI_TYPE_B(self, output_decimal):
return (output_decimal - 0.05 * 16383) * (self._pres_max - self._pres_min) / (0.9 * 16383) + self._pres_min
# Vacuum series pressure transfer function per Datasheet
def _PSI_TYPE_A_VAC(self, output_decimal):
return -1 * ((output_decimal - 0.1 * 16383) * 15 / (0.8 * 16383) - self._pres_max)
def _PSI_TYPE_B_VAC(self, output_decimal):
return -1 * ((output_decimal - 0.05 * 16383) * 15 / (0.9 * 16383) - self._pres_max)
# Temperature transfer function per Datasheet
@staticmethod
def _TEMP_CELSIUS(output_decimal):
return output_decimal * (150 - (-50)) / 2047 + (-50)
def __init__(self, i2c_obj, address=None, interface_type='A', pressure_range='001', pressure_type='D'):
"""
:param i2c_obj: I2C class; I2C bus
:param address: str or int or None; Sensor I2C address, pls refer to the datasheet
:param interface_type: str; Sensor interface type, pls refer to the datasheet
:param pressure_range: str or int; Sensor pressure range, pls refer to the datasheet
"""
self._i2c = i2c_obj
self._addr = self._get_i2c_addr(address)
self._pres_range = self._get_pres_range(pressure_range)
self._pres_min, self._pres_max, self._is_vacuum = self._get_pres_min_max(pressure_type)
self._pres_func = self._get_pres_func(interface_type)
self._data_buf = bytearray(4) # buffer used to store raw data readings
def _get_i2c_addr(self, addr):
if addr:
if isinstance(addr, str):
if addr.upper() in self._I2C_ADDRESS:
return self._I2C_ADDRESS.get(addr.upper())
elif isinstance(addr, int):
if str(addr) in self._I2C_ADDRESS:
return self._I2C_ADDRESS.get(str(addr))
if addr in self._I2C_ADDRESS.values():
return addr
raise Exception('Error: Incorrect I2C address.')
else:
_scanned_addrs = self._i2c.scan()
_possible_addr = set(_scanned_addrs).intersection(set(self._I2C_ADDRESS.values()))
if len(_possible_addr) == 0:
raise Exception('Error: MS4525DO device not found.')
elif len(_possible_addr) == 1:
return _possible_addr.pop()
else:
raise Exception('Error: Multiple devices found. Please specify the address manually.')
def _get_pres_func(self, interface_type):
if isinstance(interface_type, str):
intf_type = interface_type.upper()
if intf_type in self._INTERFACE_TYPE:
func = '_PSI_TYPE_' + intf_type
if self._is_vacuum:
func = func + '_VAC'
if hasattr(self, func) and callable(getattr(self, func)):
return getattr(self, func)
raise Exception('Error: Incorrect interface type.')
def _get_pres_range(self, pressure_range):
if isinstance(pressure_range, str):
if pressure_range in self._PRESSURE_RANGE:
return self._PRESSURE_RANGE.get(pressure_range)
elif isinstance(pressure_range, int):
if pressure_range in self._PRESSURE_RANGE.values():
return pressure_range
else:
raise Exception('Error: Incorrect pressure range setting.')
def _get_pres_min_max(self, pressure_type):
if isinstance(pressure_type, str):
pres_type = pressure_type.upper()
if pres_type == 'A':
return 0, self._pres_range, False
elif pres_type == 'D':
return -1 * self._pres_range, self._pres_range, False
elif pres_type == 'G':
return 0, self._pres_range, False
elif pres_type == 'V':
return -15, 0, True
elif pres_type == 'C':
return -15, self._pres_range, False
else:
raise Exception('Error: Incorrect pressure type.')
def read_raw(self):
"""
Read raw decimal output
:return: tuple(int, int); raw pressure decimal output, raw temperature decimal output
"""
self._i2c.readfrom_into(self._addr, self._data_buf)
psi_h = self._data_buf[0]
psi_l = self._data_buf[1]
temp_h = self._data_buf[2]
temp_l = self._data_buf[3]
raw_psi_output_decimal = psi_h << 8 | psi_l
raw_temp_output_decimal = temp_h << 3 | temp_l >> 5
return raw_psi_output_decimal, raw_temp_output_decimal
def read(self):
"""
Read pressure in PSI & temperature in Celsius
:return: tuple(float, float); pressure reading in PSI, temperature reading in Celsius
"""
raw_psi, raw_temp = self.read_raw()
temp = self._TEMP_CELSIUS(raw_temp)
psi = self._pres_func(raw_psi)
return psi, temp
def read_psi(self):
"""
Read pressure in PSI
:return: float;
"""
psi, _ = self.read()
return psi
def read_pa(self):
"""
Read pressure in Pa
:return: float;
"""
return self.read_psi() * 6894.75729
def read_hpa(self):
"""
Read pressure in hPa
:return: float;
"""
return self.read_pa() / 100
def read_temp_c(self):
"""
Read temperature in Celsius
:return: float;
"""
_, temp = self.read()
return temp
def read_temp_f(self):
"""
Read temperature in Fahrenheit
:return: float
"""
return self.read_temp_c() * 1.8 + 32