9 from datalayer.clib_provider_node
import (C_DLR_PROVIDER_NODE_CALLBACK,
10 C_DLR_PROVIDER_NODE_CALLBACKDATA,
11 C_DLR_PROVIDER_NODE_CALLBACKS,
12 C_DLR_PROVIDER_NODE_FUNCTION,
13 C_DLR_PROVIDER_NODE_FUNCTION_DATA,
14 C_DLR_VARIANT, address_c_char_p,
18 NodeCallback = typing.Callable[[Result, typing.Optional[Variant]],
None]
19 NodeFunction = typing.Callable[[userData_c_void_p, str, NodeCallback],
None]
20 NodeFunctionData = typing.Callable[[
21 userData_c_void_p, str, Variant, NodeCallback],
None]
35 self.
_ptr: typing.Optional[ctypes._CFuncPtr] =
None
52 Provider Node callbacks interface
54 __slots__ = [
'on_create',
'on_remove',
'on_browse',
55 'on_read',
'on_write',
'on_metadata']
58 on_create: NodeFunctionData,
59 on_remove: NodeFunction,
60 on_browse: NodeFunction,
61 on_read: NodeFunctionData,
62 on_write: NodeFunctionData,
63 on_metadata: NodeFunction):
65 init ProviderNodeCallbacks
77 Provider node interface for providing data to the system
79 Hint: see python context manager for instance handling
81 __slots__ = [
'__ptrs',
'__c_cbs',
'__closed',
'__provider_node']
83 def __init__(self, cbs: ProviderNodeCallbacks, userdata: userData_c_void_p =
None):
87 self.__ptrs: typing.List[_CallbackPtr] = []
88 self.
__c_cbs = C_DLR_PROVIDER_NODE_CALLBACKS(
98 self.
__provider_node = datalayer.clib.libcomm_datalayer.DLR_providerNodeCreate(
103 use the python context manager
107 def __exit__(self, exc_type, exc_val, exc_tb):
109 use the python context manager
121 closes the node instance
126 datalayer.clib.libcomm_datalayer.DLR_providerNodeDelete(
133 handle value of ProviderNode
137 def __create_callback(self,
138 c_cb: C_DLR_PROVIDER_NODE_CALLBACK,
139 c_cbdata: C_DLR_PROVIDER_NODE_CALLBACKDATA) -> NodeCallback:
143 def cb(result: Result, data: typing.Optional[Variant]):
146 c_cb(c_cbdata, result.value,
None)
148 c_cb(c_cbdata, result.value, data.get_handle())
151 def __create_function(self, func: NodeFunction):
153 create callback managment
156 self.__ptrs.append(cb_ptr)
158 def _func(c_userdata: userData_c_void_p,
159 c_address: address_c_char_p,
160 c_cb: C_DLR_PROVIDER_NODE_CALLBACK,
161 c_cbdata: C_DLR_PROVIDER_NODE_CALLBACKDATA) -> C_DLR_RESULT:
163 datalayer calls this function
165 address = c_address.decode(
'utf-8')
167 func(c_userdata, address, cb)
168 return Result.OK.value
169 cb_ptr.set_ptr(C_DLR_PROVIDER_NODE_FUNCTION(_func))
170 return cb_ptr.get_ptr()
172 def __create_function_data(self, func: NodeFunctionData):
174 create callback managment
177 self.__ptrs.append(cb_ptr)
179 def _func(c_userdata: userData_c_void_p,
180 c_address: address_c_char_p,
181 c_data: C_DLR_VARIANT,
182 c_cb: C_DLR_PROVIDER_NODE_CALLBACK,
183 c_cbdata: C_DLR_PROVIDER_NODE_CALLBACKDATA) -> C_DLR_RESULT:
185 datalayer calls this function
187 address = c_address.decode(
'utf-8')
190 func(c_userdata, address, data, cb)
191 return Result.OK.value
192 cb_ptr.set_ptr(C_DLR_PROVIDER_NODE_FUNCTION_DATA(_func))
193 return cb_ptr.get_ptr()
195 def _test_function(self, func: NodeFunction):
201 def _test_function_data(self, func: NodeFunctionData):