CoolMasterのBACnetでreadPropertyMultipleについてWiresharkを通して調べてみた

はじめに

CoolMasterのBACnetのデータポイントから複数のデータポイントをreadさせたいとき、通常のやり方だとデータポイントを1つ取得するというのを複数回実行させて、1度の通信で1つ取得するということになる。しかし、データポイントを大量に取得させるためには、通信回数が多くなるので何かと不都合が出てきそうな気がする。

「readPropetyMultiple」というreadの方式で取得させれば複数のデータポイントを1度に取得できるということを学んだので、その違いについてWiresharkを活用してまとめてみた。

やりたいこと

電源のOn/OFF、設定温度、風量、運転モード

事務所のエアコンから上記のデータポイントから確認して、これら4点の情報をデータポイントから取得させる。singleで取得させた場合とmulti取得させた場合の通信の違いをWiresharkを通して確認する。

実行環境の構築

bacpypesを使用して実行しているため、Visual Studio Codeなどにbacpypesをインストールする必要があります。下記コマンドからbacpypesのインストールを実行してください。

pip install bacpypes

Wiresharkでフィルタの設定

bacnet && ip.addr == 192.168.1.26

フィルタの欄に上記のように入力を済ませておきます。フィルタで使用しているIPアドレスは通信させているPC側のIPアドレスです。この段階ではまだ何も出力されておらず、真っ白ですね。

coolmaster_bacnet-singleread.py

最初はsinglereadである、こちらのプログラムを実行させます。

import threading
import time
import warnings

from bacpypes.app import BIPSimpleApplication
from bacpypes.apdu import ReadPropertyRequest
from bacpypes.pdu import Address
from bacpypes.core import run, enable_sleeping, stop
from bacpypes.iocb import IOCB
from bacpypes.primitivedata import ObjectIdentifier, Real, Unsigned, Enumerated
from bacpypes.local.device import LocalDeviceObject

# --- BACnet 設定 ---
BACNET_DEVICE_IP = "192.168.1.100"
LOCAL_BACNET_IP = "192.168.1.26"
BACPYPES_PORT = 47807

ON_OFF_OBJECT_ID = "binaryValue:256"
SETPOINT_OBJECT_ID = "analogValue:256"
FAN_SPEED_OBJECT_ID = "multiStateValue:256"
MODE_OBJECT_ID = "multiStateValue:257"

# --- ローカルデバイス設定 ---
this_device = LocalDeviceObject(
    objectName="MyBACnetClient",
    objectIdentifier=599,
    maxApduLengthAccepted=1024,
    segmentationSupported="segmentedBoth",
    vendorIdentifier=15
)

warnings.filterwarnings("ignore", message="no signal handlers for child threads")

# --- BACnet クライアント定義 ---
class BACnetClient(BIPSimpleApplication):
    def __init__(self, local_address, device):
        super().__init__(device, local_address)

    def read_property(self, object_id_str, expected_type, label):
        request = ReadPropertyRequest(
            objectIdentifier=ObjectIdentifier(object_id_str),
            propertyIdentifier="presentValue"
        )
        request.pduDestination = Address(BACNET_DEVICE_IP)
        iocb = IOCB(request)
        self.request_io(iocb)

        def on_response(iocb):
            if iocb.ioError:
                print(f"[ERROR] {label} 読み取り失敗: {iocb.ioError}")
            else:
                try:
                    value = iocb.ioResponse.propertyValue.cast_out(expected_type)

                    if label == "電源 (on/off)":
                        status_str = "on" if int(value) else "off"
                        print(f"[INFO] {label}: {value} ({status_str})")

                    elif label == "温度 SetPoint":
                        print(f"[INFO] {label}: {value}°C")

                    elif label == "風量 fan speed":
                        speed_map = {
                            1: "弱", 2: "中", 3: "強", 4: "自動"
                        }
                        speed_str = speed_map.get(int(value), "不明")
                        print(f"[INFO] {label}: {value} ({speed_str})")

                    elif label == "運転モード mode":
                        mode_map = {
                            1: "冷房", 2: "暖房", 3: "冷暖自動", 4: "ドライ"
                        }
                        mode_str = mode_map.get(int(value), "不明")
                        print(f"[INFO] {label}: {value} ({mode_str})")

                    else:
                        print(f"[INFO] {label}: {value}")

                except Exception as e:
                    print(f"[ERROR] {label} 型変換失敗: {e}")

        iocb.add_callback(on_response)

    def read_all_properties(self):
        self.read_property(ON_OFF_OBJECT_ID, Enumerated, "電源 (on/off)")
        self.read_property(SETPOINT_OBJECT_ID, Real, "温度 SetPoint")
        self.read_property(FAN_SPEED_OBJECT_ID, Unsigned, "風量 fan speed")
        self.read_property(MODE_OBJECT_ID, Unsigned, "運転モード mode")

# --- BACnet 起動処理 ---
client = BACnetClient(Address(f"{LOCAL_BACNET_IP}:{BACPYPES_PORT}"), this_device)

def start_bacnet_loop():
    enable_sleeping()
    print("[INFO] BACnet イベントループ開始...")
    run()

thread = threading.Thread(target=start_bacnet_loop, daemon=True)
thread.start()

# --- メイン処理 ---
try:
    time.sleep(2)  # 初期化待ち
    print("[INFO] CoolMaster の状態を読み取り中...")
    client.read_all_properties()

    while True:
        time.sleep(1)

except KeyboardInterrupt:
    print("\n[INFO] ユーザーが停止しました。BACnetを終了します。")
    stop()

coolmaster_bacnet-singlereadの実行結果

