Source code for InstaTweet.tweetclient

from __future__ import annotations

import os
import random
import InstaTweet

from tweepy import OAuth1UserHandler, API, Media, TweepyException
from typing import Union, Optional
from . import InstaPost

[docs]class TweetClient: MAX_HASHTAGS = 5 DEFAULT_KEYS = { 'Consumer Key': 'string', 'Consumer Secret': 'string', 'Access Token': 'string', 'Token Secret': 'string' }
[docs] def __init__(self, profile: InstaTweet.Profile, proxies: dict = None): """Initialize TweetClient using a :class:`~.Profile` Basically just a wrapper for tweepy. It uses the settings of a profile to initialize the API and send tweets :param profile: the profile to use when initializing a :class:`tweepy.API` object :param proxies: optional proxies to use when making API requests """ self.profile = profile self.proxies = proxies self.api = self.get_api()
[docs] def get_api(self) -> API: """Initializes a :class:`~.tweepy.API` object using the API keys of the loaded :class:`~.Profile`""" return API( auth=self.get_oauth(self.profile.twitter_keys), user_agent=self.profile.user_agent, proxy=self.proxies )
[docs] @staticmethod def get_oauth(api_keys: dict) -> OAuth1UserHandler: """Initializes and returns an :class:`~.OAuth1UserHandler` object from tweepy using the specified API keys :param api_keys: Twitter developer API keys with v1.1 endpoint access """ if missing_keys := [key for key in TweetClient.DEFAULT_KEYS if key not in api_keys]: raise KeyError( f"Missing the following Twitter Keys: {missing_keys}" ) if bad_keys := [key for key in TweetClient.DEFAULT_KEYS if not api_keys[key] or api_keys[key] == 'string']: raise ValueError( f"Invalid values for the following Twitter keys: {bad_keys}" ) return OAuth1UserHandler( consumer_key=api_keys['Consumer Key'], consumer_secret=api_keys['Consumer Secret'], access_token=api_keys['Access Token'], access_token_secret=api_keys['Token Secret'] )
[docs] def send_tweet(self, post: InstaPost, hashtags: Optional[list[str]] = None) -> bool: """Composes and sends a Tweet using an already-downloaded Instagram post .. admonition:: How Tweets are Sent :class: instatweet The :attr:`.InstaPost.filepath` -- set by :meth:`~.download_post` -- is used as the media source The body of the tweet is then generated by :meth:`~build_tweet` :param post: the post to tweet :param hashtags: a list of hashtags to randomly chose from and include in the tweet """ if not post.is_downloaded: raise FileNotFoundError('Post must be downloaded first') media_ids = self.upload_media(post) if not isinstance(media_ids, list): return False try: tweet = self.api.update_status( status=self.build_tweet(post, hashtags), media_ids=media_ids, ) except TweepyException as e: print('Failed to send tweet for {}:\nResponse: {}'.format(post, e)) return False print(f'Sent tweet for {post}') return post.add_tweet_data(tweet)
[docs] def upload_media(self, post: InstaPost) -> Union[list, bool]: """Uploads the media from an already-downloaded Instagram post to Twitter .. note:: If the post is a carousel, only the first 4 photos/videos will be uploaded :param post: the Instagram post to use as the media source :return: the list of uploaded media ids (if API upload was successful) or ``False`` """ if not post.is_downloaded: raise FileNotFoundError('Post must be downloaded first') content = post.children[:4] if post.is_carousel else [post] media_ids = [] for media in content: media_upload = self.api.media_upload( filename=media.filepath, media_category='TWEET_VIDEO' if media.is_video else 'TWEET_IMAGE', wait_for_async_finalize=True, chunked=True) if hasattr(media_upload, 'processing_info'): if media_upload.processing_info['state'] != 'succeeded': print(f'Failed to upload media to Twitter for {media}') return False media_ids.append(str(media_upload.media_id)) print(f'Successfully uploaded media to Twitter for {post}') return media_ids
[docs] def build_tweet(self, post: InstaPost, hashtags: Optional[list[str]] = None) -> str: """Uses an :class:`~.InstaPost` to build the body text of a tweet .. admonition:: How Tweets are Composed :class: instatweet - The :attr:`~.InstaPost.caption` is used as a starting point - If you :meth:`~.add_hashtags` for the page, it will randomly :meth:`~pick_hashtags` to include - Lastly, the post's :attr:`~.InstaPost.permalink` is added to the end **Example**:: >> post = instatweet.insta.get_user("dailykittenig").posts[0] >> tweet = instatweet.twitter.build_tweet(post) >> print(tweet) carousel support yuh #kitten #kittycat #catlover #petstagram #animals :param post: the post being tweeted :param hashtags: optional list of hashtags to randomly pick from and include :return: the text to use for the tweet """ tags = self.pick_hashtags(hashtags) caption = post.caption.strip().replace('@', '@/') # Avoid tagging randos on Twitter characters = 280 - len(tags) - len(post.permalink) - 2 tweet = "{text}\n{hashtags}\n{link}".format( text=caption[:characters], hashtags=tags, link=post.permalink ) return tweet
[docs] @staticmethod def pick_hashtags(hashtags: list[str]) -> str: """Randomly picks hashtags from the provided list and returns them as a single string The number of hashtags chosen will either be 1 less than the length of the list (to avoid using the same tags in every tweet), or the value of :attr:`~.MAX_HASHTAGS`, whichever is smaller :param hashtags: a list of hashtags to randomly choose from :Example: :: from InstaTweet import TweetClient >> TweetClient.pick_hashtags(['cat','dog','woof']) "#woof #cat\\n" .. note:: A newline is added to help with formatting & character counting in :meth:`~.build_tweet` """ if not hashtags: return '' if not isinstance(hashtags, list): raise TypeError('Provide a list of hashtags') num_hashtags = min(len(hashtags) - 1, TweetClient.MAX_HASHTAGS) # Pick at most MAX_HASHTAGS random_hashtags = random.sample(hashtags, max(1, num_hashtags)) # Pick at least 1 return ' '.join(f'#{hashtag}' for hashtag in random_hashtags) + '\n'