Skip to content

Commit

Permalink
Merge pull request #22 from isi-nlp/bigdata
Browse files Browse the repository at this point in the history
Big datasets: use PySpark to speed up
  • Loading branch information
thammegowda authored Jun 14, 2020
2 parents 9083522 + f7f75a4 commit 3c732d3
Show file tree
Hide file tree
Showing 10 changed files with 465 additions and 21 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ celerybeat-schedule
.mypy_cache/

work/
.idea/
.idea/
tmp
54 changes: 53 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ head somefile.tok | nlcodec decode -m bpe.model -idx
# estimate quality
head somefile.tok | nlcodec estimate -m bpe.model
```

## Python API
Expand Down Expand Up @@ -150,6 +149,59 @@ some_type.get_permutations(name=False)

```

## Scaling for Big data(sets)

For larger datasets, you may take advantage of PySpark to compute term-frequencies on a separate step.
The precomputed term frequencies can be specified to `nlcodec learn -tfs` i.e. by setting `-tfs` flag.

To compute term frequencies
- Install PySpark using `pip install pyspark`
- Compute term frequencies

```bash

$ python -m nlcodec.term_freq -h
usage: term_freq.py [-h] [-i INP [INP ...]] [-wf WORD_FREQS] [-cf CHAR_FREQS]
[-dd] [-ndd]

optional arguments:
-h, --help show this help message and exit
-i INP [INP ...], --inp INP [INP ...]
Input file paths (default: None)
-wf WORD_FREQS, --word_freqs WORD_FREQS
Output file path for word frequencies (default: None)
-cf CHAR_FREQS, --char_freqs CHAR_FREQS
Output file path for character frequencies (default:
None)
-dd, --dedup Deduplicate the sentences: use only unique sequences
(default: True)
-ndd, --no-dedup Do not deduplicate. (default: False)

```
### Example
```bash
# use these environment vars
export SPARK_DRIVER_MEM="4g"
export SPARK_MATSER="local[*]" # all CPU cores of local node
python -m nlcodec.term_freq -dd -wf words.tsv -cf chars.tsv \
-i ~/work/datasets/wmt/data/*-*/*.en.tok
```
`words.tsv` and `chars.tsv` have the word and character frequencies respectively.
```bash
# word vocab of 32K
python -m nlcodec learn -i words.tsv -tfs -l word -vs 32000 -m word.model

# Character vocab of 99.95% coverage
python -m nlcodec learn -i chars.tsv -tfs -l char -mf 1 -cv 0.9995 -m char.model

# BPE vocab of 8K
python -m nlcodec learn -i words.tsv -tfs -l bpe -vs 8000 -m bpe.model

# BPE vocab until minimum merge frequency is 100; set -vs=64000 as some large number
python -m nlcodec learn -i words.tsv -tfs -l bpe -vs 64000 -m bpe.model -cv 0.99995 -mce 100

```
# Authors
+ [Thamme Gowda](https://twitter.com/thammegowda)
2 changes: 1 addition & 1 deletion nlcodec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Author: Thamme Gowda [tg (at) isi (dot) edu]
# Created: 2019-10-25

__version__ = '0.2.1'
__version__ = '0.2.2'
__description__ = """nlcodec is a collection of encoding schemes for natural language sequences"""

DEF_MIN_CO_EV = 5
Expand Down
15 changes: 11 additions & 4 deletions nlcodec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pathlib import Path

from nlcodec import learn_vocab, load_scheme, encode, decode, __version__, __description__
from nlcodec import DEF_WORD_MIN_FREQ, DEF_CHAR_MIN_FREQ, DEF_CHAR_COVERAGE
from nlcodec import DEF_WORD_MIN_FREQ, DEF_CHAR_MIN_FREQ, DEF_CHAR_COVERAGE, DEF_MIN_CO_EV


def write_lines(lines: Iterator[str], out: TextIO, line_break='\n'):
Expand Down Expand Up @@ -39,6 +39,9 @@ def parse_args() -> Dict[str, Any]:

p.add_argument('-i', '--inp', type=argparse.FileType('r'), default=sys.stdin,
help='Input file path')
p.add_argument('-tfs', '--term-freqs', action='store_true', default=False,
help='--inp is term frequencies Valid for task=learn.'
' See nlcodec.term_freq to obtain them')
p.add_argument('-o', '--out', type=argparse.FileType('w'), default=sys.stdout,
help='Output file path. Not valid for "learn" or "estimate" task')
p.add_argument('-m', '--model', type=Path, help='Path to model aka vocabulary file',
Expand All @@ -48,23 +51,27 @@ def parse_args() -> Dict[str, Any]:
help='Indices instead of strings. Valid for task=encode and task=decode')

learn_args = p.add_argument_group("args for task=learn")
learn_args.add_argument('-vs', '--vocab_size', type=int, default=-1,
learn_args.add_argument('-vs', '--vocab-size', type=int, default=-1,
help='Vocabulary size. Valid only for task=learn. This is required for'
' "bpe", but optional for "word" and "char" models, specifying it'
' will trim the vocabulary at given top most frequent types.')
learn_args.add_argument('-l', '--level', choices=['char', 'word', 'bpe'],
help='Encoding Level; Valid only for task=learn')
learn_args.add_argument('-mf', '--min_freq', default=None, type=int,
learn_args.add_argument('-mf', '--min-freq', default=None, type=int,
help='Minimum frequency of types for considering inclusion in vocabulary. '
'Types fewer than this frequency will be ignored. '
f'For --level=word or --level=bpe, freq is type freq and '
f' default is {DEF_WORD_MIN_FREQ}.'
f'for --level=char, characters fewer than this value'
f' will be excluded. default={DEF_CHAR_MIN_FREQ}')

learn_args.add_argument('-cv', '--char_coverage', default=DEF_CHAR_COVERAGE, type=float,
learn_args.add_argument('-cv', '--char-coverage', default=DEF_CHAR_COVERAGE, type=float,
help='Character coverage for --level=char or --level=bpe')

learn_args.add_argument('-mce', '--min-co-ev', default=DEF_MIN_CO_EV, type=int,
help='Minimum Co-evidence for BPE merge.'
' Valid when --task=learn and --level=bpe')

args = vars(p.parse_args())
return args

Expand Down
2 changes: 1 addition & 1 deletion nlcodec/bpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def _learn_codes(cls, term_freqs: Dict[str, int], vocab: List[Type], vocab_size:
rev_idx: Dict[str, int] = {word.name: word.idx for word in vocab}
assert len(rev_idx) == len(vocab) # one to one map
assert vocab_size > len(vocab), f'vocab_size={vocab_size} is too small;' \
f' found {len(vocab)} in the init vocab!'
f' found {len(vocab)} in the init vocab! Set a value larger than {len(vocab)}'

seqs_freqs = cls._make_idxs(rev_idx, term_freqs)
learner = BPELearn(seqs_freqs, vocab=vocab)
Expand Down
73 changes: 60 additions & 13 deletions nlcodec/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from nlcodec import DEF_WORD_MIN_FREQ as WORD_MIN_FREQ
from nlcodec import DEF_CHAR_MIN_FREQ as CHAR_MIN_FREQ
from nlcodec import DEF_CHAR_COVERAGE as CHAR_COVERAGE
from nlcodec import DEF_MIN_CO_EV as MIN_CO_EV


class Reseved:
Expand Down Expand Up @@ -246,7 +247,8 @@ def learn(cls, data: Iterator[str], **kwargs) -> List[Type]:
raise NotImplementedError()

@classmethod
def get_init_vocab(cls, term_freqs, coverage: float=0, line_count=None, min_freq=WORD_MIN_FREQ,
def get_init_vocab(cls, term_freqs, coverage: float = 0, line_count=None,
min_freq=WORD_MIN_FREQ,
vocab_size=-1):
vocab = Reseved.with_reserved_types()
res_stats = {r_type.name: term_freqs.pop(r_type.name) for r_type in vocab if
Expand Down Expand Up @@ -315,12 +317,44 @@ def term_frequencies(cls, data: Iterator[str]) -> Tuple[Dict[str, int], int]:
log.info(f"Found {len(stats):,} types and {sum(stats.values()):,} tokens")
return stats, line_count

@classmethod
def read_term_freqs(cls, data: Iterator[str], delim='\t') -> Tuple[Dict[str, int], int]:
stats = {}
line_count = -1
for idx, line in enumerate(data):
line = line.rstrip('\n')
if idx == 0 and line.startswith("#") and len(line.split(delim)) != 2:
try:
import json
meta = json.loads(line[1:]) # skip # at index 0
line_count = meta.get('line_count', line_count)
except:
pass
continue
term, freq = line.split("\t")
stats[term.strip()] = int(freq)
return stats, line_count

@classmethod
def learn(cls, data: Iterator[str], vocab_size: int = 0, min_freq: int = WORD_MIN_FREQ,
coverage: float = 0, **kwargs) -> List[Type]:
assert not kwargs
log.info(f"Building {cls} vocab.. This might take some time")
stats, line_count = cls.term_frequencies(data=data)
coverage: float = 0, term_freqs=False, **kwargs) -> List[Type]:
"""
:param data: input sentences
:param vocab_size: max vocabulary size.
:param min_freq: min frequency for inclusion in vocabulary. Excludes types with lower freq
:param coverage: Character coverage
:param term_freqs: is data the term_freqs ?
:param kwargs: place holder for any extra args
:return:
"""
assert not kwargs, f'{kwargs} args are not allowed/understood'
if term_freqs: # input is term_freqs
log.info("Restoring term frequencies from input")
stats, line_count = cls.read_term_freqs(data=data)
else: # compute term freqs
log.info("Computing term frequencies from raw data")
stats, line_count = cls.term_frequencies(data=data)

return cls.get_init_vocab(stats, coverage, line_count, min_freq, vocab_size)


Expand Down Expand Up @@ -421,18 +455,24 @@ def decode_str(self, seq: List[str]) -> str:

@classmethod
def learn(cls, data: Iterator[str], vocab_size: int = 0, min_freq=WORD_MIN_FREQ,
coverage=CHAR_COVERAGE, **kwargs) -> List[Type]:
coverage=CHAR_COVERAGE, min_co_evidence=MIN_CO_EV, term_freqs=False, **kwargs) -> List[Type]:
assert vocab_size > 0
assert not kwargs
term_freqs, line_count = WordScheme.term_frequencies(data)
assert not kwargs, f'{kwargs} args are not allowed/understood'
if term_freqs:
log.info("Reading term freqs from input")
tfs, line_count = WordScheme.read_term_freqs(data)
else:
log.info("Computing term freqs from input")
tfs, line_count = WordScheme.term_frequencies(data)

def init_vocab_factory(char_types):
return CharScheme.get_init_vocab(char_types, line_count=line_count,
coverage=coverage, min_freq=1)

from .bpe import BPELearn
vocab = BPELearn.learn_subwords(term_freqs=term_freqs, vocab_size=vocab_size,
init_vocab_factory=init_vocab_factory)
vocab = BPELearn.learn_subwords(term_freqs=tfs, vocab_size=vocab_size,
init_vocab_factory=init_vocab_factory,
min_co_evidence=min_co_evidence)
return vocab

def stochastic_split(self, seq, split_ratio, name=False):
Expand All @@ -441,6 +481,7 @@ def stochastic_split(self, seq, split_ratio, name=False):
res += self.table[idx].get_stochastic_split(name=name, split_ratio=split_ratio)
return res


#########################
REGISTRY = {
'char': CharScheme,
Expand All @@ -450,7 +491,8 @@ def stochastic_split(self, seq, split_ratio, name=False):
}


def learn_vocab(inp, level, model, vocab_size, min_freq=1, char_coverage=CHAR_COVERAGE):
def learn_vocab(inp, level, model, vocab_size, min_freq=1, term_freqs=False,
char_coverage=CHAR_COVERAGE, min_co_ev=MIN_CO_EV):
if not min_freq or min_freq < 1:
min_freq = WORD_MIN_FREQ if level == 'word' else CHAR_MIN_FREQ
log.info(f"level={level} => default min_freq={min_freq}")
Expand All @@ -459,8 +501,13 @@ def learn_vocab(inp, level, model, vocab_size, min_freq=1, char_coverage=CHAR_CO
log.info(f"Learn Vocab for level={level} and store at {model}")
log.info(f"data ={inp}")
Scheme = REGISTRY[level]
args = {} if level == 'word' else dict(coverage=char_coverage) # no char_coverage for word
table = Scheme.learn(inp, vocab_size=vocab_size, min_freq=min_freq, **args)
args = {}
if level != 'word':
args['coverage'] = char_coverage # no char_coverage for word
if level == 'bpe':
args['min_co_evidence'] = min_co_ev
table = Scheme.learn(inp, vocab_size=vocab_size, min_freq=min_freq, term_freqs=term_freqs,
**args)
Type.write_out(table=table, out=model)


Expand Down
6 changes: 6 additions & 0 deletions nlcodec/para/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python
# Parallel programming
# Author: Thamme Gowda [tg (at) isi (dot) edu]
# Created: 6/13/20


Loading

0 comments on commit 3c732d3

Please sign in to comment.