10 import comm.datalayer.NotifyInfo
11 import comm.datalayer.SubscriptionProperties
13 from datalayer.clib_variant
import C_DLR_VARIANT
17 class NotifyType(enum.Enum):
32 __slots__ = [
'__data',
'__info']
34 def __init__(self, data: C_DLR_VARIANT, info: C_DLR_VARIANT):
36 @param[in] data of the notify item
37 @param[in] containing notify_info.fbs
41 if i.get_type() != VariantType.FLATBUFFERS:
43 b = i.get_flatbuffers()
44 self.
__info = comm.datalayer.NotifyInfo.NotifyInfo.GetRootAsNotifyInfo(
49 data of the notify item
61 return self.
__info.Node().decode(
"utf-8")
73 uint64; // Filetime: Contains a 64-bit value representing the number of 100-nanosecond intervals since January 1, 1601 (UTC).
77 return self.
__info.Timestamp()
82 EPOCH_AS_FILETIME = 116444736000000000
83 HUNDREDS_OF_NANOSECONDS = 10000000
86 def to_datetime(filetime: int) -> datetime.datetime:
87 """Converts a Microsoft filetime number to a Python datetime. The new
88 datetime object is time zone-naive but is equivalent to tzinfo=utc.
89 >>> filetime_to_dt(116444736000000000)
90 datetime.datetime(1970, 1, 1, 0, 0)
91 >>> filetime_to_dt(128930364000000000)
92 datetime.datetime(2009, 7, 25, 23, 0)
94 return datetime.datetime.utcfromtimestamp(
95 (filetime - EPOCH_AS_FILETIME) / HUNDREDS_OF_NANOSECONDS)
98 ResponseNotifyCallback = typing.Callable[[
99 Result, typing.List[NotifyItem], datalayer.clib.userData_c_void_p],
None]
101 ResponseNotifyCallback
102 This callback delivers a vector with the updated nodes of a subscription.
103 It is usually called in the interval given by the publishInterval which has been set by the creation of the subscription.
104 The callback may not contain all nodes of the subscription.I.e.when a node did not change.
105 The callback may contain a node multiple times.I.e.when the node did change multiple times since the last callback.
106 The sorting order of the items in the vector is undefined.
107 @param[in] status Notify status
108 @param[in] items Notify data
109 @param[in] userdata Same userdata as in create subscription given
110 Result != OK the list of NotifyItem is None
127 notification of the close
132 def create_properties(ident: str, publish_interval: int = 1000,
133 keepalive_interval: int = 60000, error_interval: int = 10000) -> Variant:
136 @returns <Variant> Variant that describe ruleset of subscription
138 builder = flatbuffers.Builder(1024)
140 idl = builder.CreateString(ident)
141 comm.datalayer.SubscriptionProperties.SubscriptionPropertiesStart(
143 comm.datalayer.SubscriptionProperties.SubscriptionPropertiesAddId(
145 comm.datalayer.SubscriptionProperties.SubscriptionPropertiesAddPublishInterval(
146 builder, publish_interval)
147 comm.datalayer.SubscriptionProperties.SubscriptionPropertiesAddKeepaliveInterval(
148 builder, keepalive_interval)
149 comm.datalayer.SubscriptionProperties.SubscriptionPropertiesAddErrorInterval(
150 builder, error_interval)
151 prop = comm.datalayer.SubscriptionProperties.SubscriptionPropertiesEnd(
155 v.set_flatbuffers(builder.Output())
159 def get_id(prop: Variant) -> str:
161 if prop.get_type() != VariantType.FLATBUFFERS:
163 b = prop.get_flatbuffers()
164 p = comm.datalayer.SubscriptionProperties.SubscriptionProperties.GetRootAsSubscriptionProperties(
166 return p.Id().decode(
"utf-8")