hystrix.rolling_number module¶
-
class
hystrix.rolling_number.RollingNumber(_time, milliseconds, bucket_numbers)¶ Bases:
objectA number which can be used to track counters (increment) or set values over time.
It is rolling in the sense that a
millisecondsis given that you want to track (such as 10 seconds) and then that is broken into buckets (defaults to 10) so that the 10 second window doesn’t empty out and restart every 10 seconds, but instead every 1 second you have a newBucketadded and one dropped so that 9 of the buckets remain and only the newest starts from scratch.This is done so that the statistics are gathered over a rolling 10 second window with data being added/dropped in 1 second intervals (or whatever granularity is defined by the arguments) rather than each 10 second window starting at 0 again.
Performance-wise this class is optimized for writes, not reads. This is done because it expects far higher write volume (thousands/second) than reads (a few per second).
For example, on each read to getSum/getCount it will iterate buckets to sum the data so that on writes we don’t need to maintain the overall sum and pay the synchronization cost at each write to ensure the sum is up-to-date when the read can easily iterate each bucket to get the sum when it needs it.
See test module
tests.test_rolling_numberfor usage and expected behavior examples.-
buckets_size_in_milliseconds()¶
-
increment(event)¶ Increment the counter in the current bucket by one for the given
RollingNumberEventtype.The
RollingNumberEventmust be a counter type>>> RollingNumberEvent.isCounter() True
Parameters: event ( RollingNumberEvent) – Event defining which counter to increment.
-
update_rolling_max(event, value)¶ Update a value and retain the max value.
The
RollingNumberEventmust be a max updater type>>> RollingNumberEvent.isMaxUpdater() True
Parameters: - value (int) – Max value to update.
- event (
RollingNumberEvent) – Event defining which counter to increment.
-
current_bucket()¶ Retrieve the current
BucketRetrieve the latest
Bucketif the given time is BEFORE the end of the bucket window, otherwise it returnsNone.The following needs to be synchronized/locked even with a synchronized/thread-safe data structure such as LinkedBlockingDeque because the logic involves multiple steps to check existence, create an object then insert the object. The ‘check’ or ‘insertion’ themselves are thread-safe by themselves but not the aggregate algorithm, thus we put this entire block of logic inside synchronized.
I am using a
multiprocessing.RLockif/then so that a single thread will get the lock and as soon as one thread gets the lock all others will go the ‘else’ block and just return the currentBucket until the newBucket is created. This should allow the throughput to be far higher and only slow down 1 thread instead of blocking all of them in each cycle of creating a new bucket based on some testing (and it makes sense that it should as well).This means the timing won’t be exact to the millisecond as to what data ends up in a bucket, but that’s acceptable. It’s not critical to have exact precision to the millisecond, as long as it’s rolling, if we can instead reduce the impact synchronization.
More importantly though it means that the ‘if’ block within the lock needs to be careful about what it changes that can still be accessed concurrently in the ‘else’ block since we’re not completely synchronizing access.
For example, we can’t have a multi-step process to add a bucket, remove a bucket, then update the sum since the ‘else’ block of code can retrieve the sum while this is all happening. The trade-off is that we don’t maintain the rolling sum and let readers just iterate bucket to calculate the sum themselves. This is an example of favoring write-performance instead of read-performance and how the tryLock versus a synchronized block needs to be accommodated.
Returns: Returns the latest BucketorNone.Return type: bucket
-
reset()¶ Reset all rolling counters
Force a reset of all rolling counters (clear all buckets) so that statistics start being gathered from scratch.
This does NOT reset the
CumulativeSumvalues.
-
rolling_sum(event)¶ Rolling sum
Get the sum of all buckets in the rolling counter for the given
RollingNumberEvent.The
RollingNumberEventmust be a counter type>>> RollingNumberEvent.isCounter() True
Parameters: event ( RollingNumberEvent) – Event defining which counter to retrieve values from.Returns: - Return value from the given
RollingNumberEvent - counter type.
Return type: long - Return value from the given
-
rolling_max(event)¶
-
values(event)¶
-
value_of_latest_bucket(event)¶
-
cumulative_sum(event)¶ Cumulative sum
The cumulative sum of all buckets ever since the start without rolling for the given :class`RollingNumberEvent` type.
See
rolling_sum()for the rolling sum.The
RollingNumberEventmust be a counter type>>> RollingNumberEvent.isCounter() True
Parameters: event ( RollingNumberEvent) – Event defining which counter to increment.Returns: - Returns the cumulative sum of all increments and
- adds for the given
RollingNumberEventcounter type.
Return type: long
-
-
class
hystrix.rolling_number.BucketCircular(size)¶ Bases:
collections.dequeThis is a circular array acting as a FIFO queue.
-
size¶
-
last()¶
-
peek_last()¶
-
add_last(bucket)¶
-
-
class
hystrix.rolling_number.Bucket(start_time)¶ Bases:
objectCounters for a given
Bucketof timeWe support both
LongAdderandLongMaxUpdaterin aBucketbut don’t want the memory allocation of all types for each so we only allocate the objects if theRollingNumberEventmatches the correct type - though we still have the allocation of empty arrays to the given length as we want to keep using the type value for fast random access.-
get(event)¶
-
adder(event)¶
-
max_updater(event)¶
-
-
class
hystrix.rolling_number.LongAdder(min_value=0)¶ Bases:
object-
increment()¶
-
decrement()¶
-
sum()¶
-
add(value)¶
-
-
class
hystrix.rolling_number.CumulativeSum¶ Bases:
object-
add_bucket(bucket)¶
-
get(event)¶
-
adder(event)¶
-
max_updater(event)¶
-
-
hystrix.rolling_number.RollingNumberEvent¶ alias of
SUCCESS