Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
Simonstrator
PeerfactSim.KOM
Commits
d9b11eb6
Commit
d9b11eb6
authored
Nov 26, 2015
by
Nils Richerzhagen
Browse files
Added MaxPeerCountChurnGenerator.
parent
151ba806
Changes
3
Hide whitespace changes
Inline
Side-by-side
src/de/tud/kom/p2psim/impl/churn/CSVBasedChurnModel.java
View file @
d9b11eb6
...
...
@@ -322,14 +322,7 @@ public class CSVBasedChurnModel implements ChurnModel {
public
long
getStartTime
(){
return
startTime
;
}
//
// /**
// * Returns if this churn interval stands for an online interval
// * @return
// */
// public boolean isOnline(){
// return this.online;
// }
}
}
src/de/tud/kom/p2psim/impl/churn/FluctuatingPeerCountChurnModel.java
View file @
d9b11eb6
...
...
@@ -20,6 +20,9 @@
package
de.tud.kom.p2psim.impl.churn
;
import
java.io.BufferedReader
;
import
java.io.FileReader
;
import
java.io.IOException
;
import
java.util.Comparator
;
import
java.util.LinkedHashMap
;
import
java.util.LinkedList
;
...
...
@@ -31,6 +34,8 @@ import java.util.Random;
import
de.tud.kom.p2psim.api.churn.ChurnModel
;
import
de.tud.kom.p2psim.api.common.SimHost
;
import
de.tud.kom.p2psim.api.network.SimNetInterface
;
import
de.tud.kom.p2psim.impl.scenario.DefaultConfigurator
;
import
de.tud.kom.p2psim.impl.simengine.Simulator
;
import
de.tudarmstadt.maki.simonstrator.api.Randoms
;
import
de.tudarmstadt.maki.simonstrator.api.Time
;
import
de.tudarmstadt.maki.simonstrator.api.util.XMLConfigurableConstructor
;
...
...
@@ -38,131 +43,178 @@ import de.tudarmstadt.maki.simonstrator.api.util.XMLConfigurableConstructor;
/**
* random arrival/leaving rate during simulation for a prefixed interval
*
* Session length fix und nur arrival/leaving rate in lower und upper bound randoms
* Intervals for rates fix, peaks fix, darüber geben sich arrical/leaving rates.
*
* Session length fixund nur arrival/leaving rate in lower und upper bound
* randoms
*
*
* Bursts:
*
* Burst joining duration 10 minutes.
*
* Min session length 10 minutes.
*
* @author Nils Richerzhagen
* @version 1.0, Feb 18, 2015
*/
public
class
FluctuatingPeerCountChurnModel
implements
ChurnModel
{
private
final
String
SEP
=
","
;
private
final
String
filename
;
private
Map
<
Long
,
Long
>
churnInfos
;
private
final
String
commentsDelimiter
=
"#"
;
private
final
String
SEP
=
","
;
private
double
upperFluctuationBound
;
private
final
long
_burstJoinInterval
=
10
*
Time
.
MINUTE
;
/**
*
*/
private
double
lowerFluctuationBound
;
private
final
long
_minBurstLength
=
10
*
Time
.
MINUTE
;
private
LinkedList
<
ChurnInfo
>
churnInfos
=
new
LinkedList
<
ChurnInfo
>();
/**
* Maximum number of online clients for non-fluctuation phase.
* Maximum number of online clients for non-fluctuation phase. Defined
* during configuration.
*/
private
int
max
PeersOnline
;
private
final
int
default
PeersOnline
;
/**
* Inter-arrival time as specified during configuration
*/
// private final long initialInterArrivalTime;
private
final
long
initialInterArrivalTime
;
/*
* Parameters a and b of exponential distribution defining session lengths
*/
private
double
a
=
0.6378
;
private
double
b
=
-
0.05944
;
/**
* Minimum session length as specified during configuration
*/
private
long
minSessionLength
=
10
*
Time
.
SECOND
;
private
List
<
SimHost
>
hosts
;
private
final
Random
random
=
Randoms
.
getRandom
(
FluctuatingPeerCountChurnModel
.
class
);
private
long
lastJoin
=
-
1
;
private
Map
<
Long
,
ClientSessionInfo
>
clientSessionInfos
=
new
LinkedHashMap
<
Long
,
ClientSessionInfo
>();
private
PriorityQueue
<
ClientSessionInfo
>
clientsSortedByOfflineTime
;
private
final
Random
random
=
Randoms
.
getRandom
(
FluctuatingPeerCountChurnModel
.
class
);
private
long
lastJoin
=
-
1
;
@XMLConfigurableConstructor
({
"maxPeersOnline"
,
"upperFluctuationBound"
,
"lowerFluctuationBound"
})
public
FluctuatingPeerCountChurnModel
(
int
maxPeersOnline
,
double
upperFluctuationBound
,
double
lowerFluctuationBound
)
{
this
.
maxPeersOnline
=
maxPeersOnline
;
this
.
upperFluctuationBound
=
upperFluctuationBound
;
this
.
lowerFluctuationBound
=
lowerFluctuationBound
;
// private Map<Long, ClientSessionInfo> clientSessionInfos = new LinkedHashMap<Long, ClientSessionInfo>();
private
Map
<
SimHost
,
LinkedList
<
ClientChurnInfo
>>
clientChurnInfos
;
// private PriorityQueue<ClientSessionInfo> clientsSortedByOfflineTime;
@XMLConfigurableConstructor
({
"file"
,
"defaultPeersOnline"
,
"interArrivalTime"
})
public
FluctuatingPeerCountChurnModel
(
String
file
,
int
defaultPeersOnline
,
long
interArrivalTime
)
{
this
.
filename
=
file
;
this
.
defaultPeersOnline
=
defaultPeersOnline
;
this
.
initialInterArrivalTime
=
interArrivalTime
;
}
@Override
public
long
getNextUptime
(
SimHost
host
)
{
long
currentTime
=
Time
.
getCurrentTime
();
if
(!
hosts
.
remove
(
host
)){
/*
* FIXME: Avoiding reusing peer instances.
*/
return
1000
*
Time
.
HOUR
;
}
// TODO Current Fluctuation
if
(
clientsSortedByOfflineTime
.
size
()
<
maxPeersOnline
){
/*
* Initially, join peers until the peer threshold is reached.
*/
if
(
lastJoin
<
0
)
{
lastJoin
=
currentTime
;
}
LinkedList
<
ClientChurnInfo
>
churnInfos
=
clientChurnInfos
.
get
(
host
);
return
churnInfos
.
getFirst
().
joiningAt
-
currentTime
;
// ChurnInfo currentChurnInfo = null;
// if (!churnInfos.isEmpty()) {
// currentChurnInfo = churnInfos.getFirst();
// /*
// * ChurnInfo is old. Remove old one and overwrite currentChurnInfo
// */
// if (currentChurnInfo.getEndTime() < currentTime) {
// churnInfos.removeFirst();
// currentChurnInfo = churnInfos.getFirst();
// }
// }
//
// /*
// * Join all clients until default (min) number of clients is achieved.
// */
// if (clientsSortedByOfflineTime.size() < defaultPeersOnline) {
// /*
// * Initially, join peers until the peer threshold is reached.
// */
// if (lastJoin < 0) {
// lastJoin = currentTime;
// }
// long currentJoin = lastJoin + initialInterArrivalTime;
// ClientSessionInfo info = new ClientSessionInfo(currentJoin);
// clientsSortedByOfflineTime.add(info);
// clientSessionInfos.put(host.getHostId(), info);
// if (hasFlashcrowd && !flashCrowdFinished
// && currentJoin > flashcrowdStart) {
// currentJoin = flashcrowdStart;
// inFlashcrowd = true;
//
// lastJoin = currentJoin;
// return info.joiningAt - currentTime;
// }
//
//
// if (currentChurnInfo != null
// && currentChurnInfo.getStartTime() <= currentTime
// && (currentChurnInfo.getStartTime()
// + currentChurnInfo.getIntervalLength()) > currentTime) {
// /*
// * Is in next churn info: churnStart < currentTime < churnEnd
// *
// * Join or leave peers in a burst according to current churnInfo.
// *
// * Info: the peak is reached with the churnInfo - numberOfClients
// */
//
// long churnCrowdInterArrivalTime = currentChurnInfo
// .getIntervalLength()
// / (currentChurnInfo.getNumberOfClients()
// - defaultPeersOnline);
// long currentJoin = lastJoin + churnCrowdInterArrivalTime;
// ClientSessionInfo info = new ClientSessionInfo(currentJoin);
// clientSessionInfos.put(host.getHostId(), info);
//
// lastJoin = currentJoin;
// return info.joiningAt - currentTime;
//
// } else if (clientsSortedByOfflineTime.size() < defaultPeersOnline) {
// /*
// * Initially, join peers until the peer threshold is reached.
// */
// if (lastJoin < 0) {
// lastJoin = currentTime;
// }
// long currentJoin = lastJoin + initialInterArrivalTime;
// ClientSessionInfo info = new ClientSessionInfo(currentJoin);
// clientsSortedByOfflineTime.add(info);
// clientSessionInfos.put(host.getHostId(), info);
//
// lastJoin = currentJoin;
// return info.joiningAt - currentTime;
}
else
{
/*
* After reaching the peer threshold, only peers that went offline
* are replaced to keep the threshold.
*/
ClientSessionInfo
nextToGoOffline
=
clientsSortedByOfflineTime
.
poll
();
if
(
nextToGoOffline
==
null
)
{
throw
new
AssertionError
();
}
else
{
// Use the next time a peer goes offline as joining time.
long
currentJoin
=
nextToGoOffline
.
leavingAt
;
ClientSessionInfo
info
=
new
ClientSessionInfo
(
currentJoin
);
clientsSortedByOfflineTime
.
add
(
info
);
clientSessionInfos
.
put
(
host
.
getHostId
(),
info
);
// if (hasFlashcrowd && !flashCrowdFinished
// && currentJoin > flashcrowdStart) {
// currentJoin = flashcrowdStart;
// inFlashcrowd = true;
// }
lastJoin
=
currentJoin
;
return
info
.
joiningAt
-
currentTime
;
}
}
return
0
;
// } else {
// /*
// * After reaching the peer threshold, only peers that went offline
// * are replaced to keep the threshold.
// */
// ClientSessionInfo nextToGoOffline = clientsSortedByOfflineTime
// .poll();
//
// if (nextToGoOffline == null) {
// throw new AssertionError();
// } else {
// // Use the next time a peer goes offline as joining time.
// long currentJoin = nextToGoOffline.leavingAt;
// ClientSessionInfo info = new ClientSessionInfo(currentJoin);
// clientsSortedByOfflineTime.add(info);
// clientSessionInfos.put(host.getHostId(), info);
//
// lastJoin = currentJoin;
// return info.joiningAt - currentTime;
// }
// }
}
@Override
public
long
getNextDowntime
(
SimHost
host
)
{
LinkedList
<
ClientChurnInfo
>
churnInfos
=
clientChurnInfos
.
get
(
host
);
// (churnInfos.removeFirst());
// return info.sessionLength;
return
0
;
}
@Override
public
void
prepare
(
List
<
SimHost
>
churnHosts
)
{
hosts
=
new
LinkedList
<
SimHost
>(
churnHosts
);
...
...
@@ -174,57 +226,243 @@ public class FluctuatingPeerCountChurnModel implements ChurnModel {
}
}
}
clientsSortedByOfflineTime
=
new
PriorityQueue
<
FluctuatingPeerCountChurnModel
.
ClientSessionInfo
>(
(
int
)
Math
.
ceil
(
hosts
.
size
()
/
10.0
),
COMP_OFFLINE_TIME
);
// clientsSortedByOfflineTime = new PriorityQueue<FluctuatingPeerCountChurnModel.ClientSessionInfo>(
// (int) Math.ceil(hosts.size() / 10.0), COMP_OFFLINE_TIME);
parseTrace
(
filename
);
computeChurnPerNode
();
}
// /**
// * Comparator used to sort client infos by offline time
// */
// private static final Comparator<ClientSessionInfo> COMP_OFFLINE_TIME = new Comparator<FluctuatingPeerCountChurnModel.ClientSessionInfo>() {
// @Override
// public int compare(ClientSessionInfo o1, ClientSessionInfo o2) {
// return ((Long) o1.leavingAt).compareTo(o2.leavingAt);
// }
// };
/**
* Comparator used to sort client infos by offline time
*/
private
static
final
Comparator
<
ClientSessionInfo
>
COMP_OFFLINE_TIME
=
new
Comparator
<
FluctuatingPeerCountChurnModel
.
ClientSessionInfo
>()
{
@Override
public
int
compare
(
ClientSessionInfo
o1
,
ClientSessionInfo
o2
)
{
return
((
Long
)
o1
.
leavingAt
).
compareTo
(
o2
.
leavingAt
);
}
};
public
long
getSessionLength
()
{
// FIXME Hosts s
return
3
*
Time
.
HOUR
;
}
protected
long
getSessionLength
(){
double
rnd
=
random
.
nextDouble
();
public
int
getDefaultPeersOnline
()
{
return
defaultPeersOnline
;
}
private
void
computeChurnPerNode
(){
/**
* Compute a list (onlineStart, intervalLength) per host.
*/
clientChurnInfos
=
new
LinkedHashMap
<
SimHost
,
LinkedList
<
ClientChurnInfo
>>();
int
count
=
0
;
for
(
SimHost
host
:
hosts
)
{
count
++;
LinkedList
<
ClientChurnInfo
>
clientChurnInfos
=
new
LinkedList
<
ClientChurnInfo
>();
/*
* As long as the defaultNumberOfPeers is not achieved just fill list with one item per Host.
*/
if
(
count
<=
defaultPeersOnline
){
if
(
lastJoin
<
0
)
{
lastJoin
=
0
;
}
long
currentJoin
=
lastJoin
+
initialInterArrivalTime
;
lastJoin
=
currentJoin
;
// FIXME Session length for nodes that stay online?
clientChurnInfos
.
add
(
new
ClientChurnInfo
(
currentJoin
,
Simulator
.
getEndTime
()));
}
/*
* Nodes that may go offline multiple times. Defined by churnInfos via csv file.
*/
else
if
(
count
>
defaultPeersOnline
){
for
(
ChurnInfo
churnInfo
:
churnInfos
)
{
long
currentStartTime
=
churnInfo
.
getStartTime
();
long
currentBurstLength
=
churnInfo
.
getBurstLength
();
// when burst starts + x-ter Node times inter arrival rate
long
currentJoin
=
currentStartTime
+
(
count
-
defaultPeersOnline
)
*
churnInfo
.
getInterArrivalRate
();
assert
currentJoin
>=
currentStartTime
;
if
(
count
<=
churnInfo
.
getNumberOfClients
()){
clientChurnInfos
.
add
(
new
ClientChurnInfo
(
currentJoin
,
currentBurstLength
));
}
}
}
// clientChurnInfos.put(host, clientChurnInfos);
System
.
out
.
println
(
host
.
getId
().
value
()
+
" churnInfoAdded"
);
}
long
sessionLength
=
(
long
)
(
Math
.
log
(
rnd
/
a
)
/
b
)
*
Time
.
MINUTE
+
(
long
)
(
rnd
*
Time
.
MINUTE
);
}
private
void
parseTrace
(
String
filename
)
{
System
.
out
.
println
(
"=============================="
);
System
.
out
.
println
(
"Reading trace from "
+
filename
);
/*
* The minimum session length is limited to avoid too short sessions
* This parser works for the following csv file structure.
*
* startTime, intervalLength, numberOfClients
*
*/
return
Math
.
max
(
minSessionLength
+
(
long
)
(
rnd
*
Time
.
MINUTE
),
sessionLength
);
BufferedReader
csv
=
null
;
boolean
entrySuccessfullyRead
=
false
;
try
{
csv
=
new
BufferedReader
(
new
FileReader
(
filename
));
long
previousEndTime
=
0
;
while
(
csv
.
ready
())
{
String
line
=
csv
.
readLine
();
if
(
line
.
length
()
==
0
||
line
.
startsWith
(
commentsDelimiter
))
continue
;
if
(
line
.
indexOf
(
SEP
)
>
-
1
)
{
String
[]
parts
=
line
.
split
(
SEP
);
if
(
parts
.
length
==
3
)
{
try
{
long
startTime
=
DefaultConfigurator
.
parseNumber
(
parts
[
0
].
replaceAll
(
"\\s+"
,
""
),
Long
.
class
);
long
burstLength
=
DefaultConfigurator
.
parseNumber
(
parts
[
1
].
replaceAll
(
"\\s+"
,
""
),
Long
.
class
);
int
numberOfClients
=
DefaultConfigurator
.
parseNumber
(
parts
[
2
].
replaceAll
(
"\\s+"
,
""
),
Integer
.
class
);
// Insanity Checks
assert
startTime
>=
previousEndTime
:
"Start time for next fluctuation must be greater than previous end time."
;
assert
burstLength
<
_minBurstLength
:
"The minimal length of the burst must be at least 10m."
;
previousEndTime
=
startTime
+
burstLength
;
churnInfos
.
add
(
new
ChurnInfo
(
startTime
,
burstLength
,
numberOfClients
));
entrySuccessfullyRead
=
true
;
}
catch
(
NumberFormatException
e
)
{
// Ignore leading comments
if
(
entrySuccessfullyRead
)
{
// System.err.println("CSV ParseError " +
// line);
}
}
}
else
{
throw
new
AssertionError
(
"To many/few columns in CSV."
);
}
}
}
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"Could not open "
+
filename
);
throw
new
RuntimeException
(
"Could not open "
+
filename
);
}
finally
{
if
(
csv
!=
null
)
{
try
{
csv
.
close
();
}
catch
(
IOException
e
)
{
//
}
}
}
}
// /**
// * Client session information
// */
// private class ClientSessionInfo {
//
// public final long joiningAt;
//
// public final long leavingAt;
//
// public final long sessionLength;
//
// public ClientSessionInfo(long joiningAt) {
// this.sessionLength = getSessionLength();
// this.joiningAt = joiningAt;
// this.leavingAt = joiningAt + sessionLength;
// }
//
// }
public
void
setMinSessionLength
(
long
minSessionLength
)
{
this
.
minSessionLength
=
minSessionLength
;
/**
*
*/
private
class
ClientChurnInfo
{
public
final
long
joiningAt
;
public
final
long
leavingAt
;
public
ClientChurnInfo
(
long
joiningAt
,
long
sessionLength
)
{
this
.
joiningAt
=
joiningAt
;
this
.
leavingAt
=
joiningAt
+
sessionLength
;
}
}
/**
* Client session information
* Churn Info for the fluctuation intervals.
*
* @author Nils Richerzhagen
*/
private
class
C
lientSessio
nInfo
{
private
class
C
hur
nInfo
{
public
final
long
joiningAt
;
/**
* The time the burst starts.
*/
private
long
startTime
;
public
final
long
leavingAt
;
/**
* The time the nodes stay online.
*/
private
long
burstLength
;
public
final
long
sessionLength
;
/**
* The max number of nodes that join during that burst.
*/
private
int
numberOfClients
;
public
ChurnInfo
(
long
startTime
,
long
burstLength
,
int
numberOfClients
)
{
this
.
startTime
=
startTime
;
this
.
burstLength
=
burstLength
;
this
.
numberOfClients
=
numberOfClients
;
}
public
ClientSessionInfo
(
long
joiningAt
)
{
this
.
sessionLength
=
getSessionLength
();
this
.
joiningAt
=
joiningAt
;
this
.
leavingAt
=
joiningAt
+
sessionLength
;
public
long
getStartTime
()
{
return
startTime
;
}
}
public
long
getBurstLength
()
{
return
burstLength
;
}
public
int
getNumberOfClients
()
{
return
numberOfClients
;
}
//
// public long getEndTime() {
// return startTime + burstLength;
// }
/**
* Computed for the burst join window of 10 minutes.
*
* @return interArrivalRate
*/
public
long
getInterArrivalRate
()
{
return
_burstJoinInterval
/
(
numberOfClients
-
getDefaultPeersOnline
());
}
}
}
src/de/tud/kom/p2psim/impl/churn/MaxPeerCountChurnGenerator.java
0 → 100644
View file @
d9b11eb6
/*
* Copyright (c) 2005-2010 KOM – Multimedia Communications Lab
*
* This file is part of PeerfactSim.KOM.
*
* PeerfactSim.KOM is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* PeerfactSim.KOM is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with PeerfactSim.KOM. If not, see <http://www.gnu.org/licenses/>.
*
*/
package
de.tud.kom.p2psim.impl.churn
;
import
java.io.BufferedReader
;
import
java.io.FileReader
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.Comparator
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.PriorityQueue
;
import
de.tud.kom.p2psim.api.common.SimHost
;
import
de.tud.kom.p2psim.api.network.SimNetInterface
;
import
de.tud.kom.p2psim.api.network.SimNetworkComponent
;
import
de.tud.kom.p2psim.impl.scenario.DefaultConfigurator
;
import
de.tud.kom.p2psim.impl.simengine.Simulator
;
import
de.tud.kom.p2psim.impl.util.oracle.GlobalOracle
;
import
de.tud.kom.p2psim.impl.util.toolkits.CollectionHelpers
;
import
de.tud.kom.p2psim.impl.util.toolkits.Predicates
;
import
de.tudarmstadt.maki.simonstrator.api.Event
;
import
de.tudarmstadt.maki.simonstrator.api.EventHandler
;
import
de.tudarmstadt.maki.simonstrator.api.Time
;
import
de.tudarmstadt.maki.simonstrator.api.component.GlobalComponent
;
import
de.tudarmstadt.maki.simonstrator.api.util.XMLConfigurableConstructor
;
/**
* ChurnModel that follows a defined "trace" in a csv file. The trace must
* consist of the following parameters per line:
*
* startTime, intervalLength numberOfClients
*
* startTime - the time when the specified number of nodes is to be set.
*
* intervalLength - the time in which the specified number of nodes (startTime)
* must be achieved by the model. Those two together also form the inter
* join/leaving rate.
*
* numberOfClients - the number of clients to be achieved.
*
* The model checks for a minimum intervalLength of 10 minutes.
*
*
* @author Nils Richerzhagen
* @version 1.0, Nov 24, 2015
*/
public
class
MaxPeerCountChurnGenerator
implements
EventHandler
,
GlobalComponent
{
private
static
final
int
_PeerCountEvent
=
1001
;
private
static
final
int
_CHURN_START
=
1002
;
private
static
final
int
_CHURN_EVENT
=
1003
;
private
static
final
int
_CHURN_NOCHURN_HOSTS
=
1004
;
private
final
String
commentsDelimiter
=
"#"
;
private
final
String
SEP
=
","
;
private
final
long
_minBurstLength
=
10
*
Time
.
MINUTE
;
/**
* {@link ChurnInfo} from the csv file.
*/
private
LinkedList
<
ChurnInfo
>
churnInfos
=
new
LinkedList
<
ChurnInfo
>();
private
PriorityQueue
<
HostSessionInfo
>
onlineHostsSortedByOnlineTime
;
private
PriorityQueue
<
HostSessionInfo
>
offlineHostsSortedByOfflineTime
;
/**
* Comparator used to sort client infos by offline time
*/
private
static
final
Comparator
<
HostSessionInfo
>
COMP_TIME
=
new
Comparator
<
MaxPeerCountChurnGenerator
.
HostSessionInfo
>()
{
@Override
public
int
compare
(
HostSessionInfo
o1
,
HostSessionInfo
o2
)
{
return
((
Long
)
o1
.
timestamp
).
compareTo
(
o2
.
timestamp
);
}
};
@XMLConfigurableConstructor
({
"file"
})
public
MaxPeerCountChurnGenerator
(
String
file
)
{
parseTrace
(
file
);
}
/**
* Called by the configurator.
*
* @param churnStart
*/
public
void
setChurnStart
(
long
churnStart
)
{
// Event.scheduleWithDelay(churnStart, this, null, _CHURN_START);
Event
.
scheduleImmediately
(
this
,
null
,
_CHURN_START
);
}
public
void
initialize
()
{
for
(
ChurnInfo
churnInfo
:
churnInfos
)
{
Event
.
scheduleWithDelay
(
churnInfo
.
getStartTime
(),
this
,
churnInfo
,
_PeerCountEvent
);
}
}
/**
* Start adapting on new churn rate, when next churnInfo is valid.
*/
private
void
configureMaxPeerCount
(
ChurnInfo
currentChurnInfo
)
{
long
currentTime
=
Simulator
.
getCurrentTime
();
assert
currentChurnInfo
.
getStartTime
()
==
currentTime
:
"The ChurnInfo to use is scheduled for the exact time, thus it should be the same."
;
/*
* Wanted number > current number. --> need nodes = go online
*/
if
(
currentChurnInfo
.
getNumberOfClients
()
>=
onlineHostsSortedByOnlineTime
.
size
())
{
int
count
=
currentChurnInfo
.
getNumberOfClients
()
-
onlineHostsSortedByOnlineTime
.
size
();
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
/*
* Schedule the required number of hosts for going online
* churnEvent. Get oldest entry in sortedOffline list and put
* online.
*/
HostSessionInfo
hostSessionInfo
=
offlineHostsSortedByOfflineTime
.
poll
();
assert
hostSessionInfo
!=
null
:
"HostSessionInfo shouldn't be null - means to few hosts were configured."
;
ChurnEvent
churnEvent
=
new
ChurnEvent
(
hostSessionInfo
.
host
,
true
);
long
currentJoin
=
i
*
(
currentChurnInfo
.
getBurstLength
()
/
count
);
Event
.
scheduleWithDelay
(
currentJoin
,
this
,
churnEvent
,
_CHURN_EVENT
);
}
}
/*
* Wanted number < currentNumber --> remove nodes = goOffline
*/
else
if
(
currentChurnInfo
.
getNumberOfClients
()
<
onlineHostsSortedByOnlineTime
.
size
())
{
int
count
=
onlineHostsSortedByOnlineTime
.
size
()
-
currentChurnInfo
.
getNumberOfClients
();
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
/*
* Schedule the required number of hosts for going offline
* churnEvent. Get oldest entry in sortedOnline list and put
* offline.
*/
HostSessionInfo
hostSessionInfo
=
onlineHostsSortedByOnlineTime
.
poll
();
assert
hostSessionInfo
!=
null
:
"HostSessionInfo shouldn't be null - means no hosts were online."
;
ChurnEvent
churnEvent
=
new
ChurnEvent
(
hostSessionInfo
.
host
,
false
);
long
currentLeave
=
i
*
(
currentChurnInfo
.
getBurstLength
()
/
count
);
Event
.
scheduleWithDelay
(
currentLeave
,
this
,
churnEvent
,
_CHURN_EVENT
);
}
}
else
{
throw
new
AssertionError
();
}
}
@Override
public
void
eventOccurred
(
Object
content
,
int
type
)
{
if
(
type
==
_PeerCountEvent
)
{
ChurnInfo
churnInfo
=
(
ChurnInfo
)
content
;
configureMaxPeerCount
(
churnInfo
);
}
else
if
(
type
==
_CHURN_EVENT
)
{
long
currentTime
=
Simulator
.
getCurrentTime
();
ChurnEvent
churnEvent
=
(
ChurnEvent
)
content
;
SimNetworkComponent
net
=
churnEvent
.
host
.
getNetworkComponent
();
if
(
churnEvent
.
goOnline
)
{
for
(
SimNetInterface
netI
:
net
.
getSimNetworkInterfaces
())
{
if
(
netI
.
isOffline
())
{
netI
.
goOnline
();
}
}
onlineHostsSortedByOnlineTime
.
add
(
new
HostSessionInfo
(
churnEvent
.
host
,
currentTime
));
}
else
{
for
(
SimNetInterface
netI
:
net
.
getSimNetworkInterfaces
())
{
if
(
netI
.
isOnline
())
{
netI
.
goOffline
();
}
}
offlineHostsSortedByOfflineTime
.
add
(
new
HostSessionInfo
(
churnEvent
.
host
,
currentTime
));
}
}
else
if
(
type
==
_CHURN_START
)
{
initialize
();
List
<
SimHost
>
hosts
=
new
ArrayList
<
SimHost
>(
this
.
filterHosts
());
this
.
prepare
(
hosts
);
offlineHostsSortedByOfflineTime
=
new
PriorityQueue
<
HostSessionInfo
>(
(
int
)
Math
.
ceil
(
hosts
.
size
()
/
10.0
),
COMP_TIME
);
onlineHostsSortedByOnlineTime
=
new
PriorityQueue
<
HostSessionInfo
>(
(
int
)
Math
.
ceil
(
hosts
.
size
()
/
10.0
),
COMP_TIME
);
long
currentTime
=
Simulator
.
getCurrentTime
();
for
(
SimHost
host
:
hosts
)
{
offlineHostsSortedByOfflineTime
.
add
(
new
HostSessionInfo
(
host
,
currentTime
));
}
}
else
if
(
type
==
_CHURN_NOCHURN_HOSTS
)
{
// Send no-churn-hosts online immediately
List
<
SimHost
>
nochurnhosts
=
(
List
<
SimHost
>)
content
;
for
(
SimHost
host
:
nochurnhosts
)
{
for
(
SimNetInterface
netI
:
host
.
getNetworkComponent
()
.
getSimNetworkInterfaces
())
{
if
(
netI
.
isOffline
())
{
netI
.
goOnline
();
}
}
}
}
}
/**
* Send hosts offline! Should be used to start with churnHosts in offline
* state.
*
* @param hosts
*/
private
void
prepare
(
List
<
SimHost
>
hosts
)
{
for
(
SimHost
host
:
hosts
)
{
for
(
SimNetInterface
netI
:
host
.
getNetworkComponent
()
.
getSimNetworkInterfaces
())
{
if
(
netI
.
isOnline
())
{
netI
.
goOffline
();
}
}
}
}
/**
* Gets all hosts and takes all churn affected hosts. Schedules the non
* affected host to go online immediately.
*
* @return
*/
private
List
<
SimHost
>
filterHosts
()
{
List
<
SimHost
>
tmp
=
GlobalOracle
.
getHosts
();
List
<
SimHost
>
filteredHosts
=
new
LinkedList
<
SimHost
>();
CollectionHelpers
.
filter
(
tmp
,
filteredHosts
,
Predicates
.
IS_CHURN_AFFECTED
);
List
<
SimHost
>
noChurn
=
new
LinkedList
<
SimHost
>();
noChurn
.
addAll
(
tmp
);
noChurn
.
removeAll
(
filteredHosts
);
Event
.
scheduleImmediately
(
this
,
noChurn
,
_CHURN_NOCHURN_HOSTS
);
return
filteredHosts
;
}
/**
* Reads the file given by the configuration and parses the churn events.
*
* @param filename
*/
private
void
parseTrace
(
String
filename
)
{
System
.
out
.
println
(
"=============================="
);
System
.
out
.
println
(
"Reading trace from "
+
filename
);
/*
* This parser works for the following csv file structure.
*
* startTime, intervalLength numberOfClients
*
*/
BufferedReader
csv
=
null
;
boolean
entrySuccessfullyRead
=
false
;
try
{
csv
=
new
BufferedReader
(
new
FileReader
(
filename
));
long
previousEndTime
=
0
;
while
(
csv
.
ready
())
{
String
line
=
csv
.
readLine
();
if
(
line
.
length
()
==
0
||
line
.
startsWith
(
commentsDelimiter
))
continue
;
if
(
line
.
indexOf
(
SEP
)
>
-
1
)
{
String
[]
parts
=
line
.
split
(
SEP
);
if
(
parts
.
length
==
3
)
{
try
{
long
startTime
=
DefaultConfigurator
.
parseNumber
(
parts
[
0
].
replaceAll
(
"\\s+"
,
""
),
Long
.
class
);
long
burstLength
=
DefaultConfigurator
.
parseNumber
(
parts
[
1
].
replaceAll
(
"\\s+"
,
""
),
Long
.
class
);
int
numberOfClients
=
DefaultConfigurator
.
parseNumber
(
parts
[
2
].
replaceAll
(
"\\s+"
,
""
),
Integer
.
class
);
// Insanity Checks
assert
startTime
>=
previousEndTime
:
"Start time for next fluctuation must be greater than previous end time."
;
assert
burstLength
>=
_minBurstLength
:
"The minimal length of the burst must be at least 10m."
;
previousEndTime
=
startTime
+
burstLength
;
churnInfos
.
add
(
new
ChurnInfo
(
startTime
,
burstLength
,
numberOfClients
));
entrySuccessfullyRead
=
true
;
}
catch
(
NumberFormatException
e
)
{
// Ignore leading comments
if
(
entrySuccessfullyRead
)
{
// System.err.println("CSV ParseError " +
// line);
}
}
}
else
{
throw
new
AssertionError
(
"To many/few columns in CSV."
);
}
}
}
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"Could not open "
+
filename
);
throw
new
RuntimeException
(
"Could not open "
+
filename
);
}
finally
{
if
(
csv
!=
null
)
{
try
{
csv
.
close
();
}
catch
(
IOException
e
)
{
//
}
}
}
}
/**
*
* @author Nils Richerzhagen
* @version 1.0, Nov 25, 2015
*/
private
class
HostSessionInfo
{
public
final
SimHost
host
;
public
final
long
timestamp
;
public
HostSessionInfo
(
SimHost
host
,
long
timestamp
)
{
this
.
host
=
host
;
this
.
timestamp
=
timestamp
;
}
}
private
class
ChurnEvent
{
public
final
SimHost
host
;
public
final
boolean
goOnline
;
ChurnEvent
(
SimHost
host
,
boolean
goOnline
)
{
this
.
host
=
host
;
this
.
goOnline
=
goOnline
;
}
}
/**
* Churn Info for the fluctuation intervals.
*
* @author Nils Richerzhagen
*/
private
class
ChurnInfo
{
/**
* The time the burst starts.
*/
private
long
startTime
;
/**
* The time the burst takes.
*/
private
long
burstLength
;
/**
* The max number of nodes that join during that burst.
*/
private
int
numberOfClients
;
public
ChurnInfo
(
long
startTime
,
long
burstLength
,
int
numberOfClients
)
{
this
.
startTime
=
startTime
;
this
.
burstLength
=
burstLength
;
this
.
numberOfClients
=
numberOfClients
;
}
public
long
getStartTime
()
{
return
startTime
;
}
public
int
getNumberOfClients
()
{
return
numberOfClients
;
}
public
long
getBurstLength
()
{
return
burstLength
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment