Przesyłanie strumieniowe

Stream pozwala na filtering oraz sampling dla Tweetów w czasie rzeczywistym używając API Twittera.

Streams utilize Streaming HTTP protocol to deliver data through an open, streaming API connection. Rather than delivering data in batches through repeated requests by your client app, as might be expected from a REST API, a single connection is opened between your app and the API, with new results being sent through that connection whenever new matches occur. This results in a low-latency delivery mechanism that can support very high throughput. For further information, see https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data

Używanie Stream

Aby użyć Stream, jego instancja musi być zainicjowana z danymi uwierzytelniającymi API Twittera (Consumer Key, Consumer Secret, Access Token, Access Token Secret):

import tweepy

stream = tweepy.Stream(
  "Consumer Key here", "Consumer Secret here",
  "Access Token here", "Access Token Secret here"
)

Następnie, Stream.filter() lub Stream.sample() mogą być użyte do połączenia się i uruchomienia strumienia:

stream.filter(track=["Tweepy"])

Dane otrzymane ze strumienia są przekazywane do Stream.on_data(). Metoda ta zajmuje się wysyłaniem danych do innych metod w oparciu o typ wiadomości. Na przykład, jeśli ze strumienia odbierany jest Tweet, surowe dane są wysyłane do Stream.on_data(), która konstruuje obiekt Status i przekazuje go do Stream.on_status(). Domyślnie, pozostałe metody, poza Stream.on_data(), które odbierają dane ze strumienia, po prostu logują otrzymane dane, z poziomem ogging level zależnym od typu danych.

Aby dostosować przetwarzanie danych strumienia, Stream musi zostać podklasowane. Na przykład, aby zapisać ID każdego otrzymanego Tweeta:

class IDPrinter(tweepy.Stream):

    def on_status(self, status):
        print(status.id)


printer = IDPrinter(
  "Consumer Key here", "Consumer Secret here",
  "Access Token here", "Access Token Secret here"
)
printer.sample()

Wątkowanie

Zarówno Stream.filter() jak i Stream.sample() posiadają parametr threaded. Kiedy ustawiony jest na True, strumień będzie działał w oddzielnym thread, który jest zwracany przez wywołanie którejkolwiek z metod. Na przykład::odpowiednia ilość czasu. Domyślnie, wszystkie trzy z tych metod rejestrują błąd. Aby dostosować obsługę, mogą one zostać nadpisane w podklasie:

thread = stream.filter(follow=[1072250532645998596], threaded=True)

Obsługa błędów

Stream has multiple methods to handle errors during streaming. Stream.on_closed() is called when the stream is closed by Twitter. Stream.on_connection_error() is called when the stream encounters a connection error. Stream.on_request_error() is called when an error is encountered while trying to connect to the stream. When these errors are encountered and max_retries, which defaults to infinite, hasn’t been exceeded yet, the Stream instance will attempt to reconnect the stream after an appropriate amount of time. By default, all three of these methods log an error. To customize that handling, they can be overridden in a subclass:

class ConnectionTester(tweepy.Stream):

    def on_connection_error(self):
        self.disconnect()

Stream.on_request_error() jest również przekazywany kod statusu HTTP, który został napotkany. Odniesienie do kodów statusu HTTP dla API Twittera można znaleźć na stronie https://developer.twitter.com/en/support/twitter-api/error-troubleshooting.

Stream.on_exception() jest wywoływany, gdy wystąpi nieobsługiwany wyjątek. Jest to fatalne dla strumienia, a domyślnie wyjątek jest rejestrowany.