dbusdetect.py 7.64 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
# -*- coding: utf-8 -*-

"""
 DBus Removable Mass Storage Detection
 (C) 2014 Martin Süfke
 License: GPLv3 or newer: http://www.gnu.org/licenses/gpl.html
 partly based on http://stackoverflow.com/questions/23244245/listing-details-of-usb-drives-using-python-and-udisk2
"""

import logging

if len(logging.root.handlers) == 0:
# SPE compatible logging format
  logging.basicConfig(level=logging.DEBUG,format='%(levelno)2d:%(funcName)s:%(message)s:File "%(filename)s", line %(lineno)d')
logger = logging.getLogger(__name__)

from pprint import pformat#, pprint
#from StorageObjects import Storage

import dbus
from dbus.mainloop.glib import DBusGMainLoop
dbus.mainloop.glib.threads_init()
DBusGMainLoop(set_as_default=True)
#dbus.set_default_main_loop()
bus = dbus.SystemBus()
ud_manager_obj = bus.get_object('org.freedesktop.UDisks2', '/org/freedesktop/UDisks2')
om = dbus.Interface(ud_manager_obj, 'org.freedesktop.DBus.ObjectManager')

def GetDriveId(DriveDict):
  """ Input a Drive dict (from key = value) returned from a GetManagedObjects() call
      Output: UDisks2.Drive.Id
  """
  try:
    return DriveDict[
      dbus.String(u'org.freedesktop.UDisks2.Drive')][
      dbus.String(u'Id')
      ]
  except: 
    return "no [org.freedesktop.UDisks2.Drive][Id] in "+pformat(DriveDict)


def GetDrive(BlockDict):
  """ Input a BlockDev dict (from key = value) returned from a GetManagedObjects() call
      Output: UDisks2.Drive dict
  """
  try:
    return BlockDict[dbus.String(u'org.freedesktop.UDisks2.Block')][dbus.String(u'Drive')]
  except: 
    return "no [org.freedesktop.UDisks2.Block][Drive] in "+pformat(BlockDict)

def ScanDevices(AllStorage = None, RequireRemovable = False):
  global om
  udisks_path_drives={}
  udisks_path_drives_removable={}
  udisks_path_blockdev={}
  
  ManagedObjects=om.GetManagedObjects()
  for k,v in ManagedObjects.iteritems():
    #print ""
    #print "Key:",k
    d=dict(v) 
    #print "dict:", isinstance(v, dict)
    udisks_is_blockdev = v.has_key(dbus.String(u'org.freedesktop.UDisks2.Block'))
    udisks_is_drive = v.has_key(dbus.String(u'org.freedesktop.UDisks2.Drive'))
    #print "Block?", udisks_is_blockdev
    if udisks_is_blockdev:
      udisks_path_blockdev[k]=v
      #pprint(d)
    elif udisks_is_drive:
      udisks_path_drives[k]=v
      if RequireRemovable == False or v.\
        get(dbus.String(u'org.freedesktop.UDisks2.Drive'), False).\
        get(dbus.String(u'Removable'),False):
        udisks_path_drives_removable[k]=v
      #pprint(d)
  
  # Removing all blockdevs that have no "link" to any of the drives selected
  udisks_path_blockdev_removable={}
  for b,d in udisks_path_blockdev.iteritems():
    blockdrive = d[dbus.String(u'org.freedesktop.UDisks2.Block')][dbus.String(u'Drive')]
    d['U2.Block.Drive'] = blockdrive
    if blockdrive in udisks_path_drives_removable:
      # See if there is filesystem(s) on it
      filesystem = d.get(dbus.String(u'org.freedesktop.UDisks2.Filesystem'))
      d['U2.Filesystem'] = filesystem
      blockdevices =  udisks_path_drives_removable[blockdrive].get('U2.BlockDevices', dict())
      blockdevices[b] = d
      udisks_path_drives_removable[blockdrive]['U2.BlockDevices'] = blockdevices
      udisks_path_blockdev_removable[b] = d
  
  if AllStorage is not None:
    for k,d in udisks_path_drives_removable.iteritems():
      AllStorage.AddDrive(k,d.get(dbus.String(u'org.freedesktop.UDisks2.Drive')), EjectByUDisks, PowerOffByUDisks)
    
    for k,d in udisks_path_blockdev_removable.iteritems():
      AllStorage.AddBlockDevice(k,d.get(dbus.String(u'org.freedesktop.UDisks2.Block')))
      FsDict = d.get(dbus.String(u'org.freedesktop.UDisks2.Filesystem'))
      if FsDict is not None:
        AllStorage.AddFileSystem(k,FsDict,MountByUDisks,UnmountByUDisks)

  # Output the scan results for the log
  # TODO: (re)move this section; output from storage AllStore
  logger.debug("""
    - ScanRemovableDevices() Results -
    """
  )
  for k,d in udisks_path_drives_removable.iteritems():
    logger.debug("Drive: %s", GetDriveId(d))
    #, "removable:", udisks_path_drives[d].\
    #  get(dbus.String(u'org.freedesktop.UDisks2.Drive'), False).\
    #  get(dbus.String(u'Removable'),False)
  #  print
    
  for k,d in udisks_path_blockdev_removable.iteritems():
    logger.debug("Blockdev: %s %s", k.rsplit('/',1)[1], GetDriveId(udisks_path_drives_removable[GetDrive(d)])) 
      
  logger.debug("""
    - ScanRemovableDevices() Results End-
    """
  )

