Skip to content

Instantly share code, notes, and snippets.

@ESeufert
Created January 14, 2019 18:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ESeufert/47dde46ed491721ba49b0aca5683a554 to your computer and use it in GitHub Desktop.
Save ESeufert/47dde46ed491721ba49b0aca5683a554 to your computer and use it in GitHub Desktop.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import Ridge
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
from scipy.optimize import curve_fit
import statsmodels.stats.api as sms
import math
import operator
import scipy.stats as st
#random
plt.figure(figsize=(16,8))
def retention_profile_func(x, a, b, c, d):
return a * np.exp(-b * x) + c * np.exp(-d * x )
def ltv_profile_func( x, a, b, c ):
return a + b * np.log( x )
def get_retention_params( days, retention ):
days = np.array( days )
retention = np.array( retention )
popt, pcov = curve_fit( retention_profile_func, days, retention )
return popt
def build_retention_profile( days, retention ):
retention_profile = []
retention_popt = get_retention_params( days, retention )
for num in range( 0, max( days ) + 1 ):
retention_profile.insert( num, retention_profile_func( num, *retention_popt ) )
return retention_profile
def plot_retention_profile( retention_profile, days, retention ):
plt.clf()
timeline = np.linspace(0, max( days ), max( days ) + 1 )
retention_popt = get_retention_params( days, retention )
plt.scatter( days, retention, color='green', s=30, marker='o', label="training points" )
plt.plot(timeline, retention_profile, 'b--',
label='fit: a=%5.3f, b=%5.3f, c=%5.3f, d=%5.3f' % tuple( retention_popt ) )
plt.legend()
def get_ltv_params( days, ltv ):
days = np.array( days )
ltv = np.array( ltv )
popt, pcov = curve_fit( ltv_profile_func, days, ltv )
return popt
def build_ltv_profile( days, ltv ):
ltv_profile = []
ltv_popt = get_ltv_params( days, ltv )
for num in range( 1, max( days ) + 1 ):
ltv_profile.insert( num, ltv_profile_func( num, *ltv_popt ) )
return ltv_profile
def plot_ltv_profile( ltv_profile, days, ltv ):
plt.clf()
timeline = np.linspace(1, max( days ), max( days ) )
ltv_popt = get_ltv_params( days, ltv )
plt.scatter(days, ltv, color='red', s=30, marker='o', label="training points")
plt.plot(timeline, ltv_profile, 'g--',
label='fit: a=%5.3f, b=%5.3f, c=%5.3f' % tuple(ltv_popt) )
plt.legend()
def build_ltv_day_error( day, ltv_profile, users, mu, sigma ):
s = np.random.normal( mu, sigma, users )
return s
def build_error_bar( error ):
range = max( error ) - min( error )
return range
def plot_hist( values, bins ):
plt.hist( values , bins=bins )
def get_pareto_params( mean ):
shape = 2
mode = arpdau_mu / 10
dist_mean = ( shape * mode ) / ( shape - 1 )
while ( abs( dist_mean - mean ) > ( mean / 100 ) ):
shape = shape * 1.1
mode = mode * 1.1
dist_mean = ( shape * mode ) / ( shape - 1 )
return shape, mode
def index_exists(ls, i):
return (0 <= i < len(ls)) or (-len(ls) <= i < 0)
def build_pareto_dist( sample, shape, mode ):
s = (np.random.pareto( shape, sample ) + 1) * mode
return s
def build_expon_dist( sample, arpdau_mu ):
s = st.expon.rvs( scale = arpdau_mu, size = sample )
return s
def iterate_period( period_length, retention_profile, daily_users, arpdau_mu ):
monetization = {}
for day in range( 1, ( period_length + 1 ) ):
#iterate through the days
monetization[ day ] = {}
for index in range( 1, ( period_length - day + 2 ) ):
#advance each cohort by a day
shape, mode = get_pareto_params( arpdau_mu )
num_users = int( daily_users * retention_profile[ day - 1 ] )
this_cohort_day_monetization = build_pareto_dist( num_users, shape, mode )
#this_cohort_day_monetization = build_expon_dist( num_users, arpdau_mu )
monetization[ day ][ index ] = this_cohort_day_monetization
return monetization
def iterate_monetization( monetization, daily_users ):
ltv = {}
for day, cohorts in monetization.iteritems():
for cohort, values in cohorts.iteritems():
if cohort not in ltv:
ltv[ cohort ] = []
ltv[ cohort ].append( sum( values / daily_users ) )
else:
ltv[ cohort ].append( sum( values / daily_users ) + ltv[ cohort ][ len( ltv[ cohort ] ) - 1 ] )
return ltv
def plot_ltvs( ltvs ):
for cohort, vals in ltvs.iteritems():
plt.plot( vals )
if len( vals ) == 1:
plt.plot( vals, marker='o', markersize=3, color="red" )
def get_CIs( ltvs ):
CIs = {}
collapsed = collapse_dayXLTVs( ltvs )
for dayXLTV, vals in collapsed.iteritems():
if dayXLTV not in CIs:
CIs[ dayXLTV ] = []
ci = st.t.interval( 0.95, len( vals )-1, loc=np.mean( vals ), scale=st.sem( vals ) )
#ci = sms.DescrStatsW( vals ).tconfint_mean()
CIs[ dayXLTV ] = ci
return CIs
def collapse_dayXLTVs( ltvs ):
collapsed = {}
#get the confidence intervals for the DX LTVs
#ltvs is a dict of cohort day X cumulative revenues
for cohort, vals in ltvs.iteritems():
for dayXLTV in range( 0, len( vals ) ):
if ( dayXLTV + 1 ) not in collapsed:
collapsed[ ( dayXLTV + 1 ) ] = []
collapsed[ ( dayXLTV + 1 ) ].append( vals[ dayXLTV ] )
return collapsed
def plot_CIs( CIs, ltvs ):
collapsed = collapse_dayXLTVs( ltvs )
for dayXLTV, vals in CIs.iteritems():
if not math.isnan( vals[ 0 ] ):
plt.errorbar( ( dayXLTV - 1 ), np.mean( collapsed[ dayXLTV ] ) , xerr = vals[ 0 ], yerr=vals[ 1 ] )
plt.text( dayXLTV - 1, ( vals[ 1 ] * 1.5 ), '%.2f, %.2f (Delta: %.2f / %.2f pct)' %(vals[ 0 ], vals[ 1 ], abs( vals[ 0 ] - vals[ 1 ] ), ( abs( vals[ 0 ] - vals[ 1 ] ) / np.mean( collapsed[ dayXLTV ] ) * 100 ) ), fontsize=12)
return 1
def get_minimum_cohort_days( daily_users, retention_profile, arpdau_mu, acceptable_spread, dayXLTV_target ):
return iterate_cohorts( dayXLTV_target + 1, daily_users, retention_profile, arpdau_mu, acceptable_spread, dayXLTV_target )
def iterate_cohorts( running_days, daily_users, retention_profile, arpdau_mu, acceptable_spread, dayXLTV_target ):
m = iterate_period( running_days, retention_profile, daily_users, arpdau_mu )
ltvs = iterate_monetization( m, daily_users )
CIs = get_CIs( ltvs )
if dayXLTV_target not in CIs or math.isnan( CIs[ dayXLTV_target][ 0 ] ) or len( ltvs[ dayXLTV_target ] ) < 2:
return iterate_cohorts( running_days + 1, daily_users, retention_profile, arpdau_mu, acceptable_spread, dayXLTV_target )
collapsed = collapse_dayXLTVs( ltvs )
mean_dayXLTV_target = np.mean( collapsed[ dayXLTV_target ] )
if abs( CIs[ dayXLTV_target][ 1 ] - CIs[ dayXLTV_target][ 0 ] ) > acceptable_spread:
return iterate_cohorts( running_days + 1, daily_users, retention_profile, arpdau_mu, acceptable_spread, dayXLTV_target )
return running_days, ( CIs[ dayXLTV_target][ 0 ], CIs[ dayXLTV_target][ 1 ] ), mean_dayXLTV_target
#model inputs
days = [ 0, 1, 7, 30, 90, 180, 365 ]
retention = [ 1, .5, .25, .125, .0625, .03125, .015625 ]
arpdau_mu = .25
daily_users = 500
retention_profile = build_retention_profile( days, retention )
#print retention_profile
#
#plot_retention_profile( retention_profile, days, retention )
'''
shape, mode = get_pareto_params( arpdau_mu )
s = build_pareto_dist( daily_users, shape, mode )
plot_hist( s, 100 )
'''
#plot_ltvs( ltvs )
#plot_CIs( CIs, ltvs )
#last5 = {k: CIs[k] for k in CIs.keys()[-5:]}
#last5 = sorted(last5.items(), key=operator.itemgetter(0))
#first5 = {k: CIs[k] for k in CIs.keys()[:5]}
#first5 = sorted(first5.items(), key=operator.itemgetter(0))
#print first5, last5
dayXLTV_target = 30
num_days, CI, mean = get_minimum_cohort_days( daily_users, retention_profile, arpdau_mu, 0.10, dayXLTV_target )
print num_days, CI, mean
#plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment