defsplit_to_sentences(data): """ Split data by linebreak "\n" Args: data: str Returns: A list of sentences """ sentences = data.split('\n') # Additional clearning (This part is already implemented) # - Remove leading and trailing spaces from each sentence # - Drop sentences if they are empty strings. sentences = [s.strip() for s in sentences] sentences = [s for s in sentences if len(s) > 0] return sentences
deftokenize_sentences(sentences): """ Tokenize sentences into tokens (words) Args: sentences: List of strings Returns: List of lists of tokens """ tokenized_sentences = [] for sentence in sentences: sentence = sentence.lower() # 切割单词 tokenized = nltk.word_tokenize(sentence) tokenized_sentences.append(tokenized) return tokenized_sentences
defcount_words(tokenized_sentences): """ Count the number of word appearence in the tokenized sentences Args: tokenized_sentences: List of lists of strings Returns: dict that maps word (str) to the frequency (int) """ word_counts = {} for sentence in tokenized_sentences: for token in sentence:
defget_words_with_nplus_frequency(tokenized_sentences, count_threshold): """ Find the words that appear N times or more Args: tokenized_sentences: List of lists of sentences count_threshold: 阈值 Returns: List of words that appear N times or more """ closed_vocab = [] word_counts = count_words(tokenized_sentences)
for word, cnt in word_counts.items(): if cnt >= count_threshold: closed_vocab.append(word) return closed_vocab
defreplace_oov_words_by_unk(tokenized_sentences, vocabulary, unknown_token="<unk>"): """ Replace words not in the given vocabulary with '<unk>' token. Args: tokenized_sentences: List of lists of strings vocabulary: List of strings that we will use unknown_token: A string representing unknown (out-of-vocabulary) words Returns: List of lists of strings, with words not in the vocabulary replaced """ vocabulary = set(vocabulary) replaced_tokenized_sentences = [] for sentence in tokenized_sentences: replaced_sentence = []
for token in sentence: if token in vocabulary: replaced_sentence.append(token) else: replaced_sentence.append(unknown_token) replaced_tokenized_sentences.append(replaced_sentence) return replaced_tokenized_sentences
defpreprocess_data(train_data, test_data, count_threshold): """ Preprocess data, i.e., - Find tokens that appear at least N times in the training data. - Replace tokens that appear less than N times by "<unk>" both for training and test data. Args: train_data, test_data: List of lists of strings. count_threshold: Words whose count is less than this are treated as unknown. Returns: Tuple of - training data with low frequent words replaced by "<unk>" - test data with low frequent words replaced by "<unk>" - vocabulary of words that appear n times or more in the training data """
# Get the closed vocabulary using the train data vocabulary = get_words_with_nplus_frequency(train_data, count_threshold)
# For the train data, replace less common words with "<unk>" train_data_replaced = replace_oov_words_by_unk(train_data, vocabulary, "<unk>") # For the test data, replace less common words with "<unk>" test_data_replaced = replace_oov_words_by_unk(test_data, vocabulary, "<unk>") return train_data_replaced, test_data_replaced, vocabulary
defcount_n_grams(data, n, start_token='<s>', end_token = '<e>'): """ Count all n-grams in the data Args: data: List of lists of words n: number of words in a sequence Returns: A dictionary that maps a tuple of n-words to its frequency """ n_grams = {}
for sentence in data:
sentence = [start_token] * n + sentence + [end_token] sentence = tuple(sentence) for i in range(len(sentence) - n + 1):
n_gram = sentence[i : i + n]
if n_gram in n_grams.keys(): n_grams[n_gram] += 1 else: n_grams[n_gram] = 1 return n_grams
defestimate_probability(word, previous_n_gram, n_gram_counts, n_plus1_gram_counts, vocabulary_size, k=1.0): """ Estimate the probabilities of a next word using the n-gram counts with k-smoothing Args: word: next word previous_n_gram: A sequence of words of length n n_gram_counts: Dictionary of counts of n-grams n_plus1_gram_counts: Dictionary of counts of (n+1)-grams vocabulary_size: number of words in the vocabulary k: positive constant, smoothing parameter Returns: A probability """
previous_n_gram = tuple(previous_n_gram) # 计算之前的n-gram出现次数 if previous_n_gram in n_gram_counts: previous_n_gram_count = n_gram_counts[previous_n_gram] else: previous_n_gram_count = 0 # 分母 denominator = previous_n_gram_count + k * vocabulary_size
# 之前的n-gram加上当前单词 n_plus1_gram = previous_n_gram + (word, ) # 计算出现次数 if n_plus1_gram in n_plus1_gram_counts: n_plus1_gram_count = n_plus1_gram_counts[n_plus1_gram] else: n_plus1_gram_count = 0 # 分子 numerator = n_plus1_gram_count + k
probability = numerator / denominator return probability
defestimate_probabilities(previous_n_gram, n_gram_counts, n_plus1_gram_counts, vocabulary, k=1.0): """ Estimate the probabilities of next words using the n-gram counts with k-smoothing Args: previous_n_gram: A sequence of words of length n n_gram_counts: Dictionary of counts of (n+1)-grams n_plus1_gram_counts: Dictionary of counts of (n+1)-grams vocabulary: List of words k: positive constant, smoothing parameter Returns: A dictionary mapping from next words to the probability. """ # convert list to tuple to use it as a dictionary key previous_n_gram = tuple(previous_n_gram) # add <e> <unk> to the vocabulary # <s> is not needed since it should not appear as the next word vocabulary = vocabulary + ["<e>", "<unk>"] vocabulary_size = len(vocabulary) probabilities = {} for word in vocabulary: probability = estimate_probability(word, previous_n_gram, n_gram_counts, n_plus1_gram_counts, vocabulary_size, k=k) probabilities[word] = probability
defmake_count_matrix(n_plus1_gram_counts, vocabulary): # add <e> <unk> to the vocabulary # <s> is omitted since it should not appear as the next word vocabulary = vocabulary + ["<e>", "<unk>"] # obtain unique n-grams n_grams = [] for n_plus1_gram in n_plus1_gram_counts.keys(): n_gram = n_plus1_gram[0:-1] n_grams.append(n_gram) n_grams = list(set(n_grams)) # mapping from n-gram to row row_index = {n_gram:i for i, n_gram in enumerate(n_grams)} # mapping from next word to column col_index = {word:j for j, word in enumerate(vocabulary)} nrow = len(n_grams) ncol = len(vocabulary) count_matrix = np.zeros((nrow, ncol)) for n_plus1_gram, count in n_plus1_gram_counts.items(): n_gram = n_plus1_gram[0:-1] word = n_plus1_gram[-1] if word notin vocabulary: continue i = row_index[n_gram] j = col_index[word] count_matrix[i, j] = count count_matrix = pd.DataFrame(count_matrix, index=n_grams, columns=vocabulary) return count_matrix
defcalculate_perplexity(sentence, n_gram_counts, n_plus1_gram_counts, vocabulary_size, k=1.0): """ Calculate perplexity for a list of sentences Args: sentence: List of strings n_gram_counts: Dictionary of counts of (n+1)-grams n_plus1_gram_counts: Dictionary of counts of (n+1)-grams vocabulary_size: number of unique words in the vocabulary k: Positive smoothing constant Returns: Perplexity score """ # length of previous words n = len(list(n_gram_counts.keys())[0]) # prepend <s> and append <e> sentence = ["<s>"] * n + sentence + ["<e>"] # Cast the sentence from a list to a tuple sentence = tuple(sentence) # length of sentence (after adding <s> and <e> tokens) N = len(sentence) # The variable p will hold the product # that is calculated inside the n-root # Update this in the code below product_pi = 1.0 # Index t ranges from n to N - 1, inclusive on both ends for t in range(n, N):
# get the n-gram preceding the word at position t n_gram = sentence[t - n : t] # get the word at position t word = sentence[t] # Estimate the probability of the word given the n-gram # using the n-gram counts, n-plus1-gram counts, # vocabulary size, and smoothing constant probability = estimate_probability(word, n_gram, n_gram_counts, n_plus1_gram_counts, vocabulary_size, k=1) # Update the product of the probabilities # This 'product_pi' is a cumulative product # of the (1/P) factors that are calculated in the loop product_pi *= 1 / probability
# Take the Nth root of the product perplexity = product_pi ** float(1 / N) return perplexity
defsuggest_a_word(previous_tokens, n_gram_counts, n_plus1_gram_counts, vocabulary, k=1.0, start_with=None): """ Get suggestion for the next word Args: previous_tokens: The sentence you input where each token is a word. Must have length > n n_gram_counts: Dictionary of counts of (n+1)-grams n_plus1_gram_counts: Dictionary of counts of (n+1)-grams vocabulary: List of words k: positive constant, smoothing parameter start_with: If not None, specifies the first few letters of the next word Returns: A tuple of - string of the most likely next word - corresponding probability """ # length of previous words n = len(list(n_gram_counts.keys())[0]) # 获得最后面的n个单词 previous_n_gram = previous_tokens[-n:]