Skip to content

Instantly share code, notes, and snippets.

@pallih
Created May 16, 2017 13:57
Show Gist options
  • Save pallih/96d1726f83a063710b32e45a367af6f0 to your computer and use it in GitHub Desktop.
Save pallih/96d1726f83a063710b32e45a367af6f0 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import random
import tweepy
from tweepy.models import Status
import requests
import lxml.html
import json
import dataset
DB_CONNECT = ""
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
wrong_messages = ["Því miður þá er {} ekki gulur bíll samkvæmt mínum kokkabókum, heldur {}. Ekkert stig.",
"Hmmm... {} er {} bíll en ekki gulur. Sorrí. Ekkert stig.",
"Nei, nei. {} er {} bíll. Við erum að leita að gulum bílum. Ekkert stig.",
"{} er {} bíll. Ekki gulur. Ekkert stig.",
"Mér finnst {} ekki vera gulur bíll. Meira svona {} bíll. Ekkert stig",
"Fyrsta lögmál gula bílsins er að {} er ekki gulur bíll. Ekkert stig",
"Gulur bíll? Nei, nei. {} er ekkert gulur. Hann er {}."]
nothing_found_messages = ["Ég er ekki viss um að {} sé bílnúmer. Í það minnsta finn ég ekkert slíkt. Sorrí, ekkert stig.",
"{} er eitthvað skrítið bílnúmer. Ég fann ekkert. Leitaði víða. Sorrí, ekkert stig.",
"Neibbs. {} er ekki bílnúmer á mínu heimili. Ekkert stig.",
"Ertu viss um að {} sé bílnúmer? Ég fann engan bíl með því númeri..."]
yellow_messages = ["Vúhú! {} er gulur bíll! Eitt stig til þín! Þú ert núna með {} stig.",
"Blístur og fagnaðarlæti! {} er gulur bíll. Eitt stig til þín! Þú ert núna með {} stig.",
"Já! {} er heiðgul bifreið. Eitt stig til þín! Þú ert núna með {} stig.",
"{} er gulur sem sólin! Eitt stig til þín! Þú ert núna með {} stig.",
"Loksins! {} er gulur bíll. Eitt stig til þín! Þú ert núna með {} stig.",
"Jáá! Þú ert á hraðri leið á toppinn! {} er gulur sem þorskur! Þú ert núna með {} stig.",
"GUUUUUUUUUULUUUUUUUUUR BÍÍÍÍÍÍÍÍÍLLLLL! {} gæti ekki verið gulari. Þú ert núna með {} stig."]
successful_tweets = ["Jibbí! @{} fann gulan bíl og fékk stig!",
"GULUR BÍLL! @{} fann gulan bíl!",
"Enn einn guli bíllinn! @{} fær stig!",
"Hvað eru eiginlega margir gulir bílar? @{} fær stig!",
"Gleði og hamingja! @{} fann gulan bíl!",
"Þessi bíll er heiðgulur! @{} fær stig!",
"Allt að gerast! @{} fann gulan bíl!",
"Þvílíkur dagur! @{} fann gulan bíl!",
"Ég gleðst í hjarta mér því @{} fann gulan bíl!",
"GUUUUUUUUULUUUUUUUR BÍÍÍÍÍÍÍÍÍLLLLL! @{} fann gulan bíl!",
"Þessi dagur verður bara betri og betri! @{} fann gulan bíl!"]
class StreamListener(tweepy.StreamListener):
def send_dm(self, screen_name, text):
api.send_direct_message(screen_name=screen_name, text=text)
def send_tweet(self, text):
api.update_status(status=text)
def lookup_number(self, text, status):
reply_to = status.direct_message["sender_screen_name"]
if reply_to == u'gulurbill':
# do not loop endlessly
return
table = db['users']
user = table.find_one(twitter_handle=reply_to)
if not user:
scored_yellow = json.dumps(list())
table.upsert(dict(twitter_handle=reply_to, score=0, scored_yellow=scored_yellow), ['twitter_handle'])
lookup_url = "http://www.samgongustofa.is/umferd/okutaeki/okutaekjaskra/uppfletting?vq={}"
r = requests.get(lookup_url.format(text.strip()))
if u"Ekkert fannst" in r.text:
dm_text = random.choice(nothing_found_messages)
dm_text = dm_text.format(text).decode('utf-8')
self.send_dm(screen_name=reply_to, text=dm_text)
return
root = lxml.html.fromstring(r.text)
tegund = root.xpath("//div[@class='vehicleinfo']/div[@class='boxbody']/ul/li[contains(., 'Tegund')]/span")[0].text.strip()
stada = root.xpath("//div[@class='vehicleinfo']/div[@class='boxbody']/ul/li[contains(., 'Sta')]/span")[0].text.strip()
if stada == u"Afskráð":
dm_text = "{} er afskráð bifreið. Ekkert stig".format(text)
self.send_dm(screen_name=reply_to, text=dm_text)
return
if stada == u"Úr umferð":
dm_text = "{} er bifreið sem hefur verið tekin úr umferð. Ekkert stig".format(text)
self.send_dm(screen_name=reply_to, text=dm_text)
return
color = tegund.split('(')[1].replace(')', '').strip().lower()
if len(color) > 1:
if color == u"gulur":
user["score"] = user["score"] + 1
user["scored_yellow"] = json.loads(user["scored_yellow"])
if (user["scored_yellow"]) and (text in user["scored_yellow"]):
# check if already scored
dm_text = "{} er vissulega gulur bíll... en þú hefur nú þegar fengið stig fyrir hann. Ekkert stig til þín!".format(text)
self.send_dm(screen_name=reply_to, text=dm_text)
return
else:
# update and send dm
user["scored_yellow"] = json.dumps(user["scored_yellow"] + [text])
dm_text = random.choice(yellow_messages)
dm_text = dm_text.format(text, str(user["score"]))
table.upsert(user, ['twitter_handle'])
db.commit()
tweet_text = random.choice(successful_tweets)
tweet_text = tweet_text.format(reply_to)
to_tweet = random.choice(range(0, 5))
if to_tweet == 4:
try:
self.send_tweet(tweet_text)
except:
pass
self.send_dm(screen_name=reply_to, text=dm_text)
return
else:
dm_text = random.choice(wrong_messages)
dm_text = dm_text.format(text, color.encode('utf-8'))
self.send_dm(screen_name=reply_to, text=dm_text)
return
def on_data(self, raw_data):
data = json.loads(raw_data)
if 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif 'event' in data:
if data["event"] == 'follow':
if self.on_follow(data) is False:
return False
def get_top_5(self):
dm_text = "Topp 5:\n"
result = db.query('SELECT * FROM users ORDER BY score DESC LIMIT 5')
for row in result:
dm_text += "@" + row['twitter_handle'] + " (" + str(row['score']) + ')\n'
return dm_text
def get_help(self):
dm_text = "Gulur bíll - hjálp\n"
dm_text += "Sendu bílnúmer til mín og ef bíllinn er gulur þá færðu stig. "
dm_text += "Dagurinn þinn verður betri. Það mun stytta upp. Vinda mun lægja. Allt verður gott.\n"
dm_text += "Þú getur líka sent til mín TOPP3 til að fá stöðu efstu þriggja eða "
dm_text += "MÍNSTAÐA til að fá senda þína stöðu.\n"
return dm_text
def get_position(self, twitter_handle):
table = db['users']
user = table.find_one(twitter_handle=twitter_handle)
all_users = db.query('SELECT * FROM users ORDER BY score DESC')
position = 1
total = 1
for i, row in enumerate(all_users):
if twitter_handle == row["twitter_handle"]:
position = position + i
total = total + i
dm_text = "Þú ert með {} stig og ert í sæti nr. {} af {}"
dm_text = dm_text.format(str(user["score"]), str(position), str(total))
return dm_text
def on_direct_message(self, status):
text = status.direct_message["text"].strip("-").replace(" ", "").upper()
reply_to = status.direct_message["sender_screen_name"]
if reply_to == u'gulurbill':
# do not loop endlessly
return
if text == u"TOPP3" or text == u"TOPP5" or text == "TOPP":
dm_text = self.get_top_5()
self.send_dm(screen_name=reply_to, text=dm_text)
return
elif text == u"HJÁLP":
dm_text = self.get_help()
self.send_dm(screen_name=reply_to, text=dm_text)
return
elif text == u"MÍNSTAÐA":
dm_text = self.get_position(reply_to)
self.send_dm(screen_name=reply_to, text=dm_text)
return
else:
self.lookup_number(text.encode('utf-8'), status)
def on_error(self, status_code):
if status_code == 420:
return False
def on_follow(self, data):
user_who_followed = data["source"]["screen_name"]
if user_who_followed == u'gulurbill':
return
else:
table = db['users']
user = table.find_one(twitter_handle=user_who_followed)
if not user:
scored_yellow = json.dumps(list())
table.upsert(dict(twitter_handle=user_who_followed, score=0, scored_yellow=scored_yellow), ['twitter_handle'])
api.create_friendship(id=user_who_followed)
return
if __name__ == '__main__':
db = dataset.connect(DB_CONNECT)
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
myStreamListener = StreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
myStream.userstream(async=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment