From: AceVest Date: Mon, 15 Aug 2016 20:54:13 +0000 (+0800) Subject: ... X-Git-Url: http://zhaoyanbai.com/repos/%22http:/www.isc.org/icons/Bv9ARM.ch11.html?a=commitdiff_plain;h=1e216532cfa827ff977cf47d904a30a236bb911e;p=acecode.git ... --- diff --git a/diying/AlertMiBand.py b/diying/AlertMiBand.py new file mode 100755 index 0000000..d341e30 --- /dev/null +++ b/diying/AlertMiBand.py @@ -0,0 +1,414 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------ +# File Name: AlertMiBand.py +# Author: Zhao Yanbai +# 2016-08-07 19:15:05 Sunday CST +# Description: none +# ------------------------------------------------------------------------ + +import os +import re +import sys +import time +import thread +import random +import logging +import binascii +import subprocess +import datetime +from bluepy.btle import Scanner, DefaultDelegate +import bluepy.btle as btle +import mibanda + + + +SELF_MI_MAC = "C8:0F:10:6A:B1:F0" + +def stdoutencode(data): + retVal = None + + try: + data = data or "" + + # Reference: http://bugs.python.org/issue1602 + if IS_WIN: + output = data.encode(sys.stdout.encoding, "replace") + + if '?' in output and '?' not in data: + warnMsg = "cannot properly display Unicode characters " + warnMsg += "inside Windows OS command prompt " + warnMsg += "(http://bugs.python.org/issue1602). All " + warnMsg += "unhandled occurances will result in " + warnMsg += "replacement with '?' character. Please, find " + warnMsg += "proper character representation inside " + warnMsg += "corresponding output files. " + singleTimeWarnMessage(warnMsg) + + retVal = output + else: + retVal = data.encode(sys.stdout.encoding) + except: + retVal = data.encode(UNICODE_ENCODING) if isinstance(data, unicode) else data + + return retVal + +class ColorizingStreamHandler(logging.StreamHandler): + # color names to indices + color_map = { + 'black': 0, + 'red': 1, + 'green': 2, + 'yellow': 3, + 'blue': 4, + 'magenta': 5, + 'cyan': 6, + 'white': 7, + } + + # levels to (background, foreground, bold/intense) + level_map = { + logging.DEBUG: (None, 'blue', False), + logging.INFO: (None, 'green', False), + logging.WARNING: (None, 'yellow', False), + logging.ERROR: (None, 'red', False), + logging.CRITICAL: ('red', 'white', False) + } + csi = '\x1b[' + reset = '\x1b[0m' + disable_coloring = False + + @property + def is_tty(self): + isatty = getattr(self.stream, 'isatty', None) + return isatty and isatty() and not self.disable_coloring + + def emit(self, record): + try: + message = stdoutencode(self.format(record)) + stream = self.stream + + if not self.is_tty: + if message and message[0] == "\r": + message = message[1:] + stream.write(message) + else: + self.output_colorized(message) + stream.write(getattr(self, 'terminator', '\n')) + + self.flush() + except (KeyboardInterrupt, SystemExit): + raise + except IOError: + pass + except: + self.handleError(record) + + if not subprocess.mswindows: + def output_colorized(self, message): + self.stream.write(message) + else: + ansi_esc = re.compile(r'\x1b\[((?:\d+)(?:;(?:\d+))*)m') + + nt_color_map = { + 0: 0x00, # black + 1: 0x04, # red + 2: 0x02, # green + 3: 0x06, # yellow + 4: 0x01, # blue + 5: 0x05, # magenta + 6: 0x03, # cyan + 7: 0x07, # white + } + + def output_colorized(self, message): + parts = self.ansi_esc.split(message) + write = self.stream.write + h = None + fd = getattr(self.stream, 'fileno', None) + + if fd is not None: + fd = fd() + + if fd in (1, 2): # stdout or stderr + h = ctypes.windll.kernel32.GetStdHandle(-10 - fd) + + while parts: + text = parts.pop(0) + + if text: + write(text) + + if parts: + params = parts.pop(0) + + if h is not None: + params = [int(p) for p in params.split(';')] + color = 0 + + for p in params: + if 40 <= p <= 47: + color |= self.nt_color_map[p - 40] << 4 + elif 30 <= p <= 37: + color |= self.nt_color_map[p - 30] + elif p == 1: + color |= 0x08 # foreground intensity on + elif p == 0: # reset to default color + color = 0x07 + else: + pass # error condition ignored + + ctypes.windll.kernel32.SetConsoleTextAttribute(h, color) + + def colorize(self, message, record): + if record.levelno in self.level_map and self.is_tty: + bg, fg, bold = self.level_map[record.levelno] + params = [] + + if bg in self.color_map: + params.append(str(self.color_map[bg] + 40)) + + if fg in self.color_map: + params.append(str(self.color_map[fg] + 30)) + + if bold: + params.append('1') + + if params and message: + if message.lstrip() != message: + prefix = re.search(r"\s+", message).group(0) + message = message[len(prefix):] + else: + prefix = "" + + message = "%s%s" % (prefix, ''.join((self.csi, ';'.join(params), + 'm', message, self.reset))) + + return message + + def format(self, record): + message = logging.StreamHandler.format(self, record) + return self.colorize(message, record) + + +class CUSTOM_LOGGING : + IMPT = 2 + INFO = 3 + ERR = 1 + + +logging.addLevelName(CUSTOM_LOGGING.IMPT, "IMPT") +logging.addLevelName(CUSTOM_LOGGING.INFO, "INFO") +logging.addLevelName(CUSTOM_LOGGING.ERR, "ERR") + +LOGGER = logging.getLogger("AlertMiBandLog") +LOGGER_HANDLER = None +try: + disableColor = False + + for argument in sys.argv: + if "disable-col" in argument: + disableColor = True + break + + if disableColor: + LOGGER_HANDLER = logging.StreamHandler(sys.stdout) + else: + LOGGER_HANDLER = ColorizingStreamHandler(sys.stdout) + LOGGER_HANDLER.level_map[logging.getLevelName("IMPT")] = (None, "cyan", False) + LOGGER_HANDLER.level_map[logging.getLevelName("INFO")] = (None, "magenta", False) + LOGGER_HANDLER.level_map[logging.getLevelName("ERR")] = ("magenta", None, False) +except ImportError: + LOGGER_HANDLER = logging.StreamHandler(sys.stdout) + + +LOGGER_HANDLER = logging.StreamHandler(sys.stdout) +#LOGGER_HANDLER = logging.FileHandler('/root/AlertMiBand.{0}.log'.format(datetime.date.today().strftime('%Y%m%d'))) +FORMATTER = logging.Formatter("\r[%(asctime)s] [%(levelname)s] %(message)s", "%H:%M:%S") + +LOGGER_HANDLER.setFormatter(FORMATTER) +LOGGER.addHandler(LOGGER_HANDLER) +LOGGER.setLevel(logging.INFO) + +logger = LOGGER + +class ScanDelegate(DefaultDelegate): + def __init__(self): + DefaultDelegate.__init__(self) + + def handleDiscovery(self, dev, isNewDev, isNewData): + if isNewDev: + logger.info("Discovered device {0}".format(dev.addr)) + elif isNewData: + logger.info("Received new data from {0} ".format(dev.addr)) + + +alert_lock = thread.allocate_lock() + + +def GetNewDevInfo(addr) : + addr = str(addr) + alert_lock.acquire() + try : + device = mibanda.BandDevice(addr, "MI") + device.connect() + logger.info("Name: {0} Address:{1} Steps: {2}".format(device.getName(), device.getAddress(), device.getSteps())) + bi = device.getBatteryInfo() + di = device.getDeviceInfo() + logger.info("BatteryInfo. Status:{0} Level:{1} Charge Counter:{2} Last Charge:{3} DeviceInfo {4}".format(bi.status, bi.level, bi.charge_counter, bi.last_charged, di.firmware_version)) + device.setUserInfo(27182818+random.randint(0, 100), True, 25, 123, 45, 0, "Ace") + t= datetime.datetime.now() + datetime.timedelta(seconds=100) + device.setAlarm(enable=1, number=0, when=t, smart=0, repeat=0) + + alert_lock.release() + except Exception, e: + logger.info("Failed to Get New Device Info {0} Error {1}".format(addr, e)) + alert_lock.release() + return False + + + return True + +def AlertDevice(addr, level, msg) : + if level < 0 : + level = 0 + if level > 2 : + level = 2 + + alert_lock.acquire() + logger.info("To Alert Device {0} level {1} [ {2} ]".format(addr, level, msg)) + try : + p = btle.Peripheral(addr, btle.ADDR_TYPE_PUBLIC, 0) + Characteristics = p.getCharacteristics() + for c in Characteristics : + if c.uuid.getCommonName() == 'Alert Level' : + c.write(binascii.a2b_hex("{0:02}".format(level))) + + p.disconnect() + logger.info("Alerted Device {0} [ {1} ]".format(addr, msg)) + alert_lock.release() + except Exception, e: + logger.info("Failed to Alert Device {0} Error {1} [ {2} ]".format(addr, e, msg)) + alert_lock.release() + return False + + + return True + +def NotifySelf(level, msg="") : + AlertDevice(SELF_MI_MAC, level, msg) + +lock = thread.allocate_lock() +num = 0 +mi_macs = set() + +def ThreadFunc(name) : + global num + global logger + global mi_macs + while True : + time.sleep(1) + + lock.acquire() + alert_macs = list(mi_macs) + lock.release() + + if len(alert_macs) == 0 : + logger.info("{0}".format(str(mi_macs))) + logger.info("Device Set Empty") + continue + + logger.info(">ALERT THREAD TODO LIST {0}".format(str(alert_macs))) + + for m in alert_macs : + if not AlertDevice(m, 2, "Hack") : + lock.acquire() + #mi_macs.remove(m) + lock.release() + + + print "AlertThread Exit" + sys.exit(0) + + +def ScanMiBandThread(d) : + global mi_macs + global lock + while True : + try : + scanner = Scanner().withDelegate(ScanDelegate()) + devices = scanner.scan(3.0) + except Exception, e : + logger.info("Scan Failed {0}".format(e)) + time.sleep(1) + continue + + for dev in devices: + devAddr = dev.addr.upper(); + + if devAddr == SELF_MI_MAC : + print ">>>>>>>>>>>>>>>>>>> {0} ".format(devAddr) + continue + + if devAddr != '88:0F:10:50:BE:3E' : + #continue + pass + + if devAddr not in mi_macs : + logger.info("Discoveryed New Device {0} ({1}), RSSI={2} dB".format(devAddr, dev.addrType, dev.rssi)) + + bIsMiBand = False + for (adtype, desc, value) in dev.getScanData(): + logger.info(" {0} = {1} ".format(desc, value)) + if 'Complete Local Name' == desc and 'MI' in value : + bIsMiBand = True + + if not bIsMiBand : + logger.info("Not MiBand Device") + continue + + NotifySelf(1, "Notify New Device {0}".format(devAddr)) + + lock.acquire() + print("------------------------- Add New Device {0}".format(devAddr)) + mi_macs.add(devAddr) + lock.release() + time.sleep(5) + + + else : + logger.info("Skip Old Device {0} ({1}), RSSI={2} dB".format(devAddr, dev.addrType, dev.rssi)) + + time.sleep(1) + +def main() : + global logger + global mi_macs + + logger.info("Alert Tool is Running") + thread.start_new_thread(ScanMiBandThread, ('ThreadA', )) + thread.start_new_thread(ThreadFunc, ('ThreadB', )) + + + mi_infos = { } + + i = 0 + while True : + i += 1 + if i == 1 : + NotifySelf(2, "Program Started") + + time.sleep(1) + lock.acquire() + macs = list(mi_macs) + lock.release() + + print("MI INFO THREAD {0}".format(str(mi_infos))) + for m in macs : + if mi_infos.get(m, None) == None : + #if GetNewDevInfo('88:0F:10:50:BE:3E') : + if GetNewDevInfo(m) : + mi_infos[m] = 'Handled' + +if __name__ == "__main__" : + main()