from __future__ import annotations
import collections.abc
import logging
import typing
import weakref
import warnings
import os
import sys
import ctypes as ct
from . import windows
from . import utils
from . import exceptions
from .lowlevel import constants
from .lowlevel import structs
# Following imports are needed for backward compatibility
logger = logging.getLogger("twain")
_exc_mapping = {
constants.TWCC_BUMMER: exceptions.GeneralFailure,
constants.TWCC_LOWMEMORY: MemoryError,
constants.TWCC_NODS: exceptions.NoDataSourceError,
constants.TWCC_OPERATIONERROR: exceptions.OperationError,
constants.TWCC_BADCAP: exceptions.BadCapability,
constants.TWCC_BADPROTOCOL: exceptions.BadProtocol,
constants.TWCC_BADVALUE: ValueError,
constants.TWCC_SEQERROR: exceptions.SequenceError,
constants.TWCC_BADDEST: exceptions.BadDestination,
constants.TWCC_CAPUNSUPPORTED: exceptions.CapUnsupported,
constants.TWCC_CAPBADOPERATION: exceptions.CapBadOperation,
constants.TWCC_CAPSEQERROR: exceptions.CapSeqError,
constants.TWCC_DENIED: exceptions.DeniedError,
constants.TWCC_FILEEXISTS: FileExistsError,
constants.TWCC_FILENOTFOUND: FileNotFoundError,
constants.TWCC_NOTEMPTY: exceptions.NotEmptyError,
constants.TWCC_PAPERJAM: exceptions.PaperJam,
constants.TWCC_PAPERDOUBLEFEED: exceptions.PaperDoubleFeedError,
constants.TWCC_FILEWRITEERROR: exceptions.FileWriteError,
constants.TWCC_CHECKDEVICEONLINE: exceptions.CheckDeviceOnlineError,
constants.TWCC_MAXCONNECTIONS: exceptions.MaxConnectionsError,
}
_ext_to_type = {
".bmp": constants.TWFF_BMP,
".jpg": constants.TWFF_JFIF,
".jpeg": constants.TWFF_JFIF,
".png": constants.TWFF_PNG,
".tiff": constants.TWFF_TIFF,
".tif": constants.TWFF_TIFF,
}
_mapping = {
constants.TWTY_INT8: ct.c_int8,
constants.TWTY_UINT8: ct.c_uint8,
constants.TWTY_INT16: ct.c_int16,
constants.TWTY_UINT16: ct.c_uint16,
constants.TWTY_UINT32: ct.c_uint32,
constants.TWTY_INT32: ct.c_int32,
constants.TWTY_BOOL: ct.c_uint16,
constants.TWTY_FIX32: structs.TW_FIX32,
constants.TWTY_FRAME: structs.TW_FRAME,
constants.TWTY_STR32: ct.c_char * 34,
constants.TWTY_STR64: ct.c_char * 66,
constants.TWTY_STR128: ct.c_char * 130,
constants.TWTY_STR255: ct.c_char * 255,
}
# Corresponding states are defined in TWAIN spec 2.11 paragraph
# Data Source Manager states
_DsmStates = typing.Literal[
"closed", # TWAIN state 2
"open", # TWAIN state 3
]
# Data Source states
_SourceStates = typing.Literal[
"closed", # TWAIN state 2
"open", # TWAIN state 4
"enabled", # TWAIN state 5
"ready", # TWAIN state 6
]
def _is_good_type(type_id: int) -> bool:
return type_id in list(_mapping.keys())
def _struct2dict(
struct: ct.Structure, decode: collections.abc.Callable[[bytes], str]
) -> dict[str, typing.Any]:
result = {}
for field, _ in struct._fields_:
value = getattr(struct, field)
if hasattr(value, "_length_") and hasattr(value, "_type_"):
# Probably an array
value = list(value)
elif hasattr(value, "_fields_"):
# Probably another struct
value = _struct2dict(value, decode)
if isinstance(value, bytes):
value = decode(value)
result[field] = value
return result
class _IImage(typing.Protocol):
def close(self):
...
def save(self, filepath: str):
...
if sys.platform == "win32":
def _twain1_alloc(size: int) -> ct.c_void_p:
return windows.GlobalAlloc(windows.GMEM_ZEROINIT, size)
_twain1_free = windows.GlobalFree
_twain1_lock = windows.GlobalLock
_twain1_unlock = windows.GlobalUnlock
class _Image(_IImage):
def __init__(
self,
handle,
free: collections.abc.Callable[[typing.Any], None],
lock: collections.abc.Callable[[typing.Any], ct.c_void_p],
unlock: collections.abc.Callable[[typing.Any], None],
):
self._handle = handle
self._free = free
self._lock = lock
self._unlock = unlock
def __del__(self):
self.close()
def close(self):
"""Releases memory of image"""
self._free(self._handle)
self._handle = None
def save(self, filepath: str):
"""Saves in-memory image to BMP file"""
# calling GlobalSize may not work on TWAIN2 sources as they may use memory allocator that is not compatible
# with GlobalSize
# in this case would need to change code to determine image size from image contents
size = windows.GlobalSize(self._handle)
ptr = self._lock(self._handle)
try:
dib_bytes = (ct.c_char * size).from_address(ptr)
bmp = windows.convert_dib_to_bmp(dib_bytes)
with open(filepath, "wb") as f:
f.write(bmp)
finally:
self._unlock(self._handle)
else:
# Mac
def _twain1_alloc(size: int) -> ct.c_void_p:
return ct.libc.malloc(size) # type: ignore # needs fixing
def _twain1_lock(handle):
return handle
def _twain1_unlock(handle):
pass
def _twain1_free(handle):
return ct.libc.free(handle)
[docs]
class Source:
"""
This object represents connection to Data Source.
An instance of this class can be created by calling
:meth:`SourceManager.open_source`
"""
def __init__(self, sm: SourceManager, ds_id: structs.TW_IDENTITY):
self._sm = sm
self._id = ds_id
self._state: _SourceStates = "open"
self._version2 = bool(ds_id.SupportedGroups & constants.DF_DS2)
if self._version2:
self._alloc = sm._alloc
self._free = sm._free
self._lock = sm._lock
self._unlock = sm._unlock
self._encode = sm._encode
self._decode = sm._decode
else:
self._alloc = _twain1_alloc
self._free = _twain1_free
self._lock = _twain1_lock
self._unlock = _twain1_unlock
self._encoding = sys.getfilesystemencoding()
self._encode = lambda s: s.encode(self._encoding)
self._decode = lambda s: s.decode(self._encoding)
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def close(self):
"""This method is used to close the data source object.
It gives finer control over this connects than relying on garbage collection.
"""
if self._state == "ready":
self._end_all_xfers()
if self._state == "enabled":
try:
self._disable()
except exceptions.TwainError:
logger.warning("Failed to disable data source during cleanup")
if self._state == "open":
self._sm._close_ds(self._id)
self._state = "closed"
self._sm = None
def _call(
self,
dg: int,
dat: int,
msg: int,
buf,
expected_returns=(constants.TWRC_SUCCESS,),
) -> int:
return self._sm._call(self._id, dg, dat, msg, buf, expected_returns)
def _get_capability(self, cap: int, current: int):
twCapability = structs.TW_CAPABILITY(cap, constants.TWON_DONTCARE16, 0)
logger.info("Calling DAT_CAPABILITY/%d", current)
self._call(
constants.DG_CONTROL,
constants.DAT_CAPABILITY,
current,
ct.byref(twCapability),
)
try:
ptr = self._lock(twCapability.hContainer)
try:
if twCapability.ConType == constants.TWON_ONEVALUE:
type_id = int(ct.cast(ptr, ct.POINTER(ct.c_uint16))[0])
if not _is_good_type(type_id):
msg = f"Capability Code = {cap}, Format Code = {twCapability.ConType}, Item Type = {type_id}"
raise exceptions.CapabilityFormatNotSupported(msg)
ctype = _mapping.get(type_id)
val = ct.cast(ptr + 2, ct.POINTER(ctype))[0] # type: ignore # needs fixing
if type_id in (
constants.TWTY_INT8,
constants.TWTY_UINT8,
constants.TWTY_INT16,
constants.TWTY_UINT16,
constants.TWTY_UINT32,
constants.TWTY_INT32,
):
pass
elif type_id == constants.TWTY_BOOL:
val = bool(val)
elif type_id == constants.TWTY_FIX32:
val = structs.fix2float(val)
elif type_id == constants.TWTY_FRAME:
val = structs.frame2tuple(val)
return type_id, val
elif twCapability.ConType == constants.TWON_RANGE:
rng = ct.cast(ptr, ct.POINTER(structs.TW_RANGE)).contents
return {
"MinValue": rng.MinValue,
"MaxValue": rng.MaxValue,
"StepSize": rng.StepSize,
"DefaultValue": rng.DefaultValue,
"CurrentValue": rng.CurrentValue,
}
elif twCapability.ConType == constants.TWON_ENUMERATION:
enum = ct.cast(ptr, ct.POINTER(structs.TW_ENUMERATION)).contents
if not _is_good_type(enum.ItemType):
msg = f"Capability Code = {cap}, Format Code = {twCapability.ConType}, Item Type = {enum.ItemType}"
raise exceptions.CapabilityFormatNotSupported(msg)
ctype = _mapping[enum.ItemType]
item_p = ct.cast(
ptr + ct.sizeof(structs.TW_ENUMERATION),
ct.POINTER(ctype), # type: ignore # needs fixing
)
values = [el for el in item_p[0 : enum.NumItems]]
return enum.ItemType, (enum.CurrentIndex, enum.DefaultIndex, values)
elif twCapability.ConType == constants.TWON_ARRAY:
arr = ct.cast(ptr, ct.POINTER(structs.TW_ARRAY)).contents
if not _is_good_type(arr.ItemType):
msg = f"Capability Code = {cap}, Format Code = {twCapability.ConType}, Item Type = {arr.ItemType}"
raise exceptions.CapabilityFormatNotSupported(msg)
ctype = _mapping[arr.ItemType]
item_p = ct.cast(
ptr + ct.sizeof(structs.TW_ARRAY),
ct.POINTER(ctype), # type: ignore # needs fixing
)
return arr.ItemType, [el for el in item_p[0 : arr.NumItems]]
else:
msg = (
f"Capability Code = {cap}, Format Code = {twCapability.ConType}"
)
raise exceptions.CapabilityFormatNotSupported(msg)
finally:
self._unlock(twCapability.hContainer)
finally:
self._free(twCapability.hContainer)
[docs]
def get_capability(self, cap: int):
"""This function is used to return the capability information from the source.
If the capability is not supported, an exception should be returned.
Capabilities are returned as a tuple of a type (TWTY_*) and a value.
The format of values depends on their container type.
Capabilities can be in any of the following containers:
singleton, range, enumerator or array.
singletons are returned as a single value (integer or string)
ranges are returned as a tuple dictionary containing MinValue,
MaxValue, StepSize, DefaultValue and CurrentValue
enumerators and arrays are returned as tuples, each containing
a list which has the actual values
"""
return self._get_capability(cap, constants.MSG_GET)
[docs]
def get_capability_current(self, cap: int):
"""This function is used to return the current value of a capability from the source.
If the capability is not supported, an exception should be returned.
Capabilities are returned as a tuple of a type (TWTY_*) and a value.
The format of values depends on their container type.
Capabilities can be in any of the following containers:
singleton, range, enumerator or array.
singletons are returned as a single value (integer or string)
ranges are returned as a tuple dictionary containing MinValue,
MaxValue, StepSize, DefaultValue and CurrentValue
enumerators and arrays are returned as tuples, each containing
a list which has the actual values
"""
return self._get_capability(cap, constants.MSG_GETCURRENT)
[docs]
def get_capability_default(self, cap: int):
"""This function is used to return the default value of a capability from the source.
If the capability is not supported, an exception should be returned.
Capabilities are returned as a tuple of a type (TWTY_*) and a value.
The format of values depends on their container type.
Capabilities can be in any of the following containers:
singleton, range, enumerator or array.
singletons are returned as a single value (integer or string)
ranges are returned as a tuple dictionary containing MinValue,
MaxValue, StepSize, DefaultValue and CurrentValue
enumerators and arrays are returned as tuples, each containing
a list which has the actual values
"""
return self._get_capability(cap, constants.MSG_GETDEFAULT)
@property
def name(self) -> str:
"""Get the name of the source. This can be used later for
connecting to the same source.
"""
return self._decode(self._id.ProductName)
@property
def identity(self) -> dict:
"""This function is used to retrieve information about the source.
driver. The information is returned in a dictionary.
"""
res = _struct2dict(self._id, self._decode)
res.update(_struct2dict(self._id.Version, self._decode))
return res
[docs]
def set_capability(
self,
cap: int,
type_id: int,
value: int | float | str | tuple[float, float, float, float],
):
"""This function is used to set the value of a capability in the source.
Three parameters are required, a Capability Identifier (lowlevel.CAP_* or
lowlevel.ICAP_*) a value type (lowlevel.TWTY_*) and a value
If the capability is not supported, an exception should be returned.
This function is used to set a value using a TW_ONEVALUE.
"""
if not _is_good_type(type_id):
raise exceptions.CapabilityFormatNotSupported(
f"Capability Code = {cap}, Format Code = {type_id}"
)
ctype = _mapping[type_id]
if type_id in (
constants.TWTY_INT8,
constants.TWTY_INT16,
constants.TWTY_INT32,
constants.TWTY_UINT8,
constants.TWTY_UINT16,
constants.TWTY_UINT32,
constants.TWTY_BOOL,
):
cval = ctype(value) # type: ignore # needs fixing
elif type_id in (
constants.TWTY_STR32,
constants.TWTY_STR64,
constants.TWTY_STR128,
constants.TWTY_STR255,
):
cval = ctype(self._encode(value)) # type: ignore # needs fixing
elif type_id == constants.TWTY_FIX32:
cval = structs.float2fix(value) # type: ignore # needs fixing
elif type_id == constants.TWTY_FRAME:
cval = structs.tuple2frame(value) # type: ignore # needs fixing
else:
assert 0, "invalid case"
handle = self._alloc(ct.sizeof(structs.TW_ONEVALUE) + ct.sizeof(ctype)) # type: ignore # needs fixing
try:
ptr = self._lock(handle)
try:
ct.cast(ptr, ct.POINTER(ct.c_uint16))[0] = type_id
ct.cast(ptr + 2, ct.POINTER(ctype))[0] = cval # type: ignore # needs fixing
finally:
self._unlock(handle)
capability = structs.TW_CAPABILITY(cap, constants.TWON_ONEVALUE, handle)
logger.info("Calling DAT_CAPABILITY/MSG_SET")
rv = self._call(
constants.DG_CONTROL,
constants.DAT_CAPABILITY,
constants.MSG_SET,
ct.byref(capability),
[constants.TWRC_CHECKSTATUS],
)
finally:
self._free(handle)
if rv == constants.TWRC_CHECKSTATUS:
raise exceptions.CheckStatus
[docs]
def reset_capability(self, cap: int):
"""This function is used to reset the value of a capability to the source default.
:param cap: Capability Identifier (lowlevel.CAP_* or lowlevel.ICAP_*).
"""
capability = structs.TW_CAPABILITY(Cap=cap)
logger.info("Calling DAT_CAPABILITY/MSG_RESET")
self._call(
constants.DG_CONTROL,
constants.DAT_CAPABILITY,
constants.MSG_RESET,
ct.byref(capability),
)
[docs]
def set_image_layout(
self,
frame: tuple[float, float, float, float],
document_number: int = 1,
page_number: int = 1,
frame_number: int = 1,
):
"""This function is used to inform the source of the Image Layout.
It uses a tuple containing frame coordinates, document
number, page number, frame number.
"""
il = structs.TW_IMAGELAYOUT(
Frame=structs.tuple2frame(frame),
DocumentNumber=document_number,
PageNumber=page_number,
FrameNumber=frame_number,
)
logger.info("Calling DAT_IMAGELAYOUT/MSG_SET")
rv = self._call(
constants.DG_IMAGE,
constants.DAT_IMAGELAYOUT,
constants.MSG_SET,
ct.byref(il),
(constants.TWRC_SUCCESS, constants.TWRC_CHECKSTATUS),
)
if rv == constants.TWRC_CHECKSTATUS:
raise exceptions.CheckStatus
[docs]
def get_image_layout(
self
) -> tuple[tuple[float, float, float, float], int, int, int]:
"""This function is used to ask the source for Image Layout.
It returns a tuple containing frame coordinates, document
number, page number, frame number.
Valid states 4 through 6
"""
il = structs.TW_IMAGELAYOUT()
logger.info("Calling DAT_IMAGELAYOUT/MSG_GET")
self._call(
constants.DG_IMAGE,
constants.DAT_IMAGELAYOUT,
constants.MSG_GET,
ct.byref(il),
)
return (
structs.frame2tuple(il.Frame),
il.DocumentNumber,
il.PageNumber,
il.FrameNumber,
)
[docs]
def get_image_layout_default(
self
) -> tuple[tuple[float, float, float, float], int, int, int]:
"""This function is used to ask the source for default Image Layout.
It returns a tuple containing frame coordinates, document
number, page number, frame number.
Valid states 4 through 6
"""
il = structs.TW_IMAGELAYOUT()
logger.info("Calling DAT_IMAGELAYOUT/MSG_GETDEFAULT")
self._call(
constants.DG_IMAGE,
constants.DAT_IMAGELAYOUT,
constants.MSG_GETDEFAULT,
ct.byref(il),
)
return (
structs.frame2tuple(il.Frame),
il.DocumentNumber,
il.PageNumber,
il.FrameNumber,
)
[docs]
def reset_image_layout(self):
"""This function is used to reset Image Layout to its default settings"""
il = structs.TW_IMAGELAYOUT()
logger.info("Calling DAT_IMAGELAYOUT/MSG_RESET")
self._call(
constants.DG_IMAGE,
constants.DAT_IMAGELAYOUT,
constants.MSG_RESET,
ct.byref(il),
)
def _enable(self, show_ui: bool, modal_ui: bool, hparent):
"""This function is used to ask the source to begin acquisition.
Parameters:
show_ui - bool
modal_ui - bool
"""
ui = structs.TW_USERINTERFACE(ShowUI=show_ui, ModalUI=modal_ui, hParent=hparent)
logger.info("starting scan")
logger.info("Calling DAT_USERINTERFACE/MSG_ENALEDS")
self._call(
constants.DG_CONTROL,
constants.DAT_USERINTERFACE,
constants.MSG_ENABLEDS,
ct.byref(ui),
)
self._state = "enabled"
def _disable(self):
"""This function is used to ask the source to hide the user interface."""
ui = structs.TW_USERINTERFACE()
logger.info("Calling DAT_USERINTERFACE/MSG_DISABLEDS")
self._call(
constants.DG_CONTROL,
constants.DAT_USERINTERFACE,
constants.MSG_DISABLEDS,
ct.byref(ui),
)
self._state = "open"
def _process_event(self, msg: structs.MSG) -> tuple[int, int]:
"""The TWAIN interface requires that the windows events
are available to both the application and the lowlevel
source (which operates in the same process).
This method is called in the event loop to pass on those
events.
"""
event = structs.TW_EVENT(ct.cast(ct.byref(msg), ct.c_void_p), 0)
logger.info("Calling DAT_EVENT/MSG_PROCESSEVENT")
rv = self._call(
constants.DG_CONTROL,
constants.DAT_EVENT,
constants.MSG_PROCESSEVENT,
ct.byref(event),
(constants.TWRC_DSEVENT, constants.TWRC_NOTDSEVENT),
)
logger.debug("handling event result %d", rv)
if event.TWMessage == constants.MSG_XFERREADY:
logger.info("transfer is ready")
self._state = "ready"
return rv, event.TWMessage
def _modal_loop(
self, event_callback: collections.abc.Callable[[int], None] | None
) -> None:
logger.info("entering modal loop")
done = False
msg = structs.MSG()
while not done:
# Get message fron Windows message queue
if not windows.GetMessage(ct.byref(msg), 0, 0, 0): # type: ignore # needs fixing
# Got WM_QUIT message indicating that application is exiting
break
# send windows event to data source
rc, event = self._process_event(msg)
if rc not in (constants.TWRC_NOTDSEVENT, constants.TWRC_DSEVENT):
logger.error("got unexpected process event result %d", rc)
if event_callback:
event_callback(event)
if event in (constants.MSG_XFERREADY, constants.MSG_CLOSEDSREQ):
done = True
if rc == constants.TWRC_NOTDSEVENT:
# this event is not for data source, so process it via normal
# Windows mechanisms
windows.TranslateMessage(ct.byref(msg)) # type: ignore # needs fixing
windows.DispatchMessage(ct.byref(msg)) # type: ignore # needs fixing
logger.info("exited modal loop")
def _acquire(
self,
xfer_ready_callback: collections.abc.Callable[[], int],
show_ui: bool = True,
modal: bool = False,
) -> None:
# enable data source
self._enable(show_ui=show_ui, modal_ui=modal, hparent=self._sm._hwnd)
try:
def event_callback_lolevel(event: int) -> None:
if event == constants.MSG_XFERREADY:
logger.info("got MSG_XFERREADY message")
more = 1
while more:
try:
more = xfer_ready_callback()
except exceptions.CancelAll:
self._end_all_xfers()
break
# Enter modal loop
self._modal_loop(event_callback_lolevel)
finally:
# disable data source UI
self._disable()
@property
def file_xfer_params(self) -> tuple[str, int]:
"""Property which stores tuple of (file name, format) where format is one of TWFF_*
This property is used by :meth:`xfer_image_by_file`
Valid states: 4, 5, 6
"""
sfx = structs.TW_SETUPFILEXFER()
logger.info("Calling DAT_SETUPFILEXFER/MSG_GET")
self._call(
constants.DG_CONTROL,
constants.DAT_SETUPFILEXFER,
constants.MSG_GET,
ct.byref(sfx),
)
return self._decode(sfx.FileName), sfx.Format
@file_xfer_params.setter
def file_xfer_params(self, params: tuple[str, int]) -> None:
(path, fmt) = params
sfx = structs.TW_SETUPFILEXFER(self._encode(path), fmt, 0)
logger.info("Calling DAT_SETUPFILEXFER/MSG_SET")
self._call(
constants.DG_CONTROL,
constants.DAT_SETUPFILEXFER,
constants.MSG_SET,
ct.byref(sfx),
)
@property
def image_info(self) -> dict:
"""This function is used to ask the source for Image Info.
Normally, the application is notified that the image is
ready for transfer using the message loop. However, it is
hard to get at the message loop in toolkits such as wxPython.
As an alternative, I poll the source looking for image information.
When the image information is available, the image is ready for transfer
Valid states: 6, 7
"""
ii = structs.TW_IMAGEINFO()
logger.info("Calling DAT_IMAGEINFO/MSG_GET")
self._call(
constants.DG_IMAGE, constants.DAT_IMAGEINFO, constants.MSG_GET, ct.byref(ii)
)
return {
"XResolution": structs.fix2float(ii.XResolution),
"YResolution": structs.fix2float(ii.YResolution),
"ImageWidth": ii.ImageWidth,
"ImageLength": ii.ImageLength,
"SamplesPerPixel": ii.SamplesPerPixel,
"BitsPerSample": list(ii.BitsPerSample),
"BitsPerPixel": ii.BitsPerPixel,
"Planar": ii.Planar,
"PixelType": ii.PixelType,
"Compression": ii.Compression,
}
def _get_native_image(self) -> ct.c_void_p | None:
"""
Transfer image via memory. Should only be called when image is ready for transfer.
Returns handle to image or None if transfer was cancelled.
"""
hbitmap = ct.c_void_p()
logger.info("Calling DAT_IMAGENATIVEXFER")
rv = self._call(
constants.DG_IMAGE,
constants.DAT_IMAGENATIVEXFER,
constants.MSG_GET,
ct.byref(hbitmap),
(constants.TWRC_XFERDONE, constants.TWRC_CANCEL),
)
if rv == constants.TWRC_XFERDONE:
return hbitmap
if rv == constants.TWRC_CANCEL:
return None
raise RuntimeError(f"Unexpected result returned from DAT_IMAGENATIVEXFER: {rv}")
def _get_file_image(self) -> int:
logger.info("Calling DAT_IMAGEFILEXFER")
return self._call(
constants.DG_IMAGE,
constants.DAT_IMAGEFILEXFER,
constants.MSG_GET,
None,
(constants.TWRC_XFERDONE, constants.TWRC_CANCEL),
)
def _get_file_audio(self) -> int:
logger.info("Calling DAT_AUDIOFILEXFER")
return self._call(
constants.DG_AUDIO,
constants.DAT_AUDIOFILEXFER,
constants.MSG_GET,
None,
(constants.TWRC_XFERDONE, constants.TWRC_CANCEL),
)
def _end_xfer(self) -> int:
"""
This method should be called after every image transfer, successful or cancelled
Returns information about additional transfers:
* 0 - no more transfers available
* -1 - more images available but how many is unknown
* >0 - indicates how many more images are available
"""
px = structs.TW_PENDINGXFERS()
logger.info("Calling DAT_PENDINGXFERS/MSG_ENDXFER")
self._call(
constants.DG_CONTROL,
constants.DAT_PENDINGXFERS,
constants.MSG_ENDXFER,
ct.byref(px),
)
if px.Count == 0:
self._state = "enabled"
return px.Count
def _end_all_xfers(self) -> None:
"""Cancel all outstanding transfers on the data source."""
px = structs.TW_PENDINGXFERS()
logger.info("Calling DAT_PENDINGXFERS/MSG_RESET")
self._call(
constants.DG_CONTROL,
constants.DAT_PENDINGXFERS,
constants.MSG_RESET,
ct.byref(px),
)
self._state = "enabled"
[docs]
def request_acquire(self, show_ui: bool, modal_ui: bool) -> None:
"""This function is used to ask the source to begin acquisition.
Transitions Source to state 5.
:param show_ui: bool (default 1)
:param modal_ui: bool (default 1)
Valid states: 4
"""
self._enable(show_ui, modal_ui, self._sm._hwnd)
[docs]
def modal_loop(self) -> None:
"""This function should be called after call to :func:`requiest_acquire`
it will return after acquisition complete.
Valid states: 5
"""
self._modal_loop(self._sm._cb)
[docs]
def hide_ui(self) -> None:
"""This function is used to ask the source to hide the user interface.
Transitions Source to state 4 if successful.
Valid states: 5
"""
self._disable()
[docs]
def xfer_image_natively(self) -> tuple[typing.Any, int]:
"""Perform a 'Native' form transfer of the image.
When successful, this routine returns two values,
an image handle and a count of the number of images
remaining in the source.
If remaining number of images is zero Source will
transition to state 5, otherwise it stays in state 6
in which case you should call
:meth:`xfer_image_natively` again.
Valid states: 6
"""
handle = self._get_native_image()
# _end_xfer should be called even if transfer was cancelled
more = self._end_xfer()
# get_native_image returns None if transfer was cancelled
if not handle:
raise exceptions.DSTransferCancelled
return handle.value, more
[docs]
def xfer_image_by_file(self) -> int:
"""Perform a file based transfer of the image.
When successful, the file is saved to the image file,
defined in a previous call to :meth:`file_xfer_params`.
Returns the number of pending transfers
If remaining number of images is zero Source will
transition to state 5, otherwise it stays in state 6
in which case you should call
:meth:`xfer_image_natively` again.
Valid states: 6
"""
rv = self._get_file_image()
more = self._end_xfer()
if rv == constants.TWRC_CANCEL:
raise exceptions.DSTransferCancelled
return more
[docs]
def acquire_file(
self,
before: collections.abc.Callable[[dict], str],
after: collections.abc.Callable[[int], None] = lambda more: None,
show_ui: bool = True,
modal: bool = False,
) -> None:
"""Acquires one or more images as files. Call returns when acquisition complete.
:param before: Callback called before each acquired file, it should return
full file path to where image should be saved. It can also throw CancelAll to cancel acquisition
:keyword after: Callback called after each acquired file, it receives number of
images remaining. It can throw CancelAll to cancel remaining
acquisition
:keyword show_ui: If True source's UI will be presented to user
:keyword modal: If True source's UI will be modal
"""
_, (_, _, mechs) = self.get_capability(constants.ICAP_XFERMECH)
if constants.TWSX_FILE not in mechs:
raise Exception("File transfer is not supported")
def callback():
filepath = before(self.image_info)
_, ext = os.path.splitext(filepath)
ext = ext.lower()
if ext != ".bmp":
import tempfile
handle, bmppath = tempfile.mkstemp(".bmp")
os.close(handle)
else:
bmppath = filepath
self.file_xfer_params = bmppath, constants.TWFF_BMP
rv = self._get_file_image()
more = self._end_xfer()
if rv == constants.TWRC_CANCEL:
raise exceptions.DSTransferCancelled
if ext != ".bmp":
try:
import Image # type: ignore # needs fixing
except ImportError:
from PIL import Image
Image.open(bmppath).save(filepath)
os.remove(bmppath)
after(more)
return more
self.set_capability(
constants.ICAP_XFERMECH, constants.TWTY_UINT16, constants.TWSX_FILE
)
self._acquire(callback, show_ui, modal)
[docs]
def acquire_natively(
self,
after: collections.abc.Callable[[_IImage, int], None],
before: collections.abc.Callable[[dict], None] = lambda img_info: None,
show_ui: bool = True,
modal: bool = False,
) -> None:
"""Acquires one or more images via memory. Call returns when acquisition complete.
This function will run its own event loop while it is executing.
:keyword after: Callback called after each acquired file, it receives an image object and
number of images remaining. It can throw CancelAll to cancel remaining
acquisition
:keyword before: Callback called before each acquired file. It can throw CancelAll
to cancel acquisition
:keyword show_ui: If True source's UI will be presented to user
:keyword modal: If True source's UI will be modal
"""
def xfer_ready_callback() -> int:
before(self.image_info)
handle, more = self.xfer_image_natively()
after(
_Image(
handle=handle, free=self._free, lock=self._lock, unlock=self._unlock
),
more,
)
return more
self.set_capability(
constants.ICAP_XFERMECH, constants.TWTY_UINT16, constants.TWSX_NATIVE
)
self._acquire(
xfer_ready_callback=xfer_ready_callback, show_ui=show_ui, modal=modal
)
def is_twain2(self) -> bool:
return self._version2
# backward compatible aliases
def destroy(self) -> None:
warnings.warn(
"destroy is deprecated, use close instead",
DeprecationWarning,
)
self.close()
def GetCapabilityCurrent(self, cap):
warnings.warn(
"GetCapabilityCurrent is deprecated, use get_capability_current instead",
DeprecationWarning,
)
return self.get_capability_current(cap)
def GetCapabilityDefault(self, cap):
warnings.warn(
"GetCapabilityDefault is deprecated, use get_capability_default instead",
DeprecationWarning,
)
return self.get_capability_default(cap)
def GetSourceName(self):
warnings.warn(
"GetSourceName is deprecated, use name property instead", DeprecationWarning
)
return self.name
def GetIdentity(self):
warnings.warn(
"GetIdentity is deprecated, use identity property instead",
DeprecationWarning,
)
return self.identity
def GetCapability(self, cap):
warnings.warn(
"GetCapability is deprecated, use get_capability instead",
DeprecationWarning,
)
return self.get_capability(cap)
def SetCapability(self, cap, type_id, value):
warnings.warn(
"SetCapability is deprecated, use set_capability instead",
DeprecationWarning,
)
return self.set_capability(cap, type_id, value)
def ResetCapability(self, cap):
warnings.warn(
"ResetCapability is deprecated, use reset_capability instead",
DeprecationWarning,
)
self.reset_capability(cap)
def SetImageLayout(self, frame, document_number=1, page_number=1, frame_number=1):
warnings.warn(
"SetImageLayout is deprecated, use set_image_layout instead",
DeprecationWarning,
)
self.set_image_layout(frame, document_number, page_number, frame_number)
def GetImageLayout(self):
warnings.warn(
"GetImageLayout is deprecated, use get_image_layout instead",
DeprecationWarning,
)
return self.get_image_layout()
def GetDefaultImageLayout(self):
warnings.warn(
"GetDefaultImageLayout is deprecated, use get_default_image_layout instead",
DeprecationWarning,
)
return self.get_image_layout_default()
def ResetImageLayout(self):
warnings.warn(
"ResetImageLayout is deprecated, use reset_image_layout instead",
DeprecationWarning,
)
self.reset_image_layout()
def RequestAcquire(self, show_ui, modal_ui):
warnings.warn(
"RequestAcquire is deprecated, use reset_acquire instead",
DeprecationWarning,
)
self.request_acquire(show_ui, modal_ui)
def ModalLoop(self):
warnings.warn(
"ModalLoop is deprecated, use modal_loop instead", DeprecationWarning
)
self.modal_loop()
def HideUI(self):
warnings.warn("HideUI is deprecated, use hide_ui instead", DeprecationWarning)
self.hide_ui()
def SetXferFileName(self, path, format):
warnings.warn(
"SetXferFileName is deprecated, use file_xfer_params instead",
DeprecationWarning,
)
self.file_xfer_params = (path, format)
def GetXferFileName(self):
warnings.warn(
"GetXferFileName is deprecated, use file_xfer_params property instead",
DeprecationWarning,
)
return self.file_xfer_params
def GetImageInfo(self):
warnings.warn(
"GetImageInfo is deprecated, use image_info property instead",
DeprecationWarning,
)
return self.image_info
def XferImageNatively(self):
warnings.warn(
"XferImageNatively is deprecated, use xfer_image_natively instead",
DeprecationWarning,
)
return self.xfer_image_natively()
def XferImageByFile(self):
warnings.warn(
"XferImageByFile is deprecated, use xfer_image_by_file instead",
DeprecationWarning,
)
return self.xfer_image_by_file()
def _get_protocol_major_version(
requested_protocol_major_version: None | typing.Literal[1, 2]
) -> typing.Literal[1, 2]:
if requested_protocol_major_version not in [None, 1, 2]:
raise ValueError("Invalid protocol version specified")
if utils.is_windows():
# On Windows default to major version 1 since version 2 is not supported
# by almost all scanners
return 1 or requested_protocol_major_version
return 2 or requested_protocol_major_version
def _get_dsm(
dsm_name: str | None, protocol_major_version: typing.Literal[1, 2]
) -> ct.CDLL:
"""
Loads TWAIN Data Source Manager DLL.
If dsm_name parameter is set will load that exact DSM DLL.
If it is not set will use heuristic to determine which DSM to use based on the specified protocol version.
For example on Windows it will use bundled twain_32.dll if version 1 was specified and current process is a 32 bit process,
otherwise will use twaindsm.dll which does not come with Windows by default and needs to be installed separately.
:param dsm_name: Path to DSM DLL or null to allow automatic detection
:param protocol_major_version: A hint for which TWAIN protocol major version is needed
"""
if sys.platform == "win32":
is64bit = sys.maxsize > 2**32
if dsm_name:
return ct.WinDLL(dsm_name)
if protocol_major_version == 1 and not is64bit:
dsm_name = os.path.join(os.environ["WINDIR"], "twain_32.dll")
else:
dsm_name = "twaindsm.dll"
try:
logger.info("attempting to load dll: %s", dsm_name)
return ct.WinDLL(dsm_name)
except OSError as e:
logger.error("load failed with error %s", e)
raise exceptions.SMLoadFileFailed(e)
else:
return ct.CDLL("/System/Library/Frameworks/TWAIN.framework/TWAIN")
[docs]
class SourceManager:
"""Represents a Data Source Manager connection.
This is the first class that you need to create.
From instance of this class you can list existing sources using :func:`source_list` property.
You can open source using :func:`open_source` method and then use methods on
the returned object to perform scanning.
When done call :func:`close` method to release resources.
"""
def __init__(
self,
parent_window=None,
MajorNum: int = 1,
MinorNum: int = 0,
Language: int = constants.TWLG_USA,
Country: int = constants.TWCY_USA,
Info: str = "",
ProductName: str = "TWAIN Python Interface",
ProtocolMajor: typing.Literal[1, 2] | None = None,
ProtocolMinor: int | None = None,
SupportedGroups: int = constants.DG_IMAGE | constants.DG_CONTROL,
Manufacturer: str = "pytwain",
ProductFamily: str = "TWAIN Python Interface",
dsm_name: str | None = None,
):
"""Constructor for a TWAIN Source Manager Object. This
constructor has one position argument, parent_window, which
should contain .
:param parent_window: can contain Tk, Wx or Gtk window object or the windows
handle of the main window
:keyword dsm_name: Optional path or name of the TWAIN Data Source Manager DLL,
if not specified will try to locate one automatically.
:keyword MajorNum: default = 1
:keyword MinorNum: default = 0
:keyword Language: default = TWLG_USA
:keyword Country: default = TWCY_USA
:keyword Info: default = 'TWAIN Python Interface 1.0.0.0 10/02/2002'
:keyword ProductName: default = 'TWAIN Python Interface'
:keyword ProtocolMajor: default = TWON_PROTOCOLMAJOR
:keyword ProtocolMinor: default = TWON_PROTOCOLMINOR
:keyword SupportedGroups: default = DG_IMAGE | DG_CONTROL
:keyword Manufacturer: default = 'Kevin Gill'
:keyword ProductFamily: default = 'TWAIN Python Interface'
"""
self._sources: weakref.WeakSet[Source] = weakref.WeakSet()
self._cb: collections.abc.Callable[[int], None] | None = None
self._state: _DsmStates = "closed"
self._parent_window = parent_window
self._hwnd = 0
if utils.is_windows():
if hasattr(parent_window, "winfo_id"):
# tk window
self._hwnd = parent_window.winfo_id()
elif hasattr(parent_window, "GetHandle"):
# wx window
self._hwnd = parent_window.GetHandle()
elif hasattr(parent_window, "window") and hasattr(
parent_window.window, "handle"
):
# gtk window
self._hwnd = parent_window.window.handle
elif parent_window is None:
self._hwnd = 0
else:
self._hwnd = int(parent_window)
protocol_major = _get_protocol_major_version(ProtocolMajor)
twain_dll = _get_dsm(dsm_name, protocol_major_version=protocol_major)
try:
self._entry = twain_dll["DSM_Entry"]
except AttributeError as e:
raise exceptions.SMGetProcAddressFailed(e)
self._entry.restype = ct.c_uint16
self._entry.argtypes = (
ct.POINTER(structs.TW_IDENTITY),
ct.POINTER(structs.TW_IDENTITY),
ct.c_uint32,
ct.c_uint16,
ct.c_uint16,
ct.c_void_p,
)
self._app_id = structs.TW_IDENTITY(
Version=structs.TW_VERSION(
MajorNum=MajorNum,
MinorNum=MinorNum,
Language=Language,
Country=Country,
Info=Info.encode("utf8"),
),
ProtocolMajor=protocol_major,
ProtocolMinor=0,
SupportedGroups=SupportedGroups | constants.DF_APP2,
Manufacturer=Manufacturer.encode("utf8"),
ProductFamily=ProductFamily.encode("utf8"),
ProductName=ProductName.encode("utf8"),
)
logger.info("Calling DAT_PARENT/MSG_OPENDSM")
self._call(
None,
constants.DG_CONTROL,
constants.DAT_PARENT,
constants.MSG_OPENDSM,
ct.byref(ct.c_void_p(self._hwnd)),
)
self._version2 = bool(self._app_id.SupportedGroups & constants.DF_DSM2)
if self._version2:
entrypoint = structs.TW_ENTRYPOINT(Size=ct.sizeof(structs.TW_ENTRYPOINT))
rv = self._entry(
self._app_id,
None,
constants.DG_CONTROL,
constants.DAT_ENTRYPOINT,
constants.MSG_GET,
ct.byref(entrypoint),
)
if rv != constants.TWRC_SUCCESS:
raise exceptions.SMOpenFailed(
f"[{dsm_name}], return code {rv} from DG_CONTROL DAT_ENTRYPOINT MSG_GET"
)
self._alloc = entrypoint.DSM_MemAllocate
self._free = entrypoint.DSM_MemFree
self._lock = entrypoint.DSM_MemLock
self._unlock = entrypoint.DSM_MemUnlock
self._encode = lambda s: s.encode("utf8")
self._decode = lambda s: s.decode("utf8")
else:
self._alloc = _twain1_alloc
self._free = _twain1_free
self._lock = _twain1_lock
self._unlock = _twain1_unlock
self._encoding = sys.getfilesystemencoding()
self._encode = lambda s: s.encode(self._encoding)
self._decode = lambda s: s.decode(self._encoding)
logger.info("DSM initialized")
self._state = "open"
def __del__(self) -> None:
if self._state == "open":
self._close_dsm()
def __enter__(self) -> SourceManager:
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
def _close_dsm(self) -> None:
logger.info("Calling DAT_PARENT/MSG_CLOSEDSM")
self._call(
None,
constants.DG_CONTROL,
constants.DAT_PARENT,
constants.MSG_CLOSEDSM,
ct.byref(ct.c_void_p(self._hwnd)),
)
[docs]
def close(self) -> None:
"""This method is used to force the SourceManager to close down.
It is provided for finer control than letting garbage collection drop the connections.
"""
while self._sources:
self._sources.pop().close()
if self._state == "open":
self._close_dsm()
self._state = "closed"
def _call(
self,
dest_id,
dg: int,
dat: int,
msg: int,
buf: typing.Any,
expected_returns: tuple[int, ...] = (),
) -> int:
logger.debug("Calling TWAIN (%s,%d,%d,%d)", dest_id, dg, dat, msg)
rv = self._entry(self._app_id, dest_id, dg, dat, msg, buf)
logger.debug("TWAIN returned: %d", rv)
if rv == constants.TWRC_SUCCESS or rv in expected_returns:
return rv
if rv == constants.TWRC_FAILURE:
status = structs.TW_STATUS()
rv = self._entry(
self._app_id,
dest_id,
constants.DG_CONTROL,
constants.DAT_STATUS,
constants.MSG_GET,
ct.byref(status),
)
if rv != constants.TWRC_SUCCESS:
logger.warning(
"Getting additional error information returned non success code: %d",
rv,
)
raise exceptions.TwainError()
code = status.ConditionCode
exc = _exc_mapping.get(
code, exceptions.UnknownError(f"ConditionCode = {code}")
)
raise exc
raise RuntimeError(f"Unexpected result: {rv}")
def _user_select(self) -> structs.TW_IDENTITY | None:
logger.info("starting source selection dialog")
ds_id = structs.TW_IDENTITY()
logger.info("Calling DAT_IDENTITY/MSG_USERSELECT")
rv = self._call(
None,
constants.DG_CONTROL,
constants.DAT_IDENTITY,
constants.MSG_USERSELECT,
ct.byref(ds_id),
expected_returns=(constants.TWRC_SUCCESS, constants.TWRC_CANCEL),
)
if rv == constants.TWRC_SUCCESS:
logger.info("user selected source with id %s", ds_id.Id)
return ds_id
if rv == constants.TWRC_CANCEL:
logger.info("user cancelled selection")
return None
# This is unexpected since _call should only return values from expected_returns list
raise RuntimeError(f"Got unexpected return value {rv} from _call method")
def _open_ds(self, ds_id: structs.TW_IDENTITY) -> None:
logger.info("opening data source with id %s", ds_id.Id)
logger.info("Calling DAT_IDENTITY/MSG_OPENDS")
self._call(
None,
constants.DG_CONTROL,
constants.DAT_IDENTITY,
constants.MSG_OPENDS,
ct.byref(ds_id),
)
def _close_ds(self, ds_id: structs.TW_IDENTITY) -> None:
logger.info("closing data source with id %s", ds_id.Id)
logger.info("Calling DAT_IDENTITY/MSG_CLOSEDS")
self._call(
None,
constants.DG_CONTROL,
constants.DAT_IDENTITY,
constants.MSG_CLOSEDS,
ct.byref(ds_id),
)
[docs]
def open_source(self, product_name: str | None = None) -> Source | None:
"""Open a TWAIN Source.
If `product_name` parameter is specified, will open source matching that product name.
Otherwise, will present a source selection dialog box to user. Once user selects a source,
that source will be opened.
Returns a :class:`Source` object, which can be used to communicate with the source or `None` if user cancelled
source selection dialog.
:keyword product_name: source to be opened, if not specified or value is `None` user will be prompted for
source selection
"""
if product_name:
ds_id = structs.TW_IDENTITY(ProductName=self._encode(product_name))
else:
selected_ds_id = self._user_select()
if not selected_ds_id:
return None
ds_id = selected_ds_id
self._open_ds(ds_id)
source = Source(self, ds_id)
self._sources.add(source)
return source
@property
def identity(self) -> dict:
"""This property is used to retrieve the identity of our application.
The information is returned in a dictionary.
"""
res = _struct2dict(self._app_id, self._decode)
res.update(_struct2dict(self._app_id.Version, self._decode))
return res
@property
def source_list(self) -> list[str]:
"""Returns a list containing the names of available sources"""
names: list[str] = []
ds_id = structs.TW_IDENTITY()
try:
logger.info("Calling DAT_IDENTITY/MSG_GETFIRST")
rv = self._call(
None,
constants.DG_CONTROL,
constants.DAT_IDENTITY,
constants.MSG_GETFIRST,
ct.byref(ds_id),
(constants.TWRC_SUCCESS, constants.TWRC_ENDOFLIST),
)
except exceptions.NoDataSourceError:
# there are no data sources
return names
while rv != constants.TWRC_ENDOFLIST:
names.append(self._decode(ds_id.ProductName))
logger.info("Calling DAT_IDENTITY/MSG_GETNEXT")
rv = self._call(
None,
constants.DG_CONTROL,
constants.DAT_IDENTITY,
constants.MSG_GETNEXT,
ct.byref(ds_id),
(constants.TWRC_SUCCESS, constants.TWRC_ENDOFLIST),
)
return names
[docs]
def set_callback(self, cb: collections.abc.Callable[[int], None] | None):
"""Register a python function to be used for notification that the
transfer is ready, etc.
"""
self._cb = cb
def is_twain2(self) -> bool:
return self._version2
# backward compatible aliases
def destroy(self) -> None:
warnings.warn("destroy is deprecated, use close instead", DeprecationWarning)
self.close()
def SetCallback(self, cb):
warnings.warn(
"SetCallback is deprecated, use set_callback instead", DeprecationWarning
)
return self.set_callback(cb)
def OpenSource(self, product_name=None):
warnings.warn(
"OpenSource is deprecated, use open_source instead", DeprecationWarning
)
return self.open_source(product_name)
def GetIdentity(self):
warnings.warn(
"GetIdentity is deprecated, use identity property instead",
DeprecationWarning,
)
return self.identity
def GetSourceList(self):
warnings.warn(
"GetSourceList is deprecated, use source_list property instead",
DeprecationWarning,
)
return self.source_list
[docs]
def version() -> str:
"""Retrieve the version of the module"""
# This version should be updated before release.
# Ideally it needs to be automated with build process.
return "2.3.0"
[docs]
def acquire(
path: str,
ds_name: str | None = None,
dpi: float | None = None,
pixel_type: typing.Literal["bw", "gray", "color"] | None = None,
bpp: int | None = None,
frame: tuple[float, float, float, float] | None = None,
parent_window=None,
show_ui: bool = False,
dsm_name: str | None = None,
modal: bool = False,
) -> dict | None:
"""Acquires single image into file
:param path: Path where to save image
:keyword ds_name: name of lowlevel data source, if not provided user will be presented with selection dialog
:keyword dpi: resolution in dots per inch
:keyword pixel_type: can be 'bw', 'gray', 'color'
:keyword bpp: bits per pixel
:keyword frame: tuple (left, top, right, bottom) scan area in inches
:keyword parent_window: can be Tk, Wx, Gtk window object or Win32 window handle
:keyword show_ui: if True source's UI dialog will be presented to user
:keyword dsm_name: optional name or path for TWAIN Data Source Manager DLL
:keyword modal: whether to use modal dialog for scanner UI
Returns a dictionary describing image, or None if scanning was cancelled by user
"""
if pixel_type:
pixel_type_map = {
"bw": constants.TWPT_BW,
"gray": constants.TWPT_GRAY,
"color": constants.TWPT_RGB,
}
twain_pixel_type = pixel_type_map[pixel_type]
else:
twain_pixel_type = None
if not parent_window:
from tkinter import Tk
parent_window = Tk()
sm = SourceManager(parent_window, dsm_name=dsm_name)
try:
sd = sm.open_source(ds_name)
if not sd:
return None
try:
if twain_pixel_type:
sd.set_capability(
constants.ICAP_PIXELTYPE, constants.TWTY_UINT16, twain_pixel_type
)
sd.set_capability(
constants.ICAP_UNITS, constants.TWTY_UINT16, constants.TWUN_INCHES
)
if bpp:
sd.set_capability(constants.ICAP_BITDEPTH, constants.TWTY_UINT16, bpp)
if dpi:
sd.set_capability(constants.ICAP_XRESOLUTION, constants.TWTY_FIX32, dpi)
sd.set_capability(constants.ICAP_YRESOLUTION, constants.TWTY_FIX32, dpi)
if frame:
try:
sd.set_image_layout(frame)
except exceptions.CheckStatus:
pass
res: list[dict] = []
def before(img_info: dict) -> str:
res.append(img_info)
return path
def after(more: int) -> None:
if more:
raise exceptions.CancelAll
try:
sd.acquire_file(
before=before, after=after, show_ui=show_ui, modal=modal
)
except exceptions.DSTransferCancelled:
return None
finally:
sd.close()
finally:
sm.close()
return res[0]
# import all constants for backward compatibility
from .lowlevel.constants import * # noqa
from .windows import * # noqa