# -*- coding: utf-8 -*- """ DBus Removable Mass Storage Detection (C) 2014 Martin Süfke License: GPLv3 or newer: http://www.gnu.org/licenses/gpl.html partly based on http://stackoverflow.com/questions/23244245/listing-details-of-usb-drives-using-python-and-udisk2 """ import logging if len(logging.root.handlers) == 0: # SPE compatible logging format logging.basicConfig(level=logging.DEBUG,format='%(levelno)2d:%(funcName)s:%(message)s:File "%(filename)s", line %(lineno)d') logger = logging.getLogger(__name__) from pprint import pformat#, pprint #from StorageObjects import Storage import dbus from dbus.mainloop.glib import DBusGMainLoop dbus.mainloop.glib.threads_init() DBusGMainLoop(set_as_default=True) #dbus.set_default_main_loop() bus = dbus.SystemBus() ud_manager_obj = bus.get_object('org.freedesktop.UDisks2', '/org/freedesktop/UDisks2') om = dbus.Interface(ud_manager_obj, 'org.freedesktop.DBus.ObjectManager') def GetDriveId(DriveDict): """ Input a Drive dict (from key = value) returned from a GetManagedObjects() call Output: UDisks2.Drive.Id """ try: return DriveDict[ dbus.String(u'org.freedesktop.UDisks2.Drive')][ dbus.String(u'Id') ] except: return "no [org.freedesktop.UDisks2.Drive][Id] in "+pformat(DriveDict) def GetDrive(BlockDict): """ Input a BlockDev dict (from key = value) returned from a GetManagedObjects() call Output: UDisks2.Drive dict """ try: return BlockDict[dbus.String(u'org.freedesktop.UDisks2.Block')][dbus.String(u'Drive')] except: return "no [org.freedesktop.UDisks2.Block][Drive] in "+pformat(BlockDict) def ScanDevices(AllStorage = None, RequireRemovable = False): global om udisks_path_drives={} udisks_path_drives_removable={} udisks_path_blockdev={} ManagedObjects=om.GetManagedObjects() for k,v in ManagedObjects.iteritems(): #print "" #print "Key:",k d=dict(v) #print "dict:", isinstance(v, dict) udisks_is_blockdev = v.has_key(dbus.String(u'org.freedesktop.UDisks2.Block')) udisks_is_drive = v.has_key(dbus.String(u'org.freedesktop.UDisks2.Drive')) #print "Block?", udisks_is_blockdev if udisks_is_blockdev: udisks_path_blockdev[k]=v #pprint(d) elif udisks_is_drive: udisks_path_drives[k]=v if RequireRemovable == False or v.\ get(dbus.String(u'org.freedesktop.UDisks2.Drive'), False).\ get(dbus.String(u'Removable'),False): udisks_path_drives_removable[k]=v #pprint(d) # Removing all blockdevs that have no "link" to any of the drives selected udisks_path_blockdev_removable={} for b,d in udisks_path_blockdev.iteritems(): blockdrive = d[dbus.String(u'org.freedesktop.UDisks2.Block')][dbus.String(u'Drive')] d['U2.Block.Drive'] = blockdrive if blockdrive in udisks_path_drives_removable: # See if there is filesystem(s) on it filesystem = d.get(dbus.String(u'org.freedesktop.UDisks2.Filesystem')) d['U2.Filesystem'] = filesystem blockdevices = udisks_path_drives_removable[blockdrive].get('U2.BlockDevices', dict()) blockdevices[b] = d udisks_path_drives_removable[blockdrive]['U2.BlockDevices'] = blockdevices udisks_path_blockdev_removable[b] = d if AllStorage is not None: for k,d in udisks_path_drives_removable.iteritems(): AllStorage.AddDrive(k,d.get(dbus.String(u'org.freedesktop.UDisks2.Drive')), EjectByUDisks, PowerOffByUDisks) for k,d in udisks_path_blockdev_removable.iteritems(): AllStorage.AddBlockDevice(k,d.get(dbus.String(u'org.freedesktop.UDisks2.Block'))) FsDict = d.get(dbus.String(u'org.freedesktop.UDisks2.Filesystem')) if FsDict is not None: AllStorage.AddFileSystem(k,FsDict,MountByUDisks,UnmountByUDisks) # Output the scan results for the log # TODO: (re)move this section; output from storage AllStore logger.debug(""" - ScanRemovableDevices() Results - """ ) for k,d in udisks_path_drives_removable.iteritems(): logger.debug("Drive: %s", GetDriveId(d)) #, "removable:", udisks_path_drives[d].\ # get(dbus.String(u'org.freedesktop.UDisks2.Drive'), False).\ # get(dbus.String(u'Removable'),False) # print for k,d in udisks_path_blockdev_removable.iteritems(): logger.debug("Blockdev: %s %s", k.rsplit('/',1)[1], GetDriveId(udisks_path_drives_removable[GetDrive(d)])) logger.debug(""" - ScanRemovableDevices() Results End- """ ) def EjectByUDisks(UDisk2PathToDrive): """ Call with a UDisks2 path to a drive """ global bus logger.debug("Eject %s", UDisk2PathToDrive) dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToDrive) fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Drive') logger.info(fs_intf.Eject({})) def PowerOffByUDisks(UDisk2PathToDrive): """ Call with a UDisks2 path to a drive """ global bus logger.debug("PowerOff %s", UDisk2PathToDrive) dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToDrive) fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Drive') logger.info(fs_intf.PowerOff({})) def MountByUDisks(UDisk2PathToBlockDevice): """ Call with a UDisks2 path to a block device """ global bus logger.debug("Mount %s", UDisk2PathToBlockDevice) dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToBlockDevice) fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Filesystem') logger.info(fs_intf.Mount({})) def UnmountByUDisks(UDisk2PathToBlockDevice): """ Call with a UDisks2 path to a block device """ global bus logger.debug("Unmount %s", UDisk2PathToBlockDevice) dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToBlockDevice) fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Filesystem') logger.info(fs_intf.Unmount({})) #def UDisks2Signal_SignalReceived(*args, **kwargs): # print "UDisks2 Signal Received: {0!s} {1!s}\n".format(args,kwargs) def InstallChangeReceiver( PropertiesChangedHandler=None, InterfaceAddedHander=None, InterfaceRemovedHandler=None, ): logger.debug( """Object Manager Path: %s Intf: %s """, om.object_path, om.dbus_interface ) # bus.add_signal_receiver(handler_function=UDisks2Signal_SignalReceived, # interface_keyword='interface', # member_keyword='member', # sender_keyword='sender', # destination_keyword='destination', # path_keyword='path', # message_keyword='message', # ) if callable(PropertiesChangedHandler): bus.add_signal_receiver(handler_function=PropertiesChangedHandler, dbus_interface=dbus.String(u'org.freedesktop.DBus.Properties'), path_keyword='path', byte_arrays=True, ) if callable(InterfaceAddedHander): om.connect_to_signal(signal_name="InterfacesAdded", handler_function=InterfaceAddedHander, # path_keyword='path', byte_arrays=True, ) if callable(InterfaceRemovedHandler): om.connect_to_signal(signal_name="InterfacesRemoved", handler_function=InterfaceRemovedHandler, # path_keyword='path', byte_arrays=True, ) logger.debug("succcess") if __name__ == "__main__": # logging.basicConfig(level=logging.WARN) # raise Exception("{0} cannot run as the main python program".format(__file__)) logger.warn("Running {0} as a program. Will scan for devices and exit.") ScanDevices() #end;