2 Class Async Subscription
12 from datalayer.clib_client
import (C_DLR_CLIENT_NOTIFY_RESPONSE,
13 C_DLR_CLIENT_RESPONSE, C_NotifyItem)
14 from datalayer.clib_variant
import C_DLR_VARIANT
22 __slots__ = [
'__ptr_notify',
'__ptr_resp',
23 '__closed',
'__client',
'__id',
'__on_cb']
27 @param[in] client Reference to the client
38 use the python context manager
42 def __exit__(self, exc_type, exc_val, exc_tb):
44 use the python context manager
60 closes the client instance
74 def __create_sub_notify_callback(self, cb: datalayer.subscription.ResponseNotifyCallback):
81 def _cb(status: C_DLR_RESULT, items: ctypes.POINTER(C_NotifyItem), count: ctypes.c_uint32, userdata: ctypes.c_void_p):
83 datalayer calls this function
88 for x
in range(0, count):
90 items[x].data, items[x].info)
91 notify_items.append(n)
92 cb(r, notify_items, userdata)
97 cb_ptr.set_ptr(C_DLR_CLIENT_NOTIFY_RESPONSE(_cb))
98 return cb_ptr.get_ptr()
100 def _test_notify_callback(self, cb: datalayer.subscription.ResponseNotifyCallback):
106 def __create_response_callback(self, cb: datalayer.client.ResponseCallback):
114 def _cb(status: C_DLR_RESULT, data: C_DLR_VARIANT, userdata: ctypes.c_void_p):
116 datalayer calls this function
124 cb(r,
None, userdata)
127 cb_ptr.set_ptr(C_DLR_CLIENT_RESPONSE(_cb))
128 return cb_ptr.get_ptr()
130 def _test_response_callback(self, cb: datalayer.client.ResponseCallback):
136 def _create(self, prop: Variant, cnb: datalayer.subscription.ResponseNotifyCallback, cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
139 @param[in] ruleset Variant that describe ruleset of subscription as subscription.fbs
140 @param[in] publishCallback Callback to call when new data is available
141 @param[in] callback Callback to be called when subscription is created
142 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request and subscription
143 @param[in] token Security access &token for authentication as JWT payload
144 @result <Result> status of function call
146 r =
Result(datalayer.clib.libcomm_datalayer.DLR_clientCreateSubscriptionAsync(
157 def subscribe(self, address: str, cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
159 Setup a subscription to a node
160 @param[in] address Address of the node to add a subscription to
161 @param[in] callback Callback to called when data is subscribed
162 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
163 @result <Result> status of function call
165 b_id = self.
id().encode(
'utf-8')
166 b_address = address.encode(
'utf-8')
167 return Result(datalayer.clib.libcomm_datalayer.DLR_clientSubscribeAsync(
174 def unsubscribe(self, address: str, cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
176 Removes a node from a subscription id
177 @param[in] address Address of a node, that should be removed to the given subscription.
178 @param[in] callback Callback to called when data is subscribed
179 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
180 @result <Result> status of function call
182 b_id = self.
id().encode(
'utf-8')
183 b_address = address.encode(
'utf-8')
184 return Result(datalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAsync(
191 def subscribe_multi(self, address: typing.List[str], cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
193 Setup a subscription to multiple nodes
194 @param[in] address Set of addresses of nodes, that should be removed to the given subscription.
195 @param[in] count Count of addresses.
196 @param[in] callback Callback to called when data is subscribed
197 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
198 @result <Result> status of function call
200 b_id = self.
id().encode(
'utf-8')
201 b_address = (ctypes.c_char_p * len(address))(*
202 [d.encode(
'utf-8')
for d
in address])
203 return Result(datalayer.clib.libcomm_datalayer.DLR_clientSubscribeMultiAsync(
211 def unsubscribe_multi(self, address: typing.List[str], cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
213 Removes a set of nodes from a subscription id
214 @param[in] address Address of a node, that should be removed to the given subscription.
215 @param[in] callback Callback to called when data is subscribed
216 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
217 @result <Result> status of function call
219 b_id = self.
id().encode(
'utf-8')
220 b_address = (ctypes.c_char_p * len(address))(*
221 [d.encode(
'utf-8')
for d
in address])
222 return Result(datalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeMultiAsync(
230 def unsubscribe_all(self, cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
232 Removes all subscriptions from a subscription id
233 @param[in] callback Callback to called when data is subscribed
234 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
235 @result <Result> status of function call
239 self.
__client()._unregister_sync(self)
240 b_id = self.
id().encode(
'utf-8')
241 return Result(datalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAllAsync(
248 """ wait_on_response_cb """
252 while not self.
__on_cb and n < wait: