1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton.endpoints module
22 """
23
24 from __future__ import absolute_import
25
26 import weakref
27
28 from cproton import PN_CONFIGURATION, PN_COORDINATOR, PN_DELIVERIES, PN_DIST_MODE_COPY, PN_DIST_MODE_MOVE, \
29 PN_DIST_MODE_UNSPECIFIED, PN_EOS, PN_EXPIRE_NEVER, PN_EXPIRE_WITH_CONNECTION, PN_EXPIRE_WITH_LINK, \
30 PN_EXPIRE_WITH_SESSION, PN_LOCAL_ACTIVE, PN_LOCAL_CLOSED, PN_LOCAL_UNINIT, PN_NONDURABLE, PN_RCV_FIRST, \
31 PN_RCV_SECOND, PN_REMOTE_ACTIVE, PN_REMOTE_CLOSED, PN_REMOTE_UNINIT, PN_SND_MIXED, PN_SND_SETTLED, PN_SND_UNSETTLED, \
32 PN_SOURCE, PN_TARGET, PN_UNSPECIFIED, pn_connection, pn_connection_attachments, pn_connection_close, \
33 pn_connection_collect, pn_connection_condition, pn_connection_desired_capabilities, pn_connection_error, \
34 pn_connection_get_container, pn_connection_get_hostname, pn_connection_get_user, pn_connection_offered_capabilities, \
35 pn_connection_open, pn_connection_properties, pn_connection_release, pn_connection_remote_condition, \
36 pn_connection_remote_container, pn_connection_remote_desired_capabilities, pn_connection_remote_hostname, \
37 pn_connection_remote_offered_capabilities, pn_connection_remote_properties, pn_connection_set_container, \
38 pn_connection_set_hostname, pn_connection_set_password, pn_connection_set_user, pn_connection_state, \
39 pn_connection_transport, pn_delivery, pn_error_code, pn_error_text, pn_link_advance, pn_link_attachments, \
40 pn_link_available, pn_link_close, pn_link_condition, pn_link_credit, pn_link_current, pn_link_detach, pn_link_drain, \
41 pn_link_drained, pn_link_draining, pn_link_error, pn_link_flow, pn_link_free, pn_link_get_drain, pn_link_head, \
42 pn_link_is_receiver, pn_link_is_sender, pn_link_max_message_size, pn_link_name, pn_link_next, pn_link_offered, \
43 pn_link_open, pn_link_queued, pn_link_rcv_settle_mode, pn_link_recv, pn_link_remote_condition, \
44 pn_link_remote_max_message_size, pn_link_remote_rcv_settle_mode, pn_link_remote_snd_settle_mode, \
45 pn_link_remote_source, pn_link_remote_target, pn_link_send, pn_link_session, pn_link_set_drain, \
46 pn_link_set_max_message_size, pn_link_set_rcv_settle_mode, pn_link_set_snd_settle_mode, pn_link_snd_settle_mode, \
47 pn_link_source, pn_link_state, pn_link_target, pn_link_unsettled, pn_receiver, pn_sender, pn_session, \
48 pn_session_attachments, pn_session_close, pn_session_condition, pn_session_connection, pn_session_free, \
49 pn_session_get_incoming_capacity, pn_session_get_outgoing_window, pn_session_head, pn_session_incoming_bytes, \
50 pn_session_next, pn_session_open, pn_session_outgoing_bytes, pn_session_remote_condition, \
51 pn_session_set_incoming_capacity, pn_session_set_outgoing_window, pn_session_state, pn_terminus_capabilities, \
52 pn_terminus_copy, pn_terminus_filter, pn_terminus_get_address, pn_terminus_get_distribution_mode, \
53 pn_terminus_get_durability, pn_terminus_get_expiry_policy, pn_terminus_get_timeout, pn_terminus_get_type, \
54 pn_terminus_is_dynamic, pn_terminus_outcomes, pn_terminus_properties, pn_terminus_set_address, \
55 pn_terminus_set_distribution_mode, pn_terminus_set_durability, pn_terminus_set_dynamic, \
56 pn_terminus_set_expiry_policy, pn_terminus_set_timeout, pn_terminus_set_type, pn_work_head
57
58 from ._common import unicode2utf8, utf82unicode
59 from ._condition import cond2obj, obj2cond
60 from ._data import Data, dat2obj, obj2dat
61 from ._delivery import Delivery
62 from ._exceptions import ConnectionException, EXCEPTIONS, LinkException, SessionException
63 from ._transport import Transport
64 from ._wrapper import Wrapper
108
111 """
112 A representation of an AMQP connection
113 """
114
115 @staticmethod
117 if impl is None:
118 return None
119 else:
120 return Connection(impl)
121
122 - def __init__(self, impl=pn_connection):
124
126 Endpoint._init(self)
127 self.offered_capabilities = None
128 self.desired_capabilities = None
129 self.properties = None
130 self.url = None
131 self._acceptor = None
132
134 return pn_connection_attachments(self._impl)
135
136 @property
139
140 @property
143
150
152 return pn_connection_condition(self._impl)
153
155 return pn_connection_remote_condition(self._impl)
156
158 if collector is None:
159 pn_connection_collect(self._impl, None)
160 else:
161 pn_connection_collect(self._impl, collector._impl)
162 self._collector = weakref.ref(collector)
163
165 return utf82unicode(pn_connection_get_container(self._impl))
166
169
170 container = property(_get_container, _set_container)
171
173 return utf82unicode(pn_connection_get_hostname(self._impl))
174
177
178 hostname = property(_get_hostname, _set_hostname,
179 doc="""
180 Set the name of the host (either fully qualified or relative) to which this
181 connection is connecting to. This information may be used by the remote
182 peer to determine the correct back-end service to connect the client to.
183 This value will be sent in the Open performative, and will be used by SSL
184 and SASL layers to identify the peer.
185 """)
186
189
192
193 user = property(_get_user, _set_user)
194
197
200
201 password = property(_get_password, _set_password)
202
203 @property
205 """The container identifier specified by the remote peer for this connection."""
206 return pn_connection_remote_container(self._impl)
207
208 @property
210 """The hostname specified by the remote peer for this connection."""
211 return pn_connection_remote_hostname(self._impl)
212
213 @property
215 """The capabilities offered by the remote peer for this connection."""
216 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
217
218 @property
220 """The capabilities desired by the remote peer for this connection."""
221 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
222
223 @property
225 """The properties specified by the remote peer for this connection."""
226 return dat2obj(pn_connection_remote_properties(self._impl))
227
228 @property
230 return self.url and str(self.url)
231
233 """
234 Opens the connection.
235
236 In more detail, this moves the local state of the connection to
237 the ACTIVE state and triggers an open frame to be sent to the
238 peer. A connection is fully active once both peers have opened it.
239 """
240 obj2dat(self.offered_capabilities,
241 pn_connection_offered_capabilities(self._impl))
242 obj2dat(self.desired_capabilities,
243 pn_connection_desired_capabilities(self._impl))
244 obj2dat(self.properties, pn_connection_properties(self._impl))
245 pn_connection_open(self._impl)
246
248 """
249 Closes the connection.
250
251 In more detail, this moves the local state of the connection to
252 the CLOSED state and triggers a close frame to be sent to the
253 peer. A connection is fully closed once both peers have closed it.
254 """
255 self._update_cond()
256 pn_connection_close(self._impl)
257 if hasattr(self, '_session_policy'):
258
259 del self._session_policy
260
261 @property
263 """
264 The state of the connection as a bit field. The state has a local
265 and a remote component. Each of these can be in one of three
266 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
267 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
268 REMOTE_ACTIVE and REMOTE_CLOSED.
269 """
270 return pn_connection_state(self._impl)
271
273 """
274 Returns a new session on this connection.
275 """
276 ssn = pn_session(self._impl)
277 if ssn is None:
278 raise (SessionException("Session allocation failed."))
279 else:
280 return Session(ssn)
281
283 return Session.wrap(pn_session_head(self._impl, mask))
284
286 return Link.wrap(pn_link_head(self._impl, mask))
287
288 @property
291
292 @property
294 return pn_error_code(pn_connection_error(self._impl))
295
297 pn_connection_release(self._impl)
298
299
300 -class Session(Wrapper, Endpoint):
301
302 @staticmethod
304 if impl is None:
305 return None
306 else:
307 return Session(impl)
308
311
313 return pn_session_attachments(self._impl)
314
316 return pn_session_condition(self._impl)
317
319 return pn_session_remote_condition(self._impl)
320
322 return pn_session_get_incoming_capacity(self._impl)
323
325 pn_session_set_incoming_capacity(self._impl, capacity)
326
327 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
328
330 return pn_session_get_outgoing_window(self._impl)
331
333 pn_session_set_outgoing_window(self._impl, window)
334
335 outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
336
337 @property
339 return pn_session_outgoing_bytes(self._impl)
340
341 @property
343 return pn_session_incoming_bytes(self._impl)
344
346 pn_session_open(self._impl)
347
349 self._update_cond()
350 pn_session_close(self._impl)
351
352 - def next(self, mask):
353 return Session.wrap(pn_session_next(self._impl, mask))
354
355 @property
357 return pn_session_state(self._impl)
358
359 @property
362
363 @property
366
369
372
374 pn_session_free(self._impl)
375
376
377 -class Link(Wrapper, Endpoint):
378 """
379 A representation of an AMQP link, of which there are two concrete
380 implementations, Sender and Receiver.
381 """
382
383 SND_UNSETTLED = PN_SND_UNSETTLED
384 SND_SETTLED = PN_SND_SETTLED
385 SND_MIXED = PN_SND_MIXED
386
387 RCV_FIRST = PN_RCV_FIRST
388 RCV_SECOND = PN_RCV_SECOND
389
390 @staticmethod
392 if impl is None: return None
393 if pn_link_is_sender(impl):
394 return Sender(impl)
395 else:
396 return Receiver(impl)
397
400
402 return pn_link_attachments(self._impl)
403
405 if err < 0:
406 exc = EXCEPTIONS.get(err, LinkException)
407 raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
408 else:
409 return err
410
412 return pn_link_condition(self._impl)
413
415 return pn_link_remote_condition(self._impl)
416
418 """
419 Opens the link.
420
421 In more detail, this moves the local state of the link to the
422 ACTIVE state and triggers an attach frame to be sent to the
423 peer. A link is fully active once both peers have attached it.
424 """
425 pn_link_open(self._impl)
426
428 """
429 Closes the link.
430
431 In more detail, this moves the local state of the link to the
432 CLOSED state and triggers an detach frame (with the closed flag
433 set) to be sent to the peer. A link is fully closed once both
434 peers have detached it.
435 """
436 self._update_cond()
437 pn_link_close(self._impl)
438
439 @property
441 """
442 The state of the link as a bit field. The state has a local
443 and a remote component. Each of these can be in one of three
444 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
445 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
446 REMOTE_ACTIVE and REMOTE_CLOSED.
447 """
448 return pn_link_state(self._impl)
449
450 @property
452 """The source of the link as described by the local peer."""
453 return Terminus(pn_link_source(self._impl))
454
455 @property
457 """The target of the link as described by the local peer."""
458 return Terminus(pn_link_target(self._impl))
459
460 @property
462 """The source of the link as described by the remote peer."""
463 return Terminus(pn_link_remote_source(self._impl))
464
465 @property
467 """The target of the link as described by the remote peer."""
468 return Terminus(pn_link_remote_target(self._impl))
469
470 @property
473
474 @property
476 """The connection on which this link was attached."""
477 return self.session.connection
478
479 @property
482
485
486 @property
489
491 return pn_link_advance(self._impl)
492
493 @property
495 return pn_link_unsettled(self._impl)
496
497 @property
499 """The amount of outstanding credit on this link."""
500 return pn_link_credit(self._impl)
501
502 @property
504 return pn_link_available(self._impl)
505
506 @property
508 return pn_link_queued(self._impl)
509
510 - def next(self, mask):
511 return Link.wrap(pn_link_next(self._impl, mask))
512
513 @property
515 """Returns the name of the link"""
516 return utf82unicode(pn_link_name(self._impl))
517
518 @property
520 """Returns true if this link is a sender."""
521 return pn_link_is_sender(self._impl)
522
523 @property
525 """Returns true if this link is a receiver."""
526 return pn_link_is_receiver(self._impl)
527
528 @property
530 return pn_link_remote_snd_settle_mode(self._impl)
531
532 @property
534 return pn_link_remote_rcv_settle_mode(self._impl)
535
537 return pn_link_snd_settle_mode(self._impl)
538
540 pn_link_set_snd_settle_mode(self._impl, mode)
541
542 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
543
545 return pn_link_rcv_settle_mode(self._impl)
546
548 pn_link_set_rcv_settle_mode(self._impl, mode)
549
550 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
551
553 return pn_link_get_drain(self._impl)
554
556 pn_link_set_drain(self._impl, bool(b))
557
558 drain_mode = property(_get_drain, _set_drain)
559
561 return pn_link_drained(self._impl)
562
563 @property
565 return pn_link_remote_max_message_size(self._impl)
566
568 return pn_link_max_message_size(self._impl)
569
571 pn_link_set_max_message_size(self._impl, mode)
572
573 max_message_size = property(_get_max_message_size, _set_max_message_size)
574
576 return pn_link_detach(self._impl)
577
579 pn_link_free(self._impl)
580
583 """
584 A link over which messages are sent.
585 """
586
588 pn_link_offered(self._impl, n)
589
591 """
592 Send specified data as part of the current delivery
593
594 @type data: binary
595 @param data: data to send
596 """
597 return self._check(pn_link_send(self._impl, data))
598
599 - def send(self, obj, tag=None):
600 """
601 Send specified object over this sender; the object is expected to
602 have a send() method on it that takes the sender and an optional
603 tag as arguments.
604
605 Where the object is a Message, this will send the message over
606 this link, creating a new delivery for the purpose.
607 """
608 if hasattr(obj, 'send'):
609 return obj.send(self, tag=tag)
610 else:
611
612 return self.stream(obj)
613
615 if not hasattr(self, 'tag_generator'):
616 def simple_tags():
617 count = 1
618 while True:
619 yield str(count)
620 count += 1
621
622 self.tag_generator = simple_tags()
623 return next(self.tag_generator)
624
627 """
628 A link over which messages are received.
629 """
630
632 """Increases the credit issued to the remote sender by the specified number of messages."""
633 pn_link_flow(self._impl, n)
634
635 - def recv(self, limit):
636 n, binary = pn_link_recv(self._impl, limit)
637 if n == PN_EOS:
638 return None
639 else:
640 self._check(n)
641 return binary
642
644 pn_link_drain(self._impl, n)
645
647 return pn_link_draining(self._impl)
648
651 UNSPECIFIED = PN_UNSPECIFIED
652 SOURCE = PN_SOURCE
653 TARGET = PN_TARGET
654 COORDINATOR = PN_COORDINATOR
655
656 NONDURABLE = PN_NONDURABLE
657 CONFIGURATION = PN_CONFIGURATION
658 DELIVERIES = PN_DELIVERIES
659
660 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
661 DIST_MODE_COPY = PN_DIST_MODE_COPY
662 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
663
664 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
665 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
666 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
667 EXPIRE_NEVER = PN_EXPIRE_NEVER
668
671
678
680 return pn_terminus_get_type(self._impl)
681
683 self._check(pn_terminus_set_type(self._impl, type))
684
685 type = property(_get_type, _set_type)
686
688 """The address that identifies the source or target node"""
689 return utf82unicode(pn_terminus_get_address(self._impl))
690
693
694 address = property(_get_address, _set_address)
695
697 return pn_terminus_get_durability(self._impl)
698
700 self._check(pn_terminus_set_durability(self._impl, seconds))
701
702 durability = property(_get_durability, _set_durability)
703
705 return pn_terminus_get_expiry_policy(self._impl)
706
708 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
709
710 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
711
713 return pn_terminus_get_timeout(self._impl)
714
716 self._check(pn_terminus_set_timeout(self._impl, seconds))
717
718 timeout = property(_get_timeout, _set_timeout)
719
721 """Indicates whether the source or target node was dynamically
722 created"""
723 return pn_terminus_is_dynamic(self._impl)
724
726 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
727
728 dynamic = property(_is_dynamic, _set_dynamic)
729
731 return pn_terminus_get_distribution_mode(self._impl)
732
734 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
735
736 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
737
738 @property
740 """Properties of a dynamic source or target."""
741 return Data(pn_terminus_properties(self._impl))
742
743 @property
745 """Capabilities of the source or target."""
746 return Data(pn_terminus_capabilities(self._impl))
747
748 @property
750 return Data(pn_terminus_outcomes(self._impl))
751
752 @property
754 """A filter on a source allows the set of messages transfered over
755 the link to be restricted"""
756 return Data(pn_terminus_filter(self._impl))
757
758 - def copy(self, src):
759 self._check(pn_terminus_copy(self._impl, src._impl))
760