Przesyłanie strumieniowe

Stream pozwala na filtering oraz sampling dla Tweetów w czasie rzeczywistym używając API Twittera.

Strumienie wykorzystują protokół Streaming HTTP do dostarczania danych poprzez otwarte, strumieniowe połączenie API. Zamiast dostarczać dane w partiach przez powtarzające się żądania aplikacji klienta, jak można oczekiwać od REST API, pojedyncze połączenie jest otwarte między aplikacją a API, a nowe wyniki są wysyłane przez to połączenie, gdy tylko wystąpią nowe dopasowania. Rezultatem tego jest mechanizm dostarczania z niskim opóźnieniem, który może obsługiwać bardzo dużą przepustowość. Aby uzyskać więcej informacji, zobacz https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data

Używanie Stream

Aby użyć Stream, jego instancja musi być zainicjowana z danymi uwierzytelniającymi API Twittera (Consumer Key, Consumer Secret, Access Token, Access Token Secret):

import tweepy

stream = tweepy.Stream(
  "Consumer Key here", "Consumer Secret here",
  "Access Token here", "Access Token Secret here"
)

Następnie, Stream.filter() lub Stream.sample() mogą być użyte do połączenia się i uruchomienia strumienia:

stream.filter(track=["Tweepy"])

Dane otrzymane ze strumienia są przekazywane do Stream.on_data(). Metoda ta zajmuje się wysyłaniem danych do innych metod w oparciu o typ wiadomości. Na przykład, jeśli ze strumienia odbierany jest Tweet, surowe dane są wysyłane do Stream.on_data(), która konstruuje obiekt Status i przekazuje go do Stream.on_status(). Domyślnie, pozostałe metody, poza Stream.on_data(), które odbierają dane ze strumienia, po prostu logują otrzymane dane, z poziomem ogging level zależnym od typu danych.

Aby dostosować przetwarzanie danych strumienia, Stream musi zostać podklasowane. Na przykład, aby zapisać ID każdego otrzymanego Tweeta:

class IDPrinter(tweepy.Stream):

    def on_status(self, status):
        print(status.id)


printer = IDPrinter(
  "Consumer Key here", "Consumer Secret here",
  "Access Token here", "Access Token Secret here"
)
printer.sample()

Wątkowanie

Zarówno Stream.filter() jak i Stream.sample() posiadają parametr threaded. Kiedy ustawiony jest na True, strumień będzie działał w oddzielnym thread, który jest zwracany przez wywołanie którejkolwiek z metod. Na przykład::odpowiednia ilość czasu. Domyślnie, wszystkie trzy z tych metod rejestrują błąd. Aby dostosować obsługę, mogą one zostać nadpisane w podklasie:

thread = stream.filter(follow=[1072250532645998596], threaded=True)

Obsługa błędów

Stream posiada wiele metod do obsługi błędów podczas strumieniowania. Stream.on_closed() jest wywoływany, gdy strumień jest zamykany przez Twittera. Stream.on_connection_error() jest wywoływany, gdy strumień napotka błąd połączenia. Stream.on_request_error() jest wywoływany, gdy napotkany zostanie błąd podczas próby połączenia się ze strumieniem. Kiedy te błędy są napotkane i max_retries, który domyślnie jest nieskończony, nie został jeszcze przekroczony, instancja Stream będzie próbowała ponownie połączyć się ze strumieniem po odpowiednim czasie. Domyślnie, wszystkie trzy z tych metod rejestrują błąd. Aby dostosować obsługę, mogą one zostać nadpisane w podklasie:

class ConnectionTester(tweepy.Stream):

    def on_connection_error(self):
        self.disconnect()

Stream.on_request_error() jest również przekazywany kod statusu HTTP, który został napotkany. Odniesienie do kodów statusu HTTP dla API Twittera można znaleźć na stronie https://developer.twitter.com/en/support/twitter-api/error-troubleshooting.

Stream.on_exception() jest wywoływany, gdy wystąpi nieobsługiwany wyjątek. Jest to fatalne dla strumienia, a domyślnie wyjątek jest rejestrowany.