Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions puppet/backend.pp
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,9 @@
minute => '*/5',
require => Service['v2ccelery'],
}
cron::job { 'v2cstats':
command => '/srv/v2c/venv/bin/python3 /srv/v2c/utils/stats.py',
user => 'tools.video2commons',
minute => '0',
require => Service['v2ccelery'],
}
62 changes: 62 additions & 0 deletions utils/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env python3

"""Update worker usage and capacity statistics in Redis."""

import random
import sys
import time

sys.path.insert(0, '/srv/v2c')

from redis import Redis

from video2commons.backend.worker import app as celery_app
from video2commons.config import redis_pw, redis_host
from video2commons.shared.stats import (
acquire_write_lock,
collect_worker_stats,
get_worker_stats,
release_write_lock,
update_worker_stats
)

# Stats are considered stale if they haven't been updated in 30 minutes.
STALE_SECS = 1800

# Add some sleep between 0 seconds and 5 minutes to avoid hammering the workers.
SLEEP_RANGE = (0, 300)

queue_conn = Redis(host=redis_host, password=redis_pw, db=2)
app_conn = Redis(host=redis_host, password=redis_pw, db=3)


def main():
sleep_secs = random.randint(*SLEEP_RANGE)
print(f"Sleeping for {sleep_secs} seconds...")

# This job is run on workers to allow connecting directly to the workers,
# but we don't want to hammer them with duplicate requests. Sleep a random
# amount and check if the stats have been updated recently to avoid this.
time.sleep(sleep_secs)

# Don't update stats if they've been updated recently by another job.
existing_stats = get_worker_stats(app_conn)
if existing_stats and 'last_updated_by_job' in existing_stats:
if int(time.time()) - existing_stats['last_updated_by_job'] < STALE_SECS:
print("Stats have been updated recently, skipping update.")
return

inspector = celery_app.control.inspect(timeout=5.0)
stats = collect_worker_stats(queue_conn, inspector)

lock_acquired = acquire_write_lock(app_conn)
if not lock_acquired:
raise RuntimeError("Failed to update stats, could not acquire lock.")

try:
update_worker_stats(app_conn, stats)
finally:
release_write_lock(app_conn)

if __name__ == '__main__':
main()
15 changes: 14 additions & 1 deletion video2commons/backend/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from video2commons.config import (
redis_pw, redis_host, consumer_key, consumer_secret, http_host
)
from video2commons.shared.stats import update_task_stats

