ctrlX Data Layer API for Python  1.6.0
The ctrlX Data Layer API allows access to the ctrlX Data Layer with Python
subscription_async.py
1 """
2 Class Async Subscription
3 """
4 import ctypes
5 import time
6 import typing
7 import weakref
8 
9 import datalayer
11 from datalayer.clib import C_DLR_RESULT, userData_c_void_p
12 from datalayer.clib_client import (C_DLR_CLIENT_NOTIFY_RESPONSE,
13  C_DLR_CLIENT_RESPONSE, C_NotifyItem)
14 from datalayer.clib_variant import C_DLR_VARIANT
15 from datalayer.variant import Result, Variant, VariantRef
16 
17 
19  """
20  SubscriptionAsync
21  """
22  __slots__ = ['__ptr_notify', '__ptr_resp',
23  '__closed', '__client', '__id', '__on_cb']
24 
25  def __init__(self, client: datalayer.client.Client):
26  """
27  @param[in] client Reference to the client
28  """
31  self.__closed = False
32  self.__client = weakref.ref(client)
33  self.__id = ""
34  self.__on_cb = False
35 
36  def __enter__(self):
37  """
38  use the python context manager
39  """
40  return self
41 
42  def __exit__(self, exc_type, exc_val, exc_tb):
43  """
44  use the python context manager
45  """
46  self.close()
47 
48  def on_close(self):
49  """ on_close """
50  self.close()
51 
52  def id(self) -> str:
53  """
54  Subscription ID
55  """
56  return self.__id
57 
58  def close(self):
59  """
60  closes the client instance
61  """
62  if self.__closed:
63  return
64  self.__closed = True
65  self.__ptr_notify = None
66  self.__ptr_resp = None
67  self.__client = None
68 
69  # def __unsubscribe_all(self):
70  # def __cb(result: Result, data: typing.Optional[Variant], userdata: datalayer.clib.userData_c_void_p):
71  # pass
72  # self.unsubscribe_all(__cb)
73 
74  def __create_sub_notify_callback(self, cb: datalayer.subscription.ResponseNotifyCallback):
75  """
76  callback management
77  """
79  self.__ptr_notify = cb_ptr
80 
81  def _cb(status: C_DLR_RESULT, items: ctypes.POINTER(C_NotifyItem), count: ctypes.c_uint32, userdata: ctypes.c_void_p):
82  """
83  datalayer calls this function
84  """
85  r = Result(status)
86  if r == Result.OK:
87  notify_items = []
88  for x in range(0, count):
90  items[x].data, items[x].info)
91  notify_items.append(n)
92  cb(r, notify_items, userdata)
93  del notify_items
94  return
95  cb(r, None, userdata)
96 
97  cb_ptr.set_ptr(C_DLR_CLIENT_NOTIFY_RESPONSE(_cb))
98  return cb_ptr.get_ptr()
99 
100  def _test_notify_callback(self, cb: datalayer.subscription.ResponseNotifyCallback):
101  """
102  internal use
103  """
104  return self.__create_sub_notify_callback(cb)
105 
106  def __create_response_callback(self, cb: datalayer.client.ResponseCallback):
107  """
108  callback management
109  """
110  self.__on_cb = False
112  self.__ptr_resp = cb_ptr
113 
114  def _cb(status: C_DLR_RESULT, data: C_DLR_VARIANT, userdata: ctypes.c_void_p):
115  """
116  datalayer calls this function
117  """
118  r = Result(status)
119  if r == Result.OK:
120  v = VariantRef(data)
121  cb(r, v, userdata)
122  self.__on_cb = True
123  return
124  cb(r, None, userdata)
125  self.__on_cb = True
126 
127  cb_ptr.set_ptr(C_DLR_CLIENT_RESPONSE(_cb))
128  return cb_ptr.get_ptr()
129 
130  def _test_response_callback(self, cb: datalayer.client.ResponseCallback):
131  """
132  internal use
133  """
134  return self.__create_response_callback(cb)
135 
136  def _create(self, prop: Variant, cnb: datalayer.subscription.ResponseNotifyCallback, cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
137  """
138  Setup a subscription
139  @param[in] ruleset Variant that describe ruleset of subscription as subscription.fbs
140  @param[in] publishCallback Callback to call when new data is available
141  @param[in] callback Callback to be called when subscription is created
142  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request and subscription
143  @param[in] token Security access &token for authentication as JWT payload
144  @result <Result> status of function call
145  """
146  r = Result(datalayer.clib.libcomm_datalayer.DLR_clientCreateSubscriptionAsync(
147  self.__client().get_handle(),
148  prop.get_handle(),
151  userdata,
152  self.__client().get_token()))
153  if r == Result.OK:
155  return r
156 
157  def subscribe(self, address: str, cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
158  """
159  Setup a subscription to a node
160  @param[in] address Address of the node to add a subscription to
161  @param[in] callback Callback to called when data is subscribed
162  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
163  @result <Result> status of function call
164  """
165  b_id = self.id().encode('utf-8')
166  b_address = address.encode('utf-8')
167  return Result(datalayer.clib.libcomm_datalayer.DLR_clientSubscribeAsync(
168  self.__client().get_handle(),
169  b_id,
170  b_address,
172  userdata))
173 
174  def unsubscribe(self, address: str, cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
175  """
176  Removes a node from a subscription id
177  @param[in] address Address of a node, that should be removed to the given subscription.
178  @param[in] callback Callback to called when data is subscribed
179  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
180  @result <Result> status of function call
181  """
182  b_id = self.id().encode('utf-8')
183  b_address = address.encode('utf-8')
184  return Result(datalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAsync(
185  self.__client().get_handle(),
186  b_id,
187  b_address,
189  userdata))
190 
191  def subscribe_multi(self, address: typing.List[str], cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
192  """
193  Setup a subscription to multiple nodes
194  @param[in] address Set of addresses of nodes, that should be removed to the given subscription.
195  @param[in] count Count of addresses.
196  @param[in] callback Callback to called when data is subscribed
197  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
198  @result <Result> status of function call
199  """
200  b_id = self.id().encode('utf-8')
201  b_address = (ctypes.c_char_p * len(address))(*
202  [d.encode('utf-8') for d in address])
203  return Result(datalayer.clib.libcomm_datalayer.DLR_clientSubscribeMultiAsync(
204  self.__client().get_handle(),
205  b_id,
206  b_address,
207  len(address),
209  userdata))
210 
211  def unsubscribe_multi(self, address: typing.List[str], cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
212  """
213  Removes a set of nodes from a subscription id
214  @param[in] address Address of a node, that should be removed to the given subscription.
215  @param[in] callback Callback to called when data is subscribed
216  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
217  @result <Result> status of function call
218  """
219  b_id = self.id().encode('utf-8')
220  b_address = (ctypes.c_char_p * len(address))(*
221  [d.encode('utf-8') for d in address])
222  return Result(datalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeMultiAsync(
223  self.__client().get_handle(),
224  b_id,
225  b_address,
226  len(address),
228  userdata))
229 
230  def unsubscribe_all(self, cb: datalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
231  """
232  Removes all subscriptions from a subscription id
233  @param[in] callback Callback to called when data is subscribed
234  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
235  @result <Result> status of function call
236  """
237  if self.__client is None:
238  return
239  self.__client()._unregister_sync(self)
240  b_id = self.id().encode('utf-8')
241  return Result(datalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAllAsync(
242  self.__client().get_handle(),
243  b_id,
245  userdata))
246 
247  def wait_on_response_cb(self, wait: int = 5) -> bool:
248  """ wait_on_response_cb """
249  if wait <= 0:
250  wait = 5
251  n = 0
252  while not self.__on_cb and n < wait:
253  n = n + 1
254  time.sleep(1)
255  return self.__on_cb
datalayer.subscription_async.SubscriptionAsync.unsubscribe_all
Result unsubscribe_all(self, datalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes all subscriptions from a subscription id.
Definition: subscription_async.py:250
datalayer.subscription_async.SubscriptionAsync.__on_cb
__on_cb
datalayer calls this function
Definition: subscription_async.py:34
datalayer.subscription_async.SubscriptionAsync.__client
__client
Definition: subscription_async.py:32
datalayer.subscription_async.SubscriptionAsync.__exit__
def __exit__(self, exc_type, exc_val, exc_tb)
use the python context manager
Definition: subscription_async.py:45
datalayer.subscription.Subscription
Subscription.
Definition: subscription.py:119
datalayer.variant.Result
Definition: variant.py:19
datalayer.subscription_async.SubscriptionAsync.unsubscribe_multi
Result unsubscribe_multi(self, typing.List[str] address, datalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes a set of nodes from a subscription id.
Definition: subscription_async.py:232
datalayer.client._CallbackPtr
Callback wrapper.
Definition: client.py:25
datalayer.subscription_async.SubscriptionAsync.__create_sub_notify_callback
def __create_sub_notify_callback(self, datalayer.subscription.ResponseNotifyCallback cb)
callback management
Definition: subscription_async.py:79
datalayer.variant
Definition: variant.py:1
datalayer.subscription_async.SubscriptionAsync.subscribe
Result subscribe(self, str address, datalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Setup a subscription to a node.
Definition: subscription_async.py:178
datalayer.subscription.Subscription.id
str id(self)
Subscription ID.
Definition: subscription.py:124
datalayer.subscription_async.SubscriptionAsync.subscribe_multi
Result subscribe_multi(self, typing.List[str] address, datalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Setup a subscription to multiple nodes.
Definition: subscription_async.py:213
datalayer.subscription_async.SubscriptionAsync.__closed
__closed
Definition: subscription_async.py:31
datalayer.clib
Definition: clib.py:1
datalayer.subscription_async.SubscriptionAsync.__id
__id
Definition: subscription_async.py:33
datalayer.subscription.NotifyItem
NotifyItem.
Definition: subscription.py:30
datalayer.subscription
Definition: subscription.py:1
datalayer.subscription_async.SubscriptionAsync.__ptr_notify
__ptr_notify
Definition: subscription_async.py:65
datalayer.subscription_async.SubscriptionAsync
SubscriptionAsync.
Definition: subscription_async.py:21
datalayer.subscription_async.SubscriptionAsync.unsubscribe
Result unsubscribe(self, str address, datalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes a node from a subscription id.
Definition: subscription_async.py:195
datalayer.subscription_async.SubscriptionAsync.__init__
def __init__(self, datalayer.client.Client client)
Definition: subscription_async.py:28
datalayer.subscription_async.SubscriptionAsync.on_close
def on_close(self)
on_close
Definition: subscription_async.py:49
datalayer.subscription_async.SubscriptionAsync.__ptr_resp
__ptr_resp
Definition: subscription_async.py:66
datalayer.subscription_async.SubscriptionAsync.close
def close(self)
closes the client instance
Definition: subscription_async.py:61
datalayer.subscription_async.SubscriptionAsync.__enter__
def __enter__(self)
use the python context manager
Definition: subscription_async.py:39
datalayer.subscription_async.SubscriptionAsync.__create_response_callback
def __create_response_callback(self, datalayer.client.ResponseCallback cb)
callback management
Definition: subscription_async.py:117
datalayer.subscription_async.SubscriptionAsync.id
str id(self)
Subscription ID.
Definition: subscription_async.py:55
datalayer.subscription.get_id
str get_id(Variant prop)
get_id
Definition: subscription.py:162
datalayer.variant.VariantRef
Definition: variant.py:732
datalayer.client.Client
Client interface for accessing data from the system.
Definition: client.py:61
datalayer.subscription_async.SubscriptionAsync.wait_on_response_cb
bool wait_on_response_cb(self, int wait=5)
wait_on_response_cb
Definition: subscription_async.py:262