hystrix.rolling_number module

class hystrix.rolling_number.RollingNumber(_time, milliseconds, bucket_numbers)

Bases: object

A number which can be used to track counters (increment) or set values over time.

It is rolling in the sense that a milliseconds is given that you want to track (such as 10 seconds) and then that is broken into buckets (defaults to 10) so that the 10 second window doesn’t empty out and restart every 10 seconds, but instead every 1 second you have a new Bucket added and one dropped so that 9 of the buckets remain and only the newest starts from scratch.

This is done so that the statistics are gathered over a rolling 10 second window with data being added/dropped in 1 second intervals (or whatever granularity is defined by the arguments) rather than each 10 second window starting at 0 again.

Performance-wise this class is optimized for writes, not reads. This is done because it expects far higher write volume (thousands/second) than reads (a few per second).

For example, on each read to getSum/getCount it will iterate buckets to sum the data so that on writes we don’t need to maintain the overall sum and pay the synchronization cost at each write to ensure the sum is up-to-date when the read can easily iterate each bucket to get the sum when it needs it.

See test module tests.test_rolling_number for usage and expected behavior examples.

buckets_size_in_milliseconds()
increment(event)

Increment the counter in the current bucket by one for the given RollingNumberEvent type.

The RollingNumberEvent must be a counter type

>>> RollingNumberEvent.isCounter()
True
Parameters:event (RollingNumberEvent) – Event defining which counter to increment.
update_rolling_max(event, value)

Update a value and retain the max value.

The RollingNumberEvent must be a max updater type

>>> RollingNumberEvent.isMaxUpdater()
True
Parameters:
  • value (int) – Max value to update.
  • event (RollingNumberEvent) – Event defining which counter to increment.
current_bucket()

Retrieve the current Bucket

Retrieve the latest Bucket if the given time is BEFORE the end of the bucket window, otherwise it returns None.

The following needs to be synchronized/locked even with a synchronized/thread-safe data structure such as LinkedBlockingDeque because the logic involves multiple steps to check existence, create an object then insert the object. The ‘check’ or ‘insertion’ themselves are thread-safe by themselves but not the aggregate algorithm, thus we put this entire block of logic inside synchronized.

I am using a multiprocessing.RLock if/then so that a single thread will get the lock and as soon as one thread gets the lock all others will go the ‘else’ block and just return the currentBucket until the newBucket is created. This should allow the throughput to be far higher and only slow down 1 thread instead of blocking all of them in each cycle of creating a new bucket based on some testing (and it makes sense that it should as well).

This means the timing won’t be exact to the millisecond as to what data ends up in a bucket, but that’s acceptable. It’s not critical to have exact precision to the millisecond, as long as it’s rolling, if we can instead reduce the impact synchronization.

More importantly though it means that the ‘if’ block within the lock needs to be careful about what it changes that can still be accessed concurrently in the ‘else’ block since we’re not completely synchronizing access.

For example, we can’t have a multi-step process to add a bucket, remove a bucket, then update the sum since the ‘else’ block of code can retrieve the sum while this is all happening. The trade-off is that we don’t maintain the rolling sum and let readers just iterate bucket to calculate the sum themselves. This is an example of favoring write-performance instead of read-performance and how the tryLock versus a synchronized block needs to be accommodated.

Returns:Returns the latest Bucket or None.
Return type:bucket
reset()

Reset all rolling counters

Force a reset of all rolling counters (clear all buckets) so that statistics start being gathered from scratch.

This does NOT reset the CumulativeSum values.

rolling_sum(event)

Rolling sum

Get the sum of all buckets in the rolling counter for the given RollingNumberEvent.

The RollingNumberEvent must be a counter type

>>> RollingNumberEvent.isCounter()
True
Parameters:event (RollingNumberEvent) – Event defining which counter to retrieve values from.
Returns:
Return value from the given RollingNumberEvent
counter type.
Return type:long
rolling_max(event)
values(event)
value_of_latest_bucket(event)
cumulative_sum(event)

Cumulative sum

The cumulative sum of all buckets ever since the start without rolling for the given :class`RollingNumberEvent` type.

See rolling_sum() for the rolling sum.

The RollingNumberEvent must be a counter type

>>> RollingNumberEvent.isCounter()
True
Parameters:event (RollingNumberEvent) – Event defining which counter to increment.
Returns:
Returns the cumulative sum of all increments and
adds for the given RollingNumberEvent counter type.
Return type:long
class hystrix.rolling_number.BucketCircular(size)

Bases: collections.deque

This is a circular array acting as a FIFO queue.

size
last()
peek_last()
add_last(bucket)
class hystrix.rolling_number.Bucket(start_time)

Bases: object

Counters for a given Bucket of time

We support both LongAdder and LongMaxUpdater in a Bucket but don’t want the memory allocation of all types for each so we only allocate the objects if the RollingNumberEvent matches the correct type - though we still have the allocation of empty arrays to the given length as we want to keep using the type value for fast random access.

get(event)
adder(event)
max_updater(event)
class hystrix.rolling_number.LongAdder(min_value=0)

Bases: object

increment()
decrement()
sum()
add(value)
class hystrix.rolling_number.LongMaxUpdater(min_value=0)

Bases: object

max()
update(value)
class hystrix.rolling_number.CumulativeSum

Bases: object

add_bucket(bucket)
get(event)
adder(event)
max_updater(event)
hystrix.rolling_number.RollingNumberEvent

alias of SUCCESS