redisurl = 'redis://:' + redis_pw + '@' + redis_host + ':6379/'
app = celery.Celery(
Expand Down Expand Up @@ -80,13 +81,20 @@ def main(
if redisconnection.exists(lockkey):
raise Ignore

# Immediately increment the processing count in stats since the cron job
# may take a while to update it (it takes a while to poll).
try:
update_task_stats(redisconnection, self.request.id, remove=False)
except Exception:
pass # We don't want to fail the task if we can't update stats.

# Check for 10G of disk space, refuse to run if it is unavailable
st = os.statvfs('/srv')
if st.f_frsize * st.f_bavail < 10 << 30:
self.retry(max_retries=20, countdown=5*60)
assert False # should never reach here

redisconnection.setex(lockkey, 7 * 24 * 3600, 'T')
redisconnection.setex(lockkey, 7 * 24 * 3600, self.request.hostname)

# Generate temporary directory for task
for i in range(10): # 10 tries
Expand Down Expand Up @@ -191,3 +199,8 @@ def errorcallback(text):
pywikibot._sites.clear()

shutil.rmtree(outputdir)

try:
update_task_stats(redisconnection, self.request.id, remove=True)
except Exception:
pass # We don't want to fail the task if we can't update stats.
54 changes: 42 additions & 12 deletions video2commons/frontend/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from video2commons.frontend.upload import (
upload as _upload, status as _uploadstatus
)
from video2commons.shared import stats

# Adapted from: https://stackoverflow.com/a/19161373
YOUTUBE_REGEX = (
Expand Down Expand Up @@ -146,7 +147,8 @@ def status():
return jsonify(
values=values,
rooms=rooms,
username=session['username']
username=session['username'],
stats=get_stats()
)


Expand All @@ -167,7 +169,8 @@ def _status(id):
res = worker.main.AsyncResult(id)
task = {
'id': id,
'title': title
'title': title,
'hostname': get_hostname_from_task(id)
}

try:
Expand Down Expand Up @@ -260,19 +263,38 @@ def get_tasks():
"""Get a list of visible tasks for user."""
# sudoer = able to monitor all tasks
username = session['username']
if is_sudoer(username):
if session.get('is_maintainer'):
key = 'alltasks'
else:
key = 'tasks:' + username

return key, redisconnection.lrange(key, 0, -1)[::-1]


def get_stats():
"""Get worker stats from Redis."""
stats = redisconnection.get('stats')

return json.loads(stats) if stats else None


def get_title_from_task(id):
"""Get task title from task ID."""
return redisconnection.get('titles:' + id)


def get_hostname_from_task(id):
"""Get the hostname of the worker processing a task from task ID."""
hostname = redisconnection.get('tasklock:' + id)

# Old tasks don't have a hostname as the value in tasklock and store the
# literal 'T' instead. Reinterpret these values as null.
if hostname == 'T':
hostname = None

return hostname


@api.route('/extracturl', methods=['POST'])
def extract_url():
"""Extract a video url."""
Expand Down Expand Up @@ -462,6 +484,11 @@ def run_task_internal(filename, params):
redisconnection.set('params:' + taskid, json.dumps(params))
redisconnection.expire('params:' + taskid, expire)

try:
stats.increment_queue_counter(redisconnection)
except Exception:
pass # We don't want to fail the API call if we can't update stats.

redis_publish('add', {'taskid': taskid, 'user': session['username']})
redis_publish('update', {'taskid': taskid, 'data': _status(taskid)})

Expand All @@ -475,9 +502,10 @@ def restart_task():

filename = redisconnection.get('titles:' + id)
assert filename, 'Task does not exist'
assert id in \
redisconnection.lrange('tasks:' + session['username'], 0, -1), \
'Task must belong to you.'
if not session.get('is_maintainer'):
assert id in \
redisconnection.lrange('tasks:' + session['username'], 0, -1), \
'Task must belong to you.'

restarted = redisconnection.get('restarted:' + id)
assert not restarted, \
Expand All @@ -498,9 +526,10 @@ def remove_task():
"""Revove a task from list of tasks."""
id = request.form['id']
username = session['username']
assert id in \
redisconnection.lrange('tasks:' + username, 0, -1), \
'Task must belong to you.'
if not session.get('is_maintainer'):
assert id in \
redisconnection.lrange('tasks:' + username, 0, -1), \
'Task must belong to you.'
redisconnection.lrem('alltasks', 0, id)
redisconnection.lrem('tasks:' + username, 0, id)
redisconnection.delete('titles:' + id)
Expand All @@ -517,9 +546,10 @@ def abort_task():
"""Abort a task."""
id = request.form['id']
username = session['username']
assert id in \
redisconnection.lrange('tasks:' + username, 0, -1), \
'Task must belong to you.'
if not session.get('is_maintainer'):
assert id in \
redisconnection.lrange('tasks:' + username, 0, -1), \
'Task must belong to you.'
worker.main.AsyncResult(id).abort()

redis_publish('update', {'taskid': id, 'data': _status(id)})
Expand Down
14 changes: 10 additions & 4 deletions video2commons/frontend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import json
import logging
import traceback
from urllib.parse import quote as urlquote
from urllib.parse import urlparse, urljoin

from flask import (
Expand All @@ -40,7 +39,7 @@

from video2commons.frontend.redisession import RedisSessionInterface
from video2commons.frontend.shared import redisconnection, check_banned
from video2commons.frontend.api import api
from video2commons.frontend.api import api, is_sudoer
from video2commons.frontend.i18n import (
i18nblueprint, translate as _, getlanguage, is_rtl
)
Expand Down Expand Up @@ -227,8 +226,14 @@ def logincallback():
session.pop('username', None)

identify = handshaker.identify(access_token)
if not (identify['editcount'] >= 50 and
'autoconfirmed' in identify['rights']):

is_contributor = identify['editcount'] >= 50
is_maintainer = is_sudoer(identify['username'])
is_autoconfirmed = 'autoconfirmed' in identify['rights']

# Only allow autoconfirmed users either with at least 50 edits or
# maintainer status to use this tool.
if not (is_autoconfirmed and (is_contributor or is_maintainer)):
return render_template(
'error.min.html',
message='You must be an autoconfirmed Commons user '
Expand All @@ -240,6 +245,7 @@ def logincallback():
access_token.key, access_token.secret

session['username'] = identify['username']
session['is_maintainer'] = is_maintainer

return redirect(session.get('return_to_url', url_for('main')))

Expand Down
4 changes: 4 additions & 0 deletions video2commons/frontend/i18n/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
"errorTooLarge": "File too large to upload directly! You may want to {{#a}}request a server-side upload{{/a}}.",
"addTask": "Add task...",
"yourTasks": "Your tasks:",
"workers": "Workers:",
"capacity": "Capacity:",
"utilization": "Worker Utilization:",
"pending": "Pending Tasks:",
"createServerSide": "Create server-side upload ticket in one go (recommended)",
"taskDone": "Your task is done. You may find your upload at {{#a}}{{/a}}.",
"cancel": "Cancel",
Expand Down
4 changes: 4 additions & 0 deletions video2commons/frontend/i18n/qqq.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
"errorTooLarge": "Semi-plain text shown in the result text for a task errored because the file is too large. There is a link in the text that links to [[:phab:Phabricator|Phabricator]] requesting [[:commons:Special:MyLanguage/Help:Server-side_upload|server-side upload]]. <code><nowiki>{{#a}}</nowiki></code> indicates the start of the link while <code><nowiki>{{/a}}</nowiki></code> indicates the end of the link.",
"addTask": "Plain text shown on a button at the bottom of the tool webpage. Clicking the button will show the add task dialog.",
"yourTasks": "Plain text shown near the top of the tool webpage. Below the text is a table of the user's tasks.",
"workers": "Plain text shown near the top of the tool webpage. Below the text is stats about the current encoding server capacity.",
"capacity": "Plain text shown near the top of the tool webpage. Next to the text is numerical stats about the current encoding server capacity.",
"utilization": "Plain text shown near the top of the tool webpage. Next to the text is a percentage of the current encoding server capacity.",
"pending": "Plain text shown near the top of the tool webpage. Next to the text is the number of tasks still waiting to be processed.",
"createServerSide": "Plain text shown on a button at the bottom of the tool webpage. Clicking the button will redirect the browser to [[:phab:Phabricator|Phabricator]] requesting [[:commons:Special:MyLanguage/Help:Server-side_upload|server-side upload]] for all tasks requiring the upload.",
"taskDone": "Semi-plain text shown in the result text for a completed task. There is a link in the text that links to the corresponding file page on [[:commons:|Commons]]. <code><nowiki>{{#a}}</nowiki></code> indicates the start of the link while <code><nowiki>{{/a}}</nowiki></code> indicates the end of the link.",
"cancel": "Plain text shown on a button in the add task dialog. Clicking the button will cancel the add task process and hide the dialog.\n{{Identical|Cancel}}",
Expand Down
Loading