Chapter 4
Chapter 4
Chapter 4
(3rd Edition)
Chapter 4: Communication
Communication
• Inter-process communication is at the heart of all distributed
systems
• Communication in distributed systems has traditionally
always been based on low-level message passing as offered
by the underlying network
• Expressing communication through message passing is
harder than using primitives based on shared memory, as
available for nondistributed system
• Modern distributed systems often consists of thousands or
even millions of processes scattered across a network with
unreliable communication such as the Internet
• Unless the primitive communication facilities of computer
networks are replaced by something else, development of
large-scale distributed applications extremely difficult
Outline
• Foundations –Communication Protocol
• Remote Procedure Call (RPC)
• Message-Oriented Communication (MOM)
• Multicast Communication
Communication: Foundations Layered Protocols
Application protocol
Application 7
Presentation protocol
Presentation 6
Session protocol
Session 5
Transport protocol
Transport 4
Network protocol
Network 3
Data link protocol
Data link 2
Physical protocol
Physical 1
Network
Drawbacks
► Focus on message-passing only
► Often unneeded or unwanted functionality
► Violates access transparency
The OSI reference model 6 / 49
Communication: Foundations Layered Protocols
Low-level layers
Recap
► Physical layer: contains the specification and implementation of
bits, and their transmission between sender and receiver
► Data link layer: prescribes the transmission of a series of bits into
a frame to allow for error and flow control
► Network layer: describes how packets in a network of computers
are to be routed.
Observation
For many distributed systems, the lowest-level interface is that of the
network layer.
Transport Layer
Important
The transport layer provides the actual communication facilities for
most distributed systems.
Middleware layer
Observation
Middleware is invented to provide common services and protocols that
can be used by many different applications
► A rich set of communication protocols
► (Un)marshaling of data, necessary for integrated systems
► Naming protocols, to allow easy sharing of resources
► Security protocols for secure communication
► Scaling mechanisms, such as for replication and caching
Note
What remains are truly application-specific protocols... such as?
Middleware protocols 9 / 49
Communication: Foundations Layered Protocols
Application protocol
Application
Middleware protocol
Middleware
system
Physical/Link-level protocol
Hardware
Network
Middleware protocols 10 /
Communication: Foundations Types of Communication
Types of communication
Distinguish...
Synchronize at Synchronize at Synchronize after
request submission request delivery processing by server
Client
Request
Transmission
interrupt
Storage
facility
Reply
Server Time
11 /
Communication: Foundations Types of Communication
Types of communication
Transient versus persistent
Synchronize at Synchronize at Synchronize after
request submission request delivery processing by server
Client
Request
Transmission
interrupt
Storage
facility
Reply
Server Time
Types of communication
Places for synchronization
Synchronize at Synchronize at Synchronize after
request submission request delivery processing by server
Client
Request
Transmission
interrupt
Storage
facility
Reply
Server Time
► At request submission
► At request delivery
► After request processing
13 /
Communication: Foundations Types of Communication
Client/Server
Some observations
Client/Server computing is generally based on a model of transient
synchronous communication:
► Client and server have to be active at time of communication
► Client issues request and blocks until it receives reply
► Server essentially waits only for incoming requests, and
subsequently processes them
10 / 49
Communication: Foundations Types of Communication
Client/Server
Some observations
Client/Server computing is generally based on a model of transient
synchronous communication:
► Client and server have to be active at time of communication
► Client issues request and blocks until it receives reply
► Server essentially waits only for incoming requests, and
subsequently processes them
10 / 49
Communication: Foundations Types of Communication
Messaging
Message-oriented middleware
Aims at high-level persistent asynchronous communication:
► Processes send each other messages, which are queued
► Sender need not wait for immediate reply, but can do other things
► Middleware often ensures fault tolerance
11 / 49
Communication: Remote procedure call Basic RPC operation
Observations
► Application developers are familiar with simple procedure model
► Well-engineered procedures operate in isolation (black box)
► There is no fundamental reason not to execute procedures on
separate machine
Wait for result
Client
12 / 49
Communication: Remote procedure call Basic RPC operation
3. Message is sent
across the network
1. Client procedure calls client stub. 6. Server does local call; returns result to
2. Stub builds message; calls local OS. stub.
3. OS sends message to remote 7. Stub builds message; calls OS.
OS. 8. OS sends message to client’s OS.
4. Remote OS gives message to stub. 9. Client’s OS gives message to stub.
5. Server stub unpacks parameter(s) 10. Client stub unpacks result; returns to
and call the server Client 13 / 49
Communication: Remote procedure call Parameter passing
Conclusion
Client and server need to properly interpret messages, transforming
them into machine-dependent representations.
14 / 49
Communication: Remote procedure call Parameter passing
Some assumptions
► Copy in/copy out semantics: while procedure is executed, nothing
can be assumed about parameter values.
► All data that is to be operated on is passed by parameters.
Excludes passing references to (global) data.
15 / 49
Communication: Remote procedure call Parameter passing
Some assumptions
► Copy in/copy out semantics: while procedure is executed, nothing
can be assumed about parameter values.
► All data that is to be operated on is passed by parameters.
Excludes passing references to (global) data.
Conclusion
Full access transparency cannot be realized.
15 / 49
Communication: Remote procedure call Parameter passing
Some assumptions
► Copy in/copy out semantics: while procedure is executed, nothing
can be assumed about parameter values.
► All data that is to be operated on is passed by parameters.
Excludes passing references to (global) data.
Conclusion
Full access transparency cannot be realized.
Asynchronous RPCs
Essence
Try to get rid of the strict request-reply behavior, but let the client
continue without waiting for an answer from the server.
Accept results
Request request
Asynchronous RPC 16 / 49
Communication: Remote procedure call Variations on RPC
Essence
Sending an RPC request to a group of servers.
Callbacks to client
Client
Call remote
procedures
Multicast RPC 17 / 49
Communication: Remote procedure call Example: DCE RPC
RPC in practice
Uuidgen
Interface
definition file
IDL compiler
#include #include
Runtime Runtime
Linker Linker
library library
Client Server
binary binary
Issues
(1) Client must locate server machine, and (2) locate the server.
Directory machine
Directory
server
2. Register service
3. Look up server
Server machine
Client machine
Message-oriented communication
20 / 49
Communication: Message-oriented communication Simple transient messaging with sockets
Server
socket bind listen accept receive send close
Server
1 from so ck et import *
2 s = socket(AF_INET, SOCK_STREAM)
3 s.bind((HOST, PORT))
4 s.listen(1)
5 (conn, a d d r) = s . a c c e p t ( ) # re t u r n s new s o c k e t and a d d r. c l i e n t
6 while True: # f o re v e r
7 d a t a = conn.recv(1024) # re c e i v e data from c l i e n t
8 i f not d a t a : break # s t o p i f c l i e n t stopped
9 conn.send(str(data)+" * ") # re t u r n s e n t data p l u s an " * "
10 co n n .clo se() # c l o s e t h e connection
Client
1 from so ck et import *
2 s = socket(AF_INET, SOCK_STREAM)
3 s.connect((HOST, PORT)) # connect t o s e r v e r (b l o c k u n t i l accepted)
4 s. se n d ( ’He l l o , world’) # send same data
5 d a t a = s.recv(1024) # re c e i v e t h e response
6 print d a t a # p rin t th e re su l t
7 s.close() # c l o s e t h e connection
21 / 49
Communication: Message-oriented communication Advanced transient messaging
Observation
Sockets are rather low level and programming mistakes are easily
made. However, the way that they are used is often the same (such as
in a client-server setting).
Alternative: ZeroMQ
Provides a higher level of expression by pairing sockets: one for
sending messages at process P and a corresponding one at process
Q for receiving messages. All communication is asynchronous.
Three patterns
► Request-reply
► Publish-subscribe
► Pipeline
Request-reply
Server
1 import zmq
2 co n tex t = zmq.Context()
3
4 p1 = " t c p : / / " + HOST +":"+ PORT1 # how and where t o connect
5 p2 = " t c p : / / " + HOST +":"+ PORT2 # how and where t o connect
6 s = context.socket(zmq.REP) # c re a t e re p l y s o c k e t
7
8 s.b in d (p 1 ) # bind s o c k e t t o address
9 s.b in d (p 2 ) # bind s o c k e t t o
10 while True: address
11 message = s . r e c v ( ) # w a i t f o r incoming message
12 i f not "STOP" i n message: # i f not t o s t o p . . .
13 s.send(message + " * " ) # append " * " t o message
14 else: # else...
15 break # break o u t o f loop and
end
Request-reply
Client
1 import zmq
2 co n tex t = zmq.Context()
3
4 php = " t c p : / / " + HOST +":"+ PORT # how and where t o
connect
5 s = context.socket(zmq.REQ) # c re a t e s o c k e t
67 s.connect(php) # b lo ck u n t i l connected
8 s.send("Hello World") # send message
9 message = s . r e c v ( ) # b lo ck u n t i l response
10 s.send("STOP") # t e l l server t o stop
11 print message # p rin t re su l t
Publish-subscribe
Server
1 import zmq, time
2
3 co n tex t = zmq.Context()
4 s = context.socket(zmq.PUB) # c re a t e a p u b li sh er s o c k e t
5 p = " t c p : / / " + HOST +":"+ PORT # how and where t o communicate
6 s.bind(p) # bind s o c k e t t o t h e address
7 while True:
8 t i m e . sl e e p (5 ) # w a i t every 5 seconds
9 s.send("TIME " + t i m e . a sc t i m e ()) # p u b li sh t h e cu rren t t i m e
Client
1 import zmq
2
3 co n tex t = zmq.Context()
4 s = context.socket(zmq.SUB) # c re a t e a subscriber s o c k e t
5 p = " t c p : / / " + HOST +":"+ PORT # how and where t o communicate
6 s.co n n ect(p ) # connect t o t h e s e r v e r
7 s.setsockopt(zmq.SUBSCRIBE, "TIME") # subscribe t o TIME messages
8
9 f o r i i n range(5): # Five i t e r a t i o n s
10 time = s . r e c v ( ) # re c e i v e a message
11 print time
Pipeline
Source
1 import zmq, t i m e , p i c k l e , s y s , random
2
3 co n tex t = zmq.Context()
4 me = s t r ( s y s . a rg v [ 1 ] )
5 s = context.socket(zmq.PUSH) # c re a t e a push s o c k e t
6 s r c = SRC1 i f me == ’ 1 ’ e l s e SRC2 # check t a s k source h o s t
7 p r t = PORT1 i f me == ’ 1 ’ e l s e PORT2 # check t a s k source p o r t
8 p = " t c p : / / " + s r c +":"+ p r t # how and where t o connect
9 s.bind(p) # bind s o c k e t t o address
10
11 f o r i i n range(100): # generate 100 workloads
12 workload = random.randint(1, 100) # compute workload
13 s.send(pickle.dumps((me,workload))) # send workload t o
worker
Pipeline
Worker
1 import zmq, t i m e , p i c k l e , s y s
2
3 co n tex t = zmq.Context()
4 me = s t r ( s y s . a rg v [ 1 ] )
5 r = context.socket(zmq.PULL) # c re a t e a p u l l s o c k e t
6 p1 = " t c p : / / " + SRC1 +":"+ PORT1 # address f i r s t t a s k source
7 p2 = " t c p : / / " + SRC2 +":"+ PORT2 # address second t a s k source
8 r.connect(p1) # connect t o t a s k source 1
9 r.connect(p2) # connect t o t a s k source 2
10
11 while True:
12 work = p i c k l e . l o a d s ( r. r e c v ( ) ) # re c e i v e work from a source
13 time.sleep(work[1] * 0.01) # pretend t o work
Message-oriented middleware
Essence
Asynchronous persistent communication through support of
middleware-level queues. Queues correspond to buffers at
communication servers.
Operations
Operation Description
put Append a message to a specified queue
get Block until the specified queue is nonempty, and
remove the first message
poll Check a specified queue for messages, and
remove the first. Never block
notify Install a handler to be called when a message is
put into the specified queue
Message-queuing model 29 / 49
Communication: Message-oriented communication Message-oriented persistent communication
General model
Queue managers
Queues are managed by queue managers. An application can put
messages only into a local queue. Getting a message is possible by
extracting it from a local queue only ⇒ queue managers need to route
messages.
Routing
Look up
Source queue contact address Destination queue
manager of destination manager
queue manager
Logical
queue-level
address (name)
Contact
Network
address
Message broker
Observation
Message queuing systems assume a common messaging protocol: all
applications agree on message format (i.e., structure and data
representation)
Message brokers 31 / 49
Communication: Message-oriented communication Message-oriented persistent communication
Application Application
Interface Interface
Queuing
layer
Local OS Local OS Local OS
Message brokers 32 / 49
Communication: Message-oriented communication Example: IBM’s WebSphere message-queuing system
IBM’s WebSphere MQ
Basic concepts
► Application-specific messages are put into, and removed from
queues
► Queues reside under the regime of a queue manager
► Processes can put messages only in local queues, or through an
RPC mechanism
Message transfer
► Messages are transferred between queues
► Message transfer between queues at different processes,
requires a channel
► At each end point of channel is a message channel agent
► Message channel agents are responsible for:
► Setting up channels using lower-level network
communication facilities (e.g., TCP/IP)
► (Un)wrapping messages from/in transport-
level packets
Overview ► Sending/receiving packets 33 / 49
Communication: Message-oriented communication Example: IBM’s WebSphere message-queuing system
IBM’s WebSphere MQ
Schematic overview
Client's receive
Sending client Routing table Send queue queue Receiving client
Queue Queue
manager
manager
MQ Interface MQ Interface
Attribute Description
Transport type Determines the transport protocol to be used
FIFO delivery Indicates that messages are to be delivered in
the order they are sent
Message length Maximum length of a single message
Setup retry count Specifies maximum number of retries to start up
the remote MCA
Delivery retries Maximum times MCA will try to put received
message into queue
Channels 35 / 49
Communication: Message-oriented communication Example: IBM’s WebSphere message-queuing system
IBM’s WebSphere MQ
Routing
By using logical names, in combination with name resolution to local
queues, it is possible to put a message in a remote queue
Alias table
LA1 QMA
LA2 QMC SQ1
QMD
Message transfer 36 / 49
Communication: Multicast communication Application-level tree-based multicasting
Multicast Communication
• An important topic in communication id DSs is the support for
sending data to multiple receivers
• This topic has belonged to the domain of network protocols,
where numerous proposals for network-level and transport-level
solutions have been implemented and evaluated
• A major issue in all solutions was setting up the communication
paths for information disseminations
• In practice, this involved a huge management effort, in many
cases requiring human intervention
• With the advent of peer-to-peer technology, and notably
structured overlay management, it became easier to set up
communication paths
• As peer-to-peer solutions are typically deployed at the application
layer, various application level multicasting techniques have been
introduced
• Gossip-based information dissemination provides simple (yet
often less efficient) ways for multicasting
37 / 49
Communication: Multicast communication Application-level tree-based multicasting
Application-level multicasting
Essence
Organize nodes of a distributed system into an overlay network and
use that network to disseminate data:
► Oftentimes a tree, leading to unique paths
► Alternatively, also mesh networks, requiring a form of routing
37 / 49
Communication: Multicast communication Application-level tree-based multicasting
Basic approach
1. Initiator generates a multicast identifier mid .
2. Lookup succ(mid ), the node responsible for mid .
3. Request is routed to succ(mid ), which will become the root.
4. If P wants to join, it sends a join request to the root.
5. When request arrives at Q:
► Q has not seen a join request before ⇒ it becomes
forwarder; P becomes child of Q. Join request continues to
be forwarded.
► Q knows about tree ⇒ P becomes child of Q. No need to
forward join request anymore.
38 / 49
Communication: Multicast communication Application-level tree-based multicasting
► Link stress: How often does an ALM message cross the same
physical link? Example: message from A to D needs to cross
► (Ra, Rb) twice.
Stretch: Ratio in delay between ALM-level path and network-level
path. Example: messages B to C follow path of length 73 at ALM,
but 47 at network level ⇒ stretch = 73/47.
Flooding
Essence The size of a random
P simply sends a message m overlay as function of the
to each of its neighbors. Each number of nodes
neighbor will forward that
message, except to P, and only
300
if it had not seen m before.
0
100 500 1000
Number of nodes
40 / 49
Communication: Multicast communication Flooding-based multicasting
Flooding
Essence The size of a random
P simply sends a message m overlay as function of the
to each of its neighbors. Each number of nodes
neighbor will forward that
message, except to P, and only
300
if it had not seen m before.
0
100 500 1000
Number of nodes
Variation
Let Q forward a message with a certain probability pflood , possibly
even dependent on its own number of neighbors (i.e., node degree) or
the degree of its neighbors. 40 / 49
Communication: Multicast communication Gossip-based data dissemination
Epidemic protocols
41 / 49
Communication: Multicast communication Gossip-based data dissemination
Anti-entropy
Principle operations
► A node P selects another node Q from the system at random.
► Pull: P only pulls in new updates from Q
► Push: P only pushes its own updates to Q
► Push-pull: P and Q send updates to each other
Observation
For push-pull it takes O(log(N)) rounds to disseminate updates to all
N nodes (round = when every node has taken the initiative to start an
exchange).
Anti-entropy: analysis
Basics
Consider a single source, propagating its update. Let pi be the
probability that a node has not received the update after the ith round.
Anti-entropy performance
1.0
N = 10,000
Probability not yet updated 0.8
0.6 push
0.4 push-pull
pull
0.2
0 5 10 15 20 25
Round
Rumor spreading
Basic model
A server S having an update to report, contacts other servers. If a
server is contacted to which the update has already propagated, S
stops contacting other servers with probability pstop.
Observation
If s is the fraction of ignorant servers (i.e., which are unaware of the
update), it can be shown that with many servers
Formal analysis
Notations
Let s denote fraction of nodes that have not yet been updated (i.e.,
susceptible; i the fraction of updated (infected) and active nodes; and r
the fraction of updated nodes that gave up (removed).
(1) ds/ dt = −s
(2) di/dt ·= i s · i − pstop · (1 −
stopp
⇒ di/ ds s) · i −(1 + pstop ) s
=
⇒ i (s) +
= −(1 + pstop ) · s + pstop · ln(s)
+ C
Wrapup
i (1) = 0 ⇒ C = 1 + pstop ⇒ i (s) = (1 + pstop) · (1 −s) + pstop · ln(s).
We
are looking for the case i (s) = 0, which leads to s = e − (1/p stop + 1)(1 − s )
Information dissemination models 46 / 49
Communication: Multicast communication Gossip-based data dissemination
Rumor spreading
Rumor spreading
Note
If we really have to ensure that all servers are eventually updated,
rumor spreading alone is not enough
Deleting values
Fundamental problem
We cannot remove an old value from a server and expect the removal
to propagate. Instead, mere removal will be undone in due time using
epidemic algorithms
Solution
Removal has to be registered as a special update by inserting a death
certificate
Removing data 48 / 49
Communication: Multicast communication Gossip-based data dissemination
Deleting values
Note
It is necessary that a removal actually reaches all servers.
Removing data 49 / 49