#!/usr/bin/python # -*- coding: utf-8 -*- # """ This file must contain the implementation code for all actions of pynovisao. Name: pynovisao.py Author: Alessandro dos Santos Ferreira ( santosferreira.alessandro@gmail.com ) """ from collections import OrderedDict import interface from interface.interface import InterfaceException as IException import segmentation import extraction from extraction import FeatureExtractor import classification import util from util.config import Config from util.file_utils import File as f from util.utils import TimeUtils class Act(object): def __init__(self, tk, args): self.tk = tk self.segmenter = [segmentation._segmenter_list[segmenter].meta for segmenter in segmentation._segmenter_list if segmentation._segmenter_list[segmenter].value == True ][0]() self.extractors = [extraction._extractor_list[extractor].meta for extractor in extraction._extractor_list if extraction._extractor_list[extractor].value == True ] try: self.classifier = [classification._classifier_list[classifier].meta for classifier in classification._classifier_list if classification._classifier_list[classifier].value == True ][0]() except: self.classifier = None self._image = None self._const_image = None self._image_name = None self._init_dataset(args["dataset"]) self._init_classes(args["classes"], args["colors"]) self._dataset_generator = True def _init_dataset(self, directory): if(directory[-1] == '/'): directory = directory[:-1] self.dataset = directory f.create_dir(self.dataset) def _init_classes(self, classes = None, colors = None): self.classes = [] classes = sorted(f.list_dirs(self.dataset)) if classes is None else classes.split() colors = [] if colors is None else colors.split() if(len(classes) > 0): for i in range(0, len(classes)): self.add_class(dialog = False, name=classes[i], color=colors[i] if i < len(colors) else None) else: self.add_class(dialog = False, color='Green') self.add_class(dialog = False, color='Yellow') self._current_class = 0 def open_image(self, imagename = None): def onclick(event): if event.xdata != None and event.ydata != None and int(event.ydata) != 0 and self._dataset_generator == True: x = int(event.xdata) y = int(event.ydata) self.tk.write_log("Coordinates: x = %d y = %d", x, y) segment, size_segment, idx_segment, run_time = self.segmenter.get_segment(x, y) if size_segment > 0: self.tk.append_log("\nSegment = %d: %0.3f seconds", idx_segment, run_time) self._image, run_time = self.segmenter.paint_segment(self._image, self.classes[self._current_class]["color"].value, x, y) self.tk.append_log("Painting segment: %0.3f seconds", run_time) self.tk.refresh_image(self._image) filepath = f.save_class_image(segment, self.dataset, self.classes[self._current_class]["name"].value, self._image_name, idx_segment) if filepath: self.tk.append_log("\nSegment saved in %s", filepath) if imagename is None: imagename = self.tk.utils.ask_image_name() if imagename: self._image = f.open_image(imagename) self._image_name = f.get_filename(imagename) self.tk.write_log("Opening %s...", self._image_name) self.tk.add_image(self._image, self._image_name, onclick) self._const_image = self._image self.segmenter.reset() def restore_image(self): if self._const_image is not None: self.tk.write_log("Restoring image...") self.tk.refresh_image(self._const_image) self.segmenter.reset() def close_image(self): if self._const_image is None: raise IException("Image not found") if self.tk.close_image(): self.tk.write_log("Closing image...") self._const_image = None self._image = None def add_class(self, dialog = True, name = None, color = None): n_classes = len(self.classes) if n_classes >= self.tk.MAX_CLASSES: raise IException("You have reached the limite of %d classes" % self.tk.MAX_CLASSES) def edit_class(index): self.edit_class(index) def update_current_class(index): self.update_current_class(index) def process_config(): new_class = self.tk.get_config_and_destroy() new_class["name"].value = '_'.join(new_class["name"].value.split()) self.classes.append( new_class ) self.tk.write_log("New class: %s", new_class["name"].value) self.tk.refresh_panel_classes(self.classes, self._current_class) if name is None: name = "Class_%02d" % (n_classes+1) if color is None: color = util.X11Colors.random_color() class_config = OrderedDict() class_config["name"] = Config(label="Name", value=name, c_type=str) class_config["color"] = Config(label="Color (X11 Colors)", value=color, c_type='color') class_config["callback"] = Config(label=None, value=update_current_class, c_type=None, hidden=True) class_config["callback_color"] = Config(label=None, value=edit_class, c_type=None, hidden=True) class_config["args"] = Config(label=None, value=n_classes, c_type=int, hidden=True) if dialog == False: self.classes.append( class_config ) return title = "Add a new classe" self.tk.dialogue_config(title, class_config, process_config) def edit_class(self, index): def process_update(index): updated_class = self.tk.get_config_and_destroy() updated_class["name"].value = '_'.join(updated_class["name"].value.split()) self.classes[index] = updated_class self.tk.write_log("Class updated: %s", updated_class["name"].value) self.tk.refresh_panel_classes(self.classes, self._current_class) current_config = self.classes[index] title = "Edit class %s" % current_config["name"].value self.tk.dialogue_config(title, current_config, lambda *_ : process_update(index)) def update_current_class(self, index): self._current_class = index def get_class_by_name(self, name): name = name.strip() for cl in self.classes: if cl["name"].value == name: return cl raise Exception("Class not found") def set_dataset_path(self): directory = self.tk.utils.ask_directory(default_dir = self.dataset) if directory: self._init_dataset(directory) self.tk.write_log("Image dataset defined: %s", self.dataset) self._init_classes() self.tk.refresh_panel_classes(self.classes) if self.classifier: self.classifier.reset() def toggle_dataset_generator(self): self._dataset_generator = not self._dataset_generator def select_segmenter(self): title = "Choosing a segmenter" self.tk.write_log(title) current_config = segmentation.get_segmenter_config() def process_config(): new_config = self.tk.get_config_and_destroy() self.segmenter = [new_config[segmenter].meta for segmenter in new_config if new_config[segmenter].value == True ][0]() self.tk.append_log("\nSegmenter: %s\n%s", str(self.segmenter.get_name()), str(self.segmenter.get_summary_config())) segmentation.set_segmenter_config(new_config) self.tk.dialogue_choose_one(title, current_config, process_config) def config_segmenter(self): title = "Configuring %s" % self.segmenter.get_name() self.tk.write_log(title) current_config = self.segmenter.get_config() def process_config(): new_config = self.tk.get_config_and_destroy() self.segmenter.set_config(new_config) self.tk.append_log("\nConfig updated:\n%s", str(self.segmenter.get_summary_config())) self.segmenter.reset() self.tk.dialogue_config(title, current_config, process_config) def run_segmenter(self): if self._const_image is None: raise IException("Image not found") self.tk.write_log("Running %s...", self.segmenter.get_name()) self.tk.append_log("\nConfig: %s", str(self.segmenter.get_summary_config())) self._image, run_time = self.segmenter.run(self._const_image) self.tk.append_log("Time elapsed: %0.3f seconds", run_time) self.tk.refresh_image(self._image) def select_extractors(self): title = "Selecting extractors" self.tk.write_log(title) current_config = extraction.get_extractor_config() def process_config(): new_config = self.tk.get_config_and_destroy() self.extractors = [new_config[extractor].meta for extractor in new_config if new_config[extractor].value == True ] if len(self.extractors) == 0: raise IException("Please select at least one extractor") self.tk.append_log("\nConfig updated:\n%s", '\n'.join(["%s: %s" % (new_config[extractor].label, "on" if new_config[extractor].value==True else "off") for extractor in new_config])) extraction.set_extractor_config(new_config) self.tk.dialogue_select(title, current_config, process_config) def run_extractors(self): self.tk.write_log("Running extractors on all images in %s", self.dataset) fextractor = FeatureExtractor(self.extractors) self.tk.append_log("%s", '\n'.join([extraction._extractor_list[extractor].label for extractor in extraction._extractor_list if extraction._extractor_list[extractor].value == True ])) output_file, run_time = fextractor.extract_all(self.dataset, "training") self.tk.append_log("\nOutput file saved in %s", output_file) self.tk.append_log("Time elapsed: %0.3f seconds", run_time) if self.classifier: self.classifier.reset() def select_classifier(self): if self.classifier is None: raise IException("You must install python-weka-wrapper") title = "Choosing a classifier" self.tk.write_log(title) current_config = classification.get_classifier_config() def process_config(): new_config = self.tk.get_config_and_destroy() self.classifier = [new_config[classifier].meta for classifier in new_config if new_config[classifier].value == True ][0]() self.tk.append_log("\nClassifier: %s\n%s", str(self.classifier.get_name()), str(self.classifier.get_summary_config())) classification.set_classifier_config(new_config) self.tk.dialogue_choose_one(title, current_config, process_config) def configure_classifier(self): if self.classifier is None: raise IException("You must install python-weka-wrapper") title = "Configuring %s" % self.classifier.get_name() self.tk.write_log(title) current_config = self.classifier.get_config() def process_config(): new_config = self.tk.get_config_and_destroy() self.classifier.set_config(new_config) self.tk.append_log("\nConfig updated:\n%s", str(self.classifier.get_summary_config())) if self.classifier: self.classifier.reset() self.tk.dialogue_config(title, current_config, process_config) def run_classifier(self): if self.classifier is None: raise IException("You must install python-weka-wrapper") if self._const_image is None: raise IException("Image not found") self.tk.write_log("Running %s...", self.classifier.get_name()) self.tk.append_log("\n%s", str(self.classifier.get_summary_config())) start_time = TimeUtils.get_time() list_segments = self.segmenter.get_list_segments() if len(list_segments) == 0: self.tk.append_log("Running %s... (%0.3f seconds)", self.segmenter.get_name(), (TimeUtils.get_time() - start_time)) self._image, _ = self.segmenter.run(self._const_image) self.tk.refresh_image(self._image) list_segments = self.segmenter.get_list_segments() if self.classifier.must_train(): self.tk.append_log("Creating training data... (%0.3f seconds)", (TimeUtils.get_time() - start_time)) fextractor = FeatureExtractor(self.extractors) output_file, run_time = fextractor.extract_all(self.dataset, "training", overwrite = False) self.tk.append_log("Training classifier... (%0.3f seconds)", (TimeUtils.get_time() - start_time)) self.classifier.train(self.dataset, "training") self._image = self._const_image # New and optimized classification tmp = ".tmp" f.remove_dir(f.make_path(self.dataset, tmp)) self.tk.append_log("Generating test images... (%0.3f seconds)", (TimeUtils.get_time() - start_time)) len_segments = {} for idx_segment in list_segments: segment, size_segment, idx_segment = self.segmenter.get_segment(self, idx_segment=idx_segment)[:-1] filepath = f.save_class_image(segment, self.dataset, tmp, self._image_name, idx_segment) len_segments[idx_segment] = size_segment if self.classifier.must_train(): self.tk.append_log("Running extractors on test images... (%0.3f seconds)", (TimeUtils.get_time() - start_time)) output_file, _ = fextractor.extract_all(self.dataset, "test", dirs=[tmp]) self.tk.append_log("Running classifier on test data... (%0.3f seconds)", (TimeUtils.get_time() - start_time)) labels = self.classifier.classify(self.dataset, "test") f.remove_dir(f.make_path(self.dataset, tmp)) self.tk.append_log("Painting segments... (%0.3f seconds)", (TimeUtils.get_time() - start_time)) popup_info = "%s\n" % str(self.classifier.get_summary_config()) len_total = sum([len_segments[idx] for idx in len_segments]) popup_info += "%-16s%-16s%0.2f%%\n" % ("Total", str(len_total), (len_total*100.0)/len_total) for cl in self.classes: idx_segment = [ list_segments[idx] for idx in range(0, len(labels)) if cl["name"].value == labels[idx]] if len(idx_segment) > 0: self._image, _ = self.segmenter.paint_segment(self._image, cl["color"].value, idx_segment=idx_segment, border=False) len_classes = sum([len_segments[idx] for idx in idx_segment]) popup_info += "%-16s%-16s%0.2f%%\n" % (cl["name"].value, str(len_classes), (len_classes*100.0)/len_total) self.tk.refresh_image(self._image) self.tk.popup(popup_info) end_time = TimeUtils.get_time() self.tk.append_log("\nClassification finished") self.tk.append_log("Time elapsed: %0.3f seconds", (end_time - start_time)) def cross_validation(self): if self.classifier is None: raise IException("You must install python-weka-wrapper") if self.classifier.must_train(): self.tk.write_log("Creating training data...") fextractor = FeatureExtractor(self.extractors) output_file, run_time = fextractor.extract_all(self.dataset, "training", overwrite = False) self.classifier.train(self.dataset, "training") self.tk.write_log("Running Cross Validation on %s...", self.classifier.get_name()) self.tk.append_log("\n%s", str(self.classifier.get_summary_config())) popup_info = self.classifier.cross_validate() self.tk.append_log("Cross Validation finished") self.tk.popup(popup_info) def experimenter_all(self): if self.classifier is None: raise IException("You must install python-weka-wrapper") if self.tk.ask_ok_cancel("Experimenter All", "This may take several minutes to complete. Are you sure?"): if self.classifier.must_train(): self.tk.write_log("Creating training data...") fextractor = FeatureExtractor(self.extractors) output_file, run_time = fextractor.extract_all(self.dataset, "training", overwrite = False) self.classifier.train(self.dataset, "training") self.tk.write_log("Running Experimenter All on %s...", self.classifier.get_name()) popup_info = self.classifier.experimenter() self.tk.append_log("\nExperimenter All finished") self.tk.popup(popup_info) def func_not_available(self): self.tk.write_log("This functionality is not available right now.")