29. Implementing network protocols
You now know how to write simple agents. But real world network protocols demand more complexity such as advertising services, looking up other agents, providing parameters that are computed on demand, encoding/decoding complex PDUs, generating random variates, and describing behaviors as finite state machines (FSMs). In this chapter, we illustrate how to do all these things with ease, using a few examples.
In
Chapter 20
, we looked at the MAC service in detail. In the next few sections, we develop three simple MAC agents (
MySimplestMac
,
MySimpleThrottledMac
and
MySimpleHandshakeMac
) to illustrate how network protocols and services are implemented by agents. The MAC agents are intentionally kept simple and not optimized for performance, as we wish to illustrate the key aspects of MAC agent development without getting lost in the details of optimal protocols.
29.1. Simple MAC without handshake
To illustrate how a MAC agent might work, let us start with a simple MAC agent that grants every reservation request as soon as it is made:
import org.arl.fjage.*
import org.arl.unet.*
import org.arl.unet.mac.*
class MySimplestMac extends UnetAgent {
@Override
void setup() {
register Services.MAC // advertise that the agent provides a MAC service
}
@Override
Message processRequest(Message msg) {
if (msg instanceof ReservationReq) {
// check requested duration
if (msg.duration <= 0) return new RefuseRsp(msg, 'Bad reservation duration')
// prepare START reservation notification
ReservationStatusNtf ntf1 = new ReservationStatusNtf(
recipient: msg.sender,
inReplyTo: msg.msgID,
to: msg.to,
status: ReservationStatus.START)
// prepare END reservation notification
ReservationStatusNtf ntf2 = new ReservationStatusNtf(
recipient: msg.sender,
inReplyTo: msg.msgID,
to: msg.to,
status: ReservationStatus.END)
// send START reservation notification immediately
add new OneShotBehavior({
send ntf1
})
// wait for reservation duration, and then send END reservation notification
add new WakerBehavior(Math.round(1000*msg.duration), {
send ntf2
})
// return a reservation response, which defaults to an AGREE performative
return new ReservationRsp(msg)
}
return null
}
}
Note a number of interesting features of the code above:
-
The
setup()
method is used to advertise the service provided by this agent. -
We provide basic error checking, and refuse a request that is invalid, providing a descriptive reason.
-
We prepare the AGREE response as well as the START and END status notification messages, all at once. We send out the START notification immediately (using a
OneShotBehavior
), use aWakerBehavior
to schedule the END notification to be sent out at an appropriate time, and then simply return the AGREE response. The use of theOneShotBehavior
ensures that the START notification is sent after the AGREE response, and not before. -
We return a
null
if we don’t understand the request, allowing the superclass to respond with a NOT_UNDERSTOOD message.
While the above code implements a fully functional MAC agent, it needs to respond to
ReservationCancelReq
,
ReservationAcceptReq
and
TxAckReq
messages, and provide
channelBusy
,
reservationPayloadSize
,
ackPayloadSize
,
maxReservationDuration
and
recommendedReservationDuration
parameters in order to comply with the MAC service specification (
Chapter 20
). We add this functionality trivially, by responding to the messages with
RefuseRsp
(message with a
REFUSE
performative and a descriptive reason), and returning default values for all the parameters. The resulting complete source code is shown below:
import org.arl.fjage.*
import org.arl.fjage.param.Parameter
import org.arl.unet.*
import org.arl.unet.mac.*
class MySimplestMac extends UnetAgent {
@Override
void setup() {
register Services.MAC
}
@Override
Message processRequest(Message msg) {
switch (msg) {
case ReservationReq:
if (msg.duration <= 0) return new RefuseRsp(msg, 'Bad reservation duration')
ReservationStatusNtf ntf1 = new ReservationStatusNtf(
recipient: msg.sender,
inReplyTo: msg.msgID,
to: msg.to,
status: ReservationStatus.START)
ReservationStatusNtf ntf2 = new ReservationStatusNtf(
recipient: msg.sender,
inReplyTo: msg.msgID,
to: msg.to,
status: ReservationStatus.END)
add new OneShotBehavior({
send ntf1
})
add new WakerBehavior(Math.round(1000*msg.duration), {
send ntf2
})
return new ReservationRsp(msg)
case ReservationCancelReq:
case ReservationAcceptReq: // respond to other requests defined
case TxAckReq: // by the MAC service with a RefuseRsp
return new RefuseRsp(msg, 'Not supported')
}
return null
}
// expose parameters defined by the MAC service, with just default values
@Override
List<Parameter> getParameterList() {
return allOf(MacParam) // advertise the list of parameters
}
final boolean channelBusy = false // parameters are marked as 'final'
final int reservationPayloadSize = 0 // to ensure that they are read-only
final int ackPayloadSize = 0
final float maxReservationDuration = Float.POSITIVE_INFINITY
final Float recommendedReservationDuration = null
}
Now we have a fully-compliant, but very simple, MAC agent!
29.2. Testing our simple MAC
The
MySimplestMac
agent from the previous section is available in the
samples
folder of your Unet simulator. To test it, fire up the 2-node network simulator and connect to node A:
> ps
remote: org.arl.unet.remote.RemoteControl - IDLE
state: org.arl.unet.state.StateManager - IDLE
rdp: org.arl.unet.net.RouteDiscoveryProtocol - IDLE
ranging: org.arl.unet.phy.Ranging - IDLE
uwlink: org.arl.unet.link.ECLink - IDLE
node: org.arl.unet.nodeinfo.NodeInfo - IDLE
websh: org.arl.fjage.shell.ShellAgent - RUNNING
simulator: org.arl.unet.sim.SimulationAgent - IDLE
phy: org.arl.unet.sim.HalfDuplexModem - IDLE
bbmon: org.arl.unet.bb.BasebandSignalMonitor - IDLE
arp: org.arl.unet.addr.AddressResolution - IDLE
transport: org.arl.unet.transport.SWTransport - IDLE
router: org.arl.unet.net.Router - IDLE
mac: org.arl.unet.mac.CSMA - IDLE
We see that the
org.arl.unet.mac.CSMA
agent is the current
mac
. To use our
MySimplestMac
agent, you first need to kill the
org.arl.unet.mac.CSMA
agent, and then load the
MySimplestMac
agent:
> container.kill mac
true
> container.add 'mac', new MySimplestMac()
mac
> ps
remote: org.arl.unet.remote.RemoteControl - IDLE
state: org.arl.unet.state.StateManager - IDLE
rdp: org.arl.unet.net.RouteDiscoveryProtocol - IDLE
ranging: org.arl.unet.phy.Ranging - IDLE
uwlink: org.arl.unet.link.ECLink - IDLE
node: org.arl.unet.nodeinfo.NodeInfo - IDLE
websh: org.arl.fjage.shell.ShellAgent - RUNNING
simulator: org.arl.unet.sim.SimulationAgent - IDLE
phy: org.arl.unet.sim.HalfDuplexModem - IDLE
bbmon: org.arl.unet.bb.BasebandSignalMonitor - IDLE
arp: org.arl.unet.addr.AddressResolution - IDLE
transport: org.arl.unet.transport.SWTransport - IDLE
router: org.arl.unet.net.Router - IDLE
mac: MySimplestMac - IDLE
> mac
« MySimplestMac »
[org.arl.unet.mac.MacParam]
ackPayloadSize ⤇ 0
channelBusy ⤇ false
maxReservationDuration ⤇ Infinity
recommendedReservationDuration ⤇ null
reservationPayloadSize ⤇ 0
It’s loaded and working!
Now, you can ask for a reservation and see if it responds correctly:
> mac << new ReservationReq(to: 31, duration: 3.seconds)
ReservationRsp:AGREE
mac >> ReservationStatusNtf:INFORM[to:31 status:START]
mac >> ReservationStatusNtf:INFORM[to:31 status:END]
Indeed it does! The START notification arrives immediately after the AGREE response, and the END notification arrives about 3 seconds later.
29.3. Simple MAC with throttling
While the above simple MAC would work well when the traffic offered to it is random, it will perform poorly if the network is fully loaded. All nodes would constantly try to access the channel, collide and the throughput would plummet. To address this concern, one may add an exponentially distributed random backoff (Poisson arrival to match the assumption of Aloha) for every request, to introduce randomness. The backoff could be chosen to offer a normalized network load of approximately 0.5, since this generates the highest throughput for Aloha.
Here’s the updated code with some bells and whistles:
import org.arl.fjage.*
import org.arl.fjage.param.Parameter
import org.arl.unet.*
import org.arl.unet.phy.*
import org.arl.unet.mac.*
class MySimpleThrottledMac extends UnetAgent {
private final static double TARGET_LOAD = 0.5
private final static int MAX_QUEUE_LEN = 16
(1)
private AgentID phy
boolean busy = false // is a reservation currently ongoing?
Long t0 = null // time of last reservation start, or null
Long t1 = null // time of last reservation end, or null
int waiting = 0
@Override
void setup() {
register Services.MAC
}
@Override
void startup() {
phy = agentForService(Services.PHYSICAL) (2)
}
@Override
Message processRequest(Message msg) {
switch (msg) {
case ReservationReq:
if (msg.duration <= 0) return new RefuseRsp(msg, 'Bad reservation duration')
if (waiting >= MAX_QUEUE_LEN) return new RefuseRsp(msg, 'Queue full')
ReservationStatusNtf ntf1 = new ReservationStatusNtf(
recipient: msg.sender,
inReplyTo: msg.msgID,
to: msg.to,
status: ReservationStatus.START)
ReservationStatusNtf ntf2 = new ReservationStatusNtf(
recipient: msg.sender,
inReplyTo: msg.msgID,
to: msg.to,
status: ReservationStatus.END)
// grant the request after a random backoff (3)
AgentLocalRandom rnd = AgentLocalRandom.current() (4)
double backoff = rnd.nextExp(TARGET_LOAD/msg.duration/nodes) (5)
long t = currentTimeMillis()
if (t0 == null || t0 < t) t0 = t
t0 += Math.round(1000*backoff) // schedule packet with a random backoff
if (t0 < t1) t0 = t1 // after the last scheduled packet (6)
long duration = Math.round(1000*msg.duration)
t1 = t0 + duration
waiting++
add new WakerBehavior(t0-t, { (7)
send ntf1
busy = true
waiting--
add new WakerBehavior(duration, {
send ntf2
busy = false
})
})
return new ReservationRsp(msg)
case ReservationCancelReq:
case ReservationAcceptReq:
case TxAckReq:
return new RefuseRsp(msg, 'Not supported')
}
return null
}
// expose parameters defined by the MAC service, and one additional parameter
@Override
List<Parameter> getParameterList() {
return allOf(MacParam, Param)
}
enum Param implements Parameter {
nodes (8)
}
int nodes = 6 // number of nodes in network, to be set by user
final int reservationPayloadSize = 0
final int ackPayloadSize = 0
final float maxReservationDuration = Float.POSITIVE_INFINITY
boolean getChannelBusy() { (9)
return busy
}
float getRecommendedReservationDuration() { (10)
return get(phy, Physical.DATA, PhysicalChannelParam.frameDuration)
}
}
1 | We define a few attributes to keep track of channel state and reservation queue. |
2 |
We lookup other agents in
startup()
after they have had a chance to advertise their services during the setup phase.
|
3 | Requests are no longer granted immediately, but after a random backoff instead. |
4 |
Random numbers are generated using a
AgentLocalRandom
utility. This utility ensures repeatable results during discrete event simulation, aiding with debugging, and so is the preferred way of generating random variates.
|
5 |
The
nextExp()
function generate a exponentially distributed random number with a specified rate parameter. The rate parameter is computed such that the average backoff introduced helps to achieve the specified target load.
|
6 |
In Groovy, a comparison with
null
(initial value of
t1
) is permitted, and will always be false.
|
7 | Note that we no longer send the START notification immediately. Instead we schedule it after a backoff, and then schedule the END notification after the reservation duration from the START. |
8 |
We implement one user configurable parameter
nodes
, and advertise it.
|
9 |
Parameter
busy
is no longer always false, since we now keep track of reservations. We return
busy
to be true only during the time between a reservation START and END.
|
10 |
Parameter
recommendedReservationDuration
is now determined based on the frame duration of the PHYSICAL service, assuming that most reservations are for transmitting one frame. A client is free to choose a longer reservation time, if it wishes to transmit many frames in one go (as it should for efficient use of the channel).
|
A copy of this code is available in the
samples
folder of your Unet simulator. We encourage you to test it out, in the same way as we tested
MySimplestMac
in
Section 29.2
. You’ll find that the START notification no longer arrives immediately after the AGREE response, but arrives a few seconds later, after a random backoff.
29.4. Simple MAC with handshake
While the MAC agents we have developed so far are fully functional, they are simple, and do not involve any signaling for channel reservation. Many MAC protocols such as MACA and FAMA involve a handshake using RTS and CTS PDUs. To illustrate how more complex protocols are developed using UnetStack, we implement a simple RTS-CTS 2-way handshake-based MAC agent next.
Many communication protocols are best described using an FSM. We illustrate the FSM for our simple handshake-based MAC agent in Figure 10 .
When the channel is free, the agent is in an IDLE state. If the agent receives a
ReservationReq
, it switches to the RTS state and sends an RTS PDU to the intended destination node. If it receives a CTS PDU back, then it switches to a TX state and urges the client to transmit data via a
ReservationStatusNtf
with a START status. After the reservation period is over, the agent switches back to the IDLE state. If no CTS PDU is received in the RTS state for a while, the agent times out and returns to the IDLE state after informing the client of a reservation FAILURE.
If the agent receives an RTS PDU in the IDLE state, it switches to the RX state and responds with a CTS PDU. The node initiating the handshake may then transmit data for the reservation duration. After the duration (plus some allowance for 2-way propagation delay), the agent switches back to the IDLE state. If the agent overhears (aka snoops) RTS or CTS PDUs destined for other nodes, it switches to a BACKOFF state for a while. During the state, it does not initiate or respond to RTS PDUs. After the backoff period, it switches back to the IDLE state.
Our RTS and CTS PDUs are identified by a protocol number. Since we are implementing a MAC protocol, we choose to tag our PDUs using the protocol number reserved for MAC agents (
Protocol.MAC
). We also define some timeouts and delays that we will need to use:
int PROTOCOL = Protocol.MAC
float RTS_BACKOFF = 2.seconds
float CTS_TIMEOUT = 5.seconds
float BACKOFF_RANDOM = 5.seconds
float MAX_PROP_DELAY = 2.seconds
Communication protocols often use complicated PDU formats. UnetStack provides a
PDU
class to help encode/decode PDUs. Although the RTS and CTS PDUs have a pretty simple format, the PDU is still useful in defining the format clearly:
int RTS_PDU = 0x01
int CTS_PDU = 0x02
PDU pdu = PDU.withFormat {
uint8('type') // RTS_PDU/CTS_PDU
uint16('duration') // ms
}
Here we have defined a PDU with two fields — type (8 bit) and duration (16 bit). The type may be either of RTS_PDU or CTS_PDU, while the duration will specify the reservation duration in milliseconds. We will later use this
pdu
object to encode and decode these PDUs.
Now comes the heart of our MAC protocol implementation –- the FSM shown in Figure 10 . First we define the FSM states and the events that the FSM reacts to:
enum State {
IDLE, RTS, TX, RX, BACKOFF
}
enum Event {
RX_RTS, RX_CTS, SNOOP_RTS, SNOOP_CTS
}
Next we use the
FSMBuilder
utility class to construct a
FSMBehavior
from a declarative concise representation of the FSM.
The FSM states are defined using the
state(…)
declarations. The actions to take when entering/exiting a state are defined in the
onEnter
/
onExit
clauses. The behavior of the FSM in response to events are defined using the
onEvent(…)
clauses. Timers that operate in a state are defined using the
after(…)
clauses. Finally actions to take continuously while in a state are defined using the
action
clause.
It should be easy to see the direct mapping between the FSM diagram and the FSM code below:
int MAX_RETRY = 3
int MAX_QUEUE_LEN = 16
Queue<ReservationReq> queue = new ArrayDeque<ReservationReq>(MAX_QUEUE_LEN)
FSMBehavior fsm = FSMBuilder.build {
int retryCount = 0
float backoff = 0
def rxInfo
def rnd = AgentLocalRandom.current()
state(State.IDLE) {
action {
if (!queue.isEmpty()) {
// add random backoff for each reservation to allow other nodes
// a chance to reserve, especially in case of a heavily loaded network
after(rnd.nextDouble(0, BACKOFF_RANDOM)) {
setNextState(State.RTS)
}
}
block()
}
onEvent(Event.RX_RTS) { info ->
rxInfo = info
setNextState(State.RX)
}
onEvent(Event.SNOOP_RTS) {
backoff = RTS_BACKOFF
setNextState(State.BACKOFF)
}
onEvent(Event.SNOOP_CTS) { info ->
backoff = info.duration + 2*MAX_PROP_DELAY
setNextState(State.BACKOFF)
}
}
state(State.RTS) {
onEnter {
Message msg = queue.peek()
def bytes = pdu.encode(
type: RTS_PDU,
duration: Math.ceil(msg.duration*1000))
phy << new TxFrameReq(
to: msg.to,
type: Physical.CONTROL,
protocol: PROTOCOL,
data: bytes)
after(CTS_TIMEOUT) {
if (++retryCount >= MAX_RETRY) {
sendReservationStatusNtf(queue.poll(), ReservationStatus.FAILURE)
retryCount = 0
}
setNextState(State.IDLE)
}
}
onEvent(Event.RX_CTS) {
setNextState(State.TX)
}
}
state(State.TX) {
onEnter {
ReservationReq msg = queue.poll()
retryCount = 0
sendReservationStatusNtf(msg, ReservationStatus.START)
after(msg.duration) {
sendReservationStatusNtf(msg, ReservationStatus.END)
setNextState(State.IDLE)
}
}
}
state(State.RX) {
onEnter {
def bytes = pdu.encode(
type: CTS_PDU,
duration: Math.round(rxInfo.duration*1000))
phy << new TxFrameReq(
to: rxInfo.from,
type: Physical.CONTROL,
protocol: PROTOCOL,
data: bytes)
after(rxInfo.duration + 2*MAX_PROP_DELAY) {
setNextState(State.IDLE)
}
rxInfo = null
}
}
state(State.BACKOFF) {
onEnter {
after(backoff) {
setNextState(State.IDLE)
}
}
onEvent(Event.SNOOP_RTS) {
backoff = RTS_BACKOFF
reenterState()
}
onEvent(Event.SNOOP_CTS) { info ->
backoff = info.duration + 2*MAX_PROP_DELAY
reenterState()
}
}
}
Do note that the above FSM includes a couple of details that were missing from the FSM diagram. Firstly, we implement a random backoff before switching to the RTS state to minimize contention. Secondly, we implement a
retryCount
counter to check the number of times a single
ReservationReq
has been tried. If it exceeds
MAX_RETRY
, we discard it. Thirdly, we have a
backoff
variable that allows different backoff times for different occasions. The variable is set each time, just before the state is changed to
State.BACKOFF
or before the backoff state is re-entered.
The FSM uses a simple utility method to send out
ReservationStatusNtf
notifications:
void sendReservationStatusNtf(ReservationReq msg, ReservationStatus status) {
send new ReservationStatusNtf(
recipient: msg.sender,
inReplyTo: msg.msgID,
to: msg.to,
from: addr,
status: status)
}
Now the hard work is done. We initialize our agent by registering the MAC service, looking up and subscribing to the PHYSICAL service (to transmit and receive PDUs), looking up our own address using the NODE_INFO service, and starting the
fsm
behavior:
AgentID phy
int addr
void setup() {
register Services.MAC
}
void startup() {
phy = agentForService(Services.PHYSICAL)
subscribe(phy)
subscribe(topic(phy, Physical.SNOOP))
add new OneShotBehavior({
def nodeInfo = agentForService(Services.NODE_INFO)
addr = get(nodeInfo, NodeInfoParam.address)
})
add(fsm)
}
Note that we subscribe to the
topic(phy, Physical.SNOOP)
in addition to
phy
. This allows us to snoop RTS/CTS PDUs destined for other nodes. Also note that the address lookup is performed in a
OneShotBehavior
to avoid having the agent to block while the node information agent is starting up.
Just like in the earlier MAC implementation, we have to respond to various requests defined by the MAC service specifications:
Message processRequest(Message msg) {
switch (msg) {
case ReservationReq:
if (msg.to == Address.BROADCAST || msg.to == addr)
return new RefuseRsp(msg, 'Reservation must have a destination node')
if (msg.duration <= 0 || msg.duration > maxReservationDuration)
return new RefuseRsp(msg, 'Bad reservation duration')
if (queue.size() >= MAX_QUEUE_LEN)
return new RefuseRsp(msg, 'Queue full')
queue.add(msg)
fsm.restart() // tell fsm to check queue, as it may block if empty
return new ReservationRsp(msg)
case ReservationCancelReq:
case ReservationAcceptReq:
case TxAckReq:
return new RefuseRsp(msg, 'Not supported')
}
return null
}
If we get a
ReservationReq
, we validate the attributes, add the request to our queue and return a
ReservationRsp
. For other requests that we do not support, we simply refuse them.
If we receive PDUs from the physical agent, they come as
RxFrameNtf
messages via the
processMessage()
method. For all PDUs with a protocol number that we use, we decode them. We trigger appropriate FSM events in response to RTS and CTS PDUs -– RX_RTS and RX_CTS events for PDUs destined to us, and SNOOP_RTS and SNOOP_CTS events for PDUs that we overhear:
void processMessage(Message msg) {
if (msg instanceof RxFrameNtf && msg.protocol == PROTOCOL) {
def rx = pdu.decode(msg.data)
def info = [from: msg.from, to: msg.to, duration: rx.duration/1000.0]
if (rx.type == RTS_PDU)
fsm.trigger(info.to == addr ? Event.RX_RTS : Event.SNOOP_RTS, info)
else if (rx.type == CTS_PDU)
fsm.trigger(info.to == addr ? Event.RX_CTS : Event.SNOOP_CTS, info)
}
}
Finally, we expose the parameters required by the MAC service specification:
List<Parameter> getParameterList() { // publish list of all exposed parameters
return allOf(MacParam)
}
final int reservationPayloadSize = 0 // read-only
final int ackPayloadSize = 0 // read-only
final float maxReservationDuration = 65.535 // read-only
boolean getChannelBusy() { // considered busy if fsm is not IDLE
return fsm.currentState.name != State.IDLE
}
float getRecommendedReservationDuration() { // recommended duration: one DATA packet
return get(phy, Physical.DATA, PhysicalChannelParam.frameDuration)
}
We are done! You can find the full listing of the
MySimpleHandshakeMac
agent in
Appendix D
(and also in the
samples
folder of your Unet simulator).
29.5. Testing our simple MAC with handshake
Let’s try out this MAC. The steps are similar to
Section 29.2
, but since the handshake requires MAC to be running on all nodes, you will have to fire up the 2-node network and replace the default CSMA MAC with
MySimpleHandshakeMac
on both nodes (node A and node B):
> container.kill mac
true
> container.add 'mac', new MySimpleHandshakeMac();
> mac
« MySimpleHandshakeMac »
[org.arl.unet.mac.MacParam]
ackPayloadSize ⤇ 0
maxReservationDuration ⤇ 65.535
recommendedReservationDuration ⤇ 0.7
reservationPayloadSize ⤇ 0
Since the handshaking involves exchange of PDUs between nodes, it is instructive to see the PDUs being exchanged by subscribing to
phy
. You can make a reservation request on node A:
> subscribe phy
> mac << new ReservationReq(to: 31, duration: 3.seconds)
ReservationRsp:AGREE
phy >> TxFrameStartNtf:INFORM[type:CONTROL txTime:3631928985 txDuration:950]
phy >> RxFrameStartNtf:INFORM[type:CONTROL rxTime:3634151681]
phy >> RxFrameNtf:INFORM[type:CONTROL from:31 to:232 protocol:4 rxTime:3634151681 (3 bytes)]
mac >> ReservationStatusNtf:INFORM[to:31 from:232 status:START]
mac >> ReservationStatusNtf:INFORM[to:31 from:232 status:END]
We see that a CTS is transmitted (
TxFrameStartNtf
), then a RTS is received from node B (
RxFrameStartNtf
and
RxFrameNtf
). The reservation starts as soon as the CTS is received, and it ends 3 seconds later. Exactly as we wanted!
<<< [Developing your own agents] | [Writing simulation scripts] >>> |