diff --git a/src/semanticlayertools/clustering/leiden.py b/src/semanticlayertools/clustering/leiden.py new file mode 100644 index 0000000..7b70ec8 --- /dev/null +++ b/src/semanticlayertools/clustering/leiden.py @@ -0,0 +1,174 @@ +import os +import time +import re +from tqdm import tqdm + +import igraph as ig +import leidenalg as la + + +class TimeCluster(): + """Cluster time-sliced data with the Leiden algorithm. + + Calculates temporal clusters of e.g. time-sliced cocitation or citation + data, using the Leiden algorithm . Two nodes are assumed to be identical in + different year slices, if the node name is the same. + This could be e.g. the bibcode or DOI. + + Input files are assumed to include the year in the filename, have an ending + `_GC.net` to denote their giant component character and should be in Pajek + format. + + The resolution parameter can be seen as a limiting density, above + which neighbouring nodes are considered a cluster. The interslice coupling + describes the influcence of yearly order on the clustering process. See doc + for the Leiden algorithm for more detailed info. + + :param inpath: Path for input network data + :type inpath: str + :param outpath: Path for writing output data + :type outpath: str + :param resolution: Main parameter for the clustering quality function (Constant Pots Model) + :type resolution: float + :param intersliceCoupling: Coupling parameter between two year slices, also influences cluster detection + :type intersliceCoupling: float + :param timerange: The time range for considering input data (default=1945,2005)) + :type timerange: tuple + :raises OSError: If the output file already exists at class instantiation + + .. seealso:: + Traag, V.A., Waltman. L., Van Eck, N.-J. (2018). + From Louvain to Leiden: guaranteeing well-connected communities. + Scientific reports, 9(1), 5233. 10.1038/s41598-019-41695-z + """ + + def __init__( + self, inpath: str, outpath: str, + resolution: float = 0.003, + intersliceCoupling: float = 0.4, + timerange: tuple = (1945, 2005), + useGC: bool = True, + ): + starttime = time.time() + self.inpath = inpath + self.outpath = outpath + self.res_param = resolution + self.interslice_param = intersliceCoupling + self.timerange = timerange + + self.outfile = os.path.join( + outpath, + f'timeclusters_{timerange[0]}-{timerange[1]}_res_{resolution}_intersl_{intersliceCoupling}.csv' + ) + if os.path.isfile(self.outfile): + raise OSError(f'Output file at {self.outfile} exists. Aborting.') + + if useGC is True: + edgefiles = [x for x in os.listdir(inpath) if x.endswith('_GC.net')] + elif useGC is False: + edgefiles = [x for x in os.listdir(inpath) if x.endswith('.ncol')] + + self.graphDict = {} + + for idx in tqdm(range(len(edgefiles)), leave=False): + try: + year = re.findall(r'\d{4}', edgefiles[idx])[0] + except Exception: + raise + if timerange[0] <= int(year) <= timerange[1]: + if useGC is True: + graph = ig.Graph.Read_Pajek(os.path.join(inpath, edgefiles[idx])) + elif useGC is False: + graph = ig.Graph.Read_Ncol(os.path.join(inpath, edgefiles[idx])) + self.graphDict[year] = graph + + self.optimiser = la.Optimiser() + + print( + "Graphs between " + f"{min(list(self.graphDict.keys()))} and " + f"{max(list(self.graphDict.keys()))} " + f"loaded in {time.time() - starttime} seconds." + ) + + def optimize(self, clusterSizeCompare: int = 1000): + """Optimize clusters accross time slices. + + This runs the actual clustering and can be very time and memory + consuming for large networks. Depending on the obtained cluster results, + this method has to be run iteratively with varying resolution parameter. + Output is written to file, with filename containing chosen parameters. + + The output CSV contains information on which node in which year belongs + to which cluster. As a first measure of returned clustering, the method + prints the number of clusters found above a threshold defined by + `clusterSizeCompare`. This does not influence the output clustering. + + :param clusterSizeCompare: Threshold for `interesting` clusters + :type clusterSizeCompare: int + :returns: Tuple of output file path and list of found clusters in tuple format (node, year, cluster) + :rtype: tuple + + .. seealso:: + Documentation of time-layer creation routine: + `Leiden documentation `_ + """ + starttime = time.time() + + layers, interslice_layer, _ = la.time_slices_to_layers( + list(self.graphDict.values()), + interslice_weight=self.interslice_param, + vertex_id_attr='name' + ) + print('\tSet layers.') + + partitions = [ + la.CPMVertexPartition( + H, + node_sizes='node_size', + weights='weight', + resolution_parameter=self.res_param + ) for H in layers + ] + print('\tSet partitions.') + + interslice_partition = la.CPMVertexPartition( + interslice_layer, + resolution_parameter=0, + node_sizes='node_size', + weights='weight' + ) + print('\tSet interslice partions.') + + self.optimiser.optimise_partition_multiplex( + partitions + [interslice_partition] + ) + + subgraphs = interslice_partition.subgraphs() + + commun = [] + for idx, part in enumerate(subgraphs): + nodevals = [ + ( + x['name'], + list(self.graphDict.keys()).pop(x['slice']), + idx + ) for x in part.vs + ] + commun.extend(nodevals) + + with open(self.outfile, 'w') as outfile: + outfile.write('node,year,cluster\n') + for elem in commun: + outfile.write( + f"{elem[0]},{elem[1]},{elem[2]}\n" + ) + largeclu = [ + (x, len(x.vs)) for x in subgraphs if len(x.vs) > clusterSizeCompare + ] + print( + f'Finished in {time.time() - starttime} seconds.' + f"Found {len(subgraphs)} clusters, with {len(largeclu)} larger then {clusterSizeCompare} nodes." + ) + + return self.outfile, commun diff --git a/src/semanticlayertools/clustering/reports.py b/src/semanticlayertools/clustering/reports.py new file mode 100644 index 0000000..30be990 --- /dev/null +++ b/src/semanticlayertools/clustering/reports.py @@ -0,0 +1,288 @@ +import re +import os +import time +import multiprocessing +from collections import Counter +from tqdm import tqdm + +import spacy +import textacy +import textacy.tm +import pandas as pd +import warnings + +num_processes = multiprocessing.cpu_count() + + +class ClusterReports(): + """Generate reporting on time-clusters. + + Generate reports to describe the content for all found clusters above a + minimal size by collecting metadata for all publications in each cluster, + finding the top 20 authors and affiliations of authors involved in the + cluster publications, and running basic NMF topic modelling with N=20 and + N=50 topics (english language models are used!). + For each cluster a report file is written to the output path. + + Input CSV filename is used to create the output folder in output path. For + each cluster above the limit, a subfolder is created to contain all metadata + for the cluster. The metadata files are assumed to be in JSONL format and + contain the year in the filename. + + :param infile: Path to input CSV file containing information on nodeid, clusterid, and year + :type infile: str + :param metadatapath: Path to JSONL (JSON line) formated metadata files. + :type metadatapath: str + :param outpath: Path to create output folder in, foldername reflects input filename + :type outpath: str + + :param textcolumn: The dataframe column of metadata containing textutal for topic modelling (default=title) + :type textcolumn: str + :param numberProc: Number of CPU the routine will use (default = all!) + :type numberProc: int + :param minClusterSize: The minimal cluster size, above which clusters are considered (default=1000) + :type minClusterSize: int + :param timerange: Time range to evalute clusters for (usefull for limiting computation time, default = (1945, 2005)) + :type timerange: tuple + """ + + def __init__( + self, infile: str, metadatapath: str, outpath: str, + textcolumn: str = 'title', + authorColumnName: str = 'author', + affiliationColumnName: str = 'aff', + publicationIDcolumn: str = 'nodeID', + numberProc: int = num_processes, + minClusterSize: int = 1000, + timerange: tuple = (1945, 2005) + ): + """Constructor method""" + self.numberProc = numberProc + self.minClusterSize = minClusterSize + self.metadatapath = metadatapath + self.textcolumn = textcolumn + self.authorColumnName = authorColumnName + self.affiliationColumnName = affiliationColumnName + self.publicationIDcolumn = publicationIDcolumn + clusterdf = pd.read_csv(infile) + basedata = clusterdf.groupby(['year', 'cluster']).size().to_frame('counts').reset_index() + self.largeClusterList = list( + basedata.groupby('cluster').sum().query(f'counts > {self.minClusterSize}').index + ) + self.clusternodes = clusterdf.query( + 'cluster in @self.largeClusterList' + ) + outfolder = infile.split(os.path.sep)[-1][:-4] + self.timerange = timerange + self.outpath = os.path.join(outpath, outfolder) + if os.path.isdir(self.outpath): + raise OSError(f'Output folder {self.outpath} exists. Aborting.') + else: + os.mkdir(self.outpath) + for clu in self.largeClusterList: + os.mkdir(os.path.join(self.outpath, f'Cluster_{clu}')) + + def create_corpus(self, dataframe): + """Create corpus out of dataframe. + + Using the text contained in the cluster metadata to generate a corpus. + After some basic preprocessing each text is used to generate a Spacy doc, + of which only the lemmatized words without stop words are considered. + + :params dataframe: Input dataframe + :type dataframe: `pd.Dataframe` + :returns: A textacy corpus file with english as the base language + :rtype: `textacy.Corpus` + """ + mainLanguageCorp = 'en_core_web_lg' + nlp = spacy.load(mainLanguageCorp) + + docs = [] + titles = dataframe[self.textcolumn].values + for title in tqdm(titles, leave=False): + try: + # text pre-processing + title = re.sub("\n", " ", title) + title = re.sub("[\r|\t|\x0c|\d+]", "", title) # noqa: W605 + title = re.sub("[.,]", "", title) + title = re.sub("\\\'s", "'s", title) + title = title.lower() + + doc = nlp(title) + + tokens_without_sw = ' '.join([t.lemma_ for t in doc if not t.is_stop]) + + docs.append(tokens_without_sw) + except Exception: + print(title) + raise + + corpus_titles = textacy.Corpus(mainLanguageCorp, data=docs) + return corpus_titles + + def find_topics( + self, corpus_titles: list, n_topics: int, top_words: int, + ): + """Calculate topics in corpus. + + Use NMF algorithm to calculate topics in corpus file for `n_topics` + topics, returning `top_words` most common words for each topic. + Each word has to occure at least twice in the corpus and at most in 95% + of all documents. + + :param corpus_titles: The corpus containing the preprocessed texts. + :type corpus_titles: `textacy.Corpus` + :param n_topics: Number of considered topics + :type n_topics: int + :param top_words: Number of returned words for each found topic + :type top_words: int + :returns: List of found topics with top occuring words + :rtype: str + """ + vectorizer = textacy.representations.vectorizers.Vectorizer( + tf_type="linear", + idf_type="smooth", + norm="l2", + min_df=2, + max_df=0.95 + ) + tokenized_docs = ( + ( + term.lemma_ for term in textacy.extract.terms(doc, ngs=1, ents=True) + ) for doc in corpus_titles + ) + doc_term_matrix = vectorizer.fit_transform(tokenized_docs) + + model = textacy.tm.TopicModel("nmf", n_topics) + model.fit(doc_term_matrix) + + topics = [] + for topic_idx, top_terms in model.top_topic_terms( + vectorizer.id_to_term, top_n=top_words + ): + topics.append( + "topic " + str(topic_idx) + ": " + " ".join(top_terms) + ) + outtext = f'\n\n\tTopics in cluster for {n_topics} topics:\n' + for topic in topics: + outtext += f'\t\t{topic}\n' + return outtext + + def fullReport(self, cluster): + """Generate full cluster report for one cluster. + + :param cluster: The cluster number to process + :type cluster: int or str + :raises ValueError: If input cluster data can not be read. + :returns: Report text with all gathered informations + :rtype: str + """ + starttime = time.time() + clusterpath = os.path.join(self.outpath, f'Cluster_{cluster}') + clusterfiles = os.listdir(clusterpath) + clusterdf = [] + for x in clusterfiles: + try: + clusterdf.append( + pd.read_json(os.path.join(clusterpath, x), lines=True) + ) + except ValueError: + raise + dfCluster = pd.concat(clusterdf, ignore_index=True) + basedf = self.clusternodes.query('cluster == @cluster') + inputnodes = set(basedf.node.values) + notFound = inputnodes.difference(set(dfCluster[self.publicationIDcolumn].values)) + topAuthors = Counter( + [x for y in [x.split(';') for x in dfCluster[self.authorColumnName].fillna('').values] for x in y] + ).most_common(21) + authortext = '' + for x in topAuthors: + if x[0] != '': + authortext += f'\t{x[0]}: {x[1]}\n' + topAffils = Counter( + [x for y in [x.split(';') for x in dfCluster[self.affiliationColumnName].fillna('').values] for x in y] + ).most_common(21) + affiltext = '' + for x in topAffils: + if x[0] != '': + affiltext += f'\t{x[0]}: {x[1]}\n' + print(f'\tFinished base report for cluster {cluster}.') + corpus = self.create_corpus(dfCluster) + warnings.simplefilter(action='ignore', category=FutureWarning) + topics_15 = self.find_topics(corpus, n_topics=15, top_words=20) + topics_50 = self.find_topics(corpus, n_topics=50, top_words=20) + outtext = f"""Report for Cluster {cluster} + +Got {len(inputnodes)} unique publications in time range: {basedf.year.min()} to {basedf.year.max()}. + Found metadata for {dfCluster.shape[0]} publications. + There are {len(notFound)} publications without metadata. + + The top 20 authors of this cluster are: + {authortext} + + The top 20 affiliations of this cluster are: + {affiltext} + + {topics_15} + + {topics_50} + +Finished analysis of cluster {cluster} in {time.time()- starttime} seconds.""" + print('\t\tFinished topics.') + return outtext + + def _mergeData(self, filename): + """Merge metadata for cluster nodes. + + Writes all metadata for nodes in cluster to folders. + + :param filename: Metadata input filename + :type filename: str + """ + filepath = os.path.join(self.metadatapath, filename) + data = pd.read_json(filepath, lines=True) + selectMerge = data.merge( + self.clusternodes, + left_on=self.publicationIDcolumn, + right_on='node', + how='inner' + ) + if selectMerge.shape[0] > 0: + for clu, g0 in selectMerge.groupby('cluster'): + g0.to_json( + os.path.join( + self.outpath, + f'Cluster_{clu}', + 'merged_' + filename + ), orient='records', lines=True + ) + return '' + + def gatherClusterMetadata(self): + """Initial gathering of metadata for clusters. + + For all files in the metadata path, call `_mergeData` if the found + year in the filename falls in the bounds. + + This step needs to be run once, the all cluster metadata is generated + and can be reused. + """ + filenames = os.listdir(self.metadatapath) + yearFiles = [] + for x in filenames: + try: + year = int(re.findall(r'\d{4}', x)[0]) + except Exception: + raise + if self.timerange[0] <= year <= self.timerange[1]: + yearFiles.append(x) + with multiprocessing.Pool(self.numberProc) as pool: + _ = pool.map(self._mergeData, tqdm(yearFiles, leave=False)) + return + + def writeReports(self): + """Generate reports and write to output path.""" + for cluster in tqdm(self.largeClusterList, leave=False): + outtext = self.fullReport(cluster) + with open(f'{self.outpath}/Cluster_{cluster}.txt', 'w') as file: + file.write(outtext) diff --git a/src/semanticlayertools/linkage/cocitation.py b/src/semanticlayertools/linkage/cocitation.py new file mode 100644 index 0000000..4e69587 --- /dev/null +++ b/src/semanticlayertools/linkage/cocitation.py @@ -0,0 +1,171 @@ +import os +import time +import re +import multiprocessing +from itertools import combinations +from collections import Counter +from typing import TypeVar + +import igraph as ig +import pandas as pd +import numpy as np +from tqdm import tqdm + +num_processes = multiprocessing.cpu_count() + +limitRefLength = TypeVar('limitRefLength', bool, int) +debugVar = TypeVar('debugVar', bool, str) + + +class Cocitations(): + """Create cocitation networks. + + Calculates all combinations of all references of publications in given + corpus file(s). Can be limited for maximal number of references to consider + (e.g. papers with less then 200 references), to speed up creation of + networks. + + For each corpus file, graphs are generated by the weighted cocitation tuples, + using the Igraph package. Information on obtained clusters are written to + '_graphMetadata.txt' files. The subgraph of the Giant component is saved in + Pajek format with the ending '_GC.net'. The full edge data is written in + edge-Format to a '.ncol' file. + + :param inpath: Path for input data + :type inpath: str + :param outpath: Path for writing output data + :type outpath: str + :param columnName: Column name containing the references of a publication + :type columnName: str + :param numberProc: Number of CPUs the package is allowed to use (default=all) + :type numberProc: int + :param limitRefLength: Either False or integer giving the maximum number of references a considered publication is allowed to contain + :type limitRefLength: bool or int + :param timerange: Time range to consider (default=(1945,2005)) + :type timerange: tuple + :param debug: False/True or l2 to show level 2 debugging messages + """ + + def __init__( + self, inpath, outpath, columnName, + numberProc: int = num_processes, + limitRefLength: limitRefLength = False, + timerange: tuple = (1945, 2005), + debug: debugVar = False + ): + self.inpath = inpath + self.outpath = outpath + self.columnName = columnName + self.numberProc = numberProc + self.limitRefLength = limitRefLength + self.timerange = timerange + self.debug = debug + + def getCombinations(self, chunk): + """Calculate combinations of references in publications chunk. + + :param chunk: A chunk of the corpus dataframe + :type chunk: `pd.Dataframe` + :returns: A list of all reference combinations for each corpus entry + :rtype: list + """ + res = [] + if type(self.limitRefLength) == int: + reflen = chunk[self.columnName].apply( + lambda x: True if type(x) == list and len(x) <= self.limitRefLength else False + ) + data = chunk[reflen].copy() + else: + data = chunk.copy() + for idx, row in data.iterrows(): + comb = combinations(row[self.columnName], 2) + for elem in list(comb): + res.append((elem)) + return res + + def calculateCoCitation(self, filepath): + """Run calculation for single input file. + + Creates three files: Metadata-File with all components information, + Giant component network data in pajek format and full graph data in + edgelist format. + + :param filepath: Path for input corous + :type filepath: str + :returns: A tuple of GC information: Number of nodes and percentage of total, Number of edges and percentage of total + :rtype: tuple + """ + infilename = filepath.split(os.path.sep)[-1].split('.')[0] + starttime = time.time() + try: + data = pd.read_json(filepath, lines=True).dropna(subset=[self.columnName]) + chunk_size = int(data.shape[0] / self.numberProc) + if chunk_size == 0: # Deal with small data samples. + chunk_size = 1 + chunks = np.array_split(data, chunk_size) + pool = multiprocessing.Pool(processes=self.numberProc) + cocitations = pool.map(self.getCombinations, chunks) + cocitCounts = Counter([x for y in cocitations for x in y]) + sortCoCitCounts = [ + (x[0][0], x[0][1], x[1]) for x in cocitCounts.most_common() + ] + tempG = ig.Graph.TupleList(sortCoCitCounts, weights=True, vertex_name_attr='id') + components = tempG.components() + sortedComponents = sorted( + [(x, len(x), len(x) * 100 / len(tempG.vs)) for x in components], key=lambda x: x[1], reverse=True + ) + with open(os.path.join(self.outpath, infilename + '_graphMetadata.txt'), 'w') as outfile: + outfile.write(f'Graph derived from {filepath}\nSummary:\n') + outfile.write(tempG.summary() + '\n\nComponents (ordered by size):\n\n') + for idx, elem in enumerate(sortedComponents): + gcompTemp = tempG.vs.select(elem[0]).subgraph() + outfile.write( + f"{idx}:\n\t{elem[1]} nodes ({elem[2]:.3f}% of full graph)\n\t{len(gcompTemp.es)} edges ({len(gcompTemp.es)*100/len(tempG.es):.3f}% of full graph)\n\n" + ) + if idx == 0: + gcouttuple = ( + elem[1], + elem[2], + len(gcompTemp.es), + len(gcompTemp.es) * 100 / len(tempG.es) + ) + giantComponent = sortedComponents[0] + giantComponentGraph = tempG.vs.select(giantComponent[0]).subgraph() + giantComponentGraph.write_pajek( + os.path.join(self.outpath, infilename + '_GC.net') + ) + with open(os.path.join(self.outpath, infilename + '.ncol'), 'w') as outfile: + for edge in sortCoCitCounts: + outfile.write(f"{edge[0]} {edge[1]} {edge[2]}\n") + except Exception: + raise + if self.debug == "l2": + print(f'\tDone in {time.time() - starttime} seconds.') + return gcouttuple + + def processFolder(self): + """Calculate cocitation for all files in folder.""" + starttime = time.time() + with open( + os.path.join( + self.outpath, 'Giant_Component_properties.csv' + ), 'w' + ) as gcmetafile: + gcmetafile.write('year,nodes,nodespercent,edges,edgepercent\n') + for file in tqdm(os.listdir(self.inpath), leave=False): + try: + year = re.findall(r'\d{4}', file)[0] + except Exception: + raise + if self.timerange[0] <= int(year) <= self.timerange[1]: + try: + outtuple = self.calculateCoCitation( + os.path.join(self.inpath, file) + ) + gcmetafile.write( + f'{year},{outtuple[0]},{outtuple[1]},{outtuple[2]},{outtuple[3]}\n' + ) + except Exception: + raise + if self.debug is True: + print(f'\tDone in {time.time() - starttime} seconds.') diff --git a/src/semanticlayertools/pipelines/cocitetimeclusters.py b/src/semanticlayertools/pipelines/cocitetimeclusters.py new file mode 100644 index 0000000..ea64e64 --- /dev/null +++ b/src/semanticlayertools/pipelines/cocitetimeclusters.py @@ -0,0 +1,93 @@ + +import time +import os +import multiprocessing + +from ..linkage.cocitation import Cocitations +from ..clustering.leiden import TimeCluster +from ..clustering.reports import ClusterReports + +num_processes = multiprocessing.cpu_count() + + +def run( + inputFilepath: str, + cociteOutpath: str, + timeclusterOutpath: str, + reportsOutpath: str, + resolution: float, + intersliceCoupling: float, + minClusterSize: int = 1000, + timerange: tuple = (1945, 2005), + referenceColumnName: str = 'reference', + numberproc: int = num_processes, + limitRefLength=False, debug=False +): + """Runs all steps of the temporal clustering pipepline. + + Creates cocitation networks, finds temporal clusters, writes report files + for large clusters. + + Default time range is 1945 to 2005. Minimal size for considered clusters is + 1000 nodes. Lists of references are assumed to be contained in column + "reference". + + By default this routine takes all available cpu cores. Limit this + to a lower value to allow parallel performance of other tasks. + + :param inputFilepath: Path to corpora input data + :type text: str + :param cociteOutpath: Output path for cocitation networks + :type text: str + :param timeclusterOutpath: Output path for time clusters + :type text: str + :param reportsOutpath: Output path for reports + :type text: str + :param resolution: Main parameter for the clustering quality function (Constant Pots Model) + :type resolution: float + :param intersliceCoupling: Coupling parameter between two year slices, also influences cluster detection + :type intersliceCoupling: float + :param minClusterSize: The minimal cluster size, above which clusters are considered (default=1000) + :type minClusterSize: int + :param timerange: Time range to evalute clusters for (usefull for limiting computation time, default = (1945, 2005)) + :type timerange: tuple + :param referenceColumnName: Column name containing the references of a publication + :type referenceColumnName: str + :param numberProc: Number of CPUs the package is allowed to use (default=all) + :type numberProc: int + :param limitRefLength: Either False or integer giving the maximum number of references a considered publication is allowed to contain + :type limitRefLength: bool or int + """ + for path in [cociteOutpath, timeclusterOutpath, reportsOutpath]: + os.makedirs(path) + starttime = time.time() + cocites = Cocitations( + inpath=inputFilepath, + outpath=cociteOutpath, + columnName=referenceColumnName, + numberProc=numberproc, + limitRefLength=limitRefLength, + timerange=timerange, + debug=debug + ) + cocites.processFolder() + timeclusters = TimeCluster( + inpath=cociteOutpath, + outpath=timeclusterOutpath, + resolution=resolution, + intersliceCoupling=intersliceCoupling, + timerange=timerange, + debug=debug + ) + timeclfile, _ = timeclusters.optimize() + clusterreports = ClusterReports( + infile=timeclfile, + metadatapath=inputFilepath, + outpath=reportsOutpath, + numberProc=numberproc, + minClusterSize=minClusterSize, + timerange=(timerange[0], timerange[1] + 3) + ) + clusterreports.gatherClusterMetadata() + clusterreports.writeReports() + print(f'Done after {time.time() - starttime} seconds.')