プログラムを実行すると、データポイントから4つの情報を取得した情報がターミナル上に出力されます。次にWireshark側での内容を確認します。

データポイントを4回見に行ったので、192.168.1.100(CoolMaster)と192.168.1.26(PC側)で8つの通信ログが出力されましたね。readPropertyはデータポイントの1項目しか確認することができないので、これはmultiReadで実行されていないことが分かります。

coolmaster_bacnet-multiread.py

次に、multireadに対応させたプログラムを実行してみます。

from bacpypes.app import BIPSimpleApplication
from bacpypes.object import get_datatype, DeviceObject
from bacpypes.apdu import ReadPropertyMultipleRequest, ReadAccessSpecification, PropertyReference
from bacpypes.pdu import Address
from bacpypes.primitivedata import ObjectIdentifier
from bacpypes.core import run, stop
from bacpypes.iocb import IOCB

# --- LocalDeviceObject を定義(2系では自作) ---
class MyLocalDeviceObject(DeviceObject):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

# --- アドレス設定 ---
LOCAL_ADDRESS = Address("192.168.1.26")
TARGET_ADDRESS = Address("192.168.1.100")

# --- 読み取り対象 ---
OBJECTS = {
    ("binaryValue", 256): "電源 (on/off)",
    ("analogValue", 256): "温度 SetPoint",
    ("multiStateValue", 256): "風量 fan speed",
    ("multiStateValue", 257): "運転モード mode"
}

# --- デバイス定義 ---
local_device = MyLocalDeviceObject(
    objectIdentifier=("device", 599),
    objectName="MyBACnetClient",
    vendorIdentifier=15,
    maxApduLengthAccepted=1024,
    segmentationSupported='noSegmentation'
)

# --- アプリケーション作成 ---
app = BIPSimpleApplication(local_device, LOCAL_ADDRESS)

# --- 読み取り要求作成 ---
request = ReadPropertyMultipleRequest(destination=TARGET_ADDRESS)
request.listOfReadAccessSpecs = []

for (obj_type, obj_inst), label in OBJECTS.items():
    obj_id = ObjectIdentifier((obj_type, obj_inst))
    prop_ref = PropertyReference(propertyIdentifier="presentValue")
    access_spec = ReadAccessSpecification(
        objectIdentifier=obj_id,
        listOfPropertyReferences=[prop_ref]
    )
    request.listOfReadAccessSpecs.append(access_spec)

# --- リクエスト送信 ---
iocb = IOCB(request)
app.request_io(iocb)

# --- 応答処理と自動停止 ---
def on_response(iocb):
    if iocb.ioResponse:
        apdu = iocb.ioResponse
        for result in apdu.listOfReadAccessResults:
            obj_id = result.objectIdentifier
            label = OBJECTS.get(obj_id, str(obj_id))

            for prop_result in result.listOfResults:
                if prop_result.readResult.propertyAccessError is not None:
                    print(f"[ERROR] {label}: {prop_result.readResult.propertyAccessError}")
                else:
                    datatype = get_datatype(obj_id[0], prop_result.propertyIdentifier)
                    value = prop_result.readResult.propertyValue.cast_out(datatype)

                    if label == "電源 (on/off)":
                        status_map = {
                            0: "off", 1: "on",
                            "active": "on", "inactive": "off"
                        }
                        status_str = status_map.get(value, str(value))
                        print(f"[INFO] {label}: {value} ({status_str})")

                    elif label == "温度 SetPoint":
                        print(f"[INFO] {label}: {value}°C")

                    elif label == "風量 fan speed":
                        speed_map = {
                            1: "弱", 2: "中", 3: "強", 4: "自動"
                        }
                        try:
                            speed_str = speed_map.get(int(value), "不明")
                        except Exception:
                            speed_str = str(value)
                        print(f"[INFO] {label}: {value} ({speed_str})")

                    elif label == "運転モード mode":
                        mode_map = {
                            1: "冷房", 2: "暖房", 3: "冷暖自動", 4: "ドライ"
                        }
                        try:
                            mode_str = mode_map.get(int(value), "不明")
                        except Exception:
                            mode_str = str(value)
                        print(f"[INFO] {label}: {value} ({mode_str})")

                    else:
                        print(f"[INFO] {label}: {value}")
    else:
        print("[ERROR] No response received.")

    stop()

iocb.add_callback(on_response)

# --- メイン ---
if __name__ == "__main__":
    run()

coolmaster_bacnet-multiread.pyの実行結果

下側の出力がmultireadで実行したときの出力結果です。ここまでは表示が同じで何も変わりませんね。では、通信内容をWiresharkで確認してみます。

Infoの部分にreadPropertyMultipleと出力されていて、singleで実行していたときより圧倒的に通信ログの量が少ないことが分かります。1つのデバイスから複数のデータポイントを取得するときは、マルチリードの方式が適しているということになりそうですね。

ただし、マルチリードが取得可能なデータ量にも限度があるかと思いますので、その辺りは「APDUのサイズ」やmaxApduLengthAcceptedなどの設定が影響しているそうなので、詳しいところについては別の機会にまた勉強してみようかと思います。

大量のデータポイントをマルチリードで取得させたいときの簡易的な対応策として、時間差でマルチリードを複数回に分けて実行させるという方法があるようです。

まとめ

シングルリードとマルチリードで通信される内容についてWiresharkを通してログの比較をするということをやってみました。シンプルに1項目のみreadさせたいときはシングルリード、1度にまとめて何項目かのデータポイントを取得させたいときは、マルチリードが適しているということが分かりました。

「どっちがいい?」ではなく、環境に適した内容で実行させてあげるのが良さそうですね。