博文《消息创建过程》讲述了MessageEventGenerator 如何创建消息。消息创建后,至于怎么发送(将哪些消息发送到哪些邻居),取决于路由策略。本文忽略路由策略,只介绍消息在The ONE是如何发送的。
1. 概述
1.1 消息流动
The ONE的消息缓冲区是messages,除此之外,还提供其他辅助缓冲区,如下,最后一个是供应用程序使用的,先不管它。
private HashMap<String, Message> incomingMessages; //正在发送的消息
private HashMap<String, Message> messages; //消息缓冲区,包括新创建的消息和接收到的消息
private HashMap<String, Message> deliveredMessages; //已成功投递的消息
private HashMap<String, Object> blacklistedMessages; //应用层删除的消息 The messages that Applications on this router have blacklisted
The ONE消息的转换如下图所示。新创建的消息放在messages
,正在传输的消息放在incomingMessages
;传输成功的消息若为目的节点则放在deliverredMessages
,否则放在messages
。
1.2 消息发送时机
MessageEventGenerator
创建的消息(确切的说,是由其产生消息创建事件,而后处理该事件,创建消息)放在节点的缓冲区(确切的说,是MessageRouter.java
的private HashMap<String, Message> messages)
。消息创建后,至于怎么发送(将哪些消息发送到哪些邻居),取决于路由策略。
自己写个路由MyRouter
,继承ActiveRouter
,ActiveRouter
又是继承MessageRouter
,其类图关系如下:
通常在自己的路由MyRouter
重写update()
方法,此时update
调用层次是这样的:MyRouter.update –> ActiveRouter.update –> MessageRouter.update
。在update
调用自己实现的tryOtherMessages
,在tryOtherMessages
通过相应的路由策略将要发送的消息以及发送到哪个节点(即消息-连接对)收集起来,最后调用tryMessagesForConnected
发送消息。主要代码如下:
private Tuple<Message, Connection> tryOtherMessages() {
List<Tuple<Message, Connection>> messages = new ArrayList<Tuple<Message, Connection>>();
messages.add(new Tuple<Message, Connection>(m,con)); //将要发送的<Message, Connection>收集起来
return tryMessagesForConnected(messages); // try to send messages
}
值得注意的是,tryMessagesForConnected
只处理一个消息的传送,而不是处理Tuple中的所有消息,因为一有消息发送,信道就被占用了。
//ActiveRouter.java
protected Tuple<Message, Connection> tryMessagesForConnected(List<Tuple<Message, Connection>> tuples) {
for (Tuple<Message, Connection> t : tuples) {
Message m = t.getKey();
Connection con = t.getValue();
if (startTransfer(m, con) == RCV_OK) { //REV_OK相当于链路空闲,可以使用
return t; //只要有一个连接在传送就返回了
}
}
}
2. 开始传输
ActiveRouter.java
的startTransfer
开始传输(类似于网络层),调用Connection.java
的startTransfer
(类似于链路层)。
2.1 ActiveRouter.startTransfer
//ActiveRouter.java
protected int startTransfer(Message m, Connection con) {
int retVal;
if (!con.isReadyForTransfer()) { //connection is up and there is no message being transferred
return TRY_LATER_BUSY;
}
if (!policy.acceptSending(getHost(), con.getOtherNode(getHost()), con, m)) { //默认情况,所有消息都接受。 支持三种策略:simple policy, Hop Count, ModuleCommunicationBus(MCB)
return MessageRouter.DENIED_POLICY;
}
retVal = con.startTransfer(getHost(), m); //相当于用物理层链路开始传输
if (retVal == RCV_OK) { // started transfer
addToSendingConnections(con); //add connection(s) that are currently used for sending
} else if (deleteDelivered && retVal == DENIED_OLD && m.getTo() == con.getOtherNode(this.getHost())) {
this.deleteMessage(m.getId(), false); //final recipient has already received the msg -> delete it
}
return retVal;
}
2.2 Connection.startTransfer
ActiveRouter.startTransfer
调用Connection.startTransfer
,Connection.startTransfer()
是抽象方法。Connection
有两个子类:CBRConnection
(A constant bit-rate connection)和VBRConnection
(The transmission speed is updated every round)。以CBRConnection
为例,其startTransfer
源码如下:
public int startTransfer(DTNHost from, Message m) {
//判断没有消息在该connection传输,no message being transferred
assert this.msgOnFly == null : "Already transferring " + this.msgOnFly + " from " + this.msgFromNode + " to " + this.getOtherNode(this.msgFromNode) + ". Can't " + "start transfer of " + m + " from " + from;
this.msgFromNode = from;
Message newMessage = m.replicate(); //复制消息
int retVal = getOtherNode(from).receiveMessage(newMessage, from); //Start receiving a message from another host
if (retVal == MessageRouter.RCV_OK) {
this.msgOnFly = newMessage;
this.transferDoneTime = SimClock.getTime() + (1.0*m.getSize()) / this.speed;
}
return retVal;
}
3. 开始接收消息
开始接收消息,把消息放到incomingMessages
,而不是直接放到messages
,因为传输需要时间(消息大小除以速度)。调用过程如下:DTNHost.receiveMessage –> ActiveRouter.receiveMessage –> MessageRouter.receiveMessage
,其源代码如下:
//DTNHost.java Start receiving a message from another host
public int (Message m, DTNHost from) {
int retVal = this.router.receiveMessage(m, from);
if (retVal == MessageRouter.RCV_OK) {
m.addNodeOnPath(this); // add this node on the messages path
}
return retVal;
}
//ActiveRouter.java重写了MessageRouter.java的receiveMessage,增加了判断
public int receiveMessage(Message m, DTNHost from) {
int recvCheck = checkReceiving(m, from); //router isn't transferring, doesn't have the message and has room for it
if (recvCheck != RCV_OK) {
return recvCheck;
}
return super.receiveMessage(m, from); //调用MessageRouter.java的receiveMessage
}
//MessageRouter.java Try to start receiving a message from another host.
public int receiveMessage(Message m, DTNHost from) {
Message newMessage = m.replicate();
this.putToIncomingBuffer(newMessage, from); //才开始接收,还没接收完,先放在incomingMessages
newMessage.addNodeOnPath(this.host);
for (MessageListener ml : this.mListeners) {
ml.messageTransferStarted(newMessage, from, getHost());
}
return RCV_OK; // superclass always accepts messages
}