Source code for tupa.model

from collections import OrderedDict
from enum import Enum

from ucca import textutil
from ucca.layer0 import Terminal

from .action import Actions
from .classifiers.classifier import Classifier
from .config import Config, SEPARATOR, SPARSE, MLP, BIRNN, HIGHWAY_RNN, HIERARCHICAL_RNN, NOOP
from .features.feature_params import FeatureParameters
from .model_util import UnknownDict, AutoIncrementDict, remove_backup, save_json, load_json


[docs]class ParameterDefinition: def __init__(self, args, name, attr_to_arg, attr_to_val=None): self.args = args self.name = name self.attr_to_arg = attr_to_arg self.attr_to_val = attr_to_val or {} @property def dim_arg(self): return self.attr_to_arg["dim"] @property def size_arg(self): return self.attr_to_arg["size"] @property def enabled(self): return bool(getattr(self.args, self.dim_arg)) @enabled.setter def enabled(self, value): if value: raise ValueError("Can only disable parameter configuration by setting 'enabled' to False") setattr(self.args, self.dim_arg, 0) @property def lang_specific(self): return self.attr_to_val.get("lang_specific")
[docs] def create_from_config(self, lang=None): kwargs = dict(self.attr_to_val) kwargs.update({k: getattr(self.get_args(lang), v) for k, v in self.attr_to_arg.items()}) return FeatureParameters(self.name, **kwargs)
[docs] def load_to_config(self, params): for lang in list(self.all_langs(params)) or [None]: param = params.get(self.key(lang)) self.get_args(lang).update({self.dim_arg: 0, self.size_arg: 0} if param is None else {v: getattr(param, k) for k, v in self.attr_to_arg.items()})
[docs] def get_args(self, lang): return self.args.hyperparams.specific[lang] if lang else self.args
[docs] def all_langs(self, params): for key in params: param_name, _, lang = key.partition(SEPARATOR) if param_name == self.name and lang: yield lang
[docs] def key(self, lang=None): return SEPARATOR.join(filter(None, (self.name, lang)))
def __str__(self): return self.name def __repr__(self): return "%s(%s, %s)" % (type(self).__name__, self.name, ", ".join( "%s=%s" % i for i in list(self.attr_to_arg.items()) + list(self.attr_to_val.items())))
NODE_LABEL_KEY = "n"
[docs]class ClassifierProperty(Enum): update_only_on_error = 1 require_init_features = 2 trainable_after_saving = 3
CLASSIFIER_PROPERTIES = { SPARSE: (ClassifierProperty.update_only_on_error,), MLP: (ClassifierProperty.trainable_after_saving,), BIRNN: (ClassifierProperty.trainable_after_saving, ClassifierProperty.require_init_features), HIGHWAY_RNN: (ClassifierProperty.trainable_after_saving, ClassifierProperty.require_init_features), HIERARCHICAL_RNN: (ClassifierProperty.trainable_after_saving, ClassifierProperty.require_init_features), NOOP: (ClassifierProperty.trainable_after_saving,), } NODE_LABEL_PARAM_DEFS = [ (NODE_LABEL_KEY, dict(dim="node_label_dim", size="max_node_labels", dropout="node_label_dropout", min_count="min_node_label_count")) ] PARAM_DEFS = [ ("c", dict(dim="node_category_dim", size="max_node_categories")), ("W", dict(dim="word_dim_external", size="max_words_external", dropout="word_dropout_external", updated="update_word_vectors", filename="word_vectors", vocab="vocab"), dict( copy_from="w", lang_specific=True)), ("w", dict(dim="word_dim", size="max_words", dropout="word_dropout"), dict(lang_specific=True)), ("m", dict(dim="lemma_dim", size="max_lemmas", dropout="lemma_dropout"), dict(lang_specific=True)), ("t", dict(dim="tag_dim", size="max_tags", dropout="tag_dropout"), dict(lang_specific=True)), ("u", dict(dim="pos_dim", size="max_pos", dropout="pos_dropout")), ("d", dict(dim="dep_dim", size="max_deps", dropout="dep_dropout")), ("e", dict(dim="edge_label_dim", size="max_edge_labels")), ("p", dict(dim="punct_dim", size="max_puncts")), ("A", dict(dim="action_dim", size="max_action_types")), ("T", dict(dim="ner_dim", size="max_ner_types")), ("#", dict(dim="shape_dim", size="max_shapes"), dict(lang_specific=True)), ("^", dict(dim="prefix_dim", size="max_prefixes"), dict(lang_specific=True)), ("$", dict(dim="suffix_dim", size="max_suffixes"), dict(lang_specific=True)), ]
[docs]class Model: def __init__(self, filename, config=None, *args, **kwargs): self.config = config or Config().copy() self.filename = filename self.feature_extractor = self.classifier = self.axis = self.lang = None self.feature_params = OrderedDict() self.is_finalized = False if args or kwargs: self.restore(*args, **kwargs)
[docs] def node_label_param_def(self, args=None): return self.param_defs(args, only_node_labels=True)[0]
[docs] def param_defs(self, args=None, only_node_labels=False): return [ParameterDefinition(args or self.config.args, n, *k) for n, *k in NODE_LABEL_PARAM_DEFS + ([] if only_node_labels else PARAM_DEFS)]
[docs] def init_model(self, axis=None, lang=None, init_params=True): self.set_axis(axis, lang) labels = self.classifier.labels if self.classifier else OrderedDict() if init_params: # Actually use the config state to initialize the features and hyperparameters, otherwise empty for param_def in self.param_defs(): # FIXME save parameters separately per format, not just per language for param_lang in (param_def.all_langs(self.feature_params) if self.lang else []) \ if param_def.lang_specific and self.config.args.multilingual else [None]: key = param_def.key(param_lang) param = self.feature_params.get(key) enabled = param_def.enabled and (not param_lang or param_lang == self.lang) if param: param.enabled = enabled elif self.is_neural_network and enabled: self.feature_params[key] = param_def.create_from_config(param_lang) self.init_param(key) if axis and self.axis not in labels: labels[self.axis] = self.init_actions() # Uses config to determine actions if self.config.args.node_labels and not self.config.args.use_gold_node_labels and \ NODE_LABEL_KEY not in labels: labels[NODE_LABEL_KEY] = self.init_node_labels() # Updates self.feature_params if self.classifier: # Already initialized pass elif self.config.args.classifier == SPARSE: from .features.sparse_features import SparseFeatureExtractor from .classifiers.linear.sparse_perceptron import SparsePerceptron self.feature_extractor = SparseFeatureExtractor(omit_features=self.config.args.omit_features) self.classifier = SparsePerceptron(self.config, labels) elif self.config.args.classifier == NOOP: from .features.empty_features import EmptyFeatureExtractor from .classifiers.noop import NoOp self.feature_extractor = EmptyFeatureExtractor() self.classifier = NoOp(self.config, labels) elif self.is_neural_network: from .features.dense_features import DenseFeatureExtractor from .classifiers.nn.neural_network import NeuralNetwork self.feature_extractor = DenseFeatureExtractor(self.feature_params, indexed=self.config.args.classifier != MLP, hierarchical=self.config.args.classifier == HIERARCHICAL_RNN, node_dropout=self.config.args.node_dropout, omit_features=self.config.args.omit_features) self.classifier = NeuralNetwork(self.config, labels) else: raise ValueError("Invalid model type: '%s'" % self.config.args.classifier) self._update_input_params()
[docs] def set_axis(self, axis, lang): if axis is not None: self.axis = axis if self.axis is None: self.axis = self.config.format if lang is not None: self.lang = lang if self.lang is not None: suffix = SEPARATOR + self.lang if not self.axis.endswith(suffix): self.axis += suffix
@property def formats(self): return [k.partition(SEPARATOR)[0] for k in self.classifier.labels] @property def is_neural_network(self): return self.config.args.classifier in (MLP, BIRNN, HIGHWAY_RNN, HIERARCHICAL_RNN) @property def is_retrainable(self): return ClassifierProperty.trainable_after_saving in self.classifier_properties @property def classifier_properties(self): return CLASSIFIER_PROPERTIES[self.config.args.classifier] @property def actions(self): return self.classifier.labels[self.axis]
[docs] def init_actions(self): return Actions(size=self.config.args.max_action_labels)
[docs] def init_param(self, key): if self.feature_extractor: self.feature_extractor.init_param(key)
[docs] def init_node_labels(self): node_labels = self.feature_params.get(NODE_LABEL_KEY) if node_labels is None: node_labels = self.node_label_param_def().create_from_config() if self.is_neural_network: self.feature_params[NODE_LABEL_KEY] = node_labels self.init_param(NODE_LABEL_KEY) node_labels.init_data() return node_labels.data
[docs] def score(self, state, axis): features = self.feature_extractor.extract_features(state) return self.classifier.score(features, axis=axis), features # scores is a NumPy array
[docs] def init_features(self, state, train): self.init_model() axes = [self.axis] if self.config.args.node_labels and not self.config.args.use_gold_node_labels: axes.append(NODE_LABEL_KEY) passage = [node.text for node in state.passage.nodes.values() if isinstance(node, Terminal)] lang = state.passage.attrib.get("lang") self.classifier.init_features(self.feature_extractor.init_features(state), axes, train, passage, lang)
[docs] def finalize(self, finished_epoch): """ Copy model, finalizing features (new values will not be added during subsequent use) and classifier (update it) :param finished_epoch: whether this is the end of an epoch (or just intermediate checkpoint), for bookkeeping :return: a copy of this model with a new feature extractor and classifier (actually classifier may be the same) """ self.config.print("Finalizing model", level=1) self.init_model() return Model(None, config=self.config.copy(), model=self, is_finalized=True, feature_extractor=self.feature_extractor.finalize(), classifier=self.classifier.finalize(finished_epoch=finished_epoch))
[docs] def save(self, save_init=False): """ Save feature and classifier parameters to files """ if self.filename is not None: self.init_model() try: self.feature_extractor.save(self.filename, save_init=save_init) node_labels = self.feature_extractor.params.get(NODE_LABEL_KEY) skip_labels = (NODE_LABEL_KEY,) if node_labels and node_labels.size else () bert_configs = { "use_bert": self.config.args.use_bert, "bert_model": self.config.args.bert_model, "bert_layers": self.config.args.bert_layers, "bert_layers_pooling": self.config.args.bert_layers_pooling, "bert_token_align_by": self.config.args.bert_token_align_by, "bert_multilingual": self.config.args.bert_multilingual, "bert_use_default_word_embeddings": self.config.args.bert_use_default_word_embeddings, "bert_dropout": self.config.args.bert_dropout}\ if self.config.args.use_bert else {"use_bert": self.config.args.use_bert} self.classifier.save(self.filename, skip_labels=skip_labels, multilingual=self.config.args.multilingual, omit_features=self.config.args.omit_features, **bert_configs) textutil.models["vocab"] = self.config.args.vocab save_json(self.filename + ".nlp.json", textutil.models) remove_backup(self.filename) except Exception as e: raise IOError("Failed saving model to '%s'" % self.filename) from e
[docs] def load(self, is_finalized=True): """ Load the feature and classifier parameters from files :param is_finalized: whether loaded model should be finalized, or allow feature values to be added subsequently """ if self.filename is not None: try: self.config.args.classifier = Classifier.get_property(self.filename, "type") self.config.args.multilingual = Classifier.get_property(self.filename, "multilingual") self.config.args.omit_features = Classifier.get_property(self.filename, "omit_features") self.config.args.use_bert = Classifier.get_property(self.filename, "use_bert") if self.config.args.use_bert: self.config.args.bert_model = Classifier.get_property(self.filename, "bert_model") self.config.args.bert_layers = Classifier.get_property(self.filename, "bert_layers") self.config.args.bert_layers_pooling = Classifier.get_property(self.filename, "bert_layers_pooling") self.config.args.bert_token_align_by = Classifier.get_property(self.filename, "bert_token_align_by") self.config.args.bert_multilingual = Classifier.get_property(self.filename, "bert_multilingual") self.config.args.bert_use_default_word_embeddings =\ Classifier.get_property(self.filename, "bert_use_default_word_embeddings") self.config.args.bert_dropout = Classifier.get_property(self.filename, "bert_dropout") self.init_model(init_params=False) self.feature_extractor.load(self.filename, order=[p.name for p in self.param_defs()]) if not is_finalized: self.feature_extractor.unfinalize() self._update_input_params() # Must be before classifier.load() because it uses them to init the model self.classifier.load(self.filename) self.is_finalized = is_finalized self.load_labels() for param_def in self.param_defs(self.config): param_def.load_to_config(self.feature_extractor.params) try: textutil.models.update(load_json(self.filename + ".nlp.json")) vocab = textutil.models.get("vocab") if vocab: self.config.args.vocab = vocab except FileNotFoundError: pass self.config.print("\n".join("%s: %s" % i for i in self.feature_params.items()), level=1) except FileNotFoundError: self.feature_extractor = self.classifier = None raise except Exception as e: raise IOError("Failed loading model from '%s'" % self.filename) from e
[docs] def restore(self, model, feature_extractor=None, classifier=None, is_finalized=None): """ Set all attributes to a reference to existing model, except labels, which will be copied. :param model: Model to restore :param feature_extractor: optional FeatureExtractor to restore instead of model's :param classifier: optional Classifier to restore instead of model's :param is_finalized: whether the restored model is finalized """ if is_finalized is None: is_finalized = model.is_finalized self.config.print("Restoring %sfinalized model" % ("" if is_finalized else "non-"), level=1) self.filename = model.filename self.feature_extractor = feature_extractor or model.feature_extractor self.classifier = classifier or model.classifier self.is_finalized = is_finalized self._update_input_params() self.classifier.labels_t = OrderedDict((a, l.save()) for a, l in self.classifier.labels.items()) self.load_labels()
[docs] def load_labels(self): """ Copy classifier's labels to create new Actions/Labels objects Restoring from a model that was just loaded from file, or called by restore() """ for axis, all_size in self.classifier.labels_t.items(): # all_size is a pair of (label list, size limit) if axis == NODE_LABEL_KEY: # These are node labels rather than action labels node_labels = self.feature_extractor.params.get(NODE_LABEL_KEY) if node_labels and node_labels.size: # Also used for features, so share the dict del all_size labels = node_labels.data else: # Not used as a feature, just get labels labels = UnknownDict() if self.is_finalized else AutoIncrementDict() labels.load(all_size) else: # Action labels for format determined by axis labels = Actions(*all_size) self.classifier.labels[axis] = labels
def _update_input_params(self): self.feature_params = self.classifier.input_params = self.feature_extractor.params
[docs] def all_params(self): d = OrderedDict() d["features"] = self.feature_extractor.all_features() d.update(("input_" + k, p.data.all) for k, p in self.feature_extractor.params.items() if p.data) d.update(self.classifier.all_params()) return d