Source code for gluonnlp.data.transforms
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=invalid-encoded-data, too-many-lines
"""Transformer API. It provides tools for common transformation on samples in text dataset, such as
clipping, padding, and tokenization."""
__all__ = [
'ClipSequence', 'PadSequence', 'SacreMosesTokenizer',
'SpacyTokenizer', 'SacreMosesDetokenizer',
'JiebaTokenizer', 'NLTKStanfordSegmenter', 'SentencepieceTokenizer',
'SentencepieceDetokenizer', 'BERTBasicTokenizer', 'BERTTokenizer',
'BERTSentenceTransform', 'BERTSPTokenizer',
'GPT2BPETokenizer', 'GPT2BPEDetokenizer'
]
import functools
import io
import os
import time
import unicodedata
import warnings
import zipfile
from typing import List, Optional
import mxnet as mx
from mxnet.gluon.utils import _get_repo_url, check_sha1, download
import numpy as np
from ..base import get_home_dir
from ..vocab.vocab import Vocab
from .utils import _extract_archive
from .fast_bert_tokenizer import is_control, is_punctuation, is_whitespace
from .fast_bert_tokenizer import BasicTokenizer, WordpieceTokenizer
[docs]class ClipSequence:
"""Clip the sequence to have length no more than `length`.
Parameters
----------
length : int
Maximum length of the sequence
Examples
--------
>>> datasets = gluon.data.SimpleDataset([[1, 3, 5, 7], [1, 2, 3], [1, 2, 3, 4, 5, 6, 7, 8]])
>>> list(datasets.transform(gluonnlp.data.ClipSequence(4)))
[[1, 3, 5, 7], [1, 2, 3], [1, 2, 3, 4]]
>>> datasets = gluon.data.SimpleDataset([np.array([[1, 3], [5, 7], [7, 5], [3, 1]]),
... np.array([[1, 2], [3, 4], [5, 6],
... [6, 5], [4, 3], [2, 1]]),
... np.array([[2, 4], [4, 2]])])
>>> list(datasets.transform(gluonnlp.data.ClipSequence(3)))
[array([[1, 3],
[5, 7],
[7, 5]]), array([[1, 2],
[3, 4],
[5, 6]]), array([[2, 4],
[4, 2]])]
"""
def __init__(self, length):
self._length = length
[docs]class PadSequence:
"""Pad the sequence.
Pad the sequence to the given `length` by inserting `pad_val`. If `clip` is set,
sequence that has length larger than `length` will be clipped.
Parameters
----------
length : int
The maximum length to pad/clip the sequence
pad_val : number
The pad value. Default 0
clip : bool
Examples
--------
>>> datasets = gluon.data.SimpleDataset([[1, 3, 5, 7], [1, 2, 3], [1, 2, 3, 4, 5, 6, 7, 8]])
>>> list(datasets.transform(gluonnlp.data.PadSequence(6)))
[[1, 3, 5, 7, 0, 0], [1, 2, 3, 0, 0, 0], [1, 2, 3, 4, 5, 6]]
>>> list(datasets.transform(gluonnlp.data.PadSequence(6, clip=False)))
[[1, 3, 5, 7, 0, 0], [1, 2, 3, 0, 0, 0], [1, 2, 3, 4, 5, 6, 7, 8]]
>>> list(datasets.transform(gluonnlp.data.PadSequence(6, pad_val=-1, clip=False)))
[[1, 3, 5, 7, -1, -1], [1, 2, 3, -1, -1, -1], [1, 2, 3, 4, 5, 6, 7, 8]]
"""
def __init__(self, length, pad_val=0, clip=True):
self._length = length
self._pad_val = pad_val
self._clip = clip
[docs] def __call__(self, sample):
"""
Parameters
----------
sample : list of number or mx.nd.NDArray or np.ndarray
Returns
-------
ret : list of number or mx.nd.NDArray or np.ndarray
"""
sample_length = len(sample)
if sample_length >= self._length:
if self._clip and sample_length > self._length:
return sample[:self._length]
else:
return sample
else:
if isinstance(sample, mx.nd.NDArray):
# TODO(sxjscience) Use this trick for padding because mx.pad currently only supports
# 4D/5D inputs
new_sample_shape = (self._length, ) + sample.shape[1:]
ret = mx.nd.full(shape=new_sample_shape,
val=self._pad_val,
ctx=sample.context,
dtype=sample.dtype)
ret[:sample_length] = sample
return ret
elif isinstance(sample, np.ndarray):
pad_width = [(0, self._length - sample_length)] +\
[(0, 0) for _ in range(sample.ndim - 1)]
return np.pad(sample,
mode='constant',
constant_values=self._pad_val,
pad_width=pad_width)
elif isinstance(sample, list):
return sample + [
self._pad_val for _ in range(self._length - sample_length)
]
else:
raise NotImplementedError(
'The input must be 1) list or 2) numpy.ndarray or 3) '
'mxnet.NDArray, received type=%s' % str(type(sample)))
[docs]class SacreMosesTokenizer:
"""Apply the Moses Tokenizer implemented in sacremoses.
Users of this class are required to install
`sacremoses <https://github.com/alvations/sacremoses>`_.
For example, one can use :samp:`pip install sacremoses`.
.. note::
sacremoses carries an LGPL 2.1+ license.
Examples
--------
>>> tokenizer = gluonnlp.data.SacreMosesTokenizer()
>>> tokenizer('Gluon NLP toolkit provides a suite of text processing tools.')
['Gluon', 'NLP', 'toolkit', 'provides', 'a', 'suite', 'of', 'text', 'processing', 'tools', '.']
>>> tokenizer('Das Gluon NLP-Toolkit stellt eine Reihe von Textverarbeitungstools '
... 'zur Verfügung.')
['Das', 'Gluon', 'NLP-Toolkit', 'stellt', 'eine', 'Reihe', 'von', 'Textverarbeitungstools', \
'zur', 'Verf\xfcgung', '.']
"""
def __init__(self):
from sacremoses import MosesTokenizer # pylint: disable=import-outside-toplevel
self._tokenizer = MosesTokenizer()
[docs] def __call__(self, sample: str, return_str: bool = False):
"""Tokenize a sample.
Parameters
----------
sample
The sentence to tokenize
return_str
True: return a single string
False: return a list of tokens
Returns
-------
ret : list of strs or str
List of tokens or tokenized text
"""
return self._tokenizer.tokenize(sample, return_str=return_str)
[docs]class SpacyTokenizer:
"""Apply the Spacy Tokenizer.
Users of this class are required to install `spaCy <https://spacy.io/usage/>`_
and download corresponding NLP models, such as :samp:`python -m spacy download en`.
Only spacy>=2.0.0 is supported.
Parameters
----------
lang : str
The language to tokenize. Default is 'en', i.e, English.
You may refer to https://spacy.io/usage/models for supported languages.
Examples
--------
>>> tokenizer = gluonnlp.data.SpacyTokenizer()
>>> tokenizer('Gluon NLP toolkit provides a suite of text processing tools.')
['Gluon', 'NLP', 'toolkit', 'provides', 'a', 'suite', 'of', 'text', 'processing', 'tools', '.']
>>> tokenizer = gluonnlp.data.SpacyTokenizer('de')
>>> tokenizer('Das Gluon NLP-Toolkit stellt eine Reihe von Textverarbeitungstools'
... ' zur Verfügung.')
['Das', 'Gluon', 'NLP-Toolkit', 'stellt', 'eine', 'Reihe', 'von', 'Textverarbeitungstools', \
'zur', 'Verf\xfcgung', '.']
"""
def __init__(self, lang='en_core_web_sm'):
try:
import spacy # pylint: disable=import-outside-toplevel
from pkg_resources import parse_version # pylint: disable=import-outside-toplevel
assert parse_version(spacy.__version__) >= parse_version('2.0.0'),\
'We only support spacy>=2.0.0'
except ImportError:
raise ImportError(
'spaCy is not installed. You must install spaCy in order to use the '
'SpacyTokenizer. You can refer to the official installation guide '
'in https://spacy.io/usage/.')
try:
self._nlp = spacy.load(lang, disable=['parser', 'tagger', 'ner'])
except IOError:
raise IOError(
'SpaCy Model for the specified language="{lang}" has not been '
'downloaded. You need to check the installation guide in '
'https://spacy.io/usage/models. Usually, the installation command '
'should be `python -m spacy download {lang}`.'.format(
lang=lang))
[docs] def __call__(self, sample):
"""
Parameters
----------
sample: str
The sentence to tokenize
Returns
-------
ret : list of strs
List of tokens
"""
return [tok.text for tok in self._nlp(sample)]
[docs]class SacreMosesDetokenizer:
r"""Apply the Moses Detokenizer implemented in sacremoses.
Users of this class are required to `install sacremoses
<https://github.com/alvations/sacremoses>`_. For example, one can use
:samp:`pip install sacremoses`.
.. note::
sacremoses carries an LGPL 2.1+ license.
Parameters
----------
return_str: bool, default False
True: return a single string
False: return a list of words
Examples
--------
>>> detokenizer = gluonnlp.data.SacreMosesDetokenizer()
>>> detokenizer(['Gluon', 'NLP', 'toolkit', 'provides', 'a', 'suite', 'of',
... 'text', 'processing', 'tools', '.'], return_str=True)
'Gluon NLP toolkit provides a suite of text processing tools.'
>>> detokenizer(['Das', 'Gluon','NLP-Toolkit','stellt','eine','Reihe','von',
... 'Textverarbeitungstools','zur','Verfügung','.'], return_str=True)
'Das Gluon NLP-Toolkit stellt eine Reihe von Textverarbeitungstools zur Verfügung.'
"""
def __init__(self, return_str=True):
self._return_str = return_str
from sacremoses import MosesDetokenizer # pylint: disable=import-outside-toplevel
self._detokenizer = MosesDetokenizer()
[docs] def __call__(self, sample: List[str], return_str: Optional[bool] = None):
"""
Parameters
----------
sample
The sentence to detokenize
return_str
True: return a single string
False: return a list of words
None: use constructor setting
Returns
-------
ret : list of strs or str
List of words or detokenized text
"""
ret_str = self._return_str if return_str is None else return_str
return self._detokenizer.detokenize(sample, return_str=ret_str)
[docs]class JiebaTokenizer:
r"""Apply the jieba Tokenizer.
Users of this class are required to `install jieba <https://github.com/fxsjy/jieba>`_
Parameters
----------
lang : str
The language to tokenize. Default is "zh", i.e, Chinese.
Examples
--------
>>> tokenizer = gluonnlp.data.JiebaTokenizer()
>>> tokenizer('我来到北京清华大学')
['我', '来到', '北京', '清华大学']
>>> tokenizer('小明硕士毕业于中国科学院计算所,后在日本京都大学深造')
['小明', '硕士', '毕业', '于', '中国科学院', '计算所', ',', '后', '在', '日本京都大学', '深造']
"""
def __init__(self):
try:
with warnings.catch_warnings(): # jieba uses deprecated imp module
warnings.simplefilter('ignore')
import jieba # pylint: disable=import-outside-toplevel
except ImportError:
raise ImportError(
'jieba is not installed. You must install jieba in order to use the '
'JiebaTokenizer. You can refer to the official installation guide '
'in https://github.com/fxsjy/jieba')
self._tokenizer = jieba
[docs] def __call__(self, sample):
"""
Parameters
----------
sample: str
The Chinese sentence to tokenize. Better not to input sentence in other languages
since this class is mainly used for Chinese Word Segmentation.
Returns
-------
ret : list of strs
List of tokens
"""
# we use default cutting mode provided by jieba, i.e., accurate mode
return [
tok for tok in self._tokenizer.cut(sample)
if tok not in (' ', '')
]
[docs]class NLTKStanfordSegmenter:
r"""Apply the Stanford Chinese Word Segmenter implemented in NLTK.
Users of this class are required to install Java, NLTK and download Stanford Word Segmenter
Parameters
----------
segmenter_root : str, default '$MXNET_HOME/stanford-segmenter'
Path to folder for storing stanford segmenter.
MXNET_HOME defaults to '~/.mxnet'.
slf4j_root : str, default '$MXNET_HOME/slf4j'
Path to foler for storing slf4j.
MXNET_HOME defaults to '~/.mxnet'
java_class : str, default 'edu.stanford.nlp.ie.crf.CRFClassifier'
The learning algorithm used for segmentation
Examples
--------
>>> tokenizer = gluonnlp.data.NLTKStanfordSegmenter() #doctest:+SKIP
>>> tokenizer('我来到北京清华大学') #doctest:+SKIP
['我', '来到', '北京', '清华大学']
>>> tokenizer('小明硕士毕业于中国科学院计算所,后在日本京都大学深造') #doctest:+SKIP
['小明', '硕士', '毕业', '于', '中国科学院', '计算所', ',', '后', '在', '日本京都大学', '深造']
"""
def __init__(self,
segmenter_root=os.path.join(get_home_dir(),
'stanford-segmenter'),
slf4j_root=os.path.join(get_home_dir(), 'slf4j'),
java_class='edu.stanford.nlp.ie.crf.CRFClassifier'):
is_java_exist = os.system('java -version')
assert is_java_exist == 0, 'Java is not installed. You must install Java 8.0' \
'in order to use the NLTKStanfordSegmenter'
try:
from nltk.tokenize import StanfordSegmenter # pylint: disable=import-outside-toplevel
except ImportError:
raise ImportError(
'NLTK or relevant packages are not installed. You must install NLTK '
'in order to use the NLTKStanfordSegmenter. You can refer to the '
'official installation guide in https://www.nltk.org/install.html.'
)
path_to_jar = os.path.join(segmenter_root,
'stanford-segmenter-2018-02-27',
'stanford-segmenter-3.9.1.jar')
path_to_model = os.path.join(segmenter_root,
'stanford-segmenter-2018-02-27', 'data',
'pku.gz')
path_to_dict = os.path.join(segmenter_root,
'stanford-segmenter-2018-02-27', 'data',
'dict-chris6.ser.gz')
path_to_sihan_corpora_dict = os.path.join(
segmenter_root, 'stanford-segmenter-2018-02-27', 'data')
segmenter_url = 'https://nlp.stanford.edu/software/stanford-segmenter-2018-02-27.zip'
segmenter_sha1 = 'aa27a6433704b7b4c6a14be1c126cb4b14b8f57b'
stanford_segmenter = os.path.join(segmenter_root,
'stanford-segmenter-2018-02-27.zip')
if not os.path.exists(path_to_jar) or \
not os.path.exists(path_to_model) or \
not os.path.exists(path_to_dict) or \
not os.path.exists(path_to_sihan_corpora_dict) or \
not check_sha1(filename=stanford_segmenter, sha1_hash=segmenter_sha1):
# automatically download the files from the website and place them to stanford_root
if not os.path.exists(segmenter_root):
os.mkdir(segmenter_root)
download(url=segmenter_url,
path=segmenter_root,
sha1_hash=segmenter_sha1)
_extract_archive(file=stanford_segmenter,
target_dir=segmenter_root)
path_to_slf4j = os.path.join(slf4j_root, 'slf4j-1.7.25',
'slf4j-api-1.7.25.jar')
slf4j_url = 'https://www.slf4j.org/dist/slf4j-1.7.25.zip'
slf4j_sha1 = '89ea41ad6ebe1b190139421bb7c8d981e9df1625'
slf4j = os.path.join(slf4j_root, 'slf4j-1.7.25.zip')
if not os.path.exists(path_to_slf4j) or \
not check_sha1(filename=slf4j, sha1_hash=slf4j_sha1):
# automatically download the files from the website and place them to slf4j_root
if not os.path.exists(slf4j_root):
os.mkdir(slf4j_root)
download(url=slf4j_url, path=slf4j_root, sha1_hash=slf4j_sha1)
_extract_archive(file=slf4j, target_dir=slf4j_root)
self._tokenizer = StanfordSegmenter(
java_class=java_class,
path_to_jar=path_to_jar,
path_to_slf4j=path_to_slf4j,
path_to_dict=path_to_dict,
path_to_sihan_corpora_dict=path_to_sihan_corpora_dict,
path_to_model=path_to_model)
[docs] def __call__(self, sample):
"""
Parameters
----------
sample: str
The Chinese sentence to tokenize. Better not to input sentence in other languages
since this class is mainly used for Chinese Word Segmentation.
Returns
-------
ret : list of strs
List of tokens
"""
return self._tokenizer.segment(sample).strip().split()
class _SentencepieceProcessor:
def __init__(self, path):
try:
import sentencepiece # pylint: disable=import-outside-toplevel
except ImportError:
raise ImportError(
'sentencepiece is not installed. You must install sentencepiece '
'in order to use the Sentencepiece tokenizer and detokenizer. '
'You can refer to the official installation guide '
'in https://github.com/google/sentencepiece#installation')
self._processor = sentencepiece.SentencePieceProcessor()
self._processor.Load(path)
def __len__(self):
return len(self._processor)
@property
def tokens(self):
return [self._processor.id_to_piece(i) for i in range(len(self))]
[docs]class SentencepieceTokenizer(_SentencepieceProcessor):
r"""Apply the Sentencepiece Tokenizer, which supports subword tokenization such as BPE.
Users of this class are required to `install sentencepiece
<https://github.com/google/sentencepiece>`_. For example, one can use
:samp:`pip install sentencepiece`
Parameters
----------
path : str
Path to the pre-trained subword tokenization model.
num_best : int, default 0
A scalar for sampling subwords. If num_best = {0,1}, no sampling is performed.
If num_best > 1, then samples from the num_best results.
If num_best < 0, then assume that num_best is infinite and
samples from the all hypothesis (lattice) using forward-filtering-and-backward-sampling
algorithm.
alpha : float, default 1.0
A scalar for a smoothing parameter. Inverse temperature for probability rescaling.
Examples
--------
>>> url = 'http://repo.mxnet.io/gluon/dataset/vocab/test-0690baed.bpe'
>>> f = gluon.utils.download(url)
-etc-
>>> tokenizer = gluonnlp.data.SentencepieceTokenizer(f)
>>> detokenizer = gluonnlp.data.SentencepieceDetokenizer(f)
>>> sentence = 'This is a very awesome, life-changing sentence.'
>>> tokenizer(sentence)
['▁This', '▁is', '▁a', '▁very', '▁awesome', ',', '▁life', '-', 'ch', 'anging', '▁sentence', '.']
>>> detokenizer(tokenizer(sentence))
'This is a very awesome, life-changing sentence.'
>>> os.remove('test-0690baed.bpe')
"""
def __init__(self, path, num_best=0, alpha=1.0):
super(SentencepieceTokenizer, self).__init__(path)
self._path = path
self._nbest = num_best
self._alpha = alpha
[docs] def __call__(self, sample):
"""
Parameters
----------
sample: str
The string to tokenize.
Returns
-------
ret : list of strs
List of tokens
"""
return self._processor.SampleEncodeAsPieces(sample, self._nbest,
self._alpha)
[docs]class SentencepieceDetokenizer(_SentencepieceProcessor):
r"""Apply the Sentencepiece detokenizer, which supports recombining subwords such as BPE.
Users of this class are required to `install sentencepiece
<https://github.com/google/sentencepiece>`_. For example, one can use
:samp:`pip install sentencepiece`
Parameters
----------
path : str
Path to the pre-trained subword tokenization model.
Examples
--------
>>> url = 'http://repo.mxnet.io/gluon/dataset/vocab/test-0690baed.bpe'
>>> f = gluon.utils.download(url)
-etc-
>>> tokenizer = gluonnlp.data.SentencepieceTokenizer(f)
>>> detokenizer = gluonnlp.data.SentencepieceDetokenizer(f)
>>> sentence = 'This is a very awesome, life-changing sentence.'
>>> tokenizer(sentence)
['▁This', '▁is', '▁a', '▁very', '▁awesome', ',', '▁life', '-', 'ch', 'anging', '▁sentence', '.']
>>> detokenizer(tokenizer(sentence))
'This is a very awesome, life-changing sentence.'
>>> os.remove('test-0690baed.bpe')
"""
[docs] def __call__(self, sample):
"""
Parameters
----------
sample: list(str)
The sentence to detokenize
Returns
-------
ret : str
Detokenized text
"""
return self._processor.DecodePieces(sample)
[docs]class BERTBasicTokenizer:
r"""Runs basic tokenization
performs invalid character removal (e.g. control chars) and whitespace.
tokenize CJK chars.
splits punctuation on a piece of text.
strips accents and convert to lower case.(If lower is true)
Parameters
----------
lower : bool, default True
whether the text strips accents and convert to lower case.
Examples
--------
>>> tokenizer = gluonnlp.data.BERTBasicTokenizer(lower=True)
>>> tokenizer(' \tHeLLo!how \n Are yoU? ')
['hello', '!', 'how', 'are', 'you', '?']
>>> tokenizer = gluonnlp.data.BERTBasicTokenizer(lower=False)
>>> tokenizer(' \tHeLLo!how \n Are yoU? ')
['HeLLo', '!', 'how', 'Are', 'yoU', '?']
"""
def __init__(self, lower=True):
self.tokenizer = BasicTokenizer(lower=lower)
[docs] def __call__(self, sample):
"""
Parameters
----------
sample: str
The string to tokenize. Must be unicode.
Returns
-------
ret : list of strs
List of tokens
"""
return self.tokenizer.tokenize(sample)
def _is_control(self, char):
"""Checks whether `chars` is a control character."""
return is_control(char, unicodedata.category(char))
def _is_punctuation(self, char):
"""Checks whether `chars` is a punctuation character."""
return is_punctuation(char, unicodedata.category(char))
def _is_whitespace(self, char):
"""Checks whether `chars` is a whitespace character."""
return is_whitespace(char, unicodedata.category(char))
[docs]class BERTTokenizer:
r"""End-to-end tokenization for BERT models.
Parameters
----------
vocab
Vocabulary for the corpus.
lower
whether the text strips accents and convert to lower case.
If you use the BERT pre-training model,
lower is set to Flase when using the cased model,
otherwise it is set to True.
max_input_chars_per_word
lru_cache_size
Maximum size of a least-recently-used cache to speed up tokenization.
Use size of 2**20 for example.
Examples
--------
>>> _, vocab = gluonnlp.model.bert_12_768_12(dataset_name='wiki_multilingual_uncased',
... pretrained=False, root='./model')
-etc-
>>> tokenizer = gluonnlp.data.BERTTokenizer(vocab=vocab)
>>> tokenizer('gluonnlp: 使NLP变得简单。')
['gl', '##uo', '##nn', '##lp', ':', '使', 'nl', '##p', '变', '得', '简', '单', '。']
"""
_special_prefix = '##'
def __init__(self, vocab: Vocab, lower: bool = True, max_input_chars_per_word: int = 200,
lru_cache_size: Optional[int] = None):
self.vocab = vocab
self.max_input_chars_per_word = max_input_chars_per_word
self.basic_tokenizer = BasicTokenizer(lower=lower)
self.wordpiece_tokenizer = WordpieceTokenizer(vocab=vocab,
unk_token=vocab.unknown_token,
max_input_chars_per_word=\
max_input_chars_per_word)
if lru_cache_size:
self._word_to_wordpiece_optimized = functools.lru_cache(maxsize=lru_cache_size)(
self._word_to_wordpiece_optimized)
[docs] def __call__(self, sample):
"""
Parameters
----------
sample: str
The string to tokenize.
Returns
-------
ret : list of strs
List of tokens
"""
return self._tokenizer(sample)
def _tokenizer(self, text):
split_tokens = []
for token in self.basic_tokenizer.tokenize(text):
for sub_token in self._word_to_wordpiece_optimized(token):
split_tokens.append(sub_token)
return split_tokens
def _word_to_wordpiece_optimized(self, text): # pylint: disable=method-hidden
return self.wordpiece_tokenizer.tokenize(text)
[docs] def convert_tokens_to_ids(self, tokens):
"""Converts a sequence of tokens into ids using the vocab."""
return self.vocab.to_indices(tokens)
[docs] @staticmethod
def is_first_subword(token):
"""Check if a token is the beginning of subwords.
Parameters
----------
token : str
The input token.
Returns
-------
ret : True if the token is the beginning of a serious of wordpieces.
Examples
--------
>>> _, vocab = gluonnlp.model.bert_12_768_12(dataset_name='wiki_multilingual_uncased',
... pretrained=False, root='./bert_tokenizer')
-etc-
>>> tokenizer = gluonnlp.data.BERTTokenizer(vocab=vocab)
>>> tokenizer('gluonnlp: 使NLP变得简单。')
['gl', '##uo', '##nn', '##lp', ':', '使', 'nl', '##p', '变', '得', '简', '单', '。']
>>> tokenizer.is_first_subword('gl')
True
>>> tokenizer.is_first_subword('##uo')
False
"""
return not token.startswith(BERTTokenizer._special_prefix)
[docs]class BERTSPTokenizer:
r"""End-to-end SentencePiece tokenization for BERT models.
It works best with BERTSentenceTransform().
.. note::
BERTSPTokenizer depends on the sentencepiece library. For multi-processing
with BERTSPTokenizer, making an extra copy of the BERTSPTokenizer instance
is recommended before using it.
Parameters
----------
path : str
Path to the pre-trained subword tokenization model.
vocab : gluonnlp.Vocab
Vocabulary for the corpus.
num_best : int, default 0
A scalar for sampling subwords. If num_best = {0,1}, no sampling is performed.
If num_best > 1, then samples from the num_best results.
If num_best < 0, then assume that num_best is infinite and
samples from the all hypothesis (lattice) using forward-filtering-and-backward-sampling
algorithm.
alpha : float
A scalar for a smoothing parameter. Inverse temperature for probability rescaling.
lower : bool, default True
Whether the text strips accents and convert to lower case.
If you use the BERT pre-training model,
lower is set to False when using the cased model,
otherwise it is set to True.
max_input_chars_per_word : int, default 200
Examples
--------
>>> url = 'http://repo.mxnet.io/gluon/dataset/vocab/test-682b5d15.bpe'
>>> f = gluon.utils.download(url)
-etc-
>>> bert_vocab = gluonnlp.vocab.BERTVocab.from_sentencepiece(f)
>>> sp_tokenizer = BERTSPTokenizer(f, bert_vocab, lower=True)
>>> sentence = 'Better is to bow than break.'
>>> sp_tokenizer(sentence)
['▁better', '▁is', '▁to', '▁b', 'ow', '▁than', '▁brea', 'k', '▁', '.']
>>> os.remove('test-682b5d15.bpe')
"""
_special_prefix = '▁'
def __init__(self,
path,
vocab,
num_best=0,
alpha=1.0,
lower=True,
max_input_chars_per_word=200):
self._path = path
self._num_best = num_best
self._alpha = alpha
self.sentencepiece = None
self.basic_tokenizer = BERTBasicTokenizer(lower=lower)
self.vocab = vocab
self.max_input_chars_per_word = max_input_chars_per_word
def _activate_sp(self):
self.sentencepiece = SentencepieceTokenizer(self._path, self._num_best,
self._alpha)
[docs] def __call__(self, sample):
"""
Parameters
----------
sample: str
The string to tokenize.
Returns
-------
ret : list of strs
List of tokens
"""
return self._tokenizer(sample)
def _tokenizer(self, text):
split_tokens = []
for token in self.basic_tokenizer(text):
for sub_token in self._tokenize_wordpiece(token):
split_tokens.append(sub_token)
return split_tokens
def _tokenize_wordpiece(self, text):
"""Tokenizes a piece of text into its word pieces.
This use Google's SentencePiece tokenizer model file
For example:
input = "unaffable"
output = ["▁un", "aff", "able"]
Args:
text: A single token or whitespace separated tokens. This should have
already been passed through `BERTBasicTokenizer.
Returns:
A list of sentencepieced tokens.
"""
# Swig object can not be pickled when multiprocessing.
if self.sentencepiece is None:
self._activate_sp()
output_tokens = self.sentencepiece(text)
return output_tokens
[docs] def convert_tokens_to_ids(self, tokens):
"""Converts a sequence of tokens into ids using the vocab."""
return self.vocab.to_indices(tokens)
[docs] @staticmethod
def is_first_subword(token):
"""Check if a string token is a subword following a previous subword,
instead of the beginning of a word.
Parameters
----------
token : str
The input token.
Returns
-------
ret : True if the token is the beginning of a series of subwords,
Examples
--------
>>> url = 'http://repo.mxnet.io/gluon/dataset/vocab/test-682b5d15.bpe'
>>> f = gluon.utils.download(url)
-etc-
>>> bert_vocab = gluonnlp.vocab.BERTVocab.from_sentencepiece(f)
>>> sp_tokenizer = BERTSPTokenizer(f, bert_vocab, lower=True)
>>> sp_tokenizer('Better is to bow than break.')
['▁better', '▁is', '▁to', '▁b', 'ow', '▁than', '▁brea', 'k', '▁', '.']
>>> sp_tokenizer.is_first_subword('▁better')
True
>>> sp_tokenizer.is_first_subword('ow')
False
>>> os.remove('test-682b5d15.bpe')
"""
return token.startswith(BERTSPTokenizer._special_prefix)
[docs]class BERTSentenceTransform:
r"""BERT style data transformation.
Parameters
----------
tokenizer : BERTTokenizer.
Tokenizer for the sentences.
max_seq_length : int.
Maximum sequence length of the sentences.
vocab : Vocab
The vocabulary which has cls_token and sep_token registered.
If vocab.cls_token is not present, vocab.bos_token is used instead.
If vocab.sep_token is not present, vocab.eos_token is used instead.
pad : bool, default True
Whether to pad the sentences to maximum length.
pair : bool, default True
Whether to transform sentences or sentence pairs.
"""
def __init__(self, tokenizer, max_seq_length, vocab=None, pad=True, pair=True):
self._tokenizer = tokenizer
self._max_seq_length = max_seq_length
self._pad = pad
self._pair = pair
self._vocab = self._tokenizer.vocab if vocab is None else vocab
# RoBERTa does not register CLS token and SEP token
if hasattr(self._vocab, 'cls_token'):
self._cls_token = self._vocab.cls_token
else:
self._cls_token = self._vocab.bos_token
if hasattr(self._vocab, 'sep_token'):
self._sep_token = self._vocab.sep_token
else:
self._sep_token = self._vocab.eos_token
self._padding_token = self._vocab.padding_token
[docs] def __call__(self, line):
"""Perform transformation for sequence pairs or single sequences.
The transformation is processed in the following steps:
- tokenize the input sequences
- insert [CLS], [SEP] as necessary
- generate type ids to indicate whether a token belongs to the first
sequence or the second sequence.
- generate valid length
For sequence pairs, the input is a tuple of 2 strings:
text_a, text_b.
Inputs:
text_a: 'is this jacksonville ?'
text_b: 'no it is not'
Tokenization:
text_a: 'is this jack ##son ##ville ?'
text_b: 'no it is not .'
Processed:
tokens: '[CLS] is this jack ##son ##ville ? [SEP] no it is not . [SEP]'
type_ids: 0 0 0 0 0 0 0 0 1 1 1 1 1 1
valid_length: 14
For single sequences, the input is a tuple of single string:
text_a.
Inputs:
text_a: 'the dog is hairy .'
Tokenization:
text_a: 'the dog is hairy .'
Processed:
text_a: '[CLS] the dog is hairy . [SEP]'
type_ids: 0 0 0 0 0 0 0
valid_length: 7
If vocab.cls_token and vocab.sep_token are not present,
vocab.bos_token and vocab.eos_token are used instead.
Parameters
----------
line: tuple of str
Input strings. For sequence pairs, the input is a tuple of 2 strings:
(text_a, text_b). For single sequences, the input is a tuple of single
string: (text_a,).
Returns
-------
np.array: input token ids in 'int32', shape (batch_size, seq_length)
np.array: valid length in 'int32', shape (batch_size,)
np.array: input token type ids in 'int32', shape (batch_size, seq_length)
"""
# convert to unicode
text_a = line[0]
if self._pair:
assert len(line) == 2
text_b = line[1]
tokens_a = self._tokenizer(text_a)
tokens_b = None
if self._pair:
tokens_b = self._tokenizer(text_b)
if tokens_b:
# Modifies `tokens_a` and `tokens_b` in place so that the total
# length is less than the specified length.
# Account for [CLS], [SEP], [SEP] with "- 3"
self._truncate_seq_pair(tokens_a, tokens_b,
self._max_seq_length - 3)
else:
# Account for [CLS] and [SEP] with "- 2"
if len(tokens_a) > self._max_seq_length - 2:
tokens_a = tokens_a[0:(self._max_seq_length - 2)]
# The embedding vectors for `type=0` and `type=1` were learned during
# pre-training and are added to the wordpiece embedding vector
# (and position vector). This is not *strictly* necessary since
# the [SEP] token unambiguously separates the sequences, but it makes
# it easier for the model to learn the concept of sequences.
# For classification tasks, the first vector (corresponding to [CLS]) is
# used as as the "sentence vector". Note that this only makes sense because
# the entire model is fine-tuned.
tokens = []
tokens.append(self._cls_token)
tokens.extend(tokens_a)
tokens.append(self._sep_token)
segment_ids = [0] * len(tokens)
if tokens_b:
tokens.extend(tokens_b)
tokens.append(self._sep_token)
segment_ids.extend([1] * (len(tokens) - len(segment_ids)))
input_ids = self._vocab[tokens]
# The valid length of sentences. Only real tokens are attended to.
valid_length = len(input_ids)
if self._pad:
# Zero-pad up to the sequence length.
padding_length = self._max_seq_length - valid_length
# use padding tokens for the rest
input_ids.extend([self._vocab[self._padding_token]] * padding_length)
segment_ids.extend([0] * padding_length)
return np.array(input_ids, dtype='int32'), np.array(valid_length, dtype='int32'),\
np.array(segment_ids, dtype='int32')
def _truncate_seq_pair(self, tokens_a, tokens_b, max_length):
"""Truncates a sequence pair in place to the maximum length."""
# This is a simple heuristic which will always truncate the longer sequence
# one token at a time. This makes more sense than truncating an equal percent
# of tokens from each, since if one sequence is very short then each token
# that's truncated likely contains more information than a longer sequence.
while True:
total_length = len(tokens_a) + len(tokens_b)
if total_length <= max_length:
break
if len(tokens_a) > len(tokens_b):
tokens_a.pop()
else:
tokens_b.pop()
class _GPT2BPE:
"""Base class for GPT-2 BPE tokenizer and detokenizer."""
def __init__(self):
codes = list(range(ord('!'), ord('~') + 1)) +\
list(range(ord('¡'), ord('¬') + 1)) +\
list(range(ord('®'), ord('ÿ') + 1))
chr_fn = chr
try:
chr_fn(256)
except ValueError:
chr_fn = unichr # noqa: F821
byte_encoder = {code: chr_fn(code) for code in codes}
shift = 0
for code in range(2 ** 8):
if code not in byte_encoder:
byte_encoder[code] = chr_fn(2 ** 8 + shift)
shift += 1
self._byte_encoder = byte_encoder
[docs]class GPT2BPETokenizer(_GPT2BPE):
"""BPE tokenizer used in OpenAI GPT-2 model.
Parameters
----------
root : str, default '$MXNET_HOME/models'
Location for keeping the BPE rank file.
MXNET_HOME defaults to '~/.mxnet'.
"""
bpe_ranks_file_hash = ('openai_webtext_bpe_ranks-396d4d8e.json',
'396d4d8ec90cb02f4d56e049e0e4add868bcd943')
bpe_ranks_archive_hash = ('openai_webtext_bpe_ranks-396d4d8e.zip',
'1a770728fd102bc9dc332f322e6bfb294767a685')
def __init__(self, root=os.path.join(get_home_dir(), 'models')):
try:
import regex # pylint: disable=import-outside-toplevel
self._regex = regex
except ImportError:
raise ImportError(
'GPT2BPETokenizer requires regex. '
'To install regex, use pip install -U regex')
super(GPT2BPETokenizer, self).__init__()
root = os.path.expanduser(root)
file_name, sha1_hash = self.bpe_ranks_file_hash
file_path = os.path.join(root, file_name)
if not os.path.exists(file_path) or not check_sha1(file_path, sha1_hash):
if os.path.exists(file_path):
print('Detected mismatch in the content of BPE rank file. Downloading again.')
else:
print('BPE rank file is not found. Downloading.')
os.makedirs(root, exist_ok=True)
prefix = str(time.time())
zip_file_path = os.path.join(root, prefix + file_name)
repo_url = _get_repo_url()
if repo_url[-1] != '/':
repo_url = repo_url + '/'
archive_name, archive_hash = self.bpe_ranks_archive_hash
_url_format = '{repo_url}gluon/dataset/vocab/{file_name}'
download(_url_format.format(repo_url=repo_url, file_name=archive_name),
path=zip_file_path,
sha1_hash=archive_hash,
overwrite=True)
with zipfile.ZipFile(zip_file_path) as zf:
if not os.path.exists(file_path):
zf.extractall(root)
try:
os.remove(zip_file_path)
except OSError as e:
# file has already been removed.
if e.errno == 2:
pass
else:
raise e
if not check_sha1(file_path, sha1_hash):
raise ValueError('Downloaded file has different hash. Please try again.')
self._read_bpe_ranks(file_path)
self._cache = {}
self._token_pattern = self._regex.compile(
r'\'s|\'t|\'re|\'ve|\'m|\'ll|\'d| ?\p{L}+'
r'| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+')
def _read_bpe_ranks(self, file_path):
with io.open(file_path, 'r', encoding='utf-8') as f:
bpe_data = f.read()
self._bpe_ranks = {
tuple(merge_str.split()): i for i, merge_str
in enumerate(bpe_data.split('\n')[1:-1])}
[docs] def get_bpe_subword(self, token):
""" Encode the word token into BPE subwords
Parameters
----------
token : str
Returns
-------
chars : list(str)
"""
if token in self._cache:
return self._cache[token]
chars = list(token)
while len(chars) > 0:
min_pair, min_rank = None, float('inf')
# Find the pair with the minimum rank
for i in range(1, len(chars)):
pair = (chars[i - 1], chars[i])
rank = self._bpe_ranks.get(pair, float('inf'))
if rank < min_rank:
min_rank = rank
min_pair = pair
if min_pair is None or min_pair not in self._bpe_ranks:
break
# Merge the pair
last, tail = chars[0], 1
for index in range(1, len(chars)):
if (last, chars[index]) == min_pair:
chars[tail - 1] = last + chars[index]
last = last + chars[index]
else:
chars[tail - 1] = last
tail += 1
last = chars[index]
chars[tail - 1] = last
chars = chars[:tail]
self._cache[token] = chars
return chars
[docs] def __call__(self, sample):
"""
Parameters
----------
sample : str
Returns
-------
ret : list(str)
"""
ret = []
for word_token in self._regex.findall(self._token_pattern, sample):
word_token = bytearray(word_token.encode('utf-8'))
word_token = ''.join(self._byte_encoder[code] for code in word_token)
ret.extend(self.get_bpe_subword(word_token))
return ret
[docs]class GPT2BPEDetokenizer(_GPT2BPE):
"""BPE detokenizer used in OpenAI GPT-2 model."""
def __init__(self):
super(GPT2BPEDetokenizer, self).__init__()
self._byte_decoder = {v: k for k, v in self._byte_encoder.items()}
[docs] def __call__(self, sample):
"""
Parameters
----------
sample : list(str)
Returns
-------
ret : str
"""
text = ''.join(sample)
ret = bytearray(
[self._byte_decoder[byte] for byte in text]).decode('utf-8', errors='replace')
return ret