User:WindBOT/WikiCapSource

From Team Fortress Wiki
Jump to: navigation, search

capsConfig.py

# -*- coding: utf-8 -*-

config = {
	'api': 'http://wiki.teamfortress.com/w/api.php',
	'username': 'WindBOT',
	'password': raw_input('Wiki password: '),
	'logDir': 'logs',
	'pages': {
		'rules': 'User:WindBOT/WikiCapRules',
		'results': 'User:WindBOT/WikiCapCandidates',
		'log': 'User:WindBOT/WikiCapLog'
	},
	'mysql': {
		'host': 'mysql.biringa.com',
		'username': 'perot_irclog',
		'password': raw_input('MySQL password: '),
		'database': 'perot_irclog',
		'table': 'tfwikirc'
	}
}

countCaps.py

#!/usr/bin/python -OO
# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import shutil
import re # Regular expressions
import wikitools # Wiki API
import datetime # Date manipulation and comparison
import MySQLdb # MySQL databases
import MySQLdb.cursors # Additional cursor classes for convenience

from capsConfig import config
config['runtime'] = {
	'wiki': None,
	'regexes': {}
}
config['rules'] = None

def pr(*args):
	s = []
	for i in args:
		s.append(u(i))
	s = u' '.join(s)
	try:
		print s
	except:
		pass
	if config['output'] is not None:
		try:
			print >> config['output'], s
		except:
			pass

def u(s):
	if type(s) is type(u''):
		return s
	if type(s) is type(''):
		try:
			return unicode(s)
		except:
			try:
				return unicode(s.decode('utf8'))
			except:
				try:
					return unicode(s.decode('windows-1252'))
				except:
					return unicode(s, errors='ignore')
	try:
		return unicode(s)
	except:
		try:
			return u(str(s))
		except:
			return s

def wiki():
	global config
	if config['runtime']['wiki'] is None:
		config['runtime']['wiki'] = wikitools.wiki.Wiki(config['api'])
		pr('Logging in as', config['username'], '...')
		config['runtime']['wiki'].login(config['username'], config['password'])
		pr('Logged in.')
	return config['runtime']['wiki']
def page(p):
	if type(p) in (type(''), type(u'')):
		p = wikitools.page.Page(wiki(), u(p), followRedir=False)
	return p
def editPage(p, content, summary=u'', minor=True, bot=True, nocreate=True):
	global config
	summary = u(summary)
	while len(summary) > 250:
		if summary.find(u' ') == -1:
			summary = summary[:summary.rfind(u' ')] + u'...'
		else:
			summary = summary[:247] + u'...'
	result = page(p).edit(u(content), summary=summary, minor=minor, bot=bot, nocreate=nocreate)
	return result
def compileRegex(regex, flags=re.IGNORECASE):
	global config
	regex = u(regex)
	if regex in config['runtime']['regexes']:
		return config['runtime']['regexes'][regex]
	config['runtime']['regexes'][regex] = re.compile(regex, flags)
	return config['runtime']['regexes'][regex]
def userSort(x, y):
	global config
	for k in config['rules']['sortKeys']:
		reverse = False
		if k[0] == '!':
			k = k[1:]
			reverse = True
		if reverse:
			r = cmp(y[k], x[k])
		else:
			r = cmp(x[k], y[k])
		if r == 0:
			continue
		return r
	return 0
def createUser(username):
	global config
	logFile = u(config['logDir']) + u(os.sep) + u(username).replace(u' ', u'_') + u'.log'
	l = open(logFile, 'wb')
	l.write('')
	l.close()
	return {
		'name': username,
		'score': 0.0,
		'firstEdit': None,
		'lastEdit': None,
		'editDate': {},
		'daysOff': 0,
		'hasTalked': False,
		'isBot': False,
		'edits': 0,
		'undone': 0,
		'redirects': 0,
		'newPages': 0,
		'creationTime': None,
		'log': logFile
	}
def run():
	global config, u
	# Clear log directory
	if os.path.exists(config['logDir']):
		shutil.rmtree(config['logDir'])
	os.mkdir(config['logDir'])
	rulesText = u(page(config['pages']['rules']).getWikiText())[len('<p'+'re><now'+'iki>'):-len('</now'+'iki></pr'+'e>')]
	pr('Rules:', rulesText)
	config['rules'] = eval(rulesText)
	rules = config['rules'] # Convenience
	now = datetime.datetime.utcnow().replace(tzinfo=wikitools.utc.utc)
	firstInspectedDay = now - datetime.timedelta(days = rules['inspectDuration'])
	undoRegex = compileRegex(r'^(?:Undo edit by|Reverted edits by).*?\[\[Special:Contributions/([^]|]+)(?:.*?\((\d+)\))?')
	mysql = MySQLdb.connect(host=config['mysql']['host'], user=config['mysql']['username'], passwd=config['mysql']['password'], db=config['mysql']['database'], cursorclass=MySQLdb.cursors.DictCursor)
	cursor = mysql.cursor()
	dbLimit = now - datetime.timedelta(days=rules['inspectDuration'] + rules['maxDaysOff'] + 1)
	cursor.execute('SELECT * FROM `tfwikirc`')
	stats = {}
	editScores = {}
	def log(user, *msg):
		s = [u'<' + u(user) + u'>']
		for i in msg:
			s.append(u(i))
		log = open(stats[user]['log'], 'ab')
		log.write((u' '.join(s) + u'\n').encode('utf8'))
		log.close()
		pr(u'<' + u(user) + u'>', *msg)
	while True:
		row = cursor.fetchone()
		if row == None:
			break
		row['timestamp'] = row['timestamp'].replace(tzinfo=wikitools.utc.utc)
		#pr(row)
		if not row['user'] in stats:
			stats[row['user']] = createUser(row['user'])
		rowScore = 0.0
		for n in rules['namespaces']:
			if rules['namespaces'][n]['id'] == row['namespace']:
				if rules['namespaces'][n]['points'] > 0.0:
					stats[row['user']]['edits'] += 1
					# First, check if edit is not too old
					if (now - row['timestamp']).days >= rules['inspectDuration']:
						break
					# Take care of daysOff
					if stats[row['user']]['firstEdit'] is None:
						stats[row['user']]['firstEdit'] = row['timestamp']
						daysLost = max(0, (firstInspectedDay - row['timestamp']).days - 1)
						if daysLost > 0:
							log(row['user'], 'Days from beginning of inspected duration to first edit:', daysLost, '- Comparing', firstInspectedDay, '(inspected duration) to ', row['timestamp'], '(first edit)')
						stats[row['user']]['daysOff'] += daysLost
					else:
						daysSinceLast = max(0, (row['timestamp'] - stats[row['user']]['lastEdit']).days - 1)
						if daysSinceLast > 0:
							log(row['user'], 'Days since last edit:', daysSinceLast, '- Comparing', stats[row['user']]['lastEdit'], '(previous edit) to', row['timestamp'], '(current edit)')
						stats[row['user']]['daysOff'] += daysSinceLast
					stats[row['user']]['lastEdit'] = row['timestamp']
					# Handle consecutive edits
					if row['title'] in stats[row['user']]['editDate']:
						delta = row['timestamp'] - stats[row['user']]['editDate'][row['title']]
						if delta.days * 86400 + delta.seconds < rules['consecutiveThreshold']:
							stats[row['user']]['editDate'][row['title']] = row['timestamp']
							break
					stats[row['user']]['editDate'][row['title']] = row['timestamp']
					# Now compute points
					rowScore += rules['namespaces'][n]['points']
					scoreBonus = rules['namespaces'][n]['points']
					if row['flags'].find('N') != -1:
						scoreBonus = scoreBonus * rules['newPageBonus'] + row['newsize'] * rules['newPageByteValue']
						stats[row['user']]['newPages'] += 1
					if row['flags'].find('R') != -1:
						scoreBonus *= rules['redirect']
						stats[row['user']]['redirects'] += 1
					editScores[str(row['rcid'])] = scoreBonus
					log(row['user'], 'got', scoreBonus, 'points for editing', row['title'], '(Flags:', row['flags']+')')
					stats[row['user']]['score'] += scoreBonus
					if row['flags'].find('b') != -1:
						stats[row['user']]['isBot'] = True
					# Handle undo's
					undo = undoRegex.search(row['comment'])
					if undo:
						if undo.group(2) and undo.group(2) in editScores:
							penalty = editScores[undo.group(2)] * rules['undoMultiplier']
						else:
							penalty = rules['undone']
						if undo.group(1) not in stats:
							stats[undo.group(1)] = createUser(undo.group(1))
						log(undo.group(1), 'lost', abs(penalty), 'for edit undone on page', row['title'])
						stats[undo.group(1)]['score'] += penalty
						if row['comment'][:8] == 'Reverted': # Penalize users using rollback
							log(row['user'], 'lost', abs(rules['rollbackPenalty']), 'points for using rollback on page', row['title'])
							stats[row['user']]['score'] += rules['rollbackPenalty']
						stats[undo.group(1)]['undone'] += 1
				if rules['namespaces'][n]['isTalk']:
					stats[row['user']]['hasTalked'] = True
				break
	users = []
	pr('Building users table.')
	ministatkeys = ('score', 'edits', 'daysOff', 'lastEdit')
	for usr in stats:
		ministats = {}
		for k in ministatkeys:
			ministats[k] = stats[usr][k]
		log(usr, 'Info:', ministats)
		if stats[usr]['lastEdit'] is None:
			log(usr, 'Dropping - No edit')
			continue
		daysSinceLast = max(0, (now - stats[usr]['lastEdit']).days - 1) # Days from last edit to moment of counting
		if daysSinceLast > 0:
			log(usr, 'Dropping - Days since last edit until now:', daysSinceLast, '- Comparing', stats[usr]['lastEdit'], 'to', now)
		stats[usr]['daysOff'] += daysSinceLast
		if stats[usr]['isBot']: # Skip bots
			log(usr, 'Dropping - It\'s a bot')
			continue
		if usr in rules['owners']:
			log(usr, 'Dropping - Already has cap')
			continue
		if stats[usr]['score'] < rules['minScore']: # Skip too low scores
			log(usr, 'Dropping - Score:', stats[usr]['score'], '<', rules['minScore'])
			continue
		if stats[usr]['edits'] < rules['minEdits']: # Skip too low edit count
			log(usr, 'Dropping - Edits:', stats[usr]['edits'], '<', rules['minEdits'])
			continue
		if stats[usr]['daysOff'] > rules['maxDaysOff']: # Skip too many days off
			log(usr, 'Dropping - Days off:', stats[usr]['daysOff'], '>', rules['maxDaysOff'])
			continue
		stats[usr]['creationTime'] = wikitools.user.User(wiki(), usr).registration
		if (now - stats[usr]['creationTime']).days < rules['minAge']: # Skip too young accounts
			pr(usr, 'Dropping - Age:', stats[usr]['creationTime'], '<', rules['minAge'])
			continue
		pr('Accepting candidate:', usr)
		users.append(stats[usr])
	config['users'] = users
	pr('Sorting users.')
	users = sorted(users, cmp=userSort, reverse=True)
	pr('Building Wiki table.')
	s = """{| class="wikitable grid sortable" align="center" style="text-align:center"
! class="header" style="font-size:90%;" | User
! class="header" style="font-size:90%;" | Score
! class="header" style="font-size:90%;" | Inactive days
! class="header" style="font-size:90%;" | Undone edits
! class="header" style="font-size:90%;" | Valid edits ratio
! class="header" style="font-size:90%;" | New pages
! class="header" style="font-size:90%;" | Redirects
! class="header" style="font-size:90%;" | Age
! class="header" style="font-size:90%;" | Edits (''all time'')
! class="header" style="font-size:90%;" | Has talked?
! class="header unsortable"  style="font-size:90%;" | Contribs"""
	s2 = u''
	i = 0
	for usr in users:
		i += 1
		if i > rules['maxCandidates']:
			break
		pr(usr['name'], usr['score'])
		hasTalked = 'Yes'
		if not usr['hasTalked']:
			hasTalked = 'No'
		s += '\n\
|-\n\
! {{subst:ul|' + usr['name'] + '}}\n\
| ' + str(round(usr['score'], 2)) + '\n\
| ' + str(usr['daysOff']) + ' (' + str(round(100.0 * (1.0-float(usr['daysOff'])/float(rules['inspectDuration'])), 2)) + '%)' + '\n\
| ' + str(usr['undone']) + '\n\
| ' + str(round((1.0-float(usr['undone'])/float(usr['edits']))*100.0, 2)) + '%\n\
| ' + str(usr['newPages']) + '\n\
| ' + str(usr['redirects']) + '\n\
| ' + str((now - usr['creationTime']).days) + ' days\n\
| ' + str(usr['edits']) + '\n\
| ' + hasTalked + '\n\
| [[Special:Contributions/' + usr['name'] + '|' + usr['name'] + '\'s Contribs]]'
		s2 += u'=== ' + usr['name'] + u' ===\n<p'+'re><no'+'wiki>' + u(open(usr['log'], 'rb').read(-1)) + u'</no'+'wiki></pr'+'e>\n'
	s+='\n\
|}'
	editPage(config['pages']['results'], rules['template'].replace('%table%', s), summary=u'Updated Wiki Cap candidates list.', minor=False)
	editPage(config['pages']['log'], s2, summary=u'Updated Wiki Cap log.', minor=False)
if __name__ == '__main__':
	config['output'] = open('caps.txt', 'wb')
	run()
	config['output'].close()