Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
Simonstrator
PeerfactSim.KOM
Commits
740c11b2
Commit
740c11b2
authored
Oct 20, 2014
by
Julius Rückert
Browse files
Refactored churn model and added more documentation.
parent
b965c287
Changes
1
Hide whitespace changes
Inline
Side-by-side
src/de/tud/kom/p2psim/impl/churn/ConstantPeerCountChurnModel.java
View file @
740c11b2
...
...
@@ -20,7 +20,6 @@
package
de.tud.kom.p2psim.impl.churn
;
import
java.util.Collections
;
import
java.util.Comparator
;
import
java.util.LinkedHashMap
;
import
java.util.LinkedList
;
...
...
@@ -37,56 +36,79 @@ import de.tudarmstadt.maki.simonstrator.api.Time;
import
de.tudarmstadt.maki.simonstrator.api.util.XMLConfigurableConstructor
;
/**
* Another simple churn model that maintains a constant
peer
coun
d
(i.e. if a
*
peer
goes offline, it is replaced with a new one). The arrival rate is
* Another simple churn model that maintains a constant
client
coun
t
(i.e. if a
*
client
goes offline, it is replaced with a new one). The arrival rate is
* constant until the threshold is reached. Session times are configurable as
* either static values or based on a distribution.
*
* @author Bjoern Richerzhagen
* In addition, a flash crowd phase can be configured for a sudden burst of
* connected clients.
*
* @author Bjoern Richerzhagen, Refactored by Julius Rueckert
* @version 1.0, 01.10.2012
*/
public
class
ConstantPeerCountChurnModel
implements
ChurnModel
{
/**
* Maximum number of online clients in non-flash crowd phase
*/
private
final
int
peersThreshold
;
private
long
lastJoin
=
-
1
;
/**
* Inter-arrival time as specified during configuration
*/
private
final
long
initialInterArrivalTime
;
private
List
<
SimHost
>
hosts
;
/**
* Minimum session length as specified during configuration
*/
private
long
minSessionLength
=
10
*
Time
.
SECOND
;
/*
* Parameters a and b of exponential distribution defining session lengths
*/
private
double
a
=
0.6378
;
private
double
b
=
-
0.05944
;
/**
* Tells if the model includes a flash crowd or not
*/
private
boolean
hasFlashcrowd
=
false
;
/*
* Flash crowd-related information
*/
private
long
flashcrowdStart
=
4
*
Time
.
HOUR
;
private
double
flashcrowdMultiplicator
=
2
;
private
long
flashcrowdInterval
=
10
*
Time
.
MINUTE
;
/*
* Local state variables
*/
private
boolean
flashCrowdFinished
=
false
;
private
boolean
inFlashcrowd
=
false
;
private
boolean
flashcrowdFinished
=
false
;
private
long
lastJoin
=
-
1
;
private
Map
<
Long
,
CInfo
>
cInfos
=
new
LinkedHashMap
<
Long
,
CInfo
>()
;
private
List
<
SimHost
>
hosts
;
private
PriorityQueue
<
CInfo
>
sortedByOfflinetime
;
private
Map
<
Long
,
ClientSessionInfo
>
clientSessionInfos
=
new
LinkedHashMap
<
Long
,
ClientSessionInfo
>()
;
private
final
Random
rnd
=
Randoms
.
getRandom
(
ConstantPeerCountChurnModel
.
class
);
private
PriorityQueue
<
ClientSessionInfo
>
clientsSortedByOfflineTime
;
private
final
Comparator
<
CInfo
>
COMP
=
new
Comparator
<
ConstantPeerCountChurnModel
.
CInfo
>()
{
/**
* Comparator used to sort client infos by offline time
*/
private
static
final
Comparator
<
ClientSessionInfo
>
COMP_OFFLINE_TIME
=
new
Comparator
<
ConstantPeerCountChurnModel
.
ClientSessionInfo
>()
{
@Override
public
int
compare
(
C
Info
o1
,
C
Info
o2
)
{
return
(
int
)
Math
.
signum
(
o1
.
leavingAt
-
o2
.
leavingAt
);
public
int
compare
(
C
lientSessionInfo
o1
,
ClientSession
Info
o2
)
{
return
(
(
Long
)
o1
.
leavingAt
).
compareTo
(
o2
.
leavingAt
);
}
};
private
final
Random
rnd
=
Randoms
.
getRandom
(
ConstantPeerCountChurnModel
.
class
);
/**
*
* @param maxPeersOnline
...
...
@@ -100,56 +122,82 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
@Override
public
long
getNextUptime
(
SimHost
host
)
{
long
currentTime
=
Time
.
getCurrentTime
();
if
(!
hosts
.
remove
(
host
))
{
/*
* FIXME: This is to avoid reusing peer instances. Needs to be fixed
* by properly resetting peers!
*/
return
1000
*
Time
.
HOUR
;
}
if
(
hasFlashcrowd
&&
!
flashcrowdFinished
&&
inFlashcrowd
)
{
long
time
=
Time
.
getCurrentTime
();
long
fcInterArrivalTime
=
(
long
)
(
flashcrowdInterval
/
((
flashcrowdMultiplicator
-
1
)
*
peersThreshold
));
CInfo
info
=
new
CInfo
(
lastJoin
+
fcInterArrivalTime
);
lastJoin
+=
fcInterArrivalTime
;
cInfos
.
put
(
host
.
getHostId
(),
info
);
if
(
lastJoin
>
flashcrowdStart
+
flashcrowdInterval
)
{
if
(
hasFlashcrowd
&&
inFlashcrowd
&&
!
flashCrowdFinished
)
{
/*
* Join peers in a burst according to flash crowd configuration.
*
* Info: The flash crowd peak is reached with (flash crowd
* multiplier * peer threshold) present peers. The length of the
* period is defined by the flashcrowdInterval.
*/
long
flashCrowdInterArrivalTime
=
(
long
)
(
flashcrowdInterval
/
((
flashcrowdMultiplicator
-
1
)
*
peersThreshold
));
long
currentJoin
=
lastJoin
+
flashCrowdInterArrivalTime
;
ClientSessionInfo
info
=
new
ClientSessionInfo
(
currentJoin
);
clientSessionInfos
.
put
(
host
.
getHostId
(),
info
);
if
(
currentJoin
>
flashcrowdStart
+
flashcrowdInterval
)
{
inFlashcrowd
=
false
;
flash
c
rowdFinished
=
true
;
flash
C
rowdFinished
=
true
;
}
return
info
.
joiningAt
-
time
;
}
else
if
(
sortedByOfflinetime
.
size
()
<
peersThreshold
)
{
long
time
=
Time
.
getCurrentTime
();
lastJoin
=
currentJoin
;
return
info
.
joiningAt
-
currentTime
;
}
else
if
(
clientsSortedByOfflineTime
.
size
()
<
peersThreshold
)
{
/*
* Initially, join peers until the peer threshold is reached.
*/
if
(
lastJoin
<
0
)
{
lastJoin
=
t
ime
;
lastJoin
=
currentT
ime
;
}
CInfo
info
=
new
CInfo
(
lastJoin
+
initialInterArrivalTime
)
;
lastJoin
+=
initialInterArrivalTime
;
s
ortedByOffline
t
ime
.
add
(
info
);
cInfos
.
put
(
host
.
getHostId
(),
info
);
if
(
hasFlashcrowd
&&
!
flash
c
rowdFinished
&&
info
.
joiningAt
>
flashcrowdStart
)
{
long
currentJoin
=
lastJoin
+
initialInterArrivalTime
;
ClientSessionInfo
info
=
new
ClientSessionInfo
(
currentJoin
)
;
clientsS
ortedByOffline
T
ime
.
add
(
info
);
c
lientSession
Infos
.
put
(
host
.
getHostId
(),
info
);
if
(
hasFlashcrowd
&&
!
flash
C
rowdFinished
&&
currentJoin
>
flashcrowdStart
)
{
inFlashcrowd
=
true
;
}
return
info
.
joiningAt
-
time
;
lastJoin
=
currentJoin
;
return
info
.
joiningAt
-
currentTime
;
}
else
{
CInfo
nextToGoOffline
=
sortedByOfflinetime
.
poll
();
/*
* After reaching the peer threshold, only peers that went offline
* are replaced to keep the threshold.
*/
ClientSessionInfo
nextToGoOffline
=
clientsSortedByOfflineTime
.
poll
();
if
(
nextToGoOffline
==
null
)
{
throw
new
AssertionError
();
}
else
{
long
joiningAt
=
nextToGoOffline
.
leavingAt
;
CInfo
info
=
new
CInfo
(
join
ingAt
)
;
lastJoin
=
info
.
joiningAt
;
s
ortedByOffline
t
ime
.
add
(
info
);
cInfos
.
put
(
host
.
getHostId
(),
info
);
if
(
hasFlashcrowd
&&
!
flash
c
rowdFinished
&&
info
.
joiningAt
>
flashcrowdStart
)
{
// Use the next time a peer goes offline as joining time.
long
currentJoin
=
nextToGoOffline
.
leav
ingAt
;
ClientSessionInfo
info
=
new
ClientSessionInfo
(
currentJoin
)
;
clientsS
ortedByOffline
T
ime
.
add
(
info
);
c
lientSession
Infos
.
put
(
host
.
getHostId
(),
info
);
if
(
hasFlashcrowd
&&
!
flash
C
rowdFinished
&&
currentJoin
>
flashcrowdStart
)
{
inFlashcrowd
=
true
;
}
return
info
.
joiningAt
-
Time
.
getCurrentTime
();
lastJoin
=
currentJoin
;
return
info
.
joiningAt
-
currentTime
;
}
}
}
@Override
public
long
getNextDowntime
(
SimHost
host
)
{
CInfo
info
=
cInfos
.
get
(
host
.
getHostId
());
C
lientSession
Info
info
=
c
lientSession
Infos
.
get
(
host
.
getHostId
());
return
info
.
sessionLength
;
}
...
...
@@ -164,21 +212,37 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
}
}
}
Collections
.
shuffle
(
churnHosts
,
rnd
);
sortedByOfflinetime
=
new
PriorityQueue
<
ConstantPeerCountChurnModel
.
CInfo
>(
(
int
)
Math
.
ceil
(
hosts
.
size
()
/
10.0
),
COMP
);
clientsSortedByOfflineTime
=
new
PriorityQueue
<
ConstantPeerCountChurnModel
.
ClientSessionInfo
>(
(
int
)
Math
.
ceil
(
hosts
.
size
()
/
10.0
),
COMP_OFFLINE_TIME
);
}
/**
* returns the session length for the host
* Returns the next session length following the exponential distribution
* (using parameter a and b)
*
* @param host
* @return
* @return the session length
*/
protected
long
getSessionLength
()
{
double
random
=
rnd
.
nextDouble
();
/*
* The session length is calculated according an exponential
* distribution as defined by Vu et al. (2010) to model PPLive traces.
*
* p = a * e^{b*x}
*
* Resulting in the following formula:
*
* x = log(p/a)/b
*
* FIXME: Why do we add another random number of minutes here??
*/
long
sessionLength
=
(
long
)
(
Math
.
log
(
random
/
a
)
/
b
)
*
Time
.
MINUTE
+
(
long
)
(
random
*
Time
.
MINUTE
);
/*
* The minimum session length is limited to avoid too short sessions
*/
return
Math
.
max
(
minSessionLength
+
(
long
)
(
random
*
Time
.
MINUTE
),
sessionLength
);
}
...
...
@@ -208,7 +272,10 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
this
.
b
=
b
;
}
private
class
CInfo
{
/**
* Client session information
*/
private
class
ClientSessionInfo
{
public
final
long
joiningAt
;
...
...
@@ -216,7 +283,7 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
public
final
long
sessionLength
;
public
CInfo
(
long
joiningAt
)
{
public
C
lientSession
Info
(
long
joiningAt
)
{
this
.
sessionLength
=
getSessionLength
();
this
.
joiningAt
=
joiningAt
;
this
.
leavingAt
=
joiningAt
+
sessionLength
;
...
...
@@ -224,4 +291,4 @@ public class ConstantPeerCountChurnModel implements ChurnModel {
}
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment