Improving the “Flash LED on Twitter Search” program

Twitter Fight in LED form

Pi

Back in January, I built a Raspberry Pi powered device which tracked Twitter for a search term, flashing an LED each time there was a match, and logging the lot to a file.

The other day, as you can see from the comments on that post, I was contacted by someone called Robert who needed help modifying the code to change how it worked.

Instead of one search term, he wanted it to check for four. Each term had its own LED to flash, and each match for each term would be counted for a minute, after which the LED that corresponded to the most popular term would stay lit for a few seconds. After some to-ing and fro-ing, I finally came up with this, not entirely elegant but working, solution.

What you’ll need:

  • A Raspberry Pi
  • Four LEDs and some way of connecting them to the Pi
  • Python (along with the tweepy and GPIO libraries)
  • Your Twitter API tokens (see here, and choose Add New App)

Now, assuming you’ve wired the LEDs up to pins 22, 23, 24 and 25 (oh, and I’m using the gpio.BOARD method here – we found it differs from gpio.BCM – either works, but they’re not the same numbers so be aware), use this code:

#!/usr/bin/env python
# @deKay01 2016/05/03

import time
import sys
import RPi.GPIO as gpio
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener

# Put access key stuff here
# These are needed from dev.twitter.com
access_token = "****"
access_token_secret = "****"
consumer_key = "****"
consumer_secret = "****"

# Start a minute timer - change 60 to number of seconds

timeout = time.time() + 60

# Set up search terms

searchterma = sys.argv[1]
searchtermb = sys.argv[2]
searchtermc = sys.argv[3]
searchtermd = sys.argv[4]

searchterm = searchterma + "," + searchtermb + "," + searchtermc + "," + searchtermd

# Initialise counts

counta = 0
countb = 0
countc = 0
countd = 0

# Open file

f = open(searchterm+".txt",'w')

# Set LED pins
 
leda = 22
ledb = 23
ledc = 24
ledd = 25

gpio.setmode(gpio.BOARD)
gpio.setup(leda, gpio.OUT)
gpio.output(leda, gpio.LOW)

gpio.setup(ledb, gpio.OUT)
gpio.output(ledb, gpio.LOW)

gpio.setup(ledc, gpio.OUT)
gpio.output(ledc, gpio.LOW)

gpio.setup(ledd, gpio.OUT)
gpio.output(ledd, gpio.LOW)

class myListener(StreamListener):
  def on_status(self, status):
    global counta
    global countb
    global countc
    global countd
    global timeout
    
    try:
       f.write("@{}: {}, ID={}\n".format(  str(status.user.screen_name.encode('utf-8')) , str(status.text.encode('utf-8')) , status.user.id_str  ))
       f.write("\n\n")

       # Check for search term matches and flash LEDs
       if searchterma in str(status.text.encode('utf-8')):
          counta = counta + 1
          gpio.output(leda, gpio.HIGH)
          time.sleep(0.2)
          gpio.output(leda, gpio.LOW)
       if searchtermb in str(status.text.encode('utf-8')):
          countb = countb + 1
          gpio.output(ledb, gpio.HIGH)
          time.sleep(0.2)
          gpio.output(ledb, gpio.LOW)
       if searchtermc in str(status.text.encode('utf-8')):
          countc = countc + 1
          gpio.output(ledc, gpio.HIGH)
          time.sleep(0.2)
          gpio.output(ledc, gpio.LOW)
       if searchtermd in str(status.text.encode('utf-8')):
          countd = countd + 1
          gpio.output(ledd, gpio.HIGH)
          time.sleep(0.2)
          gpio.output(ledd, gpio.LOW)

       # Output current counts on screen
       print searchterma + " = " + str(counta) + "\n"
       print searchtermb + " = " + str(countb) + "\n"
       print searchtermc + " = " + str(countc) + "\n"
       print searchtermd + " = " + str(countd) + "\n"

       # If timeout
       if time.time() > timeout:
         # Turn off all the LEDs
         gpio.output(leda, gpio.LOW)
         gpio.output(ledb, gpio.LOW)
         gpio.output(ledc, gpio.LOW)
         gpio.output(ledd, gpio.LOW)
         time.sleep(0.2)

         # Turn LED on for most frequent count
         if counta > countb and counta > countc and counta > countd:
           gpio.output(leda, gpio.HIGH)

         if countb > counta and countb > countc and countb > countd:
           gpio.output(ledb, gpio.HIGH)

         if countc > counta and countc > countb and countc > countd:
           gpio.output(ledc, gpio.HIGH)

         if countd > counta and countd > countb and countd > countc:
           gpio.output(ledd, gpio.HIGH)

         # Wait five seconds
         time.sleep(5)

         # Reset timeout
         timeout = time.time() + 60

         # Reset counts
         counta = 0
         countb = 0
         countc = 0
         countd = 0
       
    except Exception as e:
      print(e)
                                            
  def on_error(self, status_code):
    print(status_code)
                                                        
while True:
  try:
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    twitterStream = Stream(auth, myListener())
    twitterStream.filter(track=[searchterm],languages=['en'])

  except Exception as e:
    print(e)

(You can download a zip of this here)

As you can see, much of the code is the same as before. There’s an extra timeout check (for the resetting every minute), and it now prints current term match counts on the screen as they’re hit.

If you want three, five, or a different number of terms, it should be straightforward to modify. I haven’t coded it to count the number of terms and adjust accordingly!

To run it, assuming you’ve made the script executable (chmod +x multiledtweet.py should do it), use this command:

./multiledtweet.py searchterma searchtermb searchtermc searchtermd

Where searchterma, searchtermb etc. are your terms.

./multiledtweet.py murder death kill mushroom

for example. Mushroom was not very popular in comparison, I found.

mushroom-mdk

One thought on “Improving the “Flash LED on Twitter Search” program”

Leave a Reply

%d bloggers like this: