Source code for blackwidow.network.flow

from blackwidow.network.packet import AckPacket, DataPacket
from blackwidow.network.rate_graph import Rate_Graph
from event import Event

# Variables for timeout calculation from
# https://tools.ietf.org/html/rfc6298
K = 4.0
alpha = 1.0/8.0
beta = 1.0/4.0
# clock granularity
G = 1.0


[docs]class Flow(object): """Simple class for flows. Flows will trigger host behavior. Has slow start and congestion avoidance. Parameters ---------- flow_id : string A unique id for the flow. source : `Device` The source for the flow. destination : `Device` The destination for the flow. amount : int The amount of data to send in MB. env : `Network` The network that the flow belongs to. time : float The amount of time to wait before starting to send. Specified in ms. bw : Blackwidow The printer to print data to Attributes ---------- flow_id : string The flow id. src : `Device` The source for the flow. dest : `Device` The destination for the flow. amount : int The amount of data left to send in MB. env : `Network` The network that the flow belongs to. flow_start : float The amount of time to wait before starting to send. Specified in ms. pack_num : int The next pack_num to check to send. cwnd : float Congestion window size. ssthresh : float Slow start threshold resend_time : float ms before packets are sent after an ack receival min_RTT : float Minimum round trip time observed for this flow last_RTT : float Last round trip time observed for this flow SRTT : float Weighted average of round trip times biased towards recent RTT RTTVAR : float Variance of round trip times RTO : float Retransmission timeout in ms packets_sent : list List of packets that have been sent but haven't had their ack received packets_time_out : list List of packets that have exceeded timeout and need to be resent acks_arrived : set Set of ack packets that have been received done : int 0 if flow isn't finished; 1 if flow is finished Used to avoid decrementing flow more than once. send_rate : Rate_Graph Keeps track of the rate the flow is sending at and outputs to CSV file in real time. receive_rate : Rate_Graph Keeps track of the rate the flow is receiving at and outputs to CSV file in real time. """ def __init__(self, flow_id, source, destination, amount, env, time, bw): """ Constructor for Flow class """ self._flow_id = flow_id self._src = source self._dest = destination self._amount = amount*8*10**6 self._pack_num = 0 self._cwnd = 1.0 self._ssthresh = 1000 self._resend_time = 10 self._min_RTT = 1000.0 self._last_RTT = 3000.0 self._SRTT = -1 self._RTTVAR = 0 self._RTO = 3000 self._packets_sent = [] self._packets_time_out = [] self._acks_arrived = set() self.env = env self.bw = bw self._flow_start = time*1000.0 self._last_packet = 0 self._done = 0 self._send_rate = Rate_Graph(self._flow_id, "flow {0} send rate".format(self.flow_id), self.env, self.bw) self._receive_rate = Rate_Graph(self._flow_id, "flow {0} receive" " rate".format(self.flow_id), self.env, self.bw) self.env.add_event(Event("Start flow", self._flow_id, self.send_packet), self._flow_start) @property def flow_id(self): return self._flow_id @flow_id.setter def flow_id(self, value): raise AttributeError("Cannot change flow" " id: {0}".format(self._flow_id)) @property def src(self): return self._src @src.setter def src(self, value): raise AttributeError("Cannot change flow" " source: {0}".format(self._flow_id)) @property def dest(self): return self._dest @dest.setter def dest(self, value): raise AttributeError("Cannot change flow" " destination: {0}".format(self._flow_id)) @property def amount(self): return self._amount @amount.setter def amount(self, value): raise AttributeError("Cannot change flow" " amount: {0}".format(self._flow_id)) @property def flow_start(self): return self._flow_start @flow_start.setter def flow_start(self, value): raise AttributeError("Cannot change flow" " start: {0}".format(self._flow_id)) def __str__(self): msg = "Flow {0}, sending from {1} to {2}" return msg.format(self._flow_id, self._src.network_id, self._dest.network_id)
[docs] def _send_ack(self, packet): """ Creates ack for packet. Parameters ---------- packet : `Packet` The packet to be received. """ if self._src == packet.src and self._dest == packet.dest: ack_packet = AckPacket(packet.pack_id, packet.dest, packet.src, self._flow_id, timestamp=packet.timestamp) self._dest.send(ack_packet) print "Flow sent ack packet {0}".format(packet.pack_id) else: print "Received wrong packet."
[docs] def send_packet(self): """ Send a packet. """ if self._amount > 0: # Send packets up to the window size. while (len(self._packets_sent) - len(self._packets_time_out) < self._cwnd): pack = DataPacket(self._pack_num, self._src, self._dest, self._flow_id, timestamp=self.env.time) if (self._pack_num not in self._acks_arrived): self._src.send(pack) print "Flow sent packet {0}".format(pack.pack_id) self.bw.record('{0}, {1}'.format(self.env.time, pack.size), 'flow_{0}.sent'.format(self.flow_id)) self._send_rate.add_point(pack, self.env.time) self.env.add_event(Event("Timeout", self._flow_id, self._timeout, pack_num=self._pack_num), self._RTO) # Shouldn't subtract pack.size if sent before. if (self._pack_num not in self._packets_sent): self._amount = self._amount - pack.size self._packets_sent.append(self._pack_num) print "Flow has {0} bits left".format(self._amount) if self._pack_num in self._packets_time_out: self._packets_time_out.remove(self._pack_num) self._pack_num = self._pack_num + 1 if self._amount <= 0: break else: # Just keep resending last few packets until done while len(self._packets_time_out) > 0: self._pack_num = self._packets_time_out[0] pack = DataPacket(self._pack_num, self._src, self._dest, self._flow_id, timestamp=self.env.time) self._src.send(pack) self._send_rate.add_point(pack, self.env.time) self._packets_time_out.remove(self._pack_num) self.env.add_event(Event("Timeout", self._flow_id, self._timeout, pack_num=self._pack_num), self._RTO)
[docs] def receive(self, packet): """ Generate an ack or respond to bad packet. Parameters ---------- packet : `Packet` The packet to be received. """ # Packet arrived at destination. Send ack. if packet.dest == self._dest: print "Flow received packet {0}".format(packet.pack_id) if packet.pack_id not in self._acks_arrived: self._send_ack(packet) # Ack arrived at source. Update window size. else: self._receive_ack(packet)
def _receive_ack(self, packet): if packet.pack_id not in self._acks_arrived: self._respond_to_ack() self._update_RTT(packet) # Update lists by removing pack_id if packet.pack_id in self._packets_sent: self._packets_sent.remove(packet.pack_id) if packet.pack_id in self._packets_time_out: self._packets_time_out.remove(packet.pack_id) # Update which acks have arrived self._acks_arrived.add(packet.pack_id) print "Flow {} received ack for packet {}".format(self._flow_id, packet.pack_id) self.bw.record('{0}, {1}'.format(self.env.time, packet.size), 'flow_{0}.received'.format(self.flow_id)) self._receive_rate.add_point(packet, self.env.time) # Check if done if (len(self._packets_sent) == 0 and self._amount <= 0 and self._done == 0): self.env.decrement_flows() self._done = 1
[docs] def _respond_to_ack(self): """ Update window size. """ self.env.add_event(Event("Send", self._flow_id, self.send_packet), self._resend_time) if self._cwnd < self._ssthresh: self._cwnd = self._cwnd + 1.0 else: self._cwnd = self._cwnd + 1.0/self._cwnd print "Flow {} window size is {}".format(self._flow_id, self._cwnd) self.bw.record('{0}, {1}'.format(self.env.time, self._cwnd), 'flow_{0}.window'.format(self.flow_id))
[docs] def _update_RTT(self, packet): """ Update last RTT and min RTT and retransmission timeout. Parameters ---------- packet : `Packet` The packet that was received. Need this to get the timestamp """ self._last_RTT = self.env.time - packet.timestamp if self._SRTT == -1: self._SRTT = self._last_RTT self._RTTVAR = self._last_RTT / 2.0 else: self._RTTVAR = ((1.0 - beta) * self._RTTVAR + beta * abs(self._SRTT - self._last_RTT)) self._SRTT = (1.0 - alpha)*self._SRTT + alpha*self._last_RTT self._RTO = min(max(self._SRTT + max(G, K*self._RTTVAR), 1000), 5000) print "RTO is {}".format(self._RTO) if self._last_RTT < self._min_RTT: self._min_RTT = self._last_RTT self.bw.record('{0}, {1}'.format(self.env.time, self._last_RTT - self._min_RTT), 'flow_{0}.packet_delay'.format(self.flow_id))
[docs] def _timeout(self, pack_num): """ Generate an ack or respond to bad packet. Parameters ---------- pack_num : `Packet`number The packet number of the packet to check for timeout. """ if pack_num not in self._acks_arrived: self.env.add_event(Event("Resend", self._flow_id, self.send_packet), self._resend_time) # Go back n if pack_num not in self._packets_time_out: self._packets_time_out.append(pack_num) self._pack_num = pack_num self._reset_window()
[docs] def _reset_window(self): """ Called when a packet timeout occurs. Sets ssthresh to max(2, cwnd/2) and cwnd to 1. """ if self._cwnd > 4: self._ssthresh = self._cwnd / float(2) else: self._ssthresh = 2.0 self._cwnd = 1.0 print "Flow {} window size is {}".format(self._flow_id, self._cwnd) self.bw.record('{0}, {1}'.format(self.env.time, self._cwnd), 'flow_{0}.window'.format(self.flow_id))