def EjectByUDisks(UDisk2PathToDrive):
  """ Call with a UDisks2 path to a drive """
  global bus
  logger.debug("Eject %s", UDisk2PathToDrive)
  dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToDrive)
  fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Drive')
  logger.info(fs_intf.Eject({}))

def PowerOffByUDisks(UDisk2PathToDrive):
  """ Call with a UDisks2 path to a drive """
  global bus
  logger.debug("PowerOff %s", UDisk2PathToDrive)
  dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToDrive)
  fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Drive')
  logger.info(fs_intf.PowerOff({}))

def MountByUDisks(UDisk2PathToBlockDevice):
  """ Call with a UDisks2 path to a block device """
  global bus
  logger.debug("Mount %s", UDisk2PathToBlockDevice)
  dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToBlockDevice)
  fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Filesystem')
  logger.info(fs_intf.Mount({}))

def UnmountByUDisks(UDisk2PathToBlockDevice):
  """ Call with a UDisks2 path to a block device """
  global bus
  logger.debug("Unmount %s", UDisk2PathToBlockDevice)
  dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToBlockDevice)
  fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Filesystem')
  logger.info(fs_intf.Unmount({}))

#def UDisks2Signal_SignalReceived(*args, **kwargs):
#  print "UDisks2 Signal Received: {0!s} {1!s}\n".format(args,kwargs)

def InstallChangeReceiver(
      PropertiesChangedHandler=None,
      InterfaceAddedHander=None,
      InterfaceRemovedHandler=None,
      ):
        
  logger.debug(
    """Object Manager
  Path: %s 
  Intf: %s """,
    om.object_path, 
    om.dbus_interface
    )
#  bus.add_signal_receiver(handler_function=UDisks2Signal_SignalReceived, 
#                          interface_keyword='interface',
#                          member_keyword='member',
#                          sender_keyword='sender',
#                          destination_keyword='destination',
#                          path_keyword='path',
#                          message_keyword='message',
#                        )
  if callable(PropertiesChangedHandler):
    bus.add_signal_receiver(handler_function=PropertiesChangedHandler,
                          dbus_interface=dbus.String(u'org.freedesktop.DBus.Properties'),
                          path_keyword='path',
                          byte_arrays=True,
                          )

  if callable(InterfaceAddedHander):
    om.connect_to_signal(signal_name="InterfacesAdded", 
                       handler_function=InterfaceAddedHander,
#                       path_keyword='path',
                       byte_arrays=True,
                       )

  if callable(InterfaceRemovedHandler):
    om.connect_to_signal(signal_name="InterfacesRemoved", 
                       handler_function=InterfaceRemovedHandler,
 #                      path_keyword='path',
                       byte_arrays=True,
                       )
                       
  logger.debug("succcess")

if __name__ == "__main__":
#  logging.basicConfig(level=logging.WARN)
#  raise Exception("{0} cannot run as the main python program".format(__file__))
  logger.warn("Running {0} as a program. Will scan for devices and exit.")
  ScanDevices()

#end;