Commit 364915c8 authored by moebius/ALUG's avatar moebius/ALUG Committed by Christopher Spinrath

Mass Storage Cloner for writing many USB sticks - 1st version

parent 042ff5ad
Mass Storage Cloner
(C)2014-2015 Martin Suefke
License: GPLv3 or newer, the GNU Public Licence, see http://www.gnu.org/licenses/gpl.html
Use this tool on a linux machine to write any image file to a target device selected in a GUI.
Prerequisites:
python 2.6 + Tk
xterm
pv
dd
Select the source file by editing 'write.sh'
Start the program with 'python gui.py'
First release 2015-03-26, version history in 'gui.py'
This diff is collapsed.
This diff is collapsed.
#!/bin/bash
#
# Mass Storage Cloner - blkwrite.sh
# (C)2015 Martin Suefke
# License: GPLv3 or newer
#
# writes source file $1 to block device $2
#
# DO NOT Customize this file. Try to customize write.sh first
#
set -e # break if error
function onerror() {
echo
echo "Trapped error while copying >$SRC< to >$TGT< Result code is \"$res\"".
echo -n "Press Enter to Exit this script ..."
read -n1
trap EXIT
exit 1
}
trap onerror ERR EXIT
echo "Writing >$1< to >$2<"
if [ $# -lt 2 ]
then
echo "Need 2 parameters: source target"
exit 1
fi
SRC="$1"
TGT="$2"
if [ ! -b "$TGT" ]
then
echo "need a blockdevice as parameter 2"
exit 2
fi
if [ "x${TGT%%[0-9]*}" == "x$TGT" ]
then
# have a "master" block device
echo "removing partitions from $TGT with partx"
partx -d "$TGT" || true
fi
echo "Start copy operation"
# live version
pv -trabe -B8m $SRC | dd bs=8M iflag=fullblock of="$TGT" oflag=sync
# debug version
#pv -trabe -B8m $SRC | dd bs=8M iflag=fullblock of=/dev/null oflag=sync
res="$?"
echo "Result code $res"
if [ "x${TGT%%[0-9]*}" == "x$TGT" ]
then
# have a "master" block device
echo "adding partitions for $TGT with partx"
partx -a "$TGT"
fi
if [ x$res == x0 ]
then
trap EXIT
else
onerror
fi
#end;
# -*- coding: utf-8 -*-
"""
DBus Removable Mass Storage Detection
(C) 2014 Martin Süfke
License: GPLv3 or newer: http://www.gnu.org/licenses/gpl.html
partly based on http://stackoverflow.com/questions/23244245/listing-details-of-usb-drives-using-python-and-udisk2
"""
import logging
if len(logging.root.handlers) == 0:
# SPE compatible logging format
logging.basicConfig(level=logging.DEBUG,format='%(levelno)2d:%(funcName)s:%(message)s:File "%(filename)s", line %(lineno)d')
logger = logging.getLogger(__name__)
from pprint import pformat#, pprint
#from StorageObjects import Storage
import dbus
from dbus.mainloop.glib import DBusGMainLoop
dbus.mainloop.glib.threads_init()
DBusGMainLoop(set_as_default=True)
#dbus.set_default_main_loop()
bus = dbus.SystemBus()
ud_manager_obj = bus.get_object('org.freedesktop.UDisks2', '/org/freedesktop/UDisks2')
om = dbus.Interface(ud_manager_obj, 'org.freedesktop.DBus.ObjectManager')
def GetDriveId(DriveDict):
""" Input a Drive dict (from key = value) returned from a GetManagedObjects() call
Output: UDisks2.Drive.Id
"""
try:
return DriveDict[
dbus.String(u'org.freedesktop.UDisks2.Drive')][
dbus.String(u'Id')
]
except:
return "no [org.freedesktop.UDisks2.Drive][Id] in "+pformat(DriveDict)
def GetDrive(BlockDict):
""" Input a BlockDev dict (from key = value) returned from a GetManagedObjects() call
Output: UDisks2.Drive dict
"""
try:
return BlockDict[dbus.String(u'org.freedesktop.UDisks2.Block')][dbus.String(u'Drive')]
except:
return "no [org.freedesktop.UDisks2.Block][Drive] in "+pformat(BlockDict)
def ScanDevices(AllStorage = None, RequireRemovable = False):
global om
udisks_path_drives={}
udisks_path_drives_removable={}
udisks_path_blockdev={}
ManagedObjects=om.GetManagedObjects()
for k,v in ManagedObjects.iteritems():
#print ""
#print "Key:",k
d=dict(v)
#print "dict:", isinstance(v, dict)
udisks_is_blockdev = v.has_key(dbus.String(u'org.freedesktop.UDisks2.Block'))
udisks_is_drive = v.has_key(dbus.String(u'org.freedesktop.UDisks2.Drive'))
#print "Block?", udisks_is_blockdev
if udisks_is_blockdev:
udisks_path_blockdev[k]=v
#pprint(d)
elif udisks_is_drive:
udisks_path_drives[k]=v
if RequireRemovable == False or v.\
get(dbus.String(u'org.freedesktop.UDisks2.Drive'), False).\
get(dbus.String(u'Removable'),False):
udisks_path_drives_removable[k]=v
#pprint(d)
# Removing all blockdevs that have no "link" to any of the drives selected
udisks_path_blockdev_removable={}
for b,d in udisks_path_blockdev.iteritems():
blockdrive = d[dbus.String(u'org.freedesktop.UDisks2.Block')][dbus.String(u'Drive')]
d['U2.Block.Drive'] = blockdrive
if blockdrive in udisks_path_drives_removable:
# See if there is filesystem(s) on it
filesystem = d.get(dbus.String(u'org.freedesktop.UDisks2.Filesystem'))
d['U2.Filesystem'] = filesystem
blockdevices = udisks_path_drives_removable[blockdrive].get('U2.BlockDevices', dict())
blockdevices[b] = d
udisks_path_drives_removable[blockdrive]['U2.BlockDevices'] = blockdevices
udisks_path_blockdev_removable[b] = d
if AllStorage is not None:
for k,d in udisks_path_drives_removable.iteritems():
AllStorage.AddDrive(k,d.get(dbus.String(u'org.freedesktop.UDisks2.Drive')), EjectByUDisks, PowerOffByUDisks)
for k,d in udisks_path_blockdev_removable.iteritems():
AllStorage.AddBlockDevice(k,d.get(dbus.String(u'org.freedesktop.UDisks2.Block')))
FsDict = d.get(dbus.String(u'org.freedesktop.UDisks2.Filesystem'))
if FsDict is not None:
AllStorage.AddFileSystem(k,FsDict,MountByUDisks,UnmountByUDisks)
# Output the scan results for the log
# TODO: (re)move this section; output from storage AllStore
logger.debug("""
- ScanRemovableDevices() Results -
"""
)
for k,d in udisks_path_drives_removable.iteritems():
logger.debug("Drive: %s", GetDriveId(d))
#, "removable:", udisks_path_drives[d].\
# get(dbus.String(u'org.freedesktop.UDisks2.Drive'), False).\
# get(dbus.String(u'Removable'),False)
# print
for k,d in udisks_path_blockdev_removable.iteritems():
logger.debug("Blockdev: %s %s", k.rsplit('/',1)[1], GetDriveId(udisks_path_drives_removable[GetDrive(d)]))
logger.debug("""
- ScanRemovableDevices() Results End-
"""
)
def EjectByUDisks(UDisk2PathToDrive):
""" Call with a UDisks2 path to a drive """
global bus
logger.debug("Eject %s", UDisk2PathToDrive)
dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToDrive)
fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Drive')
logger.info(fs_intf.Eject({}))
def PowerOffByUDisks(UDisk2PathToDrive):
""" Call with a UDisks2 path to a drive """
global bus
logger.debug("PowerOff %s", UDisk2PathToDrive)
dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToDrive)
fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Drive')
logger.info(fs_intf.PowerOff({}))
def MountByUDisks(UDisk2PathToBlockDevice):
""" Call with a UDisks2 path to a block device """
global bus
logger.debug("Mount %s", UDisk2PathToBlockDevice)
dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToBlockDevice)
fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Filesystem')
logger.info(fs_intf.Mount({}))
def UnmountByUDisks(UDisk2PathToBlockDevice):
""" Call with a UDisks2 path to a block device """
global bus
logger.debug("Unmount %s", UDisk2PathToBlockDevice)
dbus_obj = bus.get_object('org.freedesktop.UDisks2', UDisk2PathToBlockDevice)
fs_intf = dbus.Interface(dbus_obj, 'org.freedesktop.UDisks2.Filesystem')
logger.info(fs_intf.Unmount({}))
#def UDisks2Signal_SignalReceived(*args, **kwargs):
# print "UDisks2 Signal Received: {0!s} {1!s}\n".format(args,kwargs)
def InstallChangeReceiver(
PropertiesChangedHandler=None,
InterfaceAddedHander=None,
InterfaceRemovedHandler=None,
):
logger.debug(
"""Object Manager
Path: %s
Intf: %s """,
om.object_path,
om.dbus_interface
)
# bus.add_signal_receiver(handler_function=UDisks2Signal_SignalReceived,
# interface_keyword='interface',
# member_keyword='member',
# sender_keyword='sender',
# destination_keyword='destination',
# path_keyword='path',
# message_keyword='message',
# )
if callable(PropertiesChangedHandler):
bus.add_signal_receiver(handler_function=PropertiesChangedHandler,
dbus_interface=dbus.String(u'org.freedesktop.DBus.Properties'),
path_keyword='path',
byte_arrays=True,
)
if callable(InterfaceAddedHander):
om.connect_to_signal(signal_name="InterfacesAdded",
handler_function=InterfaceAddedHander,
# path_keyword='path',
byte_arrays=True,
)
if callable(InterfaceRemovedHandler):
om.connect_to_signal(signal_name="InterfacesRemoved",
handler_function=InterfaceRemovedHandler,
# path_keyword='path',
byte_arrays=True,
)
logger.debug("succcess")
if __name__ == "__main__":
# logging.basicConfig(level=logging.WARN)
# raise Exception("{0} cannot run as the main python program".format(__file__))
logger.warn("Running {0} as a program. Will scan for devices and exit.")
ScanDevices()
#end;
\ No newline at end of file
This diff is collapsed.
#! /usr/bin/python
# -*- coding: UTF-8 -*-
"""
Visual with Variable TK components.
These elements extend the widgets in Tkinter to include their variables
in one pythonic object.
(C) 2012-2014 Martin Süfke
License: GPLv3 or newer: http://www.gnu.org/licenses/gpl.html
"""
try:
from ttk import BooleanVar, IntVar, DoubleVar, StringVar
from ttk import Checkbutton, Label, Scale, Entry, OptionMenu, Text, Frame
except:
from Tkinter import BooleanVar, IntVar, DoubleVar, StringVar
from Tkinter import Checkbutton, Label, Scale, Entry, OptionMenu, Text, Frame
#from Tkconstants import *
import types
import io
def _setifunset(d,n,v):
if not n in d:
d[n]=v
class Vget():
@property
def value(self):
return self.variable.get()
def get(self):
return self.variable.get()
class Vset():
#TODO: make a setter property on ""value" ?
def set(self,val):
self.variable.set(val)
class Vvalue(Vget):
@property
def value(self):
return self.variable.get()
@value.setter
def value_setter(self,newvalue):
return self.variable.set(newvalue)
class VsetFmt(Vset):
def set(self,msg,*arg):
#msg = msg
#if len(arg)>0:
# self.variable.set(msg % arg)
#else:
Vset.set(self,msg%arg)
class VCheckbutton(Checkbutton,Vget,Vset,Vvalue,Vvalue):
"""Checkbutton with Boolean value"""
def __init__(self, *arg, **kw):
# this is an inherited __init__ using "classic" style without super()
self.variable=BooleanVar(value=kw.pop("value",False))
_setifunset(kw,"offvalue",False)
_setifunset(kw,"onvalue",True)
_setifunset(kw,"variable",self.variable)
Checkbutton.__init__(self, *arg, **kw)
class VDLabel(Label,Vget,Vset,Vvalue):
"""Label with DoubleVar() Value"""
def __init__(self, *arg, **kw):
# this is an inherited __init__ using "classic" style without super()
self.variable=DoubleVar(value=kw.pop("value",0.0))
_setifunset(kw,"textvariable",self.variable)
Label.__init__(self, *arg, **kw)
class VSLabel(Label,Vget,VsetFmt):
"""Label with StringVar() Value"""
def __init__(self, *arg, **kw):
# this is an inherited __init__ using "classic" style without super()
self.variable=StringVar(value=kw.pop("value",""))
_setifunset(kw,"textvariable",self.variable)
Label.__init__(self, *arg, **kw)
class VIScale(Scale,Vget,Vset,Vvalue):
"""Scale with IntVar() Value"""
def __init__(self, *arg, **kw):
# this is an inherited __init__ using "classic" style without super()
self.variable=IntVar(value=kw.pop("value",0))
_setifunset(kw,"variable",self.variable)
Scale.__init__(self, *arg, **kw)
class VSEntry(Entry,Vget,VsetFmt):
"""Entry with StringVar() Value"""
def __init__(self, *arg, **kw):
# this is an inherited __init__ using "classic" style without super()
self.variable=StringVar(value=kw.pop("value",0))
_setifunset(kw,"textvariable",self.variable)
Entry.__init__(self, *arg, **kw)
class VIOptionMenu(OptionMenu,Vget,Vset,Vvalue):
"""OptionMenu with IntVar() Value"""
def __init__(self, master, *arg, **kw):
# this is an inherited __init__ using "classic" style without super()
value=kw.pop("value",0)
self.variable=IntVar(value=value)
var=kw.pop("variable",self.variable)
OptionMenu.__init__(self, master, var, value, *arg, **kw) # this is a mess in TKinter
class VSOptionMenu(OptionMenu,Vget,Vset,Vvalue):
"""OptionMenu with StringVar() Value"""
def __init__(self, master, *arg, **kw):
# this is an inherited __init__ using "classic" style without super()
value=kw.pop("value",0)
self.variable=StringVar(value=value)
var=kw.pop("variable",self.variable)
OptionMenu.__init__(self, master, var, value, *arg, **kw) # this is a mess in TKinter
class VText(Text):
"""A Text widget with a more spohisticated interface"""
def __init__(self, *arg, **kw):
Text.__init__(self, *arg, **kw)
def FromList(self,list):
"""Put all text from list into Text Widget"""
self.delete('1.0','end')
for line in list:
self.insert('end',line+"\n")
def FromString(self,input):
"""convert input string with line breaks (cr, lf, cr/lf) into Text Widget"""
sio=io.StringIO(unicode(input))
self.delete('1.0','end')
line=sio.readline()
while ""<line:
self.insert('end',line)
line=sio.readline()
sio.close()
class VGridFrame(Frame):
"""A Frame widget with a more spohisticated grid layout interface"""
nextCol="col"
nextRow="row"
def __init__(self, *arg, **kw):
self.set_next(kw.pop('next', self.nextCol)) # put in colum or row
Frame.__init__(self, *arg, **kw)
self._col=0 # next position to grid into
self._row=0
self._sticky=None # no stickyness
def colconfig(self,**kw):
"""Apply columnconfigure() to the current column.
Usually used before grid_next()"""
self.columnconfigure(self._col,**kw)
def rowconfig(self,**kw):
"""Apply rowconfigure() to the current row.
Usually used before grid_next()"""
self.rowconfigure(self._row,**kw)
def set_next(self,next):
"""set next step direction to 'col' or 'row'"""
if next:
if next == self.nextCol:
self._next=self.nextCol
elif next == self.nextRow:
self._next=self.nextRow
else:
assert False, "Need next='%s' or next='%s' but got %s"%(self.nextCol,self.nextRow,next)
def grid_next(self,widget_or_list,**kw):
"""apply widget.grid() with increasing colum= or row= values
grid_next( (A, B, ..), opt) is a shortcurt for
grid_next( A, opt)
grid_next( B, opt)
grid_next( .., opt)
"""
def _grid(widget):
widget.grid(column=self._col, row=self._row, **kw)
if self._next == self.nextCol:
self._col+=colinc
elif self._next == self.nextRow:
self._row+=rowinc
self._col=kw.pop('column', self._col)
self._row=kw.pop('row', self._row)
colinc=kw.get('columnspan',1)
rowinc=kw.get('rowspan',1)
self.set_next(kw.pop('next',None))
try:
sticky=kw.pop('sticky')
self._sticky=sticky # save stickyness for next time
except KeyError:
sticky=self._sticky # get last value (may be None if sticky unset)
if sticky:
kw['sticky']=sticky
if isinstance( widget_or_list, (types.TupleType,types.ListType) ):
for w in widget_or_list:
_grid(w)
else:
_grid(widget_or_list) # is a widget
def step(self,**kw):
""" Step a column or a row forward, resetting column or row
usage:
step(column=0) resets to column 0, advancing row
step(row=2) resets to row 2, advancing column
step(column=3, row=4) sets to coulmn 3 and row 4"""
col=kw.get("column",None)
row=kw.get("row",None)
if col is not None:
self._col=col
self._row=row or self._row+1
elif row is not None:
self._row=row
self._col=col or self._col+1
else:
assert False, "You need either 'column=...' or 'row=...' in step() %r"%(kw)
## check not running as main
assert '__main__' != __name__ , "You should not run this, but [import] it"
#end;
#!/bin/bash
#
# Mass Storage Cloner - write.sh
# (C)2015 Martin Suefke
# License: GPLv3 or newer
#
# This is exected whenever the user clicks "write" on any blockdev
# The blockdev is passed as $1
#
# Customize this file as needed.
#
xterm -e "sudo bash ./blkwrite.sh ~/LIP-Image-WS2014-10-09-01.img $1"
#end;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment