Commit f2eaca56 authored by Björn Richerzhagen's avatar Björn Richerzhagen
Browse files

Merge branch 'bugfix-jr' into 'master'

Cleanup of ConstantPeerCountChurnModel

I cleaned up the code of the model, added some explaining comments and a minor FIXME.

See merge request !1
parents b965c287 740c11b2
......@@ -20,7 +20,6 @@
package de.tud.kom.p2psim.impl.churn;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
......@@ -37,56 +36,79 @@ import de.tudarmstadt.maki.simonstrator.api.Time;
import de.tudarmstadt.maki.simonstrator.api.util.XMLConfigurableConstructor;
/**
* Another simple churn model that maintains a constant peer cound (i.e. if a
* peer goes offline, it is replaced with a new one). The arrival rate is
* Another simple churn model that maintains a constant client count (i.e. if a
* client goes offline, it is replaced with a new one). The arrival rate is
* constant until the threshold is reached. Session times are configurable as
* either static values or based on a distribution.
*
* @author Bjoern Richerzhagen
* In addition, a flash crowd phase can be configured for a sudden burst of
* connected clients.
*
* @author Bjoern Richerzhagen, Refactored by Julius Rueckert
* @version 1.0, 01.10.2012
*/
public class ConstantPeerCountChurnModel implements ChurnModel {
/**
* Maximum number of online clients in non-flash crowd phase
*/
private final int peersThreshold;
private long lastJoin = -1;
/**
* Inter-arrival time as specified during configuration
*/
private final long initialInterArrivalTime;
private List<SimHost> hosts;
/**
* Minimum session length as specified during configuration
*/
private long minSessionLength = 10 * Time.SECOND;
/*
* Parameters a and b of exponential distribution defining session lengths
*/
private double a = 0.6378;
private double b = -0.05944;
/**
* Tells if the model includes a flash crowd or not
*/
private boolean hasFlashcrowd = false;
/*
* Flash crowd-related information
*/
private long flashcrowdStart = 4 * Time.HOUR;
private double flashcrowdMultiplicator = 2;
private long flashcrowdInterval = 10 * Time.MINUTE;
/*
* Local state variables
*/
private boolean flashCrowdFinished = false;
private boolean inFlashcrowd = false;
private boolean flashcrowdFinished = false;
private long lastJoin = -1;
private Map<Long, CInfo> cInfos = new LinkedHashMap<Long, CInfo>();
private List<SimHost> hosts;
private PriorityQueue<CInfo> sortedByOfflinetime;
private Map<Long, ClientSessionInfo> clientSessionInfos = new LinkedHashMap<Long, ClientSessionInfo>();
private final Random rnd = Randoms
.getRandom(ConstantPeerCountChurnModel.class);
private PriorityQueue<ClientSessionInfo> clientsSortedByOfflineTime;
private final Comparator<CInfo> COMP = new Comparator<ConstantPeerCountChurnModel.CInfo>() {
/**
* Comparator used to sort client infos by offline time
*/
private static final Comparator<ClientSessionInfo> COMP_OFFLINE_TIME = new Comparator<ConstantPeerCountChurnModel.ClientSessionInfo>() {
@Override
public int compare(CInfo o1, CInfo o2) {
return (int) Math.signum(o1.leavingAt - o2.leavingAt);
public int compare(ClientSessionInfo o1, ClientSessionInfo o2) {
return ((Long) o1.leavingAt).compareTo(o2.leavingAt);
}
};
private final Random rnd = Randoms
.getRandom(ConstantPeerCountChurnModel.class);
/**
*
* @param maxPeersOnline
......@@ -100,56 +122,82 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
@Override
public long getNextUptime(SimHost host) {
long currentTime = Time.getCurrentTime();
if (!hosts.remove(host)) {
/*
* FIXME: This is to avoid reusing peer instances. Needs to be fixed
* by properly resetting peers!
*/
return 1000 * Time.HOUR;
}
if (hasFlashcrowd && !flashcrowdFinished && inFlashcrowd) {
long time = Time.getCurrentTime();
long fcInterArrivalTime = (long) (flashcrowdInterval / ((flashcrowdMultiplicator - 1) * peersThreshold));
CInfo info = new CInfo(lastJoin + fcInterArrivalTime);
lastJoin += fcInterArrivalTime;
cInfos.put(host.getHostId(), info);
if (lastJoin > flashcrowdStart + flashcrowdInterval) {
if (hasFlashcrowd && inFlashcrowd && !flashCrowdFinished) {
/*
* Join peers in a burst according to flash crowd configuration.
*
* Info: The flash crowd peak is reached with (flash crowd
* multiplier * peer threshold) present peers. The length of the
* period is defined by the flashcrowdInterval.
*/
long flashCrowdInterArrivalTime = (long) (flashcrowdInterval / ((flashcrowdMultiplicator - 1) * peersThreshold));
long currentJoin = lastJoin + flashCrowdInterArrivalTime;
ClientSessionInfo info = new ClientSessionInfo(currentJoin);
clientSessionInfos.put(host.getHostId(), info);
if (currentJoin > flashcrowdStart + flashcrowdInterval) {
inFlashcrowd = false;
flashcrowdFinished = true;
flashCrowdFinished = true;
}
return info.joiningAt - time;
} else if (sortedByOfflinetime.size() < peersThreshold) {
long time = Time.getCurrentTime();
lastJoin = currentJoin;
return info.joiningAt - currentTime;
} else if (clientsSortedByOfflineTime.size() < peersThreshold) {
/*
* Initially, join peers until the peer threshold is reached.
*/
if (lastJoin < 0) {
lastJoin = time;
lastJoin = currentTime;
}
CInfo info = new CInfo(lastJoin + initialInterArrivalTime);
lastJoin += initialInterArrivalTime;
sortedByOfflinetime.add(info);
cInfos.put(host.getHostId(), info);
if (hasFlashcrowd && !flashcrowdFinished
&& info.joiningAt > flashcrowdStart) {
long currentJoin = lastJoin + initialInterArrivalTime;
ClientSessionInfo info = new ClientSessionInfo(currentJoin);
clientsSortedByOfflineTime.add(info);
clientSessionInfos.put(host.getHostId(), info);
if (hasFlashcrowd && !flashCrowdFinished
&& currentJoin > flashcrowdStart) {
inFlashcrowd = true;
}
return info.joiningAt - time;
lastJoin = currentJoin;
return info.joiningAt - currentTime;
} else {
CInfo nextToGoOffline = sortedByOfflinetime.poll();
/*
* After reaching the peer threshold, only peers that went offline
* are replaced to keep the threshold.
*/
ClientSessionInfo nextToGoOffline = clientsSortedByOfflineTime
.poll();
if (nextToGoOffline == null) {
throw new AssertionError();
} else {
long joiningAt = nextToGoOffline.leavingAt;
CInfo info = new CInfo(joiningAt);
lastJoin = info.joiningAt;
sortedByOfflinetime.add(info);
cInfos.put(host.getHostId(), info);
if (hasFlashcrowd && !flashcrowdFinished
&& info.joiningAt > flashcrowdStart) {
// Use the next time a peer goes offline as joining time.
long currentJoin = nextToGoOffline.leavingAt;
ClientSessionInfo info = new ClientSessionInfo(currentJoin);
clientsSortedByOfflineTime.add(info);
clientSessionInfos.put(host.getHostId(), info);
if (hasFlashcrowd && !flashCrowdFinished
&& currentJoin > flashcrowdStart) {
inFlashcrowd = true;
}
return info.joiningAt - Time.getCurrentTime();
lastJoin = currentJoin;
return info.joiningAt - currentTime;
}
}
}
@Override
public long getNextDowntime(SimHost host) {
CInfo info = cInfos.get(host.getHostId());
ClientSessionInfo info = clientSessionInfos.get(host.getHostId());
return info.sessionLength;
}
......@@ -164,21 +212,37 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
}
}
}
Collections.shuffle(churnHosts, rnd);
sortedByOfflinetime = new PriorityQueue<ConstantPeerCountChurnModel.CInfo>(
(int) Math.ceil(hosts.size() / 10.0), COMP);
clientsSortedByOfflineTime = new PriorityQueue<ConstantPeerCountChurnModel.ClientSessionInfo>(
(int) Math.ceil(hosts.size() / 10.0), COMP_OFFLINE_TIME);
}
/**
* returns the session length for the host
* Returns the next session length following the exponential distribution
* (using parameter a and b)
*
* @param host
* @return
* @return the session length
*/
protected long getSessionLength() {
double random = rnd.nextDouble();
/*
* The session length is calculated according an exponential
* distribution as defined by Vu et al. (2010) to model PPLive traces.
*
* p = a * e^{b*x}
*
* Resulting in the following formula:
*
* x = log(p/a)/b
*
* FIXME: Why do we add another random number of minutes here??
*/
long sessionLength = (long) (Math.log(random / a) / b) * Time.MINUTE
+ (long) (random * Time.MINUTE);
/*
* The minimum session length is limited to avoid too short sessions
*/
return Math.max(minSessionLength + (long) (random * Time.MINUTE),
sessionLength);
}
......@@ -208,7 +272,10 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
this.b = b;
}
private class CInfo {
/**
* Client session information
*/
private class ClientSessionInfo {
public final long joiningAt;
......@@ -216,7 +283,7 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
public final long sessionLength;
public CInfo(long joiningAt) {
public ClientSessionInfo(long joiningAt) {
this.sessionLength = getSessionLength();
this.joiningAt = joiningAt;
this.leavingAt = joiningAt + sessionLength;
......@@ -224,4 +291,4 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
}
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment