知道了The ONE消息创建(详情见博文《消息创建过程》)和转发(详情见博文《消息转发过程》),消息接收就相对简单了。
1. 接收消息
在ActiveRouter.update
会检查消息是否传输完毕,相关源代码如下:
//ActiveRouter.java
public void update() {
super.update();
if (con.isMessageTransferred()) { //消息是否传输完毕,即是否过了(1.0*m.getSize())/this.speed没
if (con.getMessage() != null) {
transferDone(con);
con.finalizeTransfer(); //完成消息传输
}
}
}
2.完成传输
2.1 con.finalizeTransfer
//Connection.java
public void finalizeTransfer() {
assert this.msgOnFly != null : "Nothing to finalize in " + this;
assert msgFromNode != null : "msgFromNode is not set";
this.bytesTransferred += msgOnFly.getSize();
getOtherNode(msgFromNode).messageTransferred(this.msgOnFly.getId(), msgFromNode); //处理该传输完成的消息
clearMsgOnFly();
}
2.2 messageTransferred
messageTransferred
完成消息接收,将接收到消息根据情况放入相应的缓冲区,其调用关系是:DTNHost.messageTransferred --> ActiveRouter.messageTransferred --> MessageRouter.messageTransferred
。相关源代码如下:
//DTNHost.java
public void messageTransferred(String id, DTNHost from) {
this.router.messageTransferred(id, from);
}
//ActiveRouter.java
public Message messageTransferred(String id, DTNHost from) {
Message m = super.messageTransferred(id, from);
/*** 相对于MessageRouter.messageTransferred, 增加了对response消息的处理 ***/
if (m.getTo() == getHost() && m.getResponseSize() > 0) {
Message res = new Message(this.getHost(),m.getFrom(), RESPONSE_PREFIX+m.getId(), m.getResponseSize()); //generate a response message
this.createNewMessage(res);
this.getMessage(RESPONSE_PREFIX+m.getId()).setRequest(m);
}
return m;
}
//MessageRouter.java
public Message messageTransferred(String id, DTNHost from) {
Message incoming = removeFromIncomingBuffer(id, from); //将消息从incomingMessages删除
boolean isFinalRecipient; //消息传递到目的节点
isFinalRecipient = aMessage.getTo() == this.host;
boolean isFirstDelivery; //消息传递到目的节点,且第一次
isFirstDelivery = isFinalRecipient && !isDeliveredMessage(aMessage);
incoming.setReceiveTime(SimClock.getTime()); //设置消息接收时间
/*** 将消息交给应用层处理(如果有的话), 有些应用会丢弃该消息 ***/
Message outgoing = incoming;
for (Application app : getApplications(incoming.getAppID())) {
outgoing = app.handle(outgoing, this.host);
if (outgoing == null) break; // Some app wanted to drop the message
}
Message aMessage = (outgoing==null)?(incoming):(outgoing);
/*** 将消息根据实际情况放入相应的缓冲区 ***/
if (!isFinalRecipient && outgoing!=null) {
addToMessages(aMessage, false); //incomingMessages --> messages
} else if (isFirstDelivery) {
this.deliveredMessages.put(id, aMessage); //incomingMessages --> deliveredMessages
} else if (outgoing == null) {
this.blacklistedMessages.put(id, null); // Blacklist messages that an app wants to drop.
}
return aMessage;
}
//MessageRouter.java Adds a message to the message buffer and informs message listeners
protected void addToMessages(Message m, boolean newMessage) {
this.messages.put(m.getId(), m); //放到消息缓冲区messages中
if (newMessage) {
for (MessageListener ml : this.mListeners) {
ml.newMessage(m);
}
}
}
至此,消息接收完毕。
注:对于消息外部事件MessageRelayEvent
,当处理该事件processEvent
时,会调用messageTransferred
完成上述的过程。