From 7dc2ac9809bd560fff4ecd71b482f312d8237d09 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Thu, 20 Aug 2015 16:56:51 -0400 Subject: [PATCH 01/11] Support continuous ingest of message data and parallel metric computation. So far, this will ingest instagram as pulled from elasticsearch (such as that supplied with the July data), and twitter in either GNIP or firehose JSON (as supplied with the July data). It will not work on the original instagram data that Roni pulled fro parquet files and saved as CSV. It should be easy to extent it to that format (though it has less data for comments and likes, so won't be as rich). Links are create from mentions, comments (instagram only), likes (instagram only), and replies (twitter only). Because of the comments and likes, Instagram refers to a vastly larger network of entities than twitter. It also has fewer messages per entity. The Instagram network is sparser than the Twitter network. Ingest and metric computation is fully parallelizable - multiple ingests can run concurrently. Each metric can be computed in a separate process. A possibly useful extension would be to subdivide the entity space so that a single metric can be parallelized with multiple processes as well. I've implemented four metrics so far. I have not yet implemented the metric based linking between twitter and instagram. The implemented metrics are: 1hop - This is quick to compute, as I maintain a list of neighbors within the entity records. If the entity is updated, it will be recomputed. The stored value is the number of 1-hop neighbors EXCLUSIVE of the root entity itself. 2hop - This requires visiting all the 1-hop neighbors. Since we don't know which of those neighbors has been updated, any entity update requires that we recheck all 2-hop values. I store both the number of 2-hop-only neighbors and the number of 2-hop and 1-hop neighbors. These counts never include the root entity itself. substring - This does a longest substring match between ALL entities, keeping the top-k for each service and subset. The substring calculation has been refactored to be about twice as fast as Curt's original implementation. When an entity is added or updated, only the modified entities need to be rechecked to maintain the top-k lists. This is still slow on a large collection. levenshtein - This computes the levenshtein metric between ALL entities, just like substring. It is even slower than substring matching. The substring and levenshtein metrics would be helped by more CPU resources, but would be helped even more by excluding entities from the candidate match. For instance, if the substring match is 0 (or below some other threshold), we could skip computing the levenshtein metric. --- continuous/conf.json | 30 + continuous/ingest.py | 1040 +++++++++++++++++++++++++++++++ continuous/metric.py | 247 ++++++++ continuous/metric1hop.py | 25 + continuous/metric2hop.py | 37 ++ continuous/metriclevenshtein.py | 94 +++ continuous/metricsubstring.py | 80 +++ 7 files changed, 1553 insertions(+) create mode 100644 continuous/conf.json create mode 100755 continuous/ingest.py create mode 100644 continuous/metric.py create mode 100644 continuous/metric1hop.py create mode 100644 continuous/metric2hop.py create mode 100644 continuous/metriclevenshtein.py create mode 100644 continuous/metricsubstring.py diff --git a/continuous/conf.json b/continuous/conf.json new file mode 100644 index 0000000..2fa6c7c --- /dev/null +++ b/continuous/conf.json @@ -0,0 +1,30 @@ +{ + "db": { + "entity": { + "dbUri": "mongodb://10.0.2.2:27017/entity", + "collection": "entity" + }, + "link": { + "dbUri": "mongodb://10.0.2.2:27017/entity", + "collection": "link" + }, + "metrics": { + "dbUri": "mongodb://10.0.2.2:27017/entity", + "collection": "metrics" + }, + "topk": { + "dbUri": "mongodb://10.0.2.2:27017/entity", + "collection": "topk" + } + }, + "topk": { + "k": 10, + "extra": 0 + }, + "metrics": { + "1hop": true, + "2hop": true, + "substring": true, + "levenshtein": true + } +} diff --git a/continuous/ingest.py b/continuous/ingest.py new file mode 100755 index 0000000..3269474 --- /dev/null +++ b/continuous/ingest.py @@ -0,0 +1,1040 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This includes general utils for the continual ingest process. + +import argparse +import bz2 +import calendar +import glob +import HTMLParser +import json +import math +import os +import pprint +import pymongo +import re +import sys +import time +import xml.sax.saxutils +from bson.objectid import ObjectId + +import metric + + +TwitterMentionPattern = re.compile('@[a-zA-Z0-9_]{1,15}') +InstagramMentionPattern = re.compile( + '@[a-zA-Z0-9_][a-zA-Z0-9_.]{0,28}[a-zA-Z0-9_]?') + +HTMLDecoder = HTMLParser.HTMLParser() +# Used for doing timing tests +Timers = {'lastreport': time.time()} + + +# -------- General Functions -------- + +def castObjectId(id): + """ + Make sure an obejct is an ObjectId or None. + + :param id: the object to check. This may be a string, ObjectId, or a + dictionary with an _id field. + :returns: an ObjectId or None. + """ + if isinstance(id, dict): + id = id['_id'] + if id is None or type(id) is ObjectId: + return id + return ObjectId(id) + + +def calculateMetrics(state, entities=None): + """ + Calculate metrics for dirty entities. + + :param state: the state dictionary with the config values. + :param entities: a list of entities to calculate the metrics for. If None, + calculate the metrics for all entities. + """ + metricDict = state['config'].get('metrics', {}) + if state['args']['metric']: + metricDict = dict.fromkeys(state['args']['metric']) + for met in metricDict: + metric.loadMetric(met, metricDict[met]) + if entities is not None: + entities = [castObjectId(entity) for entity in entities] + metColl = getDb('metrics', state) + entityColl = getDb('entity', state) + linkColl = getDb('link', state) + latestEntity = entityColl.find().sort([ + ('date_updated', pymongo.DESCENDING)]).limit(-1).next() + if not latestEntity: + # If there are no entities, we have nothing to do + return + latestEntity = latestEntity['date_updated'] + latestLink = linkColl.find().sort([ + ('date_updated', pymongo.DESCENDING)]).limit(-1).next() + latestLink = 0 if not latestLink else latestLink['date_updated'] + idQuery = {} if entities is None else {'_id': {'$in': entities}} + cursor = entityColl.find(idQuery).sort([('_id', pymongo.ASCENDING)]) + count = numCalc = 0 + starttime = lastreport = time.time() + for entity in cursor: + for met in metric.LoadedMetrics: + metClass = metric.LoadedMetrics[met] + date = entity['date_updated'] + if metClass.onEntities or not metClass.onEntitiesOnlyNew: + date = max(date, latestEntity) + if metClass.onLinks or not metClass.onLinksOnlyNew: + date = max(date, latestLink) + oldMetric = metColl.find_one({ + 'entity': castObjectId(entity), + 'metric': met + }) + # Already up to date + if oldMetric and oldMetric['date_updated'] >= date: + continue + calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, + oldMetric, state) + numCalc += 1 + count += 1 + if state['args']['verbose'] >= 1: + curtime = time.time() + if curtime - lastreport > 10.0 / state['args']['verbose']: + print '%d %d %d %5.3f' % (count, numCalc, cursor.count(), + curtime - starttime) + lastreport = curtime + + +def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, + metricDoc, state): + """ + Calculate the value of a single metric for a single entity. + + :param entityColl: the database entity collection. + :param linkColl: the database link collection. + :param metColl: the database metrics collection. + :param metClass: the instance of the metrics class used for computation. + :param entity: the entity for which the metric is being computed. + :param metricDoc: if the metric was previously computed, that previous + record. May be None to force a completely fresh + computation. + :param state: the state dictionary with the config values. + """ + entityId = castObjectId(entity) + if metricDoc is None: + metricDoc = { + 'metric': metClass.name, + 'entity': entityId, + 'date_updated': 0 + } + work = metricDoc.get('work', {}) + kwargs = { + 'work': work, + 'entityColl': entityColl, + 'linkColl': linkColl, + 'old': metricDoc, + 'state': state, + } + # The time has to be before we do the computation, as data could be added + # during the comptation. + metricDoc['date_updated'] = time.time() + refresh = (metClass.saveWork and work == {}) + if metClass.onEntities: + query = ({} if not metClass.onEntitiesOnlyNew or refresh else + {'date_updated': {'$gte': metricDoc['date_updated']}}) + for gb in entityColl.find(query): + if castObjectId(gb) != entityId: + metClass.calcEntity(entity, gb, **kwargs) + if metClass.onLinks: + query = ({} if not metClass.onLinksOnlyNew or refresh else + {'date_updated': {'$gte': metricDoc['date_updated']}}) + query['ga'] = entityId + for link in linkColl.find(query): + gb = entityColl.find_one(castObjectId(link['gb'])) + metClass.calcLink(entity, gb, link, **kwargs) + value = metClass.calc(entity, **kwargs) + if metClass.saveWork: + metricDoc['work'] = work + metricDoc['value'] = value + metColl.save(metricDoc) + + +def convertInstagramESToMsg(inst, subset='unknown'): + """ + Convert an instragam Elasticsearch record to our message format. This + normalizes the format so that other routines can handle the data + generically. + + :param inst: the instagram record. + :param subset: a subset name to attach to the record. This is probably the + Elasticsearch _type. + :returns: a message record or None for failed. + """ + msg = { + 'service': 'instagram', + 'subset': subset, + 'user_name': inst.get('user', {}).get('username', None), + 'user_fullname': inst.get('user', {}).get('full_name', None), + 'user_id': inst.get('user', {}).get('id', None), + 'msg_date': float(inst['created_time']), + 'msg_id': inst['link'].strip('/').rsplit('/', 1)[-1], + 'url': inst['link'], + } + if msg['user_fullname'] == '': + msg['user_fullname'] = None + if msg['user_name'] == '': + msg['user_name'] = None + if msg['user_name']: + msg['user_name'] = msg['user_name'].lower() + if ('location' in inst and + inst['location'].get('latitude', None) is not None): + msg['latitude'] = inst['location']['latitude'] + msg['longitude'] = inst['location']['longitude'] + if inst.get('caption', None): + msg['msg'] = inst['caption']['text'] + if '@' in msg['msg']: + msg['mentions'] = set(mention[1:].lower() for mention in + InstagramMentionPattern.findall(msg['msg'])) + if inst.get('comments', None) and inst['comments'].get('data', None): + msg['comments'] = {} + for comment in inst['comments']['data']: + if 'from' in comment and 'id' in comment['from']: + record = comment['from'] + msg['comments'][record['id']] = { + 'user_name': (record['username'] + if 'username' in record else None), + 'user_fullname': record.get('full_name', None), + } + if inst.get('likes', None) and inst['likes'].get('data', None): + msg['likes'] = {} + for record in inst['likes']['data']: + if 'id' in record: + msg['likes'][record['id']] = { + 'user_name': (record['username'] + if 'username' in record else None), + 'user_fullname': record.get('full_name', None), + } + return msg + + +def convertTwitterGNIPToMsg(gnip): + """ + Convert a Twitter GNIP record to our message format. This normalizes the + format so that other routines can handle the data generically. + + :param gnip: the twitter gnip record. + :returns: a message record or None for failed. + """ + if 'postedTime' not in gnip: + return None + msg = { + 'service': 'twitter', + 'subset': 'unknown', + 'user_name': gnip['actor'].get('preferredUsername', None), + 'user_fullname': gnip['actor'].get('displayName', None), + 'user_id': gnip['actor']['id'].split(':')[-1], + 'msg_date': int(calendar.timegm(time.strptime( + gnip['postedTime'], "%Y-%m-%dT%H:%M:%S.000Z"))), + 'msg_id': gnip['object']['id'].split(':')[-1], + 'msg': xml.sax.saxutils.unescape(gnip['body']), + } + if msg['user_name']: + msg['user_name'] = msg['user_name'].lower() + msg['url'] = 'http://twitter.com/%s/statuses/%s' % ( + msg['user_id'], msg['msg_id']) + if ('geo' in gnip and gnip['geo'] and 'coordinates' in gnip['geo'] and + len(gnip['geo']['coordinates']) >= 2): + # gnip using latitude, longitude for geo (but twitter used long, lat + # for coordinates) + msg['latitude'] = gnip['geo']['coordinates'][0] + msg['longitude'] = gnip['geo']['coordinates'][1] + if ('twitter_entities' in gnip and 'media' in gnip['twitter_entities'] and + len(gnip['twitter_entities']['media']) > 0 and + 'media_url_https' in gnip['twitter_entities']['media'][0]): + msg['image_url'] = gnip['twitter_entities']['media'][0][ + 'media_url_https'] + if ('instagram' in gnip['generator'].get('link', '') and 'gnip' in gnip and + 'urls' in gnip['gnip'] and len(gnip['gnip']['urls']) and + 'expanded_url' in gnip['gnip']['urls'][0] and + 'instagram.com/p/' in gnip['gnip']['urls'][0]['expanded_url']): + msg['source'] = { + 'instagram': gnip['gnip']['urls'][0]['expanded_url'].rstrip( + '/').rsplit('/')[-1] + } + if ('twitter_entities' in gnip and + 'user_metions' in gnip['twitter_entities']): + msg['mentions'] = {} + for mention in gnip['twitter_entities']['user_mentions']: + if mention.get('id_str', None): + msg['mentions'][mention['id_str']] = { + 'user_name': mention.get('screen_name', None), + 'user_fullname': mention.get('name', None), + } + if ('inReplyTo' in gnip and 'link' in gnip['inReplyTo'] and + '/statuses/' in gnip['inReplyTo']['link']): + msg['replies'] = [gnip['inReplyTo']['link'].split( + '/statuses/')[0].rsplit('/', 1)[-1]] + return msg + + +def convertTwitterJSONToMsg(tw): + """ + Convert a Twitter firehose JSON record to our message format. This + normalizes the format so that other routines can handle the data + generically. + + :param tw: the twitter record. + :returns: a message record or None for failed. + """ + msg = { + 'service': 'twitter', + 'subset': 'unknown', + 'user_name': tw['user']['screen_name'].lower(), + 'user_fullname': tw['user']['name'], + 'user_id': tw['user']['id_str'], + 'msg_date': int(calendar.timegm(time.strptime( + tw['created_at'][4:], "%b %d %H:%M:%S +0000 %Y"))), + 'msg_id': tw['id_str'], + 'msg': HTMLDecoder.unescape(tw['text']), + } + msg['url'] = 'http://twitter.com/%s/statuses/%s' % ( + msg['user_id'], msg['msg_id']) + if ('coordinates' in tw and 'coordinates' in tw['coordinates'] and + len(tw['coordinates']['coordinates']) >= 2): + msg['latitude'] = tw['coordinates']['coordinates'][1] + msg['longitude'] = tw['coordinates']['coordinates'][0] + if ('entities' in tw and 'media' in tw['entities'] and + len(tw['entities']['media']) > 0 and + 'media_url_https' in tw['entities']['media'][0]): + msg['image_url'] = tw['entities']['media'][0]['media_url_https'] + if ('Instagram' in tw.get('source', '') and 'entities' in tw and + 'urls' in tw['entities'] and len(tw['entities']['urls']) > 0 and + 'expanded_url' in tw['entities']['urls'][0] and + 'instagram.com' in tw['entities']['urls'][0]['expanded_url']): + msg['source'] = { + 'instagram': tw['entities']['urls'][0]['expanded_url'].rstrip( + '/').rsplit('/')[-1] + } + if ('entities' in tw and 'user_mentions' in tw['entities'] and + len(tw['entities']['user_mentions']) > 0): + msg['mentions'] = {} + for mention in tw['entities']['user_mentions']: + if mention.get('id_str', None): + msg['mentions'][mention['id_str']] = { + 'user_name': mention.get('screen_name', None), + 'user_fullname': mention.get('name', None), + } + if tw.get('in_reply_to_user_id_str', None) is not None: + msg['replies'] = { + tw['in_reply_to_user_id_str']: { + 'user_name': tw.get('in_reply_to_screen_name', None), + } + } + return msg + + +def getDb(dbName, state): + """ + Check if a DB has been connected to. If not, connect to it and ensure that + the appropriate indices are present. + + :param dbName: the internal key name of the database. + :param state: the state dictionary with the config values and a place to + store the result. + :returns: the collection. + """ + coll = state.get('db', {}).get(dbName, {}).get('coll', None) + if coll: + return coll + if 'db' not in state: + state['db'] = {} + state['db'][dbName] = getDbConnection(**state['config']['db'][dbName]) + coll = state['db'][dbName]['coll'] + indices = { + 'entity': [ + [('user_id', pymongo.ASCENDING)], + [('name', pymongo.ASCENDING)], + [('msgs.msg_id', pymongo.ASCENDING)], + [('date_updated', pymongo.ASCENDING)], + ], + 'link': [ + [('ga', pymongo.ASCENDING)], + [('gb', pymongo.ASCENDING)], + [('date_updated', pymongo.ASCENDING)], + ], + } + for index in indices.get(dbName, []): + coll.create_index(index) + return coll + + +def getDbConnection(dbUri, **kwargs): + """ + Get a connection to a mongo DB. The adds a connection timeout. + + :param dbUri: the uri to connect to. Usually something like + mongodb://(host):27017/(db) + :param database: if specified, connect to thsi database. Otherwise, + use the default database. + :param collection: the default collection to connect to. + :returns: a dictionary with 'connection', 'database', and 'coll'. + """ + clientOptions = { + 'connectTimeoutMS': 15000, + } + result = { + 'connection': pymongo.MongoClient(dbUri, **clientOptions) + } + if kwargs.get('database', None): + result['database'] = result['connection'].get_database( + kwargs['database']) + else: + result['database'] = result['connection'].get_default_database() + if 'collection' in kwargs: + result['coll'] = result['database'][kwargs['collection']] + return result + + +def getEntityByName(state, entity): + """ + Given some known information which can include _id (our id), service, + user_id, name, and fullname, ensure that the specified entity is in the + database and the associated ObjectId of the entity or None. + + :param state: includes the database connection. + :param entity: a dictionary of _id, service, user_id, name, and fullname. + if _id is unspecified, service is required and at least one + of user_id or name. If the entity doesn't exist, a + neighbors key may be present to populate the new entity, + otherwise this key is ignored. + :returns: an entity document or None. + :returns: updated: True if the document was changed in any way. 'new' if + the entity was added. + """ + entityColl = getDb('entity', state) + if entity.get('_id', None): + id = castObjectId(entity['_id']) + return entityColl.find_one({'_id': id}, timeout=False), False + spec = {'service': entity['service']} + specUserId = {'service': entity['service']} + specName = {'service': entity['service']} + for key in ['name', 'fullname']: + if entity.get(key, None) is not None and entity[key].strip() != '': + spec[key] = entity[key] + else: + entity[key] = None + hasUserId = entity.get('user_id', None) is not None + if hasUserId: + spec['user_id'] = specUserId['user_id'] = entity['user_id'] + if entity['name'] is not None: + specName['name'] = entity['name'] + doc = entityColl.find_one(spec, timeout=False) + if doc: + # We have an entity that matches all of our information + return doc, False + doc = entityColl.find_one(specUserId if hasUserId else specName, + timeout=False) + if not doc and hasUserId and entity['name'] is not None: + doc = entityColl.find_one(specName, timeout=False) + curtime = time.time() + if doc: + # We have this user id, but not all of its aliases. + if (entity['name'] is not None and hasUserId and + entity['name'] not in doc['name']): + knownName = entityColl.find_one(specName, timeout=False) + if knownName: + # Merge this with the main doc + mergeEntities(state, doc, knownName) + doc['date_updated'] = curtime + updated = True + else: + # We've never seen this entity, so add it. + doc = { + 'service': entity['service'], + 'name': [], + 'fullname': [], + 'date_added': curtime, + 'date_updated': curtime, + 'msgs': [], + 'neighbors': entity.get('neighbors', []), + } + updated = 'new' + if hasUserId: + doc['user_id'] = entity['user_id'] + # Update the names and full names. + for key in [key for key in ['name', 'fullname'] + if entity.get(key, None) is not None]: + if entity[key] not in doc[key]: + doc[key].append(entity[key]) + doc['_id'] = entityColl.save(doc) + return doc, updated + + +def ingestMessage(state, msg): + """ + Check if we have already ingested a message. If not ingest it. This + checks if the (service, user_id) is present in our database. If not, we + add it, possibly by converting (service, user_name) or + (service, user_fullname). Once the user is present, we check if this + msg_id is listed in their known messages. If it is, we are done. If not, + add it to the list and ensure that all referenced users are present, too. + Update appropriate graph edges for referenced users. + + :param state: includes the database connection. + :param msg: our standard message format. + :returns: True if ingested, False if already present, None if we cannot + ingest this sort of record. + """ + if not msg.get('service', None) or not msg.get('user_id', None): + pprint.pprint(msg) + sys.exit(0) + return None + curtime = time.time() + entityColl = getDb('entity', state) + # Assume if we have processed this message, then we have everything we care + # about in our database. This might not be true -- a message could get + # reposted with new information. + if entityColl.find_one({'msgs': {'$elemMatch': { + 'service': msg['service'], 'msg_id': msg['msg_id'], + 'subset': msg['subset'], + }}}, {'_id': True}, limit=1): + return False + entity, changed = getEntityByName(state, { + 'service': msg['service'], + 'user_id': msg['user_id'], + 'name': msg.get('user_name', None), + 'fullname': msg.get('user_fullname', None), + }) + + found = False + for knownMsg in entity['msgs']: + if (knownMsg['service'] == msg['service'] and + knownMsg['msg_id'] == msg['msg_id']): + if msg['subset'] not in knownMsg['subset']: + knownMsg['subset'].append(msg['subset']) + entity['date_updated'] = curtime + entityColl.save(entity) + found = True + break + if found and not changed: + return False + if not found: + newmsg = { + 'service': msg['service'], + 'subset': [msg['subset']], + 'msg_id': msg['msg_id'], + 'latitude': msg.get('latitude', None), + 'longitude': msg.get('longitude', None), + 'date': msg['msg_date'] + } + for key in ('latitude', 'longitude', 'source'): + if msg.get(key, None) is not None: + newmsg[key] = msg[key] + entity['msgs'].append(newmsg) + entity['date_updated'] = curtime + # update neighbors and edges + ingestMessageEdges(state, entity, msg) + entityColl.save(entity) + # Mark descendants as dirty (for when we merge nodes) + # ##DWM:: + return True + + +def ingestMessageEdges(state, entity, msg): + """ + Update all of the edges associated with a message. Add any new neighbors + to the entity's neighbor list. + + :param state: includes the database connection. + :param entity: the entity document. Changed. + :param msg: our standard message format. + """ + entityId = castObjectId(entity) + entityColl = getDb('entity', state) + linkColl = getDb('link', state) + for (linktype, linkdir) in [('mentions', 'out'), ('likes', 'in'), + ('comments', 'in'), ('replies', 'out')]: + if linktype not in msg: + continue + links = msg[linktype] + if isinstance(links, dict): + links = [{ + 'service': entity['service'], + 'user_id': key, + 'name': (links[key]['user_name'].lower() + if links[key].get('user_name', None) is not None else + None), + 'fullname': links[key].get('user_fullname', None), + } for key in links if key != entity.get('user_id', None)] + else: + links = [{ + 'service': entity['service'], + 'name': key + } for key in links] + for link in links: + link['neighbors'] = [entityId] + linkEntity, linkChanged = getEntityByName(state, link) + linkId = castObjectId(linkEntity) + isNew = linkChanged == 'new' + # Don't link to ourselves + if linkId == entityId: + continue + if linkId not in entity['neighbors']: + entity['neighbors'].append(linkId) + if not isNew: + # The linked item is now a neighbor + updateResult = entityColl.update({ + '_id': linkId, + 'neighbors': {'$ne': entityId}, + }, { + '$set': {'date_updated': time.time()}, + '$addToSet': {'neighbors': entityId}, + }) + # If we added this link as a neighbor, then we know the edges + # are new edges and not increased weights to existing edges. + if updateResult['nModified']: + isNew = True + # We are currently bidirectional on everything + addLink(linkColl, entityId, linkId, linktype, isNew=isNew, + bidir=True) + + +def addLink(linkColl, ga, gb, linktype=None, weight=1, isNew=False, + bidir=False): + """ + Add a link or increase its weight. + + :param linkColl: mongo collection for links. + :param ga: ga _id of the link. + :param gb: gb _id of the link. + :param linktype: named type of the link. + :param weight: weight to add to the link. If the link doesn't exist, this + will be the entire weight. + :param isNew: if True, the link doesn't exist yet. If False, the link may + or may not already exist. + :param bidir: True to add a bidirectional link. False to only add a single + direction. + """ + curtime = time.time() + if isNew: + docs = [{ + 'ga': ga, 'gb': gb, 'linktype': linktype, + 'date_updated': curtime, 'weight': weight + }] + if bidir: + docs.append({ + 'ga': gb, 'gb': ga, 'linktype': linktype, + 'date_updated': curtime, 'weight': weight + }) + linkColl.insert(docs) + else: + if bidir: + query = {'linktype': linktype, '$or': [ + {'ga': ga, 'gb': gb}, {'ga': gb, 'gb': ga}]} + else: + query = {'ga': ga, 'gb': gb, 'linktype': linktype} + linkColl.update( + query, { + '$set': {'date_updated': curtime}, + '$inc': {'weight': weight} + }, upsert=True, multi=True) + + +def loadConfig(filename=None): + """ + Load the config file. This will load an arbitrary json file, then ensure + that certain minimum standards are met. + + :param filename: the name of the file to load. None to load conf.json + in the script directory. + :return: a config dictionary. + """ + if not filename: + filename = os.path.join(os.path.realpath(sys.path[0]), 'conf.json') + config = json.load(open(filename)) + return config + + +def mergeEntities(state, mainId, secondId): + """ + Merge two entity records by discarding the secondary record and converting + all references to the secondary record to the main record. All such + records are marked as updated (dirty). + + :param state: includes the database connection. + :param mainId: the main entity _id. + :param secondId: the secondary entity _id. + """ + entityColl = getDb('entity', state) + mainId = castObjectId(mainId) + secondId = castObjectId(secondId) + if state['args']['verbose'] >= 2: + print 'merge:', mainId, secondId + main = entityColl.find_one({'_id': mainId}) + second = entityColl.find_one({'_id': secondId}) + main['msgs'].extend(second['msgs']) + main['date_updated'] = time.time() + if secondId in main['neighbors']: + main['neighbors'].remove(secondId) + entityColl.save(main) + # update everyone's neighbors that include secondId to mainId + entityColl.update( + {'neighbors': secondId}, {'$addToSet': {'neighbors': mainId}}, + multi=True) + entityColl.update( + {'neighbors': secondId}, {'$pull': {'neighbors': secondId}}, + multi=True) + # update links + linkColl = getDb('link', state) + for link in linkColl.find({'ga': secondId}): + addLink(linkColl, mainId, link['gb'], link['linktype'], link['weight']) + linkColl.remove({'ga': secondId}) + for link in linkColl.find({'gb': secondId}): + addLink(linkColl, link['ga'], mainId, link['linktype'], link['weight']) + linkColl.remove({'gb': secondId}) + # Don't allow self link + linkColl.remove({'ga': mainId, 'gb': mainId}) + # Find all descendants and convert to the mainId and mark them as dirty + # ##DWM:: + entityColl.remove({'_id': secondId}) + + +# -------- Functions for stand-alone use -------- + +def checkForDuplicateNames(state): + """ + Use a brute-force appoach to see if any service has duplicate user names. + + :param state: a state object used for passing config information, database + connections, and other data. + """ + entityColl = getDb('entity', state) + names = {} + for entity in entityColl.find({}): + service = entity['service'] + if service not in names: + names[service] = {} + for name in entity['name']: + if (name in names[service] and + entity['_id'] != names[service][name]): + print 'Duplicate name %s %r %r' % (name, names[service][name], + entity['_id']) + else: + names[service][name] = entity['_id'] + + +def ingestInstagramFile(filename, state, region=None): + """ + Ingest an Instagram file. The files are expected to be in the + elasticsearch output format with lines of json, each of which contains a + _source key that contains the instagram data. + + :param filename: a file to ingest. This may be compressed with gzip or + bzip2. + :param state: a state object used for passing config information, database + connections, and other data. + :param region: if not None, the region to use for this data. + """ + state['filesProcessed'] = state.get('filesProcessed', 0) + 1 + linesProcessed = state.get('linesProcessed', 0) + linesIngested = state.get('linesIngested', 0) + fptr = openFile(filename) + for line in fptr: + line = line.strip().strip(',[]') + if not len(line): + continue + showProgress(linesProcessed, state, filename) + linesProcessed += 1 + try: + inst = json.loads(line) + except ValueError: + continue + msg = convertInstagramESToMsg(inst.get('_source', {}), + inst.get('_type', 'unknown')) + if not msg: + continue + if region is not None: + msg['subset'] = region + for retry in xrange(3): + try: + if ingestMessage(state, msg): + linesIngested += 1 + break + except pymongo.errors.OperationFailure: + if state['args']['verbose'] >= 1: + print 'retrying' + state['linesProcessed'] = linesProcessed + state['linesIngested'] = linesIngested + + +def ingestTwitterFile(filename, state, region=None): + """ + Ingest a Twitter file. The file may contain gnip or firehose json. + + :param filename: a file to ingest. This may be compressed with gzip or + bzip2. + :param state: a state object used for passing config information, database + connections, and other data. + :param region: if not None, the region to use for this data. + """ + state['filesProcessed'] = state.get('filesProcessed', 0) + 1 + linesProcessed = state.get('linesProcessed', 0) + linesIngested = state.get('linesIngested', 0) + fptr = openFile(filename) + for line in fptr: + line = line.strip().strip(',[]') + if not len(line): + continue + showProgress(linesProcessed, state, filename) + linesProcessed += 1 + try: + twit = json.loads(line) + except ValueError: + continue + if 'gnip' in twit: + msg = convertTwitterGNIPToMsg(twit) + else: + msg = convertTwitterJSONToMsg(twit) + if not msg: + continue + msg['subset'] = region if region is not None else msg.get( + 'subset', 'unknown') + for retry in xrange(3): + try: + if ingestMessage(state, msg): + linesIngested += 1 + break + except pymongo.errors.OperationFailure: + if state['args']['verbose'] >= 1: + print 'retrying' + state['linesProcessed'] = linesProcessed + state['linesIngested'] = linesIngested + + +def logarithmicBin(items): + """ + Convert a dictionary of the form (key): (sum) where the keys are all non- + negative integers into a binned dictionary with logarithmic-based bins. + + :param items: the dictionary of initial bins. + :return: the binned dictionary. + """ + bins = {} + for val in items: + if val <= 5: + bin = val + else: + logval = math.log10(val) + frac = 10 ** (logval - math.floor(logval)) + for start in [10, 9, 8, 7, 6, 5, 4, 3, 2, 1.5, 1]: + if frac >= start: + bin = int(10 ** math.floor(logval) * start) + break + bins[bin] = bins.get(bin, 0) + items[val] + return bins + + +def openFile(filename): + """ + Check if a file is gzip or bzip2 and open it with decompression. If not, + just open it. + + :param filename: name of the file to open. + :returns: a stream pointer. + """ + fileHeaders = { + '\x1f\x8b\x08': 'gunzip < %s', # gzip.open could be used + '\x42\x5a\x68': bz2.BZ2File, + } + filename = os.path.realpath(os.path.expanduser(filename)) + start = open(filename, 'rb').read(max(len(key) for key in fileHeaders)) + for key in fileHeaders: + if start[:len(key)] == key: + if isinstance(fileHeaders[key], basestring): + return os.popen(fileHeaders[key] % filename) + return fileHeaders[key](filename) + # Reopen it, since we may not be able to rewind it + return open(filename, 'rb') + + +def showEntityStatistics(entityColl): + """ + Report on distributions and statistics of the entity collection. + + :param entityColl: the entity collection. + """ + counts = entityColl.aggregate([ + {'$project': {'count': {'$size': '$msgs'}}}, + {'$group': {'_id': '$count', 'count': {'$sum': 1}}}, + {'$sort': {'_id': 1}}, + ]) + msgs = {count['_id']: count['count'] for count in counts['result']} + msgCount = sum([key * msgs[key] for key in msgs]) + msgs = logarithmicBin(msgs) + print 'Message distribution:' + pprint.pprint(msgs) + counts = entityColl.aggregate([ + {'$project': {'count': {'$size': '$neighbors'}}}, + {'$group': {'_id': '$count', 'count': {'$sum': 1}}}, + {'$sort': {'_id': 1}}, + ]) + neighbors = {count['_id']: count['count'] for count in counts['result']} + neighbors = logarithmicBin(neighbors) + print 'Neighbor distribution:' + pprint.pprint(neighbors) + senders = sum(msgs.values()) - msgs.get(0, 0) + total = entityColl.count() + if total and msgCount and senders: + print '%d senders (%4.2f%%), %5.3f msg/sender, %5.3f entities/msg' % ( + senders, 100.0 * senders / total, float(msgCount) / senders, + float(total) / msgCount) + print '%d messages, %d entities' % (msgCount, total) + + +def showProgress(linesProcessed, state, filename): + """ + Show progress if the verbosity is appropriate. + + :param linesProcessed: the number of lines processed. + :param state: a state object used for passing config information, database + connections, and other data. + :param filename: filename to report. + """ + if state['args']['verbose'] < 1 or linesProcessed % 1000: + return + if 'starttime' not in state: + state['starttime'] = time.time() + if (state['args']['verbose'] >= 2 and + filename != state.get('lastFilename', None)): + print filename + state['lastFilename'] = filename + entityColl = getDb('entity', state) + linkColl = getDb('link', state) + state['lastcounts'] = { + 'entity': entityColl.count(), 'link': linkColl.count() + } + print('%d %d %d %5.3f' % ( + linesProcessed, state['lastcounts']['entity'], + state['lastcounts']['link'], time.time() - state['starttime'])) + if state['args']['verbose'] >= 2 and not linesProcessed % 100000: + if state.get('laststatcounts', {}) != state['lastcounts']: + showEntityStatistics(entityColl) + state['laststatcounts'] = state['lastcounts'] + + +def timer(name, action='toggle', report='auto', data=None): + """ + Track timing of functions to help optimize code. + + :param name: name of the timer. Required. + :param action: 'start': start the timer, 'stop': stop the timer, 'toggle': + switch between start and stop, anything else doesn't affect + the timer (can be used for reporting). + :param report: 'auto' to report all timer states no more than once every 10 + seconds, otherwise 'all' to report all timer states, True to + report just the specified timer, or anything else to not + report time. + :param data: if present, store this as some example data for the process. + """ + curtime = time.time() + if name not in Timers and action in ('start', 'stop', 'toggle'): + Timers[name] = {'count': 0, 'tally': 0, 'start': 0, 'data': None} + if action == 'start' or (action == 'toggle' and not Timers[name]['start']): + Timers[name]['start'] = curtime + elif (action == 'stop' and Timers[name]['start']) or action == 'toggle': + Timers[name]['count'] += 1 + Timers[name]['tally'] += curtime - Timers[name]['start'] + Timers[name]['start'] = 0 + if name in Timers and data is not None: + Timers[name]['data'] = data + if report == 'auto': + if curtime - Timers['lastreport'] > 10: + report = 'all' + if report == 'all' or report is True: + keys = sorted(Timers.keys()) + for key in keys: + if (key != 'lastreport' and (report == 'all' or key == name) and + Timers[key]['count']): + data = '' + if Timers[key]['data'] is not None: + data = ' ' + str(Timers[key]['data']) + print ('%s %d %5.3f %8.6f%s' % ( + key, Timers[key]['count'], Timers[key]['tally'], + Timers[key]['tally'] / Timers[key]['count'], data)) + if report == 'all': + Timers['lastreport'] = curtime + + +class AppendRegionAction(argparse.Action): + """Append an item to a list with the current value of region.""" + def __init__(self, option_strings, dest, nargs=None, const=None, + default=None, type=None, choices=None, required=False, + help=None, metavar=None): + super(AppendRegionAction, self).__init__( + option_strings=option_strings, dest=dest, nargs=nargs, const=const, + default=default, type=type, choices=choices, required=required, + help=help, metavar=metavar) + + def __call__(self, parser, namespace, values, option_string=None): + items = argparse._copy.copy(argparse._ensure_value( + namespace, self.dest, [])) + items.append((values, getattr(namespace, 'region', None))) + setattr(namespace, self.dest, items) + + +# -------- stand-alone program -------- + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Load messages into the entity graph database') + parser.add_argument( + '--calculate', '--calc', '-c', help='Calculate metrics.', + action='store_true', dest='calc') + parser.add_argument( + '--config', '--conf', help='The path to the config file') + parser.add_argument( + '--checknames', help='Check for duplicate names.', action='store_true') + parser.add_argument( + '--instagram', '-i', '--inst', + help='Ingest one or more files that contain Instagram messages in the ' + 'elasticsearch format. These may be compressed with gzip or bzip2, ' + 'and names with wildcards are allowed. The file is expected to ' + 'contain one record per line. Those records with _source keys are ' + 'ingested.', action=AppendRegionAction, dest='inst') + parser.add_argument( + '--metric', '-m', help='Explicitly choose which metrics are ' + 'calculated. Multiple metrics may be specified. Multiple processes ' + 'can be run in parallel with different metrics to increase overall ' + 'processing speed.', action='append') + parser.add_argument( + '--region', '-r', help='Subsequent input files will use this as ' + 'their region or subset. Set to blank to revert to parsing regions ' + 'if possible.') + parser.add_argument( + '--twitter', '-t', '--twit', + help='Ingest one or more files that contain Twitter messages in ' + 'either gnip or firehose json format. These may be compressed with ' + 'gzip or bzip2, and names with wildcards are allowed.', + action=AppendRegionAction, dest='twit') + parser.add_argument('--verbose', '-v', help='Increase verbosity', + action='count') + args = vars(parser.parse_args()) + state = { + 'args': args, + 'config': loadConfig(args['config']) + } + if args.get('checknames', False): + checkForDuplicateNames(state) + if args.get('inst', None): + for filespec, region in args['inst']: + for filename in sorted(glob.iglob(os.path.expanduser(filespec))): + ingestInstagramFile(filename, state, region) + if args.get('twit', None): + for filespec, region in args['twit']: + for filename in sorted(glob.iglob(os.path.expanduser(filespec))): + ingestTwitterFile(filename, state, region) + if args.get('calc', False): + calculateMetrics(state) + if state['args']['verbose'] >= 1: + pprint.pprint(state) diff --git a/continuous/metric.py b/continuous/metric.py new file mode 100644 index 0000000..1f67993 --- /dev/null +++ b/continuous/metric.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This has the general classes and functions for metric computation + +import collections +import importlib +import pprint + +LoadedMetrics = collections.OrderedDict() + + +class Metric(object): + """ + Subclass this to compute a metric on an entity. + """ + name = 'metric' + + def __init__(self, **kwargs): + self.args = kwargs + self.dependencies = [] # metrics that must run before this one + # does this need to operate on all links to the entity? + self.onLinks = False + # If we have to visit links, only visit updated links if this is True, + # otherwise visit all the links when recomputing the metric. + # This also determines if a metric should be recomputed if any link + # was updated, even if we aren't using the links. + self.onLinksOnlyNew = True + # does this need to operate on all other entities in combination with + # this entity? + self.onEntities = False + # If we have to visit entities, only visit updated entities if this is + # True, otherwise visit all the entities when recomputing the metric. + # This also determines if a metric should be recomputed if any entity + # was updated, even if we aren't using the entities. + self.onEntitiesOnlyNew = True + # If saveWork is True, the work record is saved along with the value of + # the metric + self.saveWork = False + + def calc(self, ga, work={}, **kwargs): + """ + Calculate the metric based on the accumulated or stand-alone data. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + :returns: the value of the metric. + """ + return + + def calcEntity(self, ga, gb, work={}, **kwargs): + """ + Subclass this to handle partial calculations based on a second entity. + + :param ga: the entity for which we are computing the metric. + :param gb: the secondary entity. + :param work: an object for working on the metric. Results should be + stored here. + """ + return + + def calcLink(self, ga, gb, link, work={}, **kwargs): + """ + Subclass this to handle partial calculations based on a link. + + :param ga: the entity for which we are computing the metric. + :param gb: the secondary entity. + :param link: the link between the two entities. + :param work: an object for working on the metric. Results should be + stored here. + """ + return + + +def loadMetric(metricClass, initVal=None): + """ + Load a metric and its dependencies. Keep the list of loaded metrics + ordered so that if we walk the list, all dependencies will be met. + + :param metricClass: the class of the metric to load or a string with the + name of a metric we should attempt to located. + :param initVal: a value to pass to the class when initializing it. + :returns: True if success, None if failed to load dependencies. + """ + if isinstance(metricClass, basestring): + className = 'Metric' + metricClass.capitalize() + if globals().get(className): + metricClass = globals().get(className) + else: + moduleName = 'metric' + metricClass + module = importlib.import_module(moduleName) + # We expect metrics to register themselves, but if they + # don't, we try the expected class name again. + if metricClass not in LoadedMetrics: + if getattr(module, className, None): + metricClass = getattr(module, className) + if isinstance(metricClass, basestring): + return None + if metricClass.name not in LoadedMetrics: + if initVal is None: + initVal = {} + elif not isinstance(initVal, dict): + initVal = {'value': initVal} + newMetric = metricClass(**initVal) + for dependency in newMetric.dependencies: + if dependency not in LoadedMetrics: + loadMetric(dependency) + LoadedMetrics[metricClass.name] = newMetric + return True + + +def topKCategories(entity): + """ + Return a set of categories used for tracking topk. + + :param entity: the entity for which to extract categories. + :returns: a set of categories. + """ + cat = set() + if entity.get('service', None): + cat.add(entity['service']) + for msg in entity.get('msgs', []): + cat.update([msg['service'] + '-' + subset for subset in msg['subset']]) + return cat + + +def topKSetsToLists(topkDict): + """ + Convert the sets in the topk list to lists so we can store them. This also + removes the id dictionary, as it is not necessary. + + :param topkDict: the dictionary of topk results. Modified. + """ + if 'topk' not in topkDict: + return + topk = topkDict['topk'] + if len(topk) and isinstance(topk[0][-1], set): + for pos in xrange(len(topk)): + topk[pos] = (topk[pos][0], topk[pos][1], list(topk[pos][2])) + if 'ids' in topkDict: + del topkDict['ids'] + + +def trackTopK(topkDict, value, id, cats, state): + """ + Track the top-k values for various services and subsets. Each service and + subset will have at least k entries in the list. + + :param topkDict: a dictionary to store the top-k in. + :param value: the value associated with an item. Higher values are kept. + :param id: the id of the item. If the id is already present, it will be + replaced. + :param cats: a set of categories to track for this item. + :param state: the state dictionary with the definition of k. + :return: True is we added this item into the top-k. False if it was too + minor. + """ + if 'topk' not in topkDict: + topkDict['topk'] = [] + topkDict['cats'] = {} + topkDict['ids'] = {} + topkDict['processed'] = 0 + if (state and 'config' in state and 'topk' in state['config'] and + state['config']['topk'].get('k')): + topkDict['k'] = (state['config']['topk']['k'] + + state['config']['topk'].get('extra', 0)) + else: + topkDict['k'] = 25 + topk = topkDict['topk'] + k = topkDict['k'] + topkDict['processed'] += 1 + # When we get our dictionary out of storage, it contains lists, not sets. + # We want to operate on sets. Also, recerate our ids dictionary. + if len(topk) and isinstance(topk[0][-1], list): + for pos in xrange(len(topk)): + topk[pos] = (topk[pos][0], topk[pos][1], set(topk[pos][2])) + topkDict['ids'] = dict.fromkeys([row[1] for row in topk]) + if not cats or not len(cats): + cats = set('default') + # If we already have this id, remove it + if id in topkDict['ids']: + for pos in xrange(len(topk)): + rval, rid, rcats = topk[pos] + if rid == id: + if rid in topkDict['ids']: + del topkDict['ids'][rid] + for cat in rcats: + topkDict['cats'][cat] -= 1 + topk[pos:pos+1] = [] + break + # Skip this one if we can tell it shouldn't be added. + if (len(topk) >= k and value < topk[-1][0] and + min([topkDict['cats'].get(cat, 0) for cat in cats]) >= k): + return False + # Add the entry to the list + entry = (value, id, cats) + topk.append(entry) + topk.sort() + topk.reverse() + topkDict['ids'][id] = True + for cat in cats: + topkDict['cats'][cat] = topkDict['cats'].get(cat, 0) + 1 + kept = trackTopKRemove(topkDict, entry) + if kept and state['args']['verbose'] >= 3: + pprint.pprint(topkDict) + return kept + + +def trackTopKRemove(topkDict, entry): + """ + Check if we need to remove any entries from the top-k list. Because of + keeping the top-k for multiple categories, one addition can result in + removing multiple rows. + + :param topk: the list of topk entries. + :param entry: a tuple of (value, id, cats) that was just added to the topk + list. + :return: True is we kept the item that was added into the top-k. False if + it was removed. + """ + k = topkDict['k'] + topk = topkDict['topk'] + kept = True + cats = entry[2] + if len(topk) > k: + while True: + counts = {cat: 0 for cat in cats} + remove = False + for pos in xrange(len(topk)): + rval, rid, rcats = topk[pos] + for cat in cats: + if cat in rcats: + counts[cat] += 1 + if (min(counts.values()) > k and rcats.issubset(cats)): + if topk[pos] == entry: + kept = False + if rid in topkDict['ids']: + del topkDict['ids'][rid] + for cat in rcats: + topkDict['cats'][cat] -= 1 + topk[pos:pos+1] = [] + remove = True + break + if not remove: + break + return kept diff --git a/continuous/metric1hop.py b/continuous/metric1hop.py new file mode 100644 index 0000000..b6abab0 --- /dev/null +++ b/continuous/metric1hop.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import metric + + +class Metric1hop(metric.Metric): + name = '1hop' + + def __init__(self, **kwargs): + super(Metric1hop, self).__init__(**kwargs) + + def calc(self, ga, **kwargs): + """ + Calculate the number of 1 hop neighbors. Since we already have the + neighbors in the entity record, this is just that length. + + :param ga: the entity for which we are computing the metric. + :returns: the number of 1-hop neighbors. + """ + # This does not include the node itself + return len(ga.get('neighbors', [])) + + +metric.loadMetric(Metric1hop) diff --git a/continuous/metric2hop.py b/continuous/metric2hop.py new file mode 100644 index 0000000..9366650 --- /dev/null +++ b/continuous/metric2hop.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import metric + + +class Metric2hop(metric.Metric): + name = '2hop' + + def __init__(self, **kwargs): + super(Metric2hop, self).__init__(**kwargs) + self.onLinksOnlyNew = False + + def calc(self, ga, entityColl, **kwargs): + """ + Calculate the number of 2 hop neighbors. + + :param ga: the entity for which we are computing the metric. + :param entityColl: the database collection used for querying neighbors. + :returns: the number of 2-hop neighbors, both excluding 1-hop-only + neighbors and including 1-hop-only neighbors. The central + node is never counted. + """ + if not len(ga.get('neighbors', [])): + return {'2hop': 0, 'lte2hop': 0} + neighbors = set() + for gb in entityColl.find({'_id': {'$in': list(neighbors)}}, + {'neighbors': True}): + neighbors.update(gb.get('neighbors', [])) + neighbors.discard(ga['_id']) + result = {'2hop': len(neighbors)} + neighbors.update(ga['neighbors']) + result['lte2hop'] = len(neighbors) + return result + + +metric.loadMetric(Metric2hop) diff --git a/continuous/metriclevenshtein.py b/continuous/metriclevenshtein.py new file mode 100644 index 0000000..d6a0bf3 --- /dev/null +++ b/continuous/metriclevenshtein.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import metric + + +# This is a straightforward implementation of a well-known algorithm, and thus +# probably shouldn't be covered by copyright to begin with. But in case it is, +# the author (Magnus Lie Hetland) has, to the extent possible under law, +# dedicated all copyright and related and neighboring rights to this software +# to the public domain worldwide, by distributing it under the CC0 license, +# version 1.0. This software is distributed without any warranty. For more +# information, see +def levenshtein(a, b): + """ + Calculates the Levenshtein distance between a and b. + + :param a: one string to compare. + :param b: the second string to compare. + :returns: the Levenshtein distance between the two strings. + """ + n, m = len(a), len(b) + if n > m: + # Make sure n <= m, to use O(min(n,m)) space + a, b = b, a + n, m = m, n + + current = range(n + 1) + for i in range(1, m + 1): + previous, current = current, [i] + [0] * n + for j in range(1, n + 1): + add, delete = previous[j] + 1, current[j - 1] + 1 + change = previous[j - 1] + if a[j - 1] != b[i - 1]: + change = change + 1 + current[j] = min(add, delete, change) + + return current[n] + + +def levenshteinSimilarity(s1, s2): + """ + Calculate and normalize the Levenshtein metric to a result of 0 (poor) to 1 + (perfect). + + :param s1: the first string + :param s2: the second string + :return: the normalized result. 1 is a perfect match. + """ + totalLen = float(len(s1) + len(s2)) + return (totalLen - levenshtein(s1, s2)) / totalLen + + +class MetricLevenshtein(metric.Metric): + name = 'levenshtein' + + def __init__(self, **kwargs): + super(MetricLevenshtein, self).__init__(**kwargs) + self.onEntities = True + self.saveWork = True + + def calc(self, ga, work, **kwargs): + """ + Calculate the Levenshtein top-k relations. + + :param ga: the entity for which we are computing the metric. + :returns: the top-k table of relations. + """ + metric.topKSetsToLists(work) + return work['topk'] + + def calcEntity(self, ga, gb, work={}, state={}, **kwargs): + """ + Calculate the Levenshtein similarity between these two entities. If + appopriate, add this to the top-k list. + + :param ga: the entity for which we are computing the metric. + :param gb: the secondary entity. + :param work: an object for working on the metric. This includes the + top-k data. + :param state: the state dictionary. + """ + # We actually calculate the BEST levenshtein similarity between any + # name of ga with any name of gb and use that. + sim = 0 + for gaName in ga['name']: + for gbName in gb['name']: + sim = max(sim, levenshteinSimilarity(gaName, gbName)) + if sim: + metric.trackTopK(work, sim, gb['_id'], metric.topKCategories(gb), + state) + + +metric.loadMetric(MetricLevenshtein) diff --git a/continuous/metricsubstring.py b/continuous/metricsubstring.py new file mode 100644 index 0000000..63b91ed --- /dev/null +++ b/continuous/metricsubstring.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import metric + + +def longestCommonSubstring(s1, s2): + """ + Return the longest common substring between two strings. + + :param s1: the first string + :param s2: the second string + :return: the longest common substring. + """ + if len(s2) > len(s1): + s1, s2 = s2, s1 + lens2p1 = len(s2) + 1 + for l in xrange(len(s2), 0, -1): + for s in xrange(lens2p1 - l): + substr = s2[s: s + l] + if substr in s1: + return substr + return '' + + +def substringSimilarity(s1, s2): + """ + Determine the longest common substring between two strings and normalize + the results to a scale of 0 to 1. + + :param s1: the first string + :param s2: the second string + :return: the normalized result. 1 is a perfect match. + """ + return 2.0 * len(longestCommonSubstring(s1, s2)) / (len(s1) + len(s2)) + + +class MetricSubstring(metric.Metric): + name = 'substring' + + def __init__(self, **kwargs): + super(MetricSubstring, self).__init__(**kwargs) + self.onEntities = True + self.saveWork = True + + def calc(self, ga, work, **kwargs): + """ + Calculate the substring top-k relations. + + :param ga: the entity for which we are computing the metric. + :returns: the top-k table of relations. + """ + metric.topKSetsToLists(work) + return work['topk'] + + def calcEntity(self, ga, gb, work={}, state={}, **kwargs): + """ + Calculate the substring similarity between these two entities. If + appopriate, add this to the top-k list. + + :param ga: the entity for which we are computing the metric. + :param gb: the secondary entity. + :param work: an object for working on the metric. This includes the + top-k data. + :param state: the state dictionary. + """ + # We actually calculate the BEST substring similarity between any name + # of ga with any name of gb and use that. + sim = 0 + for gaName in ga['name']: + for gbName in gb['name']: + # Note: both gaName and gbName are lowercase. We may wish to + # also find the substring match between fullnames. + sim = max(sim, substringSimilarity(gaName, gbName)) + if sim: + metric.trackTopK(work, sim, gb['_id'], metric.topKCategories(gb), + state) + + +metric.loadMetric(MetricSubstring) From cf540f040e9d13d09a4a038e65be5a201a28f080 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Fri, 21 Aug 2015 11:21:28 -0400 Subject: [PATCH 02/11] Don't let mongo timeout. --- continuous/ingest.py | 30 +++++++++++++++++------------- continuous/metric2hop.py | 2 +- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/continuous/ingest.py b/continuous/ingest.py index 3269474..3787c2e 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -66,17 +66,18 @@ def calculateMetrics(state, entities=None): metColl = getDb('metrics', state) entityColl = getDb('entity', state) linkColl = getDb('link', state) - latestEntity = entityColl.find().sort([ + latestEntity = entityColl.find(timeout=False).sort([ ('date_updated', pymongo.DESCENDING)]).limit(-1).next() if not latestEntity: # If there are no entities, we have nothing to do return latestEntity = latestEntity['date_updated'] - latestLink = linkColl.find().sort([ + latestLink = linkColl.find(timeout=False).sort([ ('date_updated', pymongo.DESCENDING)]).limit(-1).next() latestLink = 0 if not latestLink else latestLink['date_updated'] idQuery = {} if entities is None else {'_id': {'$in': entities}} - cursor = entityColl.find(idQuery).sort([('_id', pymongo.ASCENDING)]) + cursor = entityColl.find(idQuery, timeout=False).sort( + [('_id', pymongo.ASCENDING)]) count = numCalc = 0 starttime = lastreport = time.time() for entity in cursor: @@ -90,7 +91,7 @@ def calculateMetrics(state, entities=None): oldMetric = metColl.find_one({ 'entity': castObjectId(entity), 'metric': met - }) + }, timeout=False) # Already up to date if oldMetric and oldMetric['date_updated'] >= date: continue @@ -143,15 +144,15 @@ def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, if metClass.onEntities: query = ({} if not metClass.onEntitiesOnlyNew or refresh else {'date_updated': {'$gte': metricDoc['date_updated']}}) - for gb in entityColl.find(query): + for gb in entityColl.find(query, timeout=False): if castObjectId(gb) != entityId: metClass.calcEntity(entity, gb, **kwargs) if metClass.onLinks: query = ({} if not metClass.onLinksOnlyNew or refresh else {'date_updated': {'$gte': metricDoc['date_updated']}}) query['ga'] = entityId - for link in linkColl.find(query): - gb = entityColl.find_one(castObjectId(link['gb'])) + for link in linkColl.find(query, timeout=False): + gb = entityColl.find_one(castObjectId(link['gb']), timeout=False) metClass.calcLink(entity, gb, link, **kwargs) value = metClass.calc(entity, **kwargs) if metClass.saveWork: @@ -363,6 +364,9 @@ def getDb(dbName, state): [('gb', pymongo.ASCENDING)], [('date_updated', pymongo.ASCENDING)], ], + 'metrics': [ + [('entity', pymongo.ASCENDING)], + ], } for index in indices.get(dbName, []): coll.create_index(index) @@ -498,7 +502,7 @@ def ingestMessage(state, msg): if entityColl.find_one({'msgs': {'$elemMatch': { 'service': msg['service'], 'msg_id': msg['msg_id'], 'subset': msg['subset'], - }}}, {'_id': True}, limit=1): + }}}, {'_id': True}, limit=1, timeout=False): return False entity, changed = getEntityByName(state, { 'service': msg['service'], @@ -671,8 +675,8 @@ def mergeEntities(state, mainId, secondId): secondId = castObjectId(secondId) if state['args']['verbose'] >= 2: print 'merge:', mainId, secondId - main = entityColl.find_one({'_id': mainId}) - second = entityColl.find_one({'_id': secondId}) + main = entityColl.find_one({'_id': mainId}, timeout=False) + second = entityColl.find_one({'_id': secondId}, timeout=False) main['msgs'].extend(second['msgs']) main['date_updated'] = time.time() if secondId in main['neighbors']: @@ -687,10 +691,10 @@ def mergeEntities(state, mainId, secondId): multi=True) # update links linkColl = getDb('link', state) - for link in linkColl.find({'ga': secondId}): + for link in linkColl.find({'ga': secondId}, timeout=False): addLink(linkColl, mainId, link['gb'], link['linktype'], link['weight']) linkColl.remove({'ga': secondId}) - for link in linkColl.find({'gb': secondId}): + for link in linkColl.find({'gb': secondId}, timeout=False): addLink(linkColl, link['ga'], mainId, link['linktype'], link['weight']) linkColl.remove({'gb': secondId}) # Don't allow self link @@ -711,7 +715,7 @@ def checkForDuplicateNames(state): """ entityColl = getDb('entity', state) names = {} - for entity in entityColl.find({}): + for entity in entityColl.find({}, timeout=False): service = entity['service'] if service not in names: names[service] = {} diff --git a/continuous/metric2hop.py b/continuous/metric2hop.py index 9366650..d906999 100644 --- a/continuous/metric2hop.py +++ b/continuous/metric2hop.py @@ -25,7 +25,7 @@ def calc(self, ga, entityColl, **kwargs): return {'2hop': 0, 'lte2hop': 0} neighbors = set() for gb in entityColl.find({'_id': {'$in': list(neighbors)}}, - {'neighbors': True}): + {'neighbors': True}, timeout=False): neighbors.update(gb.get('neighbors', [])) neighbors.discard(ga['_id']) result = {'2hop': len(neighbors)} From 4c539b859c611552daada13b55437655e019d236 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Fri, 21 Aug 2015 12:37:49 -0400 Subject: [PATCH 03/11] Added limit, offset, and recent flags to aid parallelization of metric calculations. --- continuous/ingest.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/continuous/ingest.py b/continuous/ingest.py index 3787c2e..7a5aeb3 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -77,7 +77,12 @@ def calculateMetrics(state, entities=None): latestLink = 0 if not latestLink else latestLink['date_updated'] idQuery = {} if entities is None else {'_id': {'$in': entities}} cursor = entityColl.find(idQuery, timeout=False).sort( - [('_id', pymongo.ASCENDING)]) + [('_id', pymongo.ASCENDING if not state['args'].get('recent', False) + else pymongo.DESCENDING)]) + if state['args'].get('offset'): + cursor.skip(state['args']['offset']) + if state['args'].get('limit'): + cursor.limit(state['args']['limit']) count = numCalc = 0 starttime = lastreport = time.time() for entity in cursor: @@ -1006,11 +1011,20 @@ def __call__(self, parser, namespace, values, option_string=None): 'and names with wildcards are allowed. The file is expected to ' 'contain one record per line. Those records with _source keys are ' 'ingested.', action=AppendRegionAction, dest='inst') + parser.add_argument( + '--limit', '-l', help='Limit the number of entities for metrics ' + 'calculations.', type=int) parser.add_argument( '--metric', '-m', help='Explicitly choose which metrics are ' 'calculated. Multiple metrics may be specified. Multiple processes ' 'can be run in parallel with different metrics to increase overall ' 'processing speed.', action='append') + parser.add_argument( + '--offset', '-o', help='Offset within entities for metrics ' + 'calculations.', type=int) + parser.add_argument( + '--recent', help='Calculate metrics for the most recently added ' + 'entities first.', action='store_true') parser.add_argument( '--region', '-r', help='Subsequent input files will use this as ' 'their region or subset. Set to blank to revert to parsing regions ' From 22c8b39b9cd9c1010df8707791d3a9d9e89db91e Mon Sep 17 00:00:00 2001 From: David Manthey Date: Fri, 21 Aug 2015 15:12:25 -0400 Subject: [PATCH 04/11] Speed improvements. Calculate entity-spanning metrics faster by buffering queries and by only fetching relevant fields. --- continuous/ingest.py | 40 +++++++++++++++++++++++++++++---- continuous/metric.py | 4 ++++ continuous/metriclevenshtein.py | 6 +++++ continuous/metricsubstring.py | 6 +++++ 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/continuous/ingest.py b/continuous/ingest.py index 7a5aeb3..35ad5ee 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -33,6 +33,32 @@ # -------- General Functions -------- +def bufferedQuery(buffer, key, coll, query, fields=None, maxBuffer=2000000): + """ + Buffer some queries we expect to be repeated. When using a buffered query, + the results are a list instead of a cursor. + + :param buffer: a dictionary to store the buffered parameters and results. + :param key: the key to use for the query buffering. + :param coll: the collection to query. + :param query: the query. + :param fields: the fields to return from the query. + :param maxBuffer: max count to buffer. + :returns: either a cursor or a list. + """ + if (key in buffer and coll == buffer[key]['coll'] and + query == buffer[key]['query'] and + fields == buffer[key]['fields'] and buffer[key]['last']): + return buffer[key]['last'] + cursor = coll.find(query, fields, timeout=False) + if cursor.count() <= maxBuffer: + cursor = [row for row in cursor] + buffer[key] = { + 'coll': coll, 'query': query, 'fields': fields, 'last': cursor + } + return cursor + + def castObjectId(id): """ Make sure an obejct is an ObjectId or None. @@ -85,6 +111,7 @@ def calculateMetrics(state, entities=None): cursor.limit(state['args']['limit']) count = numCalc = 0 starttime = lastreport = time.time() + queryBuffer = {} for entity in cursor: for met in metric.LoadedMetrics: metClass = metric.LoadedMetrics[met] @@ -101,7 +128,7 @@ def calculateMetrics(state, entities=None): if oldMetric and oldMetric['date_updated'] >= date: continue calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, - oldMetric, state) + oldMetric, state, starttime, queryBuffer) numCalc += 1 count += 1 if state['args']['verbose'] >= 1: @@ -113,7 +140,7 @@ def calculateMetrics(state, entities=None): def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, - metricDoc, state): + metricDoc, state, updateTime, queryBuffer=None): """ Calculate the value of a single metric for a single entity. @@ -126,6 +153,9 @@ def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, record. May be None to force a completely fresh computation. :param state: the state dictionary with the config values. + :param updateTime: the time to mark the metric as updated. + :param queryBuffer: if set, a buffer that can be used to skip some repeated + queries. """ entityId = castObjectId(entity) if metricDoc is None: @@ -144,12 +174,14 @@ def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, } # The time has to be before we do the computation, as data could be added # during the comptation. - metricDoc['date_updated'] = time.time() + metricDoc['date_updated'] = updateTime or time.time() refresh = (metClass.saveWork and work == {}) if metClass.onEntities: query = ({} if not metClass.onEntitiesOnlyNew or refresh else {'date_updated': {'$gte': metricDoc['date_updated']}}) - for gb in entityColl.find(query, timeout=False): + fields = getattr(metClass, 'entityFields', None) + for gb in bufferedQuery(queryBuffer, metClass.name, entityColl, query, + fields): if castObjectId(gb) != entityId: metClass.calcEntity(entity, gb, **kwargs) if metClass.onLinks: diff --git a/continuous/metric.py b/continuous/metric.py index 1f67993..295e49e 100644 --- a/continuous/metric.py +++ b/continuous/metric.py @@ -34,6 +34,10 @@ def __init__(self, **kwargs): # This also determines if a metric should be recomputed if any entity # was updated, even if we aren't using the entities. self.onEntitiesOnlyNew = True + # If self.entityFields is specified, only those fields are retreived + # when visiting entities. This can be used to reduce data transfer + # from the database. + # self.entityFields = {'name': True} # If saveWork is True, the work record is saved along with the value of # the metric self.saveWork = False diff --git a/continuous/metriclevenshtein.py b/continuous/metriclevenshtein.py index d6a0bf3..008245c 100644 --- a/continuous/metriclevenshtein.py +++ b/continuous/metriclevenshtein.py @@ -57,6 +57,12 @@ class MetricLevenshtein(metric.Metric): def __init__(self, **kwargs): super(MetricLevenshtein, self).__init__(**kwargs) self.onEntities = True + self.entityFields = { + 'name': True, + 'service': True, + 'msgs.service': True, + 'msgs.subset': True + } self.saveWork = True def calc(self, ga, work, **kwargs): diff --git a/continuous/metricsubstring.py b/continuous/metricsubstring.py index 63b91ed..0609894 100644 --- a/continuous/metricsubstring.py +++ b/continuous/metricsubstring.py @@ -41,6 +41,12 @@ class MetricSubstring(metric.Metric): def __init__(self, **kwargs): super(MetricSubstring, self).__init__(**kwargs) self.onEntities = True + self.entityFields = { + 'name': True, + 'service': True, + 'msgs.service': True, + 'msgs.subset': True + } self.saveWork = True def calc(self, ga, work, **kwargs): From 2085b9393738c63cd3af0d92150adaed32600109 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Mon, 24 Aug 2015 09:51:49 -0400 Subject: [PATCH 05/11] Work with pymongo 3. Add clear and recalc options to invalidate metrics that I have updated. Expand substring and levenshtein metrics to handle name and fullname and the combination of the two. --- continuous/conf.json | 14 +++---- continuous/ingest.py | 70 +++++++++++++++++++++++++++------ continuous/metric.py | 22 +++++++++++ continuous/metriclevenshtein.py | 52 ++++++++++++++++++++---- continuous/metricsubstring.py | 50 +++++++++++++++++++---- 5 files changed, 172 insertions(+), 36 deletions(-) diff --git a/continuous/conf.json b/continuous/conf.json index 2fa6c7c..bb128ca 100644 --- a/continuous/conf.json +++ b/continuous/conf.json @@ -1,25 +1,21 @@ { "db": { "entity": { - "dbUri": "mongodb://10.0.2.2:27017/entity", + "dbUri": "mongodb://127.0.0.1:27017/syria", "collection": "entity" }, "link": { - "dbUri": "mongodb://10.0.2.2:27017/entity", + "dbUri": "mongodb://127.0.0.1:27017/syria", "collection": "link" }, "metrics": { - "dbUri": "mongodb://10.0.2.2:27017/entity", + "dbUri": "mongodb://127.0.0.1:27017/syria", "collection": "metrics" - }, - "topk": { - "dbUri": "mongodb://10.0.2.2:27017/entity", - "collection": "topk" } }, "topk": { - "k": 10, - "extra": 0 + "k": 25, + "extra": 25 }, "metrics": { "1hop": true, diff --git a/continuous/ingest.py b/continuous/ingest.py index 35ad5ee..42eadcc 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -122,7 +122,7 @@ def calculateMetrics(state, entities=None): date = max(date, latestLink) oldMetric = metColl.find_one({ 'entity': castObjectId(entity), - 'metric': met + 'metric': metClass.name }, timeout=False) # Already up to date if oldMetric and oldMetric['date_updated'] >= date: @@ -180,17 +180,24 @@ def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, query = ({} if not metClass.onEntitiesOnlyNew or refresh else {'date_updated': {'$gte': metricDoc['date_updated']}}) fields = getattr(metClass, 'entityFields', None) - for gb in bufferedQuery(queryBuffer, metClass.name, entityColl, query, - fields): - if castObjectId(gb) != entityId: - metClass.calcEntity(entity, gb, **kwargs) + res = bufferedQuery(queryBuffer, metClass.name, entityColl, query, + fields) + if ((isinstance(res, list) and len(res)) or + (not ininstance(res, list) and not res.count())): + metClass.calcEntityPrep(entity, **kwargs) + for gb in res: + if castObjectId(gb) != entityId: + metClass.calcEntity(entity, gb, **kwargs) if metClass.onLinks: query = ({} if not metClass.onLinksOnlyNew or refresh else {'date_updated': {'$gte': metricDoc['date_updated']}}) query['ga'] = entityId - for link in linkColl.find(query, timeout=False): - gb = entityColl.find_one(castObjectId(link['gb']), timeout=False) - metClass.calcLink(entity, gb, link, **kwargs) + res = linkColl.find(query, timeout=False) + if res.count(): + for link in res: + gb = entityColl.find_one(castObjectId(link['gb']), + timeout=False) + metClass.calcLink(entity, gb, link, **kwargs) value = metClass.calc(entity, **kwargs) if metClass.saveWork: metricDoc['work'] = work @@ -338,7 +345,7 @@ def convertTwitterJSONToMsg(tw): } msg['url'] = 'http://twitter.com/%s/statuses/%s' % ( msg['user_id'], msg['msg_id']) - if ('coordinates' in tw and 'coordinates' in tw['coordinates'] and + if (tw.get('coordinates') and 'coordinates' in tw['coordinates'] and len(tw['coordinates']['coordinates']) >= 2): msg['latitude'] = tw['coordinates']['coordinates'][1] msg['longitude'] = tw['coordinates']['coordinates'][0] @@ -634,7 +641,9 @@ def ingestMessageEdges(state, entity, msg): }) # If we added this link as a neighbor, then we know the edges # are new edges and not increased weights to existing edges. - if updateResult['nModified']: + # Using nModified or n allows use of pymongo 2 or pymongo 3 + # (which is independent of the version of Mongo). + if updateResult.get('nModified', updateResult.get('n')): isNew = True # We are currently bidirectional on everything addLink(linkColl, entityId, linkId, linktype, isNew=isNew, @@ -899,6 +908,26 @@ def openFile(filename): return open(filename, 'rb') +def resetMetrics(state): + """ + Mark metrics as dirty or delete known values. + + :param state: the state dictionary with the config values. + """ + metricDict = state['config'].get('metrics', {}) + if state['args']['metric']: + metricDict = dict.fromkeys(state['args']['metric']) + for met in metricDict: + metric.loadMetric(met, metricDict[met]) + metColl = getDb('metrics', state) + for met in metric.LoadedMetrics: + metClass = metric.LoadedMetrics[met] + if state['args'].get('clear'): + metColl.remove({'metric': metClass.name}) + elif state['args'].get('recalc'): + metColl.update({'metric': metClass.name}, {'date_updated': 0}) + + def showEntityStatistics(entityColl): """ Report on distributions and statistics of the entity collection. @@ -946,7 +975,7 @@ def showProgress(linesProcessed, state, filename): return if 'starttime' not in state: state['starttime'] = time.time() - if (state['args']['verbose'] >= 2 and + if (state['args']['verbose'] >= 1 and filename != state.get('lastFilename', None)): print filename state['lastFilename'] = filename @@ -1032,10 +1061,13 @@ def __call__(self, parser, namespace, values, option_string=None): parser.add_argument( '--calculate', '--calc', '-c', help='Calculate metrics.', action='store_true', dest='calc') - parser.add_argument( - '--config', '--conf', help='The path to the config file') parser.add_argument( '--checknames', help='Check for duplicate names.', action='store_true') + parser.add_argument( + '--clear', '--clear-metrics', help='Clear metrics so that they must ' + 'be recalculated completely.', action='store_true') + parser.add_argument( + '--config', '--conf', help='The path to the config file') parser.add_argument( '--instagram', '-i', '--inst', help='Ingest one or more files that contain Instagram messages in the ' @@ -1054,6 +1086,11 @@ def __call__(self, parser, namespace, values, option_string=None): parser.add_argument( '--offset', '-o', help='Offset within entities for metrics ' 'calculations.', type=int) + parser.add_argument( + '--recalculate', '--recalc', help='Mark metrics as dirty so that they ' + 'are all updated. This is less aggresive than clearing, as they are ' + 'not deleted and may reuse partial work.', action='store_true', + dest='recalc') parser.add_argument( '--recent', help='Calculate metrics for the most recently added ' 'entities first.', action='store_true') @@ -1085,6 +1122,13 @@ def __call__(self, parser, namespace, values, option_string=None): for filename in sorted(glob.iglob(os.path.expanduser(filespec))): ingestTwitterFile(filename, state, region) if args.get('calc', False): + if args.get('clear') or args.get('recalc'): + resetMetrics(state) calculateMetrics(state) if state['args']['verbose'] >= 1: pprint.pprint(state) +# TODO: +# Ingest zip files +# ingest json from zip files +# Ingest elasticsearch instagram data from xdata +# Run metrics against entities not in the database diff --git a/continuous/metric.py b/continuous/metric.py index 295e49e..9de653b 100644 --- a/continuous/metric.py +++ b/continuous/metric.py @@ -64,6 +64,17 @@ def calcEntity(self, ga, gb, work={}, **kwargs): """ return + def calcEntityPrep(self, ga, work={}, **kwargs): + """ + Subclass this to handle partial calculations based on a second entity. + This is called before calcEntity is called on each second entity. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + """ + return + def calcLink(self, ga, gb, link, work={}, **kwargs): """ Subclass this to handle partial calculations based on a link. @@ -76,6 +87,17 @@ def calcLink(self, ga, gb, link, work={}, **kwargs): """ return + def calcLinkPrep(self, ga, work={}, **kwargs): + """ + Subclass this to handle partial calculations based on a link. + This is called before calcLink is called on each link. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + """ + return + def loadMetric(metricClass, initVal=None): """ diff --git a/continuous/metriclevenshtein.py b/continuous/metriclevenshtein.py index 008245c..a210ce8 100644 --- a/continuous/metriclevenshtein.py +++ b/continuous/metriclevenshtein.py @@ -59,6 +59,7 @@ def __init__(self, **kwargs): self.onEntities = True self.entityFields = { 'name': True, + 'fullname': True, 'service': True, 'msgs.service': True, 'msgs.subset': True @@ -72,8 +73,12 @@ def calc(self, ga, work, **kwargs): :param ga: the entity for which we are computing the metric. :returns: the top-k table of relations. """ - metric.topKSetsToLists(work) - return work['topk'] + res = {} + for key in ('name', 'fullname', 'name_fullname'): + metric.topKSetsToLists(work[key]) + if 'topk' in work[key]: + res[key] = work[key]['topk'] + return res def calcEntity(self, ga, gb, work={}, state={}, **kwargs): """ @@ -88,13 +93,46 @@ def calcEntity(self, ga, gb, work={}, state={}, **kwargs): """ # We actually calculate the BEST levenshtein similarity between any # name of ga with any name of gb and use that. - sim = 0 + simName = simFull = simBoth = 0 for gaName in ga['name']: for gbName in gb['name']: - sim = max(sim, levenshteinSimilarity(gaName, gbName)) - if sim: - metric.trackTopK(work, sim, gb['_id'], metric.topKCategories(gb), - state) + # Note: both gaName and gbName are lowercase. We may wish to + # also find the substring match between fullnames. + simName = max(simName, levenshteinSimilarity(gaName, gbName)) + for gbName in gb['fullname']: + simBoth = max(simBoth, levenshteinSimilarity(gaName, + gbName.lower())) + for gaName in ga['fullname']: + for gbName in gb['fullname']: + # Note: both gaName and gbName are lowercase. We may wish to + # also find the levenshtein match between fullnames. + simFull = max(simName, levenshteinSimilarity( + gaName.lower(), gbName.lower())) + for gbName in gb['name']: + simBoth = max(simBoth, levenshteinSimilarity(gaName.lower(), + gbName)) + simBoth = max(simBoth, simName, simFull) + if simName: + metric.trackTopK(work['name'], simName, gb['_id'], + metric.topKCategories(gb), state) + if simFull: + metric.trackTopK(work['fullname'], simFull, gb['_id'], + metric.topKCategories(gb), state) + if simBoth: + metric.trackTopK(work['name_fullname'], simBoth, gb['_id'], + metric.topKCategories(gb), state) + + def calcEntityPrep(self, ga, work={}, **kwargs): + """ + This is called before calcEntity is called on each second entity. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + """ + for key in ('name', 'fullname', 'name_fullname'): + if not key in work: + work[key] = {} metric.loadMetric(MetricLevenshtein) diff --git a/continuous/metricsubstring.py b/continuous/metricsubstring.py index 0609894..4a47e92 100644 --- a/continuous/metricsubstring.py +++ b/continuous/metricsubstring.py @@ -43,6 +43,7 @@ def __init__(self, **kwargs): self.onEntities = True self.entityFields = { 'name': True, + 'fullname': True, 'service': True, 'msgs.service': True, 'msgs.subset': True @@ -56,8 +57,12 @@ def calc(self, ga, work, **kwargs): :param ga: the entity for which we are computing the metric. :returns: the top-k table of relations. """ - metric.topKSetsToLists(work) - return work['topk'] + res = {} + for key in ('name', 'fullname', 'name_fullname'): + metric.topKSetsToLists(work[key]) + if 'topk' in work[key]: + res[key] = work[key]['topk'] + return res def calcEntity(self, ga, gb, work={}, state={}, **kwargs): """ @@ -72,15 +77,46 @@ def calcEntity(self, ga, gb, work={}, state={}, **kwargs): """ # We actually calculate the BEST substring similarity between any name # of ga with any name of gb and use that. - sim = 0 + simName = simFull = simBoth = 0 for gaName in ga['name']: for gbName in gb['name']: # Note: both gaName and gbName are lowercase. We may wish to # also find the substring match between fullnames. - sim = max(sim, substringSimilarity(gaName, gbName)) - if sim: - metric.trackTopK(work, sim, gb['_id'], metric.topKCategories(gb), - state) + simName = max(simName, substringSimilarity(gaName, gbName)) + for gbName in gb['fullname']: + simBoth = max(simBoth, substringSimilarity(gaName, + gbName.lower())) + for gaName in ga['fullname']: + for gbName in gb['fullname']: + # Note: both gaName and gbName are lowercase. We may wish to + # also find the substring match between fullnames. + simFull = max(simName, substringSimilarity( + gaName.lower(), gbName.lower())) + for gbName in gb['name']: + simBoth = max(simBoth, substringSimilarity(gaName.lower(), + gbName)) + simBoth = max(simBoth, simName, simFull) + if simName: + metric.trackTopK(work['name'], simName, gb['_id'], + metric.topKCategories(gb), state) + if simFull: + metric.trackTopK(work['fullname'], simFull, gb['_id'], + metric.topKCategories(gb), state) + if simBoth: + metric.trackTopK(work['name_fullname'], simBoth, gb['_id'], + metric.topKCategories(gb), state) + + def calcEntityPrep(self, ga, work={}, **kwargs): + """ + This is called before calcEntity is called on each second entity. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + """ + for key in ('name', 'fullname', 'name_fullname'): + if not key in work: + work[key] = {} metric.loadMetric(MetricSubstring) From 611495f4881a49382930e99e5e2f138aa4dae1f1 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Mon, 24 Aug 2015 11:03:35 -0400 Subject: [PATCH 06/11] Fixed a typo in the metric calculator. --- continuous/ingest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/continuous/ingest.py b/continuous/ingest.py index 42eadcc..3cff08a 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -183,7 +183,7 @@ def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, res = bufferedQuery(queryBuffer, metClass.name, entityColl, query, fields) if ((isinstance(res, list) and len(res)) or - (not ininstance(res, list) and not res.count())): + (not isinstance(res, list) and not res.count())): metClass.calcEntityPrep(entity, **kwargs) for gb in res: if castObjectId(gb) != entityId: From d5a7b14ddcdc1130ccebc7e561a0f7de2b7c250e Mon Sep 17 00:00:00 2001 From: David Manthey Date: Mon, 24 Aug 2015 13:58:57 -0400 Subject: [PATCH 07/11] Modifications to allow an external program to calculate metrics for entities not in the database. --- continuous/ingest.py | 57 ++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/continuous/ingest.py b/continuous/ingest.py index 3cff08a..3673395 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -68,13 +68,13 @@ def castObjectId(id): :returns: an ObjectId or None. """ if isinstance(id, dict): - id = id['_id'] + id = id.get('_id') if id is None or type(id) is ObjectId: return id return ObjectId(id) -def calculateMetrics(state, entities=None): +def calculateMetrics(state, entities=None, nonDbEntities=None): """ Calculate metrics for dirty entities. @@ -102,9 +102,14 @@ def calculateMetrics(state, entities=None): ('date_updated', pymongo.DESCENDING)]).limit(-1).next() latestLink = 0 if not latestLink else latestLink['date_updated'] idQuery = {} if entities is None else {'_id': {'$in': entities}} - cursor = entityColl.find(idQuery, timeout=False).sort( - [('_id', pymongo.ASCENDING if not state['args'].get('recent', False) - else pymongo.DESCENDING)]) + if nonDbEntities is not None: + cursor = nonDbEntities + numEntities = len(nonDbEntities) + else: + cursor = entityColl.find(idQuery, timeout=False).sort( + [('_id', pymongo.ASCENDING if not + state['args'].get('recent', False) else pymongo.DESCENDING)]) + numEntities = cursor.count() if state['args'].get('offset'): cursor.skip(state['args']['offset']) if state['args'].get('limit'): @@ -115,26 +120,34 @@ def calculateMetrics(state, entities=None): for entity in cursor: for met in metric.LoadedMetrics: metClass = metric.LoadedMetrics[met] - date = entity['date_updated'] - if metClass.onEntities or not metClass.onEntitiesOnlyNew: - date = max(date, latestEntity) - if metClass.onLinks or not metClass.onLinksOnlyNew: - date = max(date, latestLink) - oldMetric = metColl.find_one({ - 'entity': castObjectId(entity), - 'metric': metClass.name - }, timeout=False) - # Already up to date - if oldMetric and oldMetric['date_updated'] >= date: - continue - calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, - oldMetric, state, starttime, queryBuffer) + if nonDbEntities is None: + date = entity['date_updated'] + if metClass.onEntities or not metClass.onEntitiesOnlyNew: + date = max(date, latestEntity) + if metClass.onLinks or not metClass.onLinksOnlyNew: + date = max(date, latestLink) + oldMetric = metColl.find_one({ + 'entity': castObjectId(entity), + 'metric': metClass.name + }, timeout=False) + # Already up to date + if oldMetric and oldMetric['date_updated'] >= date: + continue + else: + oldMetric = None + metricDoc = calculateOneMetric( + entityColl, linkColl, metColl, metClass, entity, oldMetric, + state, starttime, queryBuffer) + if nonDbEntities is not None: + if not 'metrics' in entity: + entity['metrics'] = {} + entity['metrics'][metClass.name] = metricDoc numCalc += 1 count += 1 if state['args']['verbose'] >= 1: curtime = time.time() if curtime - lastreport > 10.0 / state['args']['verbose']: - print '%d %d %d %5.3f' % (count, numCalc, cursor.count(), + print '%d %d %d %5.3f' % (count, numCalc, numEntities, curtime - starttime) lastreport = curtime @@ -202,7 +215,9 @@ def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, if metClass.saveWork: metricDoc['work'] = work metricDoc['value'] = value - metColl.save(metricDoc) + if entityId is not None: + metColl.save(metricDoc) + return metricDoc def convertInstagramESToMsg(inst, subset='unknown'): From 5a0934ff5c1552f689adb01025b465039e7d249c Mon Sep 17 00:00:00 2001 From: David Manthey Date: Tue, 25 Aug 2015 16:34:17 -0400 Subject: [PATCH 08/11] Work for generating an elasticsearch database. Fixed a bug in generating full name metrics. --- continuous/ingest.py | 13 +- continuous/metriclevenshtein.py | 15 +- continuous/metricsubstring.py | 2 +- continuous/xmlmetric.py | 236 ++++++++++++++++++++++++++++++++ 4 files changed, 260 insertions(+), 6 deletions(-) create mode 100644 continuous/xmlmetric.py diff --git a/continuous/ingest.py b/continuous/ingest.py index 3673395..27dbc5f 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -74,13 +74,18 @@ def castObjectId(id): return ObjectId(id) -def calculateMetrics(state, entities=None, nonDbEntities=None): +def calculateMetrics(state, entities=None, nonDbEntities=None, callback=None): """ Calculate metrics for dirty entities. :param state: the state dictionary with the config values. :param entities: a list of entities to calculate the metrics for. If None, calculate the metrics for all entities. + :param nonDbEntities: a list of entity-like records that didn't come from + our database. If this is not None, the entities are + not used. + :param callback: a function to call after calculating a metric. + callback(state, entity, metClass, metric) is called. """ metricDict = state['config'].get('metrics', {}) if state['args']['metric']: @@ -143,6 +148,8 @@ def calculateMetrics(state, entities=None, nonDbEntities=None): entity['metrics'] = {} entity['metrics'][metClass.name] = metricDoc numCalc += 1 + if callback: + callback(state, entity, metClass, metricDoc) count += 1 if state['args']['verbose'] >= 1: curtime = time.time() @@ -347,6 +354,8 @@ def convertTwitterJSONToMsg(tw): :param tw: the twitter record. :returns: a message record or None for failed. """ + if tw.get('user') is None: + return msg = { 'service': 'twitter', 'subset': 'unknown', @@ -587,8 +596,6 @@ def ingestMessage(state, msg): 'service': msg['service'], 'subset': [msg['subset']], 'msg_id': msg['msg_id'], - 'latitude': msg.get('latitude', None), - 'longitude': msg.get('longitude', None), 'date': msg['msg_date'] } for key in ('latitude', 'longitude', 'source'): diff --git a/continuous/metriclevenshtein.py b/continuous/metriclevenshtein.py index a210ce8..3983125 100644 --- a/continuous/metriclevenshtein.py +++ b/continuous/metriclevenshtein.py @@ -1,6 +1,10 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +try: + import Levenshtein +except Exception: + Levenshtein = None import metric @@ -48,7 +52,14 @@ def levenshteinSimilarity(s1, s2): :return: the normalized result. 1 is a perfect match. """ totalLen = float(len(s1) + len(s2)) - return (totalLen - levenshtein(s1, s2)) / totalLen + # The C-module version of Levenshtein is vastly faster + if Levenshtein is None: + return (totalLen - levenshtein(s1, s2)) / totalLen + if isinstance(s1, str): + s1 = s1.decode('utf8') + if isinstance(s2, str): + s2 = s2.decode('utf8') + return (totalLen - Levenshtein.distance(s1, s2)) / totalLen class MetricLevenshtein(metric.Metric): @@ -106,7 +117,7 @@ def calcEntity(self, ga, gb, work={}, state={}, **kwargs): for gbName in gb['fullname']: # Note: both gaName and gbName are lowercase. We may wish to # also find the levenshtein match between fullnames. - simFull = max(simName, levenshteinSimilarity( + simFull = max(simFull, levenshteinSimilarity( gaName.lower(), gbName.lower())) for gbName in gb['name']: simBoth = max(simBoth, levenshteinSimilarity(gaName.lower(), diff --git a/continuous/metricsubstring.py b/continuous/metricsubstring.py index 4a47e92..5653730 100644 --- a/continuous/metricsubstring.py +++ b/continuous/metricsubstring.py @@ -90,7 +90,7 @@ def calcEntity(self, ga, gb, work={}, state={}, **kwargs): for gbName in gb['fullname']: # Note: both gaName and gbName are lowercase. We may wish to # also find the substring match between fullnames. - simFull = max(simName, substringSimilarity( + simFull = max(simFull, substringSimilarity( gaName.lower(), gbName.lower())) for gbName in gb['name']: simBoth = max(simBoth, substringSimilarity(gaName.lower(), diff --git a/continuous/xmlmetric.py b/continuous/xmlmetric.py new file mode 100644 index 0000000..93bb0d6 --- /dev/null +++ b/continuous/xmlmetric.py @@ -0,0 +1,236 @@ +import elasticsearch +import ingest +import pprint +import sys +import time +import urllib3 +import xml.etree.ElementTree + + +def convertXMLToObject(node): + """ + Convert an ElementTree node to a more pythonic object. + + :param node: the element to convert. + :returns: a python object. + """ + if hasattr(node, 'getroot'): + node = node.getroot() + value = None + children = list(node) + if children: + value = {} + for child in children: + childval = convertXMLToObject(child) + if childval is not None: + if child.tag not in value: + value[child.tag] = [] + value[child.tag].append(childval) + for tag in value: + if len(value[tag]) == 1 and not isinstance(value[tag][0], dict): + value[tag] = value[tag][0] + if not len(value): + value = None + else: + if node.text: + value = node.text.strip() + elif hasattr(node, 'attribute'): + value = node.attribute + if value is not None and not len(value): + value = None + return value + + +dbref = None + +def getDb(): + """ + Get a connection to Elasticsearch. + + :returns: a database connection. + :returns: the default index we want to use. + """ + global dbref + index = 'test1' + if dbref is None: + dbref = elasticsearch.Elasticsearch(hosts=[{ + 'host': '127.0.0.1', + 'port': 9200, + 'url_prefix': '/', + 'timeout': urllib3.Timeout(read=150, connect=10) + }]) + return dbref, index + + +def printAndStoreMetric(state, entity, metClass, metric): + """ + Print and store the metrics we computed for an entity. + + :param state: the state dictionary. + :param entity: the current entity. + :param metClass: the metric class. + :param metric: the computed metric value. + """ + db, index = getDb() + rankings = [] + for sub in ['name', 'fullname']: + if sub not in metric['value']: + continue + metricName = metClass.name + '-' + sub + kept = 0 + for met in metric['value'][sub]: + if met[0] < 0.85 and kept >= 3: + continue + gb = entityColl.find_one(met[1]) + namelist = gb['name'][:] + namelist.extend(gb['fullname']) + ranking = { + 'entityId': [entity['id']], + 'documentId': gb['user_id'], + 'documentSource': 'twitter', + 'documentLink': 'http://twitter.com/intent/user?' + 'user_id=' + gb['user_id'], + 'name': metricName, + 'entityNames': namelist, + 'score': met[0], + 'date_updated': time.time(), + 'info': { + 'name': gb['name'], + 'fullname': gb['fullname'] + } + } + # Not really necessary, but it lets me be lazy about deleting old + # results + id = entity['id'] + '-' + ranking['documentId'] + try: + old = db.get(index=index, doc_type='ranking', id=id) + except elasticsearch.NotFoundError: + old = None + if old is None or old['_source'] != ranking: + db.index(index=index, doc_type='ranking', body=ranking, id=id) + state['rankings'] = state.get('rankings', 0) + 1 + kept += 1 + rankings.append(ranking) + if 'fullname' in metric['value']: + met = metric['value']['fullname'][0] + gb = entityColl.find_one(met[1]) + logstr = '%d %6.4f %r;%r %s %r;%r' % ( + state.get('rankings', 0), met[0], ','.join(entity['name']), + ','.join(entity['fullname']), met[1], ','.join(gb['name']), + ','.join(gb['fullname'])) + if met[0] > 0.85: + print logstr + sys.stderr.write(logstr + '\n') + # sys.stderr.write('%r\n' % entity) + sys.stderr.write(pprint.pformat(rankings) + 'n') + sys.stderr.flush() + del entity['metrics'] + + +if __name__ == '__main__': + reverse = False + offset = 0 + filename = None + help = False + for arg in sys.argv[1:]: + if arg.startswith('--offset='): + offset = int(arg.split('=', 1)[1]) + elif arg == '-r': + reverse = True + elif arg.startswith('-') or filename: + help = True + else: + filename = arg + if help or not filename: + print """Load xml person list and compute metrics. + +Syntax: xmlmetric.py (xml file) [-r] [--offet=(offset)] + +-r reverse the processing order. +--offset skips entities at the beginning of the processing order. +""" + sys.exit(0) + starttime = lastupdate = time.time() + metricDict = { + 'levenshtein-name': { + 'longname': 'Levenshtein User Name', + 'description': 'User name similarity based Levenshtein distance. ' + 'This is based on the email user name, Twitter ' + 'handle, or other service name.', + 'version': '0.1' + }, + 'levenshtein-fullname': { + 'longname': 'Levenshtein Full Name', + 'description': 'Full name similarity based Levenshtein distance.', + 'version': '0.1' + }, + } + state = { + 'args': { + 'metric': ['levenshtein'], + 'verbose': 2, + }, + 'config': ingest.loadConfig(None) + } + tree = xml.etree.ElementTree.iterparse(filename) + for _, el in tree: + if '}' in el.tag: + el.tag = el.tag.split('}', 1)[1] + root = tree.root + print 'parsed' + entityColl = ingest.getDb('entity', state) + entities = [] + persons = [] + db, index = getDb() + for key in metricDict: + id = key + metricDict[key]['name'] = key + try: + old = db.get(index=index, doc_type='metrics', id=id) + except elasticsearch.NotFoundError: + old = None + if old is None or old['_source'] != metricDict[key]: + db.index(index=index, doc_type='metrics', body=metricDict[key], + id=id) + count = updated = 0 + for personNode in root.findall('Person'): + person = convertXMLToObject(personNode) + persons.append(person) + + id = person['PersonGUID'] + try: + old = db.get(index=index, doc_type='entity', id=id) + except elasticsearch.NotFoundError: + old = None + if old is None or old['_source'] != person: + db.index(index=index, doc_type='entity', body=person, id=id) + updated += 1 + entity = { + 'id': person['PersonGUID'], + 'name': [], + 'fullname': [], + 'service': 'xml', + } + for ident in person.get('Identity', []): + for name in ident.get('Name', []): + if ('FullName' in name and + name['FullName'] not in entity['fullname']): + entity['fullname'].append(name['FullName']) + for email in ident.get('Email', []): + if 'Username' in email: + namelower = email['Username'].lower() + if not namelower in entity['name']: + entity['name'].append(namelower) + entities.append(entity) + count += 1 + curtime = time.time() + if curtime - lastupdate > 10: + print '%d %d %4.2f' % (count, updated, curtime - starttime) + lastupdate = curtime + root = tree = None + if reverse: + entities.reverse() + if offset: + entities = entities[offset:] + print 'start %4.2f' % (time.time() - starttime) + ingest.calculateMetrics(state, None, entities, printAndStoreMetric) From 8fd19b33056f1f4f7223731814add1e93d51b490 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Mon, 31 Aug 2015 09:14:56 -0400 Subject: [PATCH 09/11] Fix an issue with unbuffered queries. --- continuous/ingest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/continuous/ingest.py b/continuous/ingest.py index 27dbc5f..d5dc9b4 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -203,7 +203,7 @@ def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, res = bufferedQuery(queryBuffer, metClass.name, entityColl, query, fields) if ((isinstance(res, list) and len(res)) or - (not isinstance(res, list) and not res.count())): + (not isinstance(res, list) and res.count())): metClass.calcEntityPrep(entity, **kwargs) for gb in res: if castObjectId(gb) != entityId: From 8643e8b655d755624f4bba42ddd5fdcc7851c9dd Mon Sep 17 00:00:00 2001 From: David Manthey Date: Fri, 4 Sep 2015 12:05:40 -0400 Subject: [PATCH 10/11] Fixed python styling. Split messages into their own collection to speed things up. --- continuous/conf.json | 4 +++ continuous/ingest.py | 66 +++++++++++++++++++++++++++++++++++------ continuous/xmlmetric.py | 5 ++-- 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/continuous/conf.json b/continuous/conf.json index bb128ca..c4bd644 100644 --- a/continuous/conf.json +++ b/continuous/conf.json @@ -11,6 +11,10 @@ "metrics": { "dbUri": "mongodb://127.0.0.1:27017/syria", "collection": "metrics" + }, + "msg": { + "dbUri": "mongodb://127.0.0.1:27017/syria", + "collection": "msg" } }, "topk": { diff --git a/continuous/ingest.py b/continuous/ingest.py index d5dc9b4..02515d3 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -94,7 +94,6 @@ def calculateMetrics(state, entities=None, nonDbEntities=None, callback=None): metric.loadMetric(met, metricDict[met]) if entities is not None: entities = [castObjectId(entity) for entity in entities] - metColl = getDb('metrics', state) entityColl = getDb('entity', state) linkColl = getDb('link', state) latestEntity = entityColl.find(timeout=False).sort([ @@ -119,6 +118,30 @@ def calculateMetrics(state, entities=None, nonDbEntities=None, callback=None): cursor.skip(state['args']['offset']) if state['args'].get('limit'): cursor.limit(state['args']['limit']) + calculateMetricsLoop(state, cursor, nonDbEntities, callback, numEntities, + latestEntity, latestLink) + + +def calculateMetricsLoop(state, cursor, nonDbEntities, callback, numEntities, + latestEntity, latestLink): + """ + Loop through the entities and metrics and perform the actual calculations. + + :param state: the state dictionary with the config values. + :param cursor: a Mongo cursor or list of entities to calculate metrics for. + + :param nonDbEntities: a list of entity-like records that didn't come from + our database. + :param callback: a function to call after calculating a metric. + callback(state, entity, metClass, metric) is called. + :param numEntities: the number of entities that we are computing metrics + for. + :param latestEntity: the epoch of the most recently changed entity record. + :param latestLink: the epoch of the most recently changed link record. + """ + metColl = getDb('metrics', state) + entityColl = getDb('entity', state) + linkColl = getDb('link', state) count = numCalc = 0 starttime = lastreport = time.time() queryBuffer = {} @@ -144,7 +167,7 @@ def calculateMetrics(state, entities=None, nonDbEntities=None, callback=None): entityColl, linkColl, metColl, metClass, entity, oldMetric, state, starttime, queryBuffer) if nonDbEntities is not None: - if not 'metrics' in entity: + if 'metrics' not in entity: entity['metrics'] = {} entity['metrics'][metClass.name] = metricDoc numCalc += 1 @@ -424,7 +447,6 @@ def getDb(dbName, state): 'entity': [ [('user_id', pymongo.ASCENDING)], [('name', pymongo.ASCENDING)], - [('msgs.msg_id', pymongo.ASCENDING)], [('date_updated', pymongo.ASCENDING)], ], 'link': [ @@ -435,6 +457,9 @@ def getDb(dbName, state): 'metrics': [ [('entity', pymongo.ASCENDING)], ], + 'msg': [ + [('msg_id', pymongo.ASCENDING)], + ], } for index in indices.get(dbName, []): coll.create_index(index) @@ -509,6 +534,28 @@ def getEntityByName(state, entity): timeout=False) if not doc and hasUserId and entity['name'] is not None: doc = entityColl.find_one(specName, timeout=False) + return getEntityByNameAdd(state, doc, entity, specName, hasUserId) + + +def getEntityByNameAdd(state, doc, entity, specName, hasUserId): + """ + Add or update an entity for which we did not have complete information. + + :param state: includes the database connection. + :param doc: an existing document for the entity to be updated or None + if this is a new entity. + :param entity: a dictionary of _id, service, user_id, name, and fullname. + if _id is unspecified, service is required and at least one + of user_id or name. + :param specName: a mongo query to get an extant entity. + :param hasUserId: True if we know the user Id (for the service) of this + entity. + + :returns: an entity document or None. + :returns: updated: True if the document was changed in any way. 'new' if + the entity was added. + """ + entityColl = getDb('entity', state) curtime = time.time() if doc: # We have this user id, but not all of its aliases. @@ -563,15 +610,15 @@ def ingestMessage(state, msg): sys.exit(0) return None curtime = time.time() - entityColl = getDb('entity', state) + msgColl = getDb('msg', state) # Assume if we have processed this message, then we have everything we care # about in our database. This might not be true -- a message could get # reposted with new information. - if entityColl.find_one({'msgs': {'$elemMatch': { + if msgColl.find_one({ 'service': msg['service'], 'msg_id': msg['msg_id'], - 'subset': msg['subset'], - }}}, {'_id': True}, limit=1, timeout=False): + }, {'_id': True}, limit=1, timeout=False): return False + entityColl = getDb('entity', state) entity, changed = getEntityByName(state, { 'service': msg['service'], 'user_id': msg['user_id'], @@ -589,8 +636,6 @@ def ingestMessage(state, msg): entityColl.save(entity) found = True break - if found and not changed: - return False if not found: newmsg = { 'service': msg['service'], @@ -608,6 +653,9 @@ def ingestMessage(state, msg): entityColl.save(entity) # Mark descendants as dirty (for when we merge nodes) # ##DWM:: + msgColl.save(msg) + if found and not changed: + return False return True diff --git a/continuous/xmlmetric.py b/continuous/xmlmetric.py index 93bb0d6..b33162c 100644 --- a/continuous/xmlmetric.py +++ b/continuous/xmlmetric.py @@ -43,6 +43,7 @@ def convertXMLToObject(node): dbref = None + def getDb(): """ Get a connection to Elasticsearch. @@ -127,7 +128,7 @@ def printAndStoreMetric(state, entity, metClass, metric): del entity['metrics'] -if __name__ == '__main__': +if __name__ == '__main__': # noqa reverse = False offset = 0 filename = None @@ -219,7 +220,7 @@ def printAndStoreMetric(state, entity, metClass, metric): for email in ident.get('Email', []): if 'Username' in email: namelower = email['Username'].lower() - if not namelower in entity['name']: + if namelower not in entity['name']: entity['name'].append(namelower) entities.append(entity) count += 1 From c50be3088050913ac381d7e2a2de28b87d718998 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Wed, 9 Sep 2015 09:18:13 -0400 Subject: [PATCH 11/11] Added a README file. Sped up processing susbtring and levenshtein metrics. Reduce the memory footprint. Comparisons are now more technically correct, as strings are normalized (Unicode can be written several ways; this makes it consistent). --- continuous/README.md | 21 +++++++++ continuous/ingest.py | 42 +++++++++++++++--- continuous/metric.py | 55 +++++++++++++++++++++-- continuous/metriclevenshtein.py | 45 +++++++++---------- continuous/metricsubstring.py | 77 +++++++++++++++++++++------------ 5 files changed, 180 insertions(+), 60 deletions(-) create mode 100644 continuous/README.md diff --git a/continuous/README.md b/continuous/README.md new file mode 100644 index 0000000..db9c0e1 --- /dev/null +++ b/continuous/README.md @@ -0,0 +1,21 @@ +This will ingest data into a Mongo database in a form that will be able to be used by the Entity Alignment app (with some modifications still required). + +So far, this can ingest Instagram as pulled from elasticsearch (such as that supplied with the July data), and Twitter in either GNIP format or the Twitter Public JSON format (as supplied with the July data). It will not work on the original Instagram data that was pulled from parquet files and saved as CSV. It should be easy to extend it to that format (though it has less data for comments and likes, so won't be as rich). + +Links are create from mentions, comments (Instagram only), likes (Instagram only), and replies (Twitter only). Because of the comments and likes, Instagram refers to a vastly larger network of entities than Twitter. Instagram also has fewer messages per entity. The Instagram network is sparser than the Twitter network. + +Ingest and metric computation is fully parallelizable - multiple ingests can run concurrently. Each metric can be computed in a separate process, and the metric computation can be divided into multiple processes as well. + +Four metrics have been implemented so far. These are: + +1hop - This is quick to compute, as we maintain a list of neighbors within the entity records. If the entity is updated, it will be recomputed. The stored value is the number of 1-hop neighbors EXCLUSIVE of the root entity itself. + +2hop - This requires visiting all the 1-hop neighbors. Since we don't know which of those neighbors has been updated, any entity update requires that we recheck all 2-hop values. We store both the number of 2-hop-only neighbors and the number of 2-hop and 1-hop neighbors. These counts never include the root entity itself. + +substring - This does a longest substring match between ALL entities, keeping the top-k for each service and subset. When an entity is added or updated, only the modified entities need to be rechecked to maintain the top-k lists. This is still slow on a large collection. Top-k lists are maintained separately for user names, full names, and the combination of the two. For entities with multiple names, the highest score is kept between all combinations. When an entity is added or updated, the metric only needs to be computed for the new entity with respect to other entities. + +levenshtein - This computes the Levenshtein metric between ALL entities, just like substring. If available, the python-Levenshtein library is used, as it is implemented in C and is faster than the pure-python implementation. As for substring, top-k lists are maintained separately for user names, full names, and the combination of the two, and when an entity is added, the metric can be incrementally updated. + +The substring and levenshtein metrics would be helped by more CPU resources, but would be helped even more by excluding entities from the candidate match. For instance, if the substring match is 0 (or below some other threshold), we could skip computing the levenshtein metric. + +See `ingest.py --help` for usage. diff --git a/continuous/ingest.py b/continuous/ingest.py index 02515d3..9a09a83 100755 --- a/continuous/ingest.py +++ b/continuous/ingest.py @@ -33,7 +33,8 @@ # -------- General Functions -------- -def bufferedQuery(buffer, key, coll, query, fields=None, maxBuffer=2000000): +def bufferedQuery(buffer, key, coll, query, fields=None, maxBuffer=2000000, + verbose=0, normalizeNames=False): """ Buffer some queries we expect to be repeated. When using a buffered query, the results are a list instead of a cursor. @@ -44,6 +45,8 @@ def bufferedQuery(buffer, key, coll, query, fields=None, maxBuffer=2000000): :param query: the query. :param fields: the fields to return from the query. :param maxBuffer: max count to buffer. + :param verbose: log verbosity. + :param normalizeNames: if True, normalize names. :returns: either a cursor or a list. """ if (key in buffer and coll == buffer[key]['coll'] and @@ -52,10 +55,32 @@ def bufferedQuery(buffer, key, coll, query, fields=None, maxBuffer=2000000): return buffer[key]['last'] cursor = coll.find(query, fields, timeout=False) if cursor.count() <= maxBuffer: - cursor = [row for row in cursor] + if verbose >= 2: + print 'Buffered', key, query, fields + newlist = [] + allmsg = [] + for row in cursor: + if len(row.get('msgs', [])): + msgs = [] + for msg in row['msgs']: + if msg not in msgs: + if msg not in allmsg: + allmsg.append(msg) + msg = allmsg[allmsg.index(msg)] + msgs.append(msg) + row['msgs'] = msgs + if normalizeNames: + metric.normalizeNames(row) + del row['name'] + del row['fullname'] + row = row.copy() # can reduce memory footprint + newlist.append(row) + if verbose >= 2 and not len(newlist) % 10000: + print '%d' % len(newlist), len(allmsg) buffer[key] = { - 'coll': coll, 'query': query, 'fields': fields, 'last': cursor + 'coll': coll, 'query': query, 'fields': fields, 'last': newlist } + cursor = newlist return cursor @@ -223,13 +248,20 @@ def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, query = ({} if not metClass.onEntitiesOnlyNew or refresh else {'date_updated': {'$gte': metricDoc['date_updated']}}) fields = getattr(metClass, 'entityFields', None) - res = bufferedQuery(queryBuffer, metClass.name, entityColl, query, - fields) + bufkey = 'metric' + ( + 'norm' if getattr(metClass, 'normalizeNames') else '') + res = bufferedQuery( + queryBuffer, bufkey, entityColl, query, fields, + verbose=state['args']['verbose'], + normalizeNames=getattr(metClass, 'normalizeNames')) if ((isinstance(res, list) and len(res)) or (not isinstance(res, list) and res.count())): metClass.calcEntityPrep(entity, **kwargs) for gb in res: if castObjectId(gb) != entityId: + if (not isinstance(res, list) and + getattr(metClass, 'normalizeNames')): + metric.normalizeNames(gb) metClass.calcEntity(entity, gb, **kwargs) if metClass.onLinks: query = ({} if not metClass.onLinksOnlyNew or refresh else diff --git a/continuous/metric.py b/continuous/metric.py index 9de653b..d08283b 100644 --- a/continuous/metric.py +++ b/continuous/metric.py @@ -6,6 +6,7 @@ import collections import importlib import pprint +import unicodedata LoadedMetrics = collections.OrderedDict() @@ -136,6 +137,31 @@ def loadMetric(metricClass, initVal=None): return True +def normalizeAndLower(text): + """ + Convert some text so that it is normalized and lowercased unicode. + + :param text: the text to alter. + :returns: the normalized and lower-cased text. + """ + if isinstance(text, str): + text = text.decode('utf8') + text = unicodedata.normalize('NFC', text) + return text.lower() + + +def normalizeNames(entity): + """ + Normalize the name and fullname lists in an entity. + + :param entity: the entity to modify. + """ + entity['normname'] = list({normalizeAndLower(name) + for name in entity['name']}) + entity['normfullname'] = list({normalizeAndLower(name) + for name in entity['fullname']}) + + def topKCategories(entity): """ Return a set of categories used for tracking topk. @@ -144,7 +170,7 @@ def topKCategories(entity): :returns: a set of categories. """ cat = set() - if entity.get('service', None): + if entity.get('service'): cat.add(entity['service']) for msg in entity.get('msgs', []): cat.update([msg['service'] + '-' + subset for subset in msg['subset']]) @@ -217,13 +243,12 @@ def trackTopK(topkDict, value, id, cats, state): break # Skip this one if we can tell it shouldn't be added. if (len(topk) >= k and value < topk[-1][0] and - min([topkDict['cats'].get(cat, 0) for cat in cats]) >= k): + not any(topkDict['cats'].get(cat, 0) < k for cat in cats)): return False # Add the entry to the list entry = (value, id, cats) topk.append(entry) - topk.sort() - topk.reverse() + topk.sort(reverse=True) topkDict['ids'][id] = True for cat in cats: topkDict['cats'][cat] = topkDict['cats'].get(cat, 0) + 1 @@ -271,3 +296,25 @@ def trackTopKRemove(topkDict, entry): if not remove: break return kept + + +def trackTopKWorst(topkDict, cats, low): + """ + Determine the worst value that we need to care about for tracking the + sepecified categories. + + :param topkDict: a dictionary with the top-k. + :param cats: a set of categories for a potential item. + :param low: a fall-back low value. + :return: The worst value that could be added to the top-k for these + categories. + """ + if not cats or not len(cats) or 'topk' not in topkDict: + return low + topk = topkDict['topk'] + k = topkDict['k'] + if len(topk) < k or isinstance(topk[0][-1], list): + return low + if any(topkDict['cats'].get(cat, 0) < k for cat in cats): + return low + return topk[-1][0] diff --git a/continuous/metriclevenshtein.py b/continuous/metriclevenshtein.py index 3983125..6cf7e33 100644 --- a/continuous/metriclevenshtein.py +++ b/continuous/metriclevenshtein.py @@ -55,10 +55,6 @@ def levenshteinSimilarity(s1, s2): # The C-module version of Levenshtein is vastly faster if Levenshtein is None: return (totalLen - levenshtein(s1, s2)) / totalLen - if isinstance(s1, str): - s1 = s1.decode('utf8') - if isinstance(s2, str): - s2 = s2.decode('utf8') return (totalLen - Levenshtein.distance(s1, s2)) / totalLen @@ -75,6 +71,7 @@ def __init__(self, **kwargs): 'msgs.service': True, 'msgs.subset': True } + self.normalizeNames = True self.saveWork = True def calc(self, ga, work, **kwargs): @@ -104,34 +101,33 @@ def calcEntity(self, ga, gb, work={}, state={}, **kwargs): """ # We actually calculate the BEST levenshtein similarity between any # name of ga with any name of gb and use that. + cat = metric.topKCategories(gb) simName = simFull = simBoth = 0 - for gaName in ga['name']: - for gbName in gb['name']: - # Note: both gaName and gbName are lowercase. We may wish to - # also find the substring match between fullnames. + gaNames = ga['normname'] + gbNames = gb['normname'] + gaFullnames = ga['normfullname'] + gbFullnames = gb['normfullname'] + for gaName in gaNames: + for gbName in gbNames: + # Note: both gaName and gbName are lowercase. simName = max(simName, levenshteinSimilarity(gaName, gbName)) - for gbName in gb['fullname']: - simBoth = max(simBoth, levenshteinSimilarity(gaName, - gbName.lower())) - for gaName in ga['fullname']: - for gbName in gb['fullname']: - # Note: both gaName and gbName are lowercase. We may wish to - # also find the levenshtein match between fullnames. - simFull = max(simFull, levenshteinSimilarity( - gaName.lower(), gbName.lower())) - for gbName in gb['name']: - simBoth = max(simBoth, levenshteinSimilarity(gaName.lower(), - gbName)) + for gbName in gbFullnames: + simBoth = max(simBoth, levenshteinSimilarity(gaName, gbName)) + for gaName in gaFullnames: + for gbName in gbFullnames: + simFull = max(simFull, levenshteinSimilarity(gaName, gbName)) + for gbName in gbNames: + simBoth = max(simBoth, levenshteinSimilarity(gaName, gbName)) simBoth = max(simBoth, simName, simFull) if simName: metric.trackTopK(work['name'], simName, gb['_id'], - metric.topKCategories(gb), state) + cat, state) if simFull: metric.trackTopK(work['fullname'], simFull, gb['_id'], - metric.topKCategories(gb), state) + cat, state) if simBoth: metric.trackTopK(work['name_fullname'], simBoth, gb['_id'], - metric.topKCategories(gb), state) + cat, state) def calcEntityPrep(self, ga, work={}, **kwargs): """ @@ -141,8 +137,9 @@ def calcEntityPrep(self, ga, work={}, **kwargs): :param work: an object for working on the metric. Results should be stored here. """ + metric.normalizeNames(ga) for key in ('name', 'fullname', 'name_fullname'): - if not key in work: + if key not in work: work[key] = {} diff --git a/continuous/metricsubstring.py b/continuous/metricsubstring.py index 5653730..cca24a7 100644 --- a/continuous/metricsubstring.py +++ b/continuous/metricsubstring.py @@ -4,18 +4,28 @@ import metric -def longestCommonSubstring(s1, s2): +def longestCommonSubstring(s1, s2, low=0): """ Return the longest common substring between two strings. :param s1: the first string :param s2: the second string + :param low: one less than the length of the shortest string we need to + consider. :return: the longest common substring. """ if len(s2) > len(s1): s1, s2 = s2, s1 + while len(s2) > low: + if s2[0] in s1: + break + s2 = s2[1:] + while len(s2) > low: + if s2[-1] in s1: + break + s2 = s2[:-1] lens2p1 = len(s2) + 1 - for l in xrange(len(s2), 0, -1): + for l in xrange(len(s2), low, -1): for s in xrange(lens2p1 - l): substr = s2[s: s + l] if substr in s1: @@ -23,16 +33,19 @@ def longestCommonSubstring(s1, s2): return '' -def substringSimilarity(s1, s2): +def substringSimilarity(s1, s2, low=0): """ Determine the longest common substring between two strings and normalize the results to a scale of 0 to 1. :param s1: the first string :param s2: the second string + :param low: the lowest value we need to consider. :return: the normalized result. 1 is a perfect match. """ - return 2.0 * len(longestCommonSubstring(s1, s2)) / (len(s1) + len(s2)) + lens1s2 = len(s1) + len(s2) + return (2.0 * len(longestCommonSubstring(s1, s2, int(low * lens1s2 / 2))) / + (lens1s2)) class MetricSubstring(metric.Metric): @@ -48,6 +61,7 @@ def __init__(self, **kwargs): 'msgs.service': True, 'msgs.subset': True } + self.normalizeNames = True self.saveWork = True def calc(self, ga, work, **kwargs): @@ -77,34 +91,42 @@ def calcEntity(self, ga, gb, work={}, state={}, **kwargs): """ # We actually calculate the BEST substring similarity between any name # of ga with any name of gb and use that. + cat = metric.topKCategories(gb) + lowName = metric.trackTopKWorst(work['name'], cat, 0) + lowFull = metric.trackTopKWorst(work['fullname'], cat, 0) + lowBoth = metric.trackTopKWorst(work['name_fullname'], cat, 0) simName = simFull = simBoth = 0 - for gaName in ga['name']: - for gbName in gb['name']: - # Note: both gaName and gbName are lowercase. We may wish to - # also find the substring match between fullnames. - simName = max(simName, substringSimilarity(gaName, gbName)) - for gbName in gb['fullname']: - simBoth = max(simBoth, substringSimilarity(gaName, - gbName.lower())) - for gaName in ga['fullname']: - for gbName in gb['fullname']: - # Note: both gaName and gbName are lowercase. We may wish to - # also find the substring match between fullnames. + gaNames = ga['normname'] + gbNames = gb['normname'] + gaFullnames = ga['normfullname'] + gbFullnames = gb['normfullname'] + for gaName in gaNames: + for gbName in gbNames: + # Note: both gaName and gbName are lowercase. + simName = max(simName, substringSimilarity( + gaName, gbName, lowName)) + if simName < 1: + for gbName in gbFullnames: + simBoth = max(simBoth, substringSimilarity( + gaName, gbName, lowBoth)) + for gaName in gaFullnames: + for gbName in gbFullnames: simFull = max(simFull, substringSimilarity( - gaName.lower(), gbName.lower())) - for gbName in gb['name']: - simBoth = max(simBoth, substringSimilarity(gaName.lower(), - gbName)) + gaName, gbName, lowFull)) + if simFull < 1 and simName < 1: + for gbName in gbNames: + simBoth = max(simBoth, substringSimilarity( + gaName, gbName, lowBoth)) simBoth = max(simBoth, simName, simFull) - if simName: + if simName and simName >= lowName: metric.trackTopK(work['name'], simName, gb['_id'], - metric.topKCategories(gb), state) - if simFull: + cat, state) + if simFull and simFull >= lowFull: metric.trackTopK(work['fullname'], simFull, gb['_id'], - metric.topKCategories(gb), state) - if simBoth: + cat, state) + if simBoth and simBoth >= lowBoth: metric.trackTopK(work['name_fullname'], simBoth, gb['_id'], - metric.topKCategories(gb), state) + cat, state) def calcEntityPrep(self, ga, work={}, **kwargs): """ @@ -114,8 +136,9 @@ def calcEntityPrep(self, ga, work={}, **kwargs): :param work: an object for working on the metric. Results should be stored here. """ + metric.normalizeNames(ga) for key in ('name', 'fullname', 'name_fullname'): - if not key in work: + if key not in work: work[key] = {}