Socialify

Folder ..

Viewing consumer.py
94 lines (83 loc) • 2.9 KB

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from kafka import KafkaConsumer
import json
import re
# from database_configuration import insert_tweet
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from time import sleep
from multiprocessing import Process, Queue
import matplotlib.pyplot as plt
import matplotlib.animation as animation

topic_name = 'twitterdata'

def sentence_score(rs):
    review_score = SentimentIntensityAnalyzer()
    return review_score.polarity_scores(rs)['compound']

def deEmojify(data):
    emoj = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002500-\U00002BEF"  # chinese char
        u"\U00002702-\U000027B0"
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u"\U00010000-\U0010ffff"
        u"\u2640-\u2642" 
        u"\u2600-\u2B55"
        u"\u200d"
        u"\u23cf"
        u"\u23e9"
        u"\u231a"
        u"\ufe0f"  # dingbats
        u"\u3030"
                      "]+", re.UNICODE)
    return re.sub(emoj, '', data)

def consumer(q):
    consumer = KafkaConsumer(
    topic_name,
    auto_offset_reset= 'earliest',
    enable_auto_commit=True,
    auto_commit_interval_ms =  1000,
    fetch_max_bytes = 128,
    max_poll_records = 100,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')))

    for msg in consumer:
        tweet = ""
        message = json.loads(json.dumps(msg.value))
        if 'text' in message and 'RT @' not in message['text']:
            if ('extended_tweet' in message) and 'full_text' in message['extended_tweet']:
                tweet= deEmojify(message['extended_tweet']['full_text'].replace('\n', ' '))
            else:
                tweet= deEmojify(message['text'].replace('\n', ' '))
        if tweet != '':
            score = sentence_score(tweet)
            sentiment = 'neutral'
            if score > 0:
                sentiment = 'positive'
            elif score < 0:
                sentiment = 'negative'
            q.put(({'tweet_id': message['id'], 'tweet': tweet, 'sentiment': sentiment}))

x_labels = ["Positive", "Negative", "Neutral"]
y_data = [0, 0, 0]

if __name__ == '__main__':
    q = Queue()
    p = Process(target=consumer, args=(q,))
    p.start()

    def update_data():
        # Get new data from the queue
        new_data = q.get()
        # Check the sentiment of the tweet
        if new_data['sentiment'] == 'positive':
            y_data[0] += 1
        elif new_data['sentiment'] == 'negative':
            y_data[1] += 1
        else:
            y_data[2] += 1

    fig = plt.figure()
    plt.barh(x_labels, y_data, color=['green', 'red', 'blue'])

    def animate(i):
        update_data()
        plt.barh(x_labels, y_data, color=['green', 'red', 'blue'])

    ani = animation.FuncAnimation(fig, animate, interval=1, frames=100)
    plt.show()