from __future__ import absolute_import, division, print_function
from collections import Counter, defaultdict
from decimal import Decimal
from operator import itemgetter
from manhattan import visitor
from .rollups import AllRollup, LocalDayRollup, LocalWeekRollup, BrowserRollup
from .cache import DeferredLRUCache
from .model import VisitorHistory, Test, Goal
from .persistence.sql import SQLPersistentStore
default_rollups = {
'all': AllRollup(),
'pst_day': LocalDayRollup('America/Los_Angeles'),
'pst_week': LocalWeekRollup('America/Los_Angeles'),
'browser': BrowserRollup(),
}
[docs]class Backend(object):
def __init__(self, sqlalchemy_url, rollups=None, complex_goals=None,
flush_every=500, cache_size=2000):
self.rollups = rollups or default_rollups
self.complex_goals = complex_goals or []
self.store = store = SQLPersistentStore(sqlalchemy_url)
self.visitors = DeferredLRUCache(get_backend=store.get_visitor_history,
put_backend=store.put_visitor_history,
max_size=cache_size)
self.tests = DeferredLRUCache(get_backend=store.get_test,
put_backend=store.put_test,
max_size=cache_size)
self.goals = DeferredLRUCache(get_backend=store.get_goal,
put_backend=store.put_goal,
max_size=cache_size)
self.pointer = self.store.get_pointer()
self.records_since_flush = 0
self.flush_every = flush_every
self.reset_counters()
[docs] def get_pointer(self):
return self.pointer
[docs] def reset_counters(self):
self.inc_conversions = Counter()
self.inc_values = defaultdict(Decimal)
self.inc_variant_conversions = Counter()
self.inc_variant_values = Counter()
self.inc_impressions = Counter()
[docs] def handle(self, rec, ptr):
try:
history = self.visitors.get(rec.vid)
except KeyError:
history = VisitorHistory()
if rec.key == 'pixel':
history.nonbot = True
for rec in history.nonbot_queue:
self.handle_nonbot(rec, history)
elif history.nonbot:
self.handle_nonbot(rec, history)
else:
history.nonbot_queue.append(rec)
# Limit nonbot queue to most recent 500 events.
del history.nonbot_queue[:-500]
self.visitors.put(rec.vid, history)
self.pointer = ptr
self.records_since_flush += 1
if self.records_since_flush > self.flush_every:
self.flush()
self.records_since_flush = 0
[docs] def handle_nonbot(self, rec, history):
assert rec.key in ('page', 'goal', 'split')
ts = int(float(rec.timestamp))
site_id = int(rec.site_id)
if rec.key == 'page':
history.ips.add(rec.ip)
history.user_agents.add(rec.user_agent)
self.record_conversion(history,
vid=rec.vid,
name=u'viewed page',
timestamp=ts,
site_id=site_id)
elif rec.key == 'goal':
self.record_conversion(history,
vid=rec.vid,
name=rec.name,
timestamp=ts,
site_id=site_id,
value=rec.value,
value_type=rec.value_type,
value_format=rec.value_format)
else: # split
self.record_impression(history,
vid=rec.vid,
name=rec.test_name,
selected=rec.selected,
timestamp=ts,
site_id=site_id)
[docs] def record_impression(self, history, vid, name, selected, timestamp,
site_id):
variant = name, selected
history.variants.add(variant)
try:
test = self.tests.get(name)
except KeyError:
test = Test()
test.first_timestamp = timestamp
test.last_timestamp = timestamp
test.variants.add(variant)
self.tests.put(name, test)
# Record this impression in appropriate time buckets both on the
# history object and in the current incremental accumulators.
for rollup_key, rollup in self.rollups.iteritems():
bucket_id = rollup.get_bucket(timestamp, history)
key = (name, selected, rollup_key, bucket_id, site_id)
if key not in history.impression_keys:
history.impression_keys.add(key)
self.inc_impressions[key] += 1
[docs] def iter_rollups(self, timestamp, history):
for rollup_key, rollup in self.rollups.iteritems():
bucket_id = rollup.get_bucket(timestamp, history)
yield rollup_key, bucket_id
[docs] def record_complex_goals(self, history, new_name, timestamp, site_id):
for complex_name, include, exclude in self.complex_goals:
# If all goals have now been satisfied in the 'include' set,
# trigger a +1 delta on this complex goal in the current
# rollups, and track that as a complex goal conversion in this
# visitor history.
if (new_name in include) and (history.goals >= include):
new_keys = []
for rollup_key, bucket_id in self.iter_rollups(timestamp,
history):
conv_key = (complex_name, rollup_key, bucket_id, site_id)
new_keys.append(conv_key)
self.inc_conversions[conv_key] += 1
history.complex_keys[complex_name] = new_keys
# If we are adding the first goal in the 'exclude' set, trigger
# a -1 delta for all conversions on that complex goal in the
# visitory history.
if history.goals & exclude == set([new_name]):
for key in history.complex_keys.pop(complex_name, []):
self.inc_conversions[key] -= 1
[docs] def record_conversion(self, history, vid, name, timestamp, site_id,
value=None, value_type='', value_format=''):
try:
goal = self.goals.get(name)
except KeyError:
goal = Goal()
goal.value_type = value_type
goal.value_format = value_format
self.goals.put(name, goal)
if value:
value = Decimal(value)
if name not in history.goals:
history.goals.add(name)
# If this is a 'new' goal for this visitor, process complex
# conversion goals.
self.record_complex_goals(history, name, timestamp, site_id)
# Record this goal conversion in appropriate time buckets both on the
# history object and in the current incremental accumulators.
for rollup_key, bucket_id in self.iter_rollups(timestamp, history):
conv_key = (name, rollup_key, bucket_id, site_id)
if conv_key not in history.conversion_keys:
history.conversion_keys.add(conv_key)
self.inc_conversions[conv_key] += 1
if value:
self.inc_values[conv_key] += value
for test_name, selected in history.variants:
vc_key = (name, test_name, selected, rollup_key, bucket_id,
site_id)
if vc_key not in history.variant_conversion_keys:
history.variant_conversion_keys.add(vc_key)
self.inc_variant_conversions[vc_key] += 1
if value:
self.inc_variant_values[vc_key] += value
[docs] def flush(self):
self.store.begin()
self.visitors.flush()
self.tests.flush()
self.goals.flush()
# Add local counter state onto existing persisted counters.
self.store.increment_conversion_counters(self.inc_conversions,
self.inc_values)
self.store.increment_impression_counters(self.inc_impressions)
self.store.increment_variant_conversion_counters(
self.inc_variant_conversions, self.inc_variant_values)
self.reset_counters()
self.store.update_pointer(self.pointer)
self.store.commit()
[docs] def count(self, goal=None, variant=None, rollup_key='all', bucket_id=0,
site_id=None):
assert goal or variant, "must specify goal or variant"
if goal and variant:
test_name, selected = variant
key = goal, test_name, selected, rollup_key, bucket_id, site_id
local = self.inc_variant_conversions[key]
flushed = self.store.count_variant_conversions(*key)[0]
elif goal:
key = goal, rollup_key, bucket_id, site_id
local = self.inc_conversions[key]
flushed = self.store.count_conversions(*key)[0]
else:
# variant
name, selected = variant
key = name, selected, rollup_key, bucket_id, site_id
local = self.inc_impressions[key]
flushed = self.store.count_impressions(*key)
return local + flushed
[docs] def goal_value(self, goal, variant=None, rollup_key='all', bucket_id=0,
site_id=None):
goal_obj = self.goals.get(goal)
if not goal_obj.value_type:
return self.count(goal, variant, rollup_key=rollup_key,
bucket_id=bucket_id, site_id=site_id)
if variant:
test_name, selected = variant
key = goal, test_name, selected, rollup_key, bucket_id, site_id
local = self.inc_variant_values[key]
flushed = self.store.count_variant_conversions(*key)[1]
else:
key = goal, rollup_key, bucket_id, site_id
local = self.inc_values[key]
flushed = self.store.count_conversions(*key)[1]
value = local + Decimal(str(flushed))
if goal_obj.value_type == visitor.SUM:
return value
elif goal_obj.value_type == visitor.AVERAGE:
count = self.count(goal, variant, rollup_key=rollup_key,
bucket_id=bucket_id, site_id=site_id)
return value / count if count > 0 else 0
else:
# visitor.PER
count = self.count(u'viewed page', variant,
rollup_key=rollup_key,
bucket_id=bucket_id, site_id=site_id)
return value / count if count > 0 else 0
[docs] def all_tests(self):
# Start with flushed.
all = self.store.all_tests()
# Update from unflushed (so that dirty entries overwrite the flushed).
all.update(self.tests.entries)
# Sort by last timestamp descending.
all = [(name, test.first_timestamp, test.last_timestamp)
for name, test in all.iteritems()]
all.sort(key=itemgetter(2), reverse=True)
return all
[docs] def results(self, test_name, goals, site_id=None):
# Return a dict: keys are populations, values are a list of values for
# the goals specified.
test = self.tests.get(test_name)
ret = {}
for variant in test.variants:
values = []
for goal in goals:
values.append(self.goal_value(goal, variant, site_id=site_id))
ret[variant[1]] = values
return ret