PyObjCとからCoreBluetoothを呼び出しWx2Beaconのデータを取得する。
この記事で紹介すること
この記事ではMacOSXのBluetoothライブラリである、CoreBluethoothをPythonから呼び出し、WxBeacon2(OMRON 2JCIE-BL01)から環境データを取得する方法について解説しています。 CoreBlutoothはObjectiveCのライブラリのため、PythonとObjectiveCのブリッジライブラリのPyObjCを利用しています。 以下の順番で説明します。
- WxBeacon2の概要+BLEの基礎
- PyObjeCのインストール
- WxBeacon2の読み出しプログラム
WxBeacon2
前回の記事PythonからBLEを制御するライブラリの調査でも記載しましたが、WxBeacon2はウェザーニュースから販売されているBLEインタフェースの環境センサです。OMRONの2JCIE-BL01とほぼ同等の製品になります。外観は以下のようになっており、WxBeacon2にはウェザーニュースさんのロゴがプリントされているのが特徴です。
センサとして取得できる項目は以下の通りです、
センサ名 | 単位 |
---|---|
温度センサ | ℃ |
湿度センサ | % |
気圧センサ | hPa |
照度センサ | lum |
紫外線センサ | ? |
騒音センサ(マイク) | dB |
またこれ以外の値としてバッテリーの値が取得できます。 WxBeacon2はBLEにおいて、ペリフェラルとして動作します。またはBeaconのブロードキャスターとしても動作します。今回はWxBeacon2をBLEペリフェラルとして接続します。 話は脱線しますが、BLEの通信について少し記載しておきます。
BLEの通信について
BLEの通信ではセントラルとペリフェラルという2つの役割があります。BLEを使うユースケースの例としてスマートフォンと活動量計を考えます。たいていの場合、スマートフォンがセントラル、活動量計がペリフェラルとなります。
セントラル
親機のようなイメージ。ペリフェラルを発見後接続要求を送り、データをやりとりを行う。また、ペリフェラルへNotificationの依頼を送れる。
ペリフェラル
アドバタイズと呼ばれるデータを送信しラントラルに発見させます。 ペリフェラルの機能は大分類のサービスと小分類のキャラクタリスティックで定義されています。さらに個々のキャラクタリスティックのデータに対してRead, Write, Notifyのどの操作が可能か定義されてます。キャラクタリスティックはデータが格納されているアドレスと考えるとすんなりわかります。
セントラルとペリフェラルの通信の流れ
通信は以下のような流れになります。
- ペリフェラルがアドバタイズと呼ばれるデータをブロードキャストする。
- ペリフェラルのアドバタイズを受信したセントラルが接続を要求する。
- ペリフェラルに接続したセントラルがペリフェラルのサービスとキャラクタリスティックを検索
- 目的のキャラクタリスティックにRead Write Notifyなどの処理を行う。
PyObjCのインストール
PyObjCはPythonからOSXのObjectiveCのAPIへアクセスするためのブリッジです。
インストール方法
pipからインストールできます。私の環境OSX 10.11.4 El Capitanではlibffiを入れ、パスを通す必要がありました。
$brew install pkg-config libffi $export PKG_CONFIG_PATH=/usr/local/Cellar/libffi/3.0.13/lib/pkgconfig/ $pip install pyobjc
ソースコード
PyObjCを利用してCoreBluetoothを呼び出し、WxBeacon2からデータを取得するまでのコードを解説します。以下にソースコードを置いています。
最初に必要なモジュールをインポートします。
#!/usr/bin/env python # encoding: utf-8 import struct from Foundation import * from PyObjCTools import AppHelper
from Foundation import *
にてCoreBluetoothを含むObjectiveCのライブラリを読み込みます。from PyObjeCTools import AppHelper
でアプリケーションを実行する
ヘルパー関数を読み込みます。
次にアプリケーションのメイン関数部分です。
if "__main__" == __name__: central_manager = CBCentralManager.alloc()#(1) central_manager.initWithDelegate_queue_options_(BleClass(), None, None)#(2) AppHelper.runConsoleEventLoop()#(3)
- (1) CBCentralManagerのインスタンスを取得
- (2) (1)で取得したインスタンスを初期化する。その際、コールバックを設定しているBleClassを設定
- (3) アプリケーションを実行すうスレッドを起動する。
wx2_service = CBUUID.UUIDWithString_(u'0C4C3000-7700-46F4-AA96-D5E974E32A54') wx2_characteristic_data = CBUUID.UUIDWithString_(u'0C4C3001-7700-46F4-AA96-D5E974E32A54') class BleClass(object): def centralManagerDidUpdateState_(self, manager): self.manager = manager manager.scanForPeripheralsWithServices_options_(None,None) #(1) def centralManager_didDiscoverPeripheral_advertisementData_RSSI_(self, manager, peripheral, data, rss): #(2) self.peripheral = peripheral if '8A783AEE-4277-4C3F-8382-ABFA4F6DB8B6' in repr(peripheral.UUID): print 'DeviceName' + peripheral.name() manager.connectPeripheral_options_(peripheral, None) manager.stopScan() def centralManager_didConnectPeripheral_(self, manager, peripheral): #(3) print repr(peripheral.UUID()) peripheral.setDelegate_(self) self.peripheral.discoverServices_([wx2_service]) def peripheral_didDiscoverServices_(self, peripheral, services): #(4) self.service = self.peripheral.services()[0] self.peripheral.discoverCharacteristics_forService_([wx2_characteristic_data], self.service) def peripheral_didDiscoverCharacteristicsForService_error_(self, peripheral, service, error): #(5) for characteristic in self.service.characteristics(): if characteristic.properties() == 18: peripheral.readValueForCharacteristic_(characteristic) break def peripheral_didWriteValueForCharacteristic_error_(self, peripheral, characteristic, error): print 'In error handler' print 'ERROR:' + repr(error) def peripheral_didUpdateNotificationStateForCharacteristic_error_(self, peripheral, characteristic, error): print "Notification handler" def peripheral_didUpdateValueForCharacteristic_error_(self, peripheral, characteristic, error):#(6) print repr(characteristic.value().bytes().tobytes()) value = characteristic.value().bytes().tobytes() temp = decode_value(value[1:3],0.01) print 'temprature:' + str(temp) humid = decode_value(value[3:5],0.01) print 'humidity:' + str(humid) lum = decode_value(value[5:7]) print 'lumix:' + str(lum) uvi = decode_value(value[9:7], 0.01) print 'UV index:' + str(uvi) atom = decode_value(value[9:11], 0.1) print 'Atom:' + str(atom) noise = decode_value(value[11:13], 0.01) print 'Noise:' + str(noise) disco = decode_value(value[13:15], 0.01) print 'Disco:' + str(disco) heat = decode_value(value[15:17], 0.01) print 'Heat:' + str(heat) batt = decode_value(value[17:19],0.001) print 'Battery:' + str(batt)
- (1) アプリケーション起動後、Bluetoothモジュールの初期化が完了したらペリフェラルのアドバタイズをスキャンするモードになります。
- (2) ペリフェラルが見つかったらUUIDと比較を行い、目的のペリフェラルか調べます。接続処理とスキャンを停止しています。
- (3) ペリフェラルとの接続が呼ばれたらこのメソッドが呼ばれます。続いてサービスの検索を実行しています。
- (4) サービスが見つかったら続けて、キャラクタリスティックの検索を行います。
- (5) 検索したキャラクタリスティックが見つかったらデータの取得要求を行います。
- (6) データを受信したら呼ばれるメソッドです。データは必ずバイト列で送られてくるので、データを分解し各センサの値に変換します。
以下のメソッドでWxBeacon2のデータをデコードします。
#Decoding sensor value from Wx2Beancon Data format. def decode_value(value, multi=1.0): if(len(value) != 2): return None lsb,msb = struct.unpack('BB',value) result = ((msb << 8) + lsb) * multi return result
まとめ
今回はCoreBluetoothをPythonから呼び出し、BLEからデータを読み出してみました。CoreBluetooth自体がわかりやすいAPIになっているため、比較的に簡単に実装することができました。 BLEの基礎やCoreBluetoothの詳細については以下の書籍をお勧めします。今回の実装でも一部参考にさせていただきました。
- 作者: 堤修一,松村礼央
- 出版社/メーカー: ソシム
- 発売日: 2015/03/23
- メディア: 単行本
- この商品を含むブログを見る
その他参考
WxBeaconのデータのパース方法
PyObjCとCoreBluetooth