Commit 72ba3b6f authored by Björn Richerzhagen's avatar Björn Richerzhagen
Browse files
parents 6397111c 71a42791
......@@ -220,7 +220,7 @@ public class CSVBasedChurnModel implements ChurnModel {
if (parts.length == 4) {
try {
String groupID = parts[0].replaceAll("\\s+","");
int a = DefaultConfigurator.parseNumber("20m", Integer.class);
// int a = DefaultConfigurator.parseNumber("20m", Integer.class);
long startTime = DefaultConfigurator.parseNumber(parts[1].replaceAll("\\s+",""), Long.class);
long endTime = DefaultConfigurator.parseNumber(parts[2].replaceAll("\\s+",""), Long.class);
long duration = DefaultConfigurator.parseNumber(parts[3].replaceAll("\\s+",""), Long.class);
......@@ -322,14 +322,7 @@ public class CSVBasedChurnModel implements ChurnModel {
public long getStartTime(){
return startTime;
}
//
// /**
// * Returns if this churn interval stands for an online interval
// * @return
// */
// public boolean isOnline(){
// return this.online;
// }
}
}
/*
* Copyright (c) 2005-2010 KOM – Multimedia Communications Lab
*
* This file is part of PeerfactSim.KOM.
*
* PeerfactSim.KOM is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* PeerfactSim.KOM is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with PeerfactSim.KOM. If not, see <http://www.gnu.org/licenses/>.
*
*/
package de.tud.kom.p2psim.impl.churn;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import de.tud.kom.p2psim.api.common.SimHost;
import de.tud.kom.p2psim.api.network.SimNetInterface;
import de.tud.kom.p2psim.api.network.SimNetworkComponent;
import de.tud.kom.p2psim.impl.scenario.DefaultConfigurator;
import de.tud.kom.p2psim.impl.simengine.Simulator;
import de.tud.kom.p2psim.impl.util.oracle.GlobalOracle;
import de.tud.kom.p2psim.impl.util.toolkits.CollectionHelpers;
import de.tud.kom.p2psim.impl.util.toolkits.Predicates;
import de.tudarmstadt.maki.simonstrator.api.Event;
import de.tudarmstadt.maki.simonstrator.api.EventHandler;
import de.tudarmstadt.maki.simonstrator.api.Time;
import de.tudarmstadt.maki.simonstrator.api.component.GlobalComponent;
import de.tudarmstadt.maki.simonstrator.api.util.XMLConfigurableConstructor;
/**
* ChurnModel that follows a defined "trace" in a csv file. The trace must
* consist of the following parameters per line:
*
* startTime, intervalLength numberOfClients
*
* startTime - the time when the specified number of nodes is to be set.
*
* intervalLength - the time in which the specified number of nodes (startTime)
* must be achieved by the model. Those two together also form the inter
* join/leaving rate.
*
* numberOfClients - the number of clients to be achieved.
*
* The model checks for a minimum intervalLength of 10 minutes.
*
*
* @author Nils Richerzhagen
* @version 1.0, Nov 24, 2015
*/
public class MaxPeerCountChurnGenerator
implements EventHandler, GlobalComponent {
private static final int _PeerCountEvent = 1001;
private static final int _CHURN_START = 1002;
private static final int _CHURN_EVENT = 1003;
private static final int _CHURN_NOCHURN_HOSTS = 1004;
private final String commentsDelimiter = "#";
private final String SEP = ",";
private final long _minBurstLength = 10 * Time.MINUTE;
private final int maxNumberOfNodes;
/**
* {@link ChurnInfo} from the csv file.
*/
private LinkedList<ChurnInfo> churnInfos = new LinkedList<ChurnInfo>();
private PriorityQueue<HostSessionInfo> onlineHostsSortedByOnlineTime;
private PriorityQueue<HostSessionInfo> offlineHostsSortedByOfflineTime;
/**
* Comparator used to sort client infos by offline time
*/
private static final Comparator<HostSessionInfo> COMP_TIME = new Comparator<MaxPeerCountChurnGenerator.HostSessionInfo>() {
@Override
public int compare(HostSessionInfo o1, HostSessionInfo o2) {
return ((Long) o1.timestamp).compareTo(o2.timestamp);
}
};
@XMLConfigurableConstructor({ "file", "maxNumberOfNodes"})
public MaxPeerCountChurnGenerator(String file, int maxNumberOfNodes) {
this.maxNumberOfNodes = maxNumberOfNodes;
parseTrace(file);
}
/**
* Called by the configurator.
*
* @param churnStart
*/
public void setChurnStart(long churnStart) {
// Event.scheduleWithDelay(churnStart, this, null, _CHURN_START);
Event.scheduleImmediately(this, null, _CHURN_START);
}
public void initialize() {
for (ChurnInfo churnInfo : churnInfos) {
Event.scheduleWithDelay(churnInfo.getStartTime(), this, churnInfo,
_PeerCountEvent);
}
}
/**
* Start adapting on new churn rate, when next churnInfo is valid.
*/
private void configureMaxPeerCount(ChurnInfo currentChurnInfo) {
long currentTime = Simulator.getCurrentTime();
assert currentChurnInfo
.getStartTime() == currentTime : "The ChurnInfo to use is scheduled for the exact time, thus it should be the same.";
/*
* Wanted number > current number. --> need nodes = go online
*/
if (currentChurnInfo
.getNumberOfClients() >= onlineHostsSortedByOnlineTime.size()) {
int count = currentChurnInfo.getNumberOfClients()
- onlineHostsSortedByOnlineTime.size();
for (int i = 0; i < count; i++) {
/*
* Schedule the required number of hosts for going online
* churnEvent. Get oldest entry in sortedOffline list and put
* online.
*/
HostSessionInfo hostSessionInfo = offlineHostsSortedByOfflineTime
.poll();
assert hostSessionInfo != null : "HostSessionInfo shouldn't be null - means to few hosts were configured.";
ChurnEvent churnEvent = new ChurnEvent(hostSessionInfo.host,
true);
long currentJoin = i
* (currentChurnInfo.getBurstLength() / count);
Event.scheduleWithDelay(currentJoin, this, churnEvent,
_CHURN_EVENT);
}
}
/*
* Wanted number < currentNumber --> remove nodes = goOffline
*/
else if (currentChurnInfo
.getNumberOfClients() < onlineHostsSortedByOnlineTime.size()) {
int count = onlineHostsSortedByOnlineTime.size()
- currentChurnInfo.getNumberOfClients();
for (int i = 0; i < count; i++) {
/*
* Schedule the required number of hosts for going offline
* churnEvent. Get oldest entry in sortedOnline list and put
* offline.
*/
HostSessionInfo hostSessionInfo = onlineHostsSortedByOnlineTime
.poll();
assert hostSessionInfo != null : "HostSessionInfo shouldn't be null - means no hosts were online.";
ChurnEvent churnEvent = new ChurnEvent(hostSessionInfo.host,
false);
long currentLeave = i
* (currentChurnInfo.getBurstLength() / count);
Event.scheduleWithDelay(currentLeave, this, churnEvent,
_CHURN_EVENT);
}
} else {
throw new AssertionError();
}
}
@Override
public void eventOccurred(Object content, int type) {
if (type == _PeerCountEvent) {
ChurnInfo churnInfo = (ChurnInfo) content;
configureMaxPeerCount(churnInfo);
} else if (type == _CHURN_EVENT) {
long currentTime = Simulator.getCurrentTime();
ChurnEvent churnEvent = (ChurnEvent) content;
SimNetworkComponent net = churnEvent.host.getNetworkComponent();
if (churnEvent.goOnline) {
for (SimNetInterface netI : net.getSimNetworkInterfaces()) {
if (netI.isOffline()) {
netI.goOnline();
}
}
onlineHostsSortedByOnlineTime
.add(new HostSessionInfo(churnEvent.host, currentTime));
} else {
for (SimNetInterface netI : net.getSimNetworkInterfaces()) {
if (netI.isOnline()) {
netI.goOffline();
}
}
offlineHostsSortedByOfflineTime
.add(new HostSessionInfo(churnEvent.host, currentTime));
}
} else if (type == _CHURN_START) {
initialize();
List<SimHost> hosts = new ArrayList<SimHost>(this.filterHosts());
this.prepare(hosts);
offlineHostsSortedByOfflineTime = new PriorityQueue<HostSessionInfo>(
(int) Math.ceil(hosts.size() / 10.0), COMP_TIME);
onlineHostsSortedByOnlineTime = new PriorityQueue<HostSessionInfo>(
(int) Math.ceil(hosts.size() / 10.0), COMP_TIME);
long currentTime = Simulator.getCurrentTime();
for (SimHost host : hosts) {
offlineHostsSortedByOfflineTime
.add(new HostSessionInfo(host, currentTime));
}
} else if (type == _CHURN_NOCHURN_HOSTS) {
// Send no-churn-hosts online immediately
List<SimHost> nochurnhosts = (List<SimHost>) content;
for (SimHost host : nochurnhosts) {
for (SimNetInterface netI : host.getNetworkComponent()
.getSimNetworkInterfaces()) {
if (netI.isOffline()) {
netI.goOnline();
}
}
}
}
}
/**
* Send hosts offline! Should be used to start with churnHosts in offline
* state.
*
* @param hosts
*/
private void prepare(List<SimHost> hosts) {
for (SimHost host : hosts) {
for (SimNetInterface netI : host.getNetworkComponent()
.getSimNetworkInterfaces()) {
if (netI.isOnline()) {
netI.goOffline();
}
}
}
}
/**
* Gets all hosts and takes all churn affected hosts. Schedules the non
* affected host to go online immediately.
*
* @return
*/
private List<SimHost> filterHosts() {
List<SimHost> tmp = GlobalOracle.getHosts();
List<SimHost> filteredHosts = new LinkedList<SimHost>();
CollectionHelpers.filter(tmp, filteredHosts,
Predicates.IS_CHURN_AFFECTED);
List<SimHost> noChurn = new LinkedList<SimHost>();
noChurn.addAll(tmp);
noChurn.removeAll(filteredHosts);
Event.scheduleImmediately(this, noChurn, _CHURN_NOCHURN_HOSTS);
return filteredHosts;
}
/**
* Reads the file given by the configuration and parses the churn events.
*
* @param filename
*/
private void parseTrace(String filename) {
System.out.println("==============================");
System.out.println("Reading trace from " + filename);
/*
* This parser works for the following csv file structure.
*
* startTime, intervalLength numberOfClients
*
*/
BufferedReader csv = null;
boolean entrySuccessfullyRead = false;
try {
csv = new BufferedReader(new FileReader(filename));
long previousEndTime = 0;
while (csv.ready()) {
String line = csv.readLine();
if (line.length() == 0 || line.startsWith(commentsDelimiter))
continue;
if (line.indexOf(SEP) > -1) {
String[] parts = line.split(SEP);
if (parts.length == 3) {
try {
long startTime = DefaultConfigurator.parseNumber(
parts[0].replaceAll("\\s+", ""),
Long.class);
long burstLength = DefaultConfigurator.parseNumber(
parts[1].replaceAll("\\s+", ""),
Long.class);
int numberOfClients = DefaultConfigurator
.parseNumber(
parts[2].replaceAll("\\s+", ""),
Integer.class);
// Insanity Checks
assert startTime >= previousEndTime : "Start time for next fluctuation must be greater than previous end time.";
assert burstLength >= _minBurstLength : "The minimal length of the burst must be at least 10m.";
assert numberOfClients > 0: "Number of nodes must be positive.";
assert numberOfClients <= maxNumberOfNodes : "Cannot configure more nodes than configured in configuration.";
previousEndTime = startTime + burstLength;
churnInfos.add(new ChurnInfo(startTime, burstLength,
numberOfClients));
entrySuccessfullyRead = true;
} catch (NumberFormatException e) {
// Ignore leading comments
if (entrySuccessfullyRead) {
// System.err.println("CSV ParseError " +
// line);
}
}
} else {
throw new AssertionError("To many/few columns in CSV.");
}
}
}
} catch (Exception e) {
System.err.println("Could not open " + filename);
throw new RuntimeException("Could not open " + filename);
} finally {
if (csv != null) {
try {
csv.close();
} catch (IOException e) {
//
}
}
}
}
/**
*
* @author Nils Richerzhagen
* @version 1.0, Nov 25, 2015
*/
private class HostSessionInfo {
public final SimHost host;
public final long timestamp;
public HostSessionInfo(SimHost host, long timestamp) {
this.host = host;
this.timestamp = timestamp;
}
}
private class ChurnEvent {
public final SimHost host;
public final boolean goOnline;
ChurnEvent(SimHost host, boolean goOnline) {
this.host = host;
this.goOnline = goOnline;
}
}
/**
* Churn Info for the fluctuation intervals.
*
* @author Nils Richerzhagen
*/
private class ChurnInfo {
/**
* The time the burst starts.
*/
private long startTime;
/**
* The time the burst takes.
*/
private long burstLength;
/**
* The max number of nodes that join during that burst.
*/
private int numberOfClients;
public ChurnInfo(long startTime, long burstLength,
int numberOfClients) {
this.startTime = startTime;
this.burstLength = burstLength;
this.numberOfClients = numberOfClients;
}
public long getStartTime() {
return startTime;
}
public int getNumberOfClients() {
return numberOfClients;
}
public long getBurstLength() {
return burstLength;
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment