Skip to content
Snippets Groups Projects
Commit 6ec51fca authored by Marcin Wątroba's avatar Marcin Wątroba
Browse files

Initial commit

parent 27408055
No related branches found
No related tags found
No related merge requests found
Showing
with 2042 additions and 1132 deletions
...@@ -36,7 +36,7 @@ MANIFEST ...@@ -36,7 +36,7 @@ MANIFEST
pip-log.txt pip-log.txt
pip-delete-this-directory.txt pip-delete-this-directory.txt
# Unit test / coverage reports # Unit test.py / coverage reports
htmlcov/ htmlcov/
.tox/ .tox/
.nox/ .nox/
......
#!/bin/bash
docker build -f scrapper_service/Dockerfile -t twitter-scrapper-worker .
\ No newline at end of file
import datetime
from typing import List
from arrow import Arrow
from common.scrap_range import ScrapRange
def get_ranges(since: Arrow, until: Arrow, step: datetime.timedelta) -> List[ScrapRange]:
progress = since
ranges = []
while progress < until:
step_end_time = progress + step if progress + step < until else until
ranges.append(ScrapRange(progress, step_end_time))
progress = step_end_time
return ranges
import logging
loggers = dict()
def get_logger(mod_name):
if mod_name in loggers:
return loggers[mod_name]
else:
logger = logging.getLogger(mod_name)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s [%(name)-30s] %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
for it in logger.handlers:
logger.removeHandler(it)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.propagate = False
loggers[mod_name] = mod_name
return logger
from pathlib import Path
def merge(root_path: Path, output_path: Path):
with open(output_path, 'wb') as writer:
for it in root_path.iterdir():
if it.name.endswith('.jsonl'):
with open(it, 'rb') as reader:
writer.write(reader.read())
def main():
Path('../scrap_annotations/by_id_merged').mkdir(parents=True, exist_ok=True)
for it in Path('../scrap_annotations/by_id').iterdir():
print(it)
merge(it, Path(f'by_id_merged/{it.name}.jsonl'))
if __name__ == '__main__':
main()
from dataclasses import dataclass
from arrow import Arrow
@dataclass
class ScrapRange:
since: Arrow
until: Arrow
import time
import requests
def wait_for_service(host: str):
tried = False
success = False
while not success:
if tried:
time.sleep(3)
response = requests.get(f'{host}/health')
print(response.status_code, host)
success = response.status_code < 400
tried = True
from threading import Thread
from typing import List
def process_threads(threads: List[Thread]):
for it in threads:
it.start()
for it in threads:
it.join()
import codecs
import datetime
import json
import pickle
import queue
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from threading import Thread
from typing import List
import requests
from arrow import Arrow
from stweet import SearchTweetsTask
from common.arrow_ultis import get_ranges
from common.logging_util import get_logger
from common.scrap_range import ScrapRange
from common.service_util import wait_for_service
from common.thread_util import process_threads
logger = get_logger('search_scrapper')
_FROM = 'from'
_TO = 'to'
class UserDirection(Enum):
FROM = 1
TO = 2
def make_parent_path(path: Path):
path.parent.mkdir(exist_ok=True, parents=True)
@dataclass
class SearchTweetsByUserTask:
scrap_range: ScrapRange
user: str
user_direction: UserDirection
def get_search_task(self) -> SearchTweetsTask:
if self.user_direction == UserDirection.TO:
return SearchTweetsTask(
all_words=self.user,
since=self.scrap_range.since,
until=self.scrap_range.until
)
else:
return SearchTweetsTask(
from_username=self.user,
since=self.scrap_range.since,
until=self.scrap_range.until
)
@dataclass
class ScrapByIdAndUserTask:
user: str
user_direction: UserDirection
tweet_id: str
source_file_path: Path
class FileUtil:
def get_tweets_file_path(self, task: SearchTweetsByUserTask, create_parent: bool = True) -> Path:
return self._create_search_path('tweets', task, create_parent)
def get_users_file_path(self, task: SearchTweetsByUserTask, create_parent: bool = True) -> Path:
return self._create_search_path('users', task, create_parent)
def _create_search_path(self, middle_name: str, task: SearchTweetsByUserTask, create_parent: bool) -> Path:
delta = task.scrap_range.until - task.scrap_range.since
seconds = delta.total_seconds()
since = task.scrap_range.until.format('YYYYMMDD')
name_with_direction = f'{task.user}___{task.user_direction.name.lower()}'
path = Path(
f"search/{name_with_direction}/{middle_name}/{name_with_direction}__{since}__{seconds}.jsonl")
if create_parent:
make_parent_path(path)
return path
def get_tweets_root_path(self, username: str, user_direction: UserDirection) -> Path:
name_with_direction = f'{username}___{user_direction.name.lower()}'
return Path(f"search/{name_with_direction}/tweets")
def get_id_tweet_file_path(self, task: ScrapByIdAndUserTask, create_parent: bool = True) -> Path:
name_with_direction = f'{task.user}___{task.user_direction.name.lower()}'
path = Path(f"by_id/{name_with_direction}/{task.tweet_id}.jsonl")
if create_parent:
make_parent_path(path)
return path
class UserTweetsSearchRangeScrapper:
_file_util: FileUtil
def __init__(self):
self._file_util = FileUtil()
def _scrap_search_thread(self, range_queue: queue.Queue[SearchTweetsByUserTask], host: str):
while not range_queue.empty():
wait_for_service(host)
task = range_queue.get()
search_task = task.get_search_task()
log_tag = f'{task.user} {task.user_direction} {search_task.since} {search_task.until}'
logger.info(f'{log_tag} start with host {host}')
task_base64_bytes = codecs.encode(pickle.dumps(search_task), "base64")
response = requests.post(f'{host}/process_search_by_query', data=task_base64_bytes)
if response.status_code < 400:
lines_count = response.json()['tweets'].count("\"object_type\": \"TweetRaw\"")
logger.info(f"{log_tag} end with tweets {lines_count}")
with open(self._file_util.get_tweets_file_path(task), 'w') as f:
f.write(response.json()['tweets'])
with open(self._file_util.get_users_file_path(task), 'w') as f:
f.write(response.json()['users'])
else:
raise Exception(f'Response: {response.status_code}')
def search_tweets(
self,
since: Arrow,
until: Arrow,
step: datetime.timedelta,
username: str,
scrapper_hosts: List[str],
):
ranges = get_ranges(since=since, until=until, step=step)
ranges_to_scrap = [it for it in ranges]
tasks = []
for it in ranges_to_scrap:
tasks.append(SearchTweetsByUserTask(it, username, UserDirection.TO))
tasks.append(SearchTweetsByUserTask(it, username, UserDirection.FROM))
tasks_filtered = [it for it in tasks if
(not self._file_util.get_tweets_file_path(it, create_parent=False).exists()) and (
not self._file_util.get_users_file_path(it, create_parent=False).exists())]
tasks_queue = queue.Queue()
for it in tasks_filtered:
tasks_queue.put(it)
threads = [Thread(target=self._scrap_search_thread, args=(tasks_queue, host)) for host in scrapper_hosts]
process_threads(threads)
def _get_all_tweet_ids_to_download(self, username: str, user_direction: UserDirection) -> List[
ScrapByIdAndUserTask]:
all_tasks = []
for path in self._file_util.get_tweets_root_path(username, user_direction).iterdir():
with open(path, 'r') as file:
all_raw_searched = file.read().splitlines(keepends=False)
for raw_it in all_raw_searched:
item = json.loads(raw_it)
if item['object_type'] == 'TweetRaw':
tweet_id = item['raw_value']['id_str']
all_tasks.append(ScrapByIdAndUserTask(username, user_direction, tweet_id, path))
return all_tasks
def _get_ids_not_downloaded(self, all_tasks: List[ScrapByIdAndUserTask]) -> List[ScrapByIdAndUserTask]:
return [it for it in all_tasks if not self._file_util.get_id_tweet_file_path(it, create_parent=False).exists()]
def _scrap_by_id_thread(self, tasks_queue: queue.Queue[ScrapByIdAndUserTask], scrap_host: str):
while not tasks_queue.empty():
task = tasks_queue.get()
tweet_id = task.tweet_id
source_path = task.source_file_path
logger.info(f'download_tweet_by_id {tweet_id} {source_path} start with host {scrap_host}')
response = requests.post(f'{scrap_host}/process_search_by_id', json={'tweet_id': tweet_id})
logger.info(f'download_tweet_by_id {tweet_id} {source_path} end with code {response.status_code}')
if response.status_code < 400:
with open(self._file_util.get_id_tweet_file_path(task), 'w') as f:
f.write(response.text)
else:
raise Exception(f'Response: {response.status_code}')
def download_tweet_by_ids_from_search(self, username: str, scrapper_hosts: List[str]):
all_tasks = self._get_all_tweet_ids_to_download(
username, UserDirection.FROM) + self._get_all_tweet_ids_to_download(username, UserDirection.TO)
tasks_to_scrap = self._get_ids_not_downloaded(all_tasks)
logger.info(f'scrapped {len(all_tasks) - len(tasks_to_scrap)} / {len(all_tasks)}')
range_queue = queue.Queue()
for it in tasks_to_scrap:
range_queue.put(it)
threads = [Thread(target=self._scrap_by_id_thread, args=(range_queue, host)) for host in scrapper_hosts]
process_threads(threads)
version: "3.8"
services:
worker1:
image: twitter-scrapper-worker
ports:
- "5001:5000"
worker2:
image: twitter-scrapper-worker
ports:
- "5002:5000"
worker3:
image: twitter-scrapper-worker
ports:
- "5003:5000"
worker4:
image: twitter-scrapper-worker
ports:
- "5004:5000"
worker5:
image: twitter-scrapper-worker
ports:
- "5005:5000"
worker6:
image: twitter-scrapper-worker
ports:
- "5006:5000"
worker7:
image: twitter-scrapper-worker
ports:
- "5007:5000"
worker8:
image: twitter-scrapper-worker
ports:
- "5008:5000"
worker9:
image: twitter-scrapper-worker
ports:
- "5009:5000"
worker10:
image: twitter-scrapper-worker
ports:
- "5010:5000"
worker11:
image: twitter-scrapper-worker
ports:
- "5011:5000"
worker12:
image: twitter-scrapper-worker
ports:
- "5012:5000"
worker13:
image: twitter-scrapper-worker
ports:
- "5013:5000"
worker14:
image: twitter-scrapper-worker
ports:
- "5014:5000"
worker15:
image: twitter-scrapper-worker
ports:
- "5015:5000"
worker16:
image: twitter-scrapper-worker
ports:
- "5016:5000"
# before run this script run ./build_docker.sh and start docker-compose.yml
import datetime
import arrow
from common.user_tweets_search_range_scrapper import UserTweetsSearchRangeScrapper
USERS = [
'@BankPekaoSA',
'@ING__Polska',
'@mBankpl',
'@SantanderBankPL',
'@Plus_Polska',
'@Orange_Polska',
'@NETIA_SA',
'@Play_Polska',
'@UPC_Polska',
'@korbankmedia'
]
HOSTS = [
'http://localhost:5001',
'http://localhost:5002',
'http://localhost:5003',
'http://localhost:5004',
'http://localhost:5005',
'http://localhost:5006',
'http://localhost:5007',
'http://localhost:5008',
'http://localhost:5009',
'http://localhost:5010',
'http://localhost:5011',
'http://localhost:5012',
'http://localhost:5013',
'http://localhost:5014',
'http://localhost:5015',
'http://localhost:5016'
]
if __name__ == '__main__':
for username in USERS:
UserTweetsSearchRangeScrapper().search_tweets(
since=arrow.get('2022-01-01'), until=arrow.get('2023-01-01'), step=datetime.timedelta(days=1),
username=username, scrapper_hosts=HOSTS)
UserTweetsSearchRangeScrapper().download_tweet_by_ids_from_search(username=username, scrapper_hosts=HOSTS)
Source diff could not be displayed: it is too large. Options to address this: view the blob.
...@@ -8,7 +8,9 @@ authors = ["Marcin Wątroba <marcin.watroba@pwr.edu.pl>"] ...@@ -8,7 +8,9 @@ authors = ["Marcin Wątroba <marcin.watroba@pwr.edu.pl>"]
python = "^3.10" python = "^3.10"
dvc = "^2.18.1" dvc = "^2.18.1"
tor-python-easy = "^0.1.5" tor-python-easy = "^0.1.5"
stweet = "^2.0.1" Flask = "^2.2.2"
joblib = "^1.1.0"
stweet = "^2.1.0"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
......
FROM python:3.10
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 PYTHONUNBUFFERED=1
WORKDIR /app
ENV PYTHONPATH=.
RUN apt-get update && apt-get install tor -y
RUN pip install poetry
COPY poetry.lock poetry.lock
COPY pyproject.toml pyproject.toml
RUN poetry config virtualenvs.create false && \
poetry install --no-interaction --no-ansi --no-root
COPY common common
COPY scrapper_service scrapper_service
CMD ["python", "-u", "scrapper_service/scrap_service.py"]
import codecs
import pickle
from pathlib import Path
from flask import Flask, request, send_file, jsonify
from stweet import JsonLineFileRawOutput, TweetsByIdRunner, WebClient, DefaultTwitterWebClientProvider, \
SearchTweetsTask, TweetSearchRunner
from common.logging_util import get_logger
from scrapper_service.task_deserializer import TaskDeserializer
from scrapper_service.tor_utils import prepare_tor
app = Flask(__name__)
logger = get_logger('scrap_service')
def prepare_path(path: Path):
path.unlink(True)
path.touch()
def get_web_client() -> WebClient:
return DefaultTwitterWebClientProvider.get_web_client_preconfigured_for_tor_proxy(
socks_proxy_url='socks5://localhost:9050',
control_host='localhost',
control_port=9051,
control_password='test1234',
)
@app.route('/health', methods=["GET"])
def health():
return jsonify({'status': 'ok'})
@app.route('/process_search_by_id', methods=["POST"])
def process_search_by_id():
id_task = TaskDeserializer.deserialize_tweets_by_id_task(request.json)
logger.info(f'process_search_by_id {id_task.tweet_id} start')
path = Path(f'tweet_by_id.jsonl').absolute()
prepare_path(path)
output_json = JsonLineFileRawOutput(path.absolute().as_posix())
runner = TweetsByIdRunner(
tweets_by_id_task=id_task,
raw_data_outputs=[output_json],
web_client=get_web_client()
)
runner.run()
count = runner.tweets_by_id_context.all_download_tweets_count
logger.info(f'process_search_by_id {id_task.tweet_id} finish whit count {count}')
return send_file(path)
def parse_task() -> SearchTweetsTask:
return pickle.loads(codecs.decode(request.data, "base64"))
def get_file_string_content(path: Path) -> str:
with open(path) as f:
return f.read()
@app.route('/process_search_by_query', methods=["POST"])
def process_search_by_query():
task = parse_task()
logger.info(f'process_search_by_query {task.since} {task.until} start')
tweets_path = Path(f'process_search_by_query_tweets.jsonl').absolute()
users_path = Path(f'process_search_by_query_users.jsonl').absolute()
prepare_path(tweets_path)
prepare_path(users_path)
output_jsonl_tweets = JsonLineFileRawOutput(tweets_path.as_posix())
output_jsonl_users = JsonLineFileRawOutput(users_path.as_posix())
runner = TweetSearchRunner(
search_tweets_task=task,
tweet_raw_data_outputs=[output_jsonl_tweets],
user_raw_data_outputs=[output_jsonl_users],
web_client=get_web_client()
)
runner.run()
tweets_count = runner.search_run_context.all_download_tweets_count
logger.info(f'process_search_by_query {task.since} end with size {tweets_count}')
return jsonify({'tweets': get_file_string_content(tweets_path), 'users': get_file_string_content(users_path)})
if __name__ == '__main__':
prepare_tor()
app.run(host='0.0.0.0', port=5000, debug=True)
from typing import Dict, Any
from stweet import TweetsByIdTask
class TaskDeserializer:
@staticmethod
def deserialize_tweets_by_id_task(task: Dict[str, Any]) -> TweetsByIdTask:
return TweetsByIdTask(task['tweet_id'])
import os
import time
import requests
from common import logging_util
logger = logging_util.get_logger('tor_utils')
def run_bash_command(command: str):
os.system(command, )
return
def _start_tor():
run_bash_command('tor --controlport 9051 &')
return
def _wait_until_tor_works():
logger.info('wait until tor works')
code = ''
while code != '200':
try:
logger.info('tor check request')
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
r = requests.get('http://jsonip.com/', proxies=proxies)
code = str(r.status_code)
logger.info('response_code: ' + code)
except Exception as err:
logger.error(err)
logger.info('not works yet, waiting..')
time.sleep(2)
logger.info('tor works')
return
def prepare_tor():
_start_tor()
_wait_until_tor_works()
return
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment