Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
inovisao
pynovisao
Commits
4421d251
Commit
4421d251
authored
Dec 26, 2017
by
Gabriel Kirsten
Browse files
add threads, open_weight function added and classifier not found message changed
parent
0772174c
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
235 additions
and
72 deletions
+235
-72
src/classification/cnn_keras.py
src/classification/cnn_keras.py
+235
-72
No files found.
src/classification/cnn_keras.py
View file @
4421d251
...
...
@@ -4,14 +4,15 @@
"""
Generic classifier with multiple models
Models -> (Xception, VGG16, VGG19, ResNet50, InceptionV3, MobileNet)
Name: cnn_keras.py
Author: Gabriel Kirsten Menezes (gabriel.kirsten@hotmail.com)
"""
import
time
import
os
import
shutil
import
random
import
numpy
as
np
from
PIL
import
Image
from
keras
import
applications
...
...
@@ -20,8 +21,9 @@ from keras import optimizers
from
keras.models
import
Model
from
keras.layers
import
Dropout
,
Flatten
,
Dense
from
keras.callbacks
import
ModelCheckpoint
from
sklearn.cross_validation
import
train_test_split
from
classifier
import
Classifier
from
interface.interface
import
InterfaceException
as
IException
from
classification.classifier
import
Classifier
from
collections
import
OrderedDict
...
...
@@ -38,53 +40,43 @@ START_TIME = time.time()
# =========================================================
IMG_WIDTH
,
IMG_HEIGHT
=
256
,
256
CLASS_NAMES
=
[
'
ferrugemAsiatica'
,
'folhaSaudavel
'
,
'
fundo'
,
'manchaAlvo'
,
'mildio'
,
'oidi
o'
]
CLASS_NAMES
=
[
'
FolhasLargas'
,
'Gramineas
'
,
'
Soja'
,
'Sol
o'
]
class
CNNKeras
(
Classifier
):
""" Class for CNN classifiers based on Keras applications """
def
__init__
(
self
,
architecture
=
"VGG16"
,
learning_rate
=
0.0001
,
momentum
=
0.9
,
batch_size
=
16
,
epochs
=
1
0
,
fine_tuning_rate
=
100
,
transfer_learning
=
False
,
save_weights
=
Fals
e
):
def
__init__
(
self
,
architecture
=
"VGG16"
,
learning_rate
=
0.0001
,
momentum
=
0.9
,
batch_size
=
16
,
epochs
=
1
,
fine_tuning_rate
=
100
,
transfer_learning
=
False
,
save_weights
=
Tru
e
):
"""
Constructor of CNNKeras
"""
self
.
architecture
=
Config
(
"Architecture"
,
architecture
,
str
)
self
.
learning_rate
=
Config
(
"Learning rate"
,
learning_rate
,
float
)
self
.
momentum
=
Config
(
"Momentum"
,
momentum
,
float
)
self
.
batch_size
=
Config
(
"Batch size"
,
batch_size
,
int
)
self
.
epochs
=
Config
(
"Epochs"
,
epochs
,
int
)
self
.
fine_tuning_rate
=
Config
(
"Fine Tuning rate"
,
fine_tuning_rate
,
int
)
self
.
transfer_learning
=
Config
(
"Transfer Learning"
,
transfer_learning
,
bool
)
self
.
save_weights
=
Config
(
"Save weights"
,
save_weights
,
bool
)
self
.
architecture
=
Config
(
"Architecture"
,
architecture
,
str
)
self
.
learning_rate
=
Config
(
"Learning rate"
,
learning_rate
,
float
)
self
.
momentum
=
Config
(
"Momentum"
,
momentum
,
float
)
self
.
batch_size
=
Config
(
"Batch size"
,
batch_size
,
int
)
self
.
epochs
=
Config
(
"Epochs"
,
epochs
,
int
)
self
.
fine_tuning_rate
=
Config
(
"Fine Tuning Rate"
,
fine_tuning_rate
,
int
)
self
.
transfer_learning
=
Config
(
"Transfer Learning"
,
transfer_learning
,
bool
)
self
.
save_weights
=
Config
(
"Save weights"
,
save_weights
,
bool
)
self
.
file_name
=
"kerasCNN"
model
=
applications
.
VGG16
(
weights
=
"imagenet"
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
for
layer
in
model
.
layers
:
layer
.
trainable
=
True
self
.
model
=
None
# Adding custom Layers
new_custom_layers
=
model
.
output
new_custom_layers
=
Flatten
()(
new_custom_layers
)
new_custom_layers
=
Dense
(
1024
,
activation
=
"relu"
)(
new_custom_layers
)
new_custom_layers
=
Dropout
(
0.5
)(
new_custom_layers
)
new_custom_layers
=
Dense
(
1024
,
activation
=
"relu"
)(
new_custom_layers
)
predictions
=
Dense
(
6
,
activation
=
"softmax"
)(
new_custom_layers
)
# creating the final model
self
.
model
=
Model
(
inputs
=
model
.
input
,
outputs
=
predictions
)
# compile the model
self
.
model
.
compile
(
loss
=
"categorical_crossentropy"
,
optimizer
=
optimizers
.
SGD
(
lr
=
self
.
learning_rate
.
value
,
momentum
=
self
.
momentum
.
value
),
metrics
=
[
"accuracy"
])
self
.
trained
=
False
def
get_config
(
self
):
"""Return configuration of classifier.
"""Return configuration of classifier.
Returns
-------
...
...
@@ -105,7 +97,7 @@ class CNNKeras(Classifier):
return
keras_config
def
set_config
(
self
,
configs
):
"""Update configuration of classifier.
"""Update configuration of classifier.
Parameters
----------
...
...
@@ -119,7 +111,7 @@ class CNNKeras(Classifier):
keras_config
[
"Learning rate"
]
=
Config
.
nvl_config
(
configs
[
"Learning rate"
],
self
.
learning_rate
)
keras_config
[
"Momentum"
]
=
Config
.
nvl_config
(
configs
[
"Momentum"
],
self
.
momentum
)
...
...
@@ -139,7 +131,7 @@ class CNNKeras(Classifier):
configs
[
"Save weights"
],
self
.
save_weights
)
def
get_summary_config
(
self
):
"""Return fomatted summary of configuration.
"""Return fomatted summary of configuration.
Returns
-------
...
...
@@ -164,7 +156,7 @@ class CNNKeras(Classifier):
return
summary
def
classify
(
self
,
dataset
,
test_dir
,
test_data
):
""""Perform the classification.
""""Perform the classification.
Parameters
----------
...
...
@@ -201,20 +193,53 @@ class CNNKeras(Classifier):
except
Exception
,
e
:
print
e
# Create a Keras class
if
not
os
.
path
.
exists
(
predict_directory
+
"/png"
):
os
.
makedirs
(
predict_directory
+
"/png"
)
# move the .png images to inside the class
for
_
,
_
,
files
in
os
.
walk
(
predict_directory
,
topdown
=
False
):
for
name
in
files
:
if
name
.
endswith
(
'.png'
):
shutil
.
move
(
os
.
path
.
join
(
predict_directory
,
name
),
os
.
path
.
join
(
predict_directory
,
"png"
,
name
))
classify_datagen
=
ImageDataGenerator
()
classify_generator
=
classify_datagen
.
flow_from_directory
(
predict_directory
,
target_size
=
(
IMG_HEIGHT
,
IMG_WIDTH
),
batch_size
=
self
.
batch_size
,
batch_size
=
1
,
shuffle
=
False
,
class_mode
=
None
)
validation_datagen
=
ImageDataGenerator
()
validation_generator
=
validation_datagen
.
flow_from_directory
(
dataset
,
target_size
=
(
IMG_HEIGHT
,
IMG_WIDTH
),
batch_size
=
self
.
batch_size
.
value
,
shuffle
=
False
,
class_mode
=
None
)
# TODO - A better solution to num_classes - 1
self
.
model
=
self
.
select_model_params
(
validation_generator
.
num_classes
-
1
)
try
:
self
.
model
.
load_weights
(
"../models_checkpoints/"
+
self
.
file_name
+
".h5"
)
except
Exception
,
e
:
raise
IException
(
"Can't load the model in "
+
"../models_checkpoints/"
+
self
.
file_name
+
".h5"
+
str
(
e
))
output_classification
=
self
.
model
.
predict_generator
(
classify_generator
,
classify_generator
.
samples
,
verbose
=
1
)
classify_generator
,
classify_generator
.
samples
,
verbose
=
2
)
one_hot_output
=
np
.
argmax
(
output_classification
,
axis
=
1
)
one_hot_output
=
one_hot_output
.
tolist
()
for
index
in
range
(
0
,
len
(
one_hot_output
)):
one_hot_output
[
index
]
=
CLASS_NAMES
[
one_hot_output
[
index
]]
...
...
@@ -233,57 +258,195 @@ class CNNKeras(Classifier):
If False don't perform new training if there is trained data.
"""
train_datagen
=
ImageDataGenerator
()
# select .h5 filename
if
self
.
transfer_learning
.
value
==
100
:
file_name
=
str
(
self
.
architecture
.
value
)
+
\
'_transfer_learning'
elif
self
.
transfer_learning
.
value
==
-
1
:
file_name
=
str
(
self
.
architecture
.
value
)
+
\
'_without_transfer_learning'
else
:
file_name
=
str
(
self
.
architecture
.
value
)
+
\
'_fine_tunning_'
+
str
(
self
.
fine_tuning_rate
.
value
)
train_generator
=
train_datagen
.
flow_from_directory
(
dataset
,
target_size
=
(
IMG_HEIGHT
,
IMG_WIDTH
),
batch_size
=
self
.
batch_size
,
shuffle
=
True
,
class_mode
=
"categorical"
)
File
.
remove_dir
(
File
.
make_path
(
dataset
,
".tmp"
))
train_generator
,
validation_generator
=
self
.
make_dataset
(
dataset
)
# test_datagen = ImageDataGenerator(
# rescale=1. / 255,
# horizontal_flip=True,
# fill_mode="nearest",
# zoom_range=0.3,
# width_shift_range=0.3,
# height_shift_range=0.3,
# rotation_range=30)
# validation_generator = test_datagen.flow_from_directory(
# VALIDATION_DATA_DIR,
# target_size=(IMG_HEIGHT, IMG_WIDTH),
# batch_size=BATCH_SIZE,
# shuffle=True,
# class_mode="categorical")
self
.
model
=
self
.
select_model_params
(
train_generator
.
num_classes
)
# compile the model
self
.
model
.
compile
(
loss
=
"categorical_crossentropy"
,
optimizer
=
optimizers
.
SGD
(
lr
=
self
.
learning_rate
.
value
,
momentum
=
self
.
momentum
.
value
),
metrics
=
[
"accuracy"
])
# Save the model according to the conditions
checkpoint
=
ModelCheckpoint
(
"../models_checkpoints/"
+
self
.
file_name
+
".h5"
,
monitor
=
'val_acc'
,
verbose
=
1
,
save_best_only
=
True
,
save_weights_only
=
False
,
mode
=
'auto'
,
period
=
1
)
if
self
.
save_weights
:
checkpoint
=
ModelCheckpoint
(
"../models_checkpoints/"
+
self
.
file_name
+
".h5"
,
monitor
=
'val_acc'
,
verbose
=
1
,
save_best_only
=
True
,
save_weights_only
=
False
,
mode
=
'auto'
,
period
=
1
)
else
:
checkpoint
=
None
train_images
,
validation_images
=
train_test_split
(
train_generator
,
test_size
=
0.4
)
# Train the model
self
.
model
.
fit_generator
(
train_generator
,
steps_per_epoch
=
train_generator
.
samples
//
self
.
batch_size
,
epochs
=
self
.
epochs
,
callbacks
=
[
checkpoint
])
steps_per_epoch
=
train_generator
.
samples
//
self
.
batch_size
.
value
,
epochs
=
self
.
epochs
.
value
,
callbacks
=
[
checkpoint
],
validation_data
=
train_generator
,
validation_steps
=
train_generator
.
samples
//
self
.
batch_size
.
value
)
if
self
.
save_weights
:
self
.
model
.
save_weights
(
"../models_checkpoints/"
+
self
.
file_name
+
".h5"
)
def
must_train
(
self
):
"""Return if classifier must be trained.
"""Return if classifier must be trained.
Returns
-------
True
"""
return
True
return
not
self
.
trained
def
must_extract_features
(
self
):
"""Return if classifier must be extracted features.
"""Return if classifier must be extracted features.
Returns
-------
False
"""
return
False
def
select_model_params
(
self
,
num_classes
):
if
self
.
fine_tuning_rate
.
value
!=
-
1
:
if
self
.
architecture
.
value
==
"Xception"
:
model
=
applications
.
Xception
(
weights
=
"imagenet"
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"VGG16"
:
model
=
applications
.
VGG16
(
weights
=
"imagenet"
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"VGG19"
:
model
=
applications
.
VGG19
(
weights
=
"imagenet"
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"ResNet50"
:
model
=
applications
.
ResNet50
(
weights
=
"imagenet"
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"InceptionV3"
:
model
=
applications
.
InceptionV3
(
weights
=
"imagenet"
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"MobileNet"
:
model
=
applications
.
MobileNet
(
weights
=
"imagenet"
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
for
layer
in
model
.
layers
[:
int
(
len
(
model
.
layers
)
*
(
self
.
fine_tuning_rate
.
value
/
100
))]:
layer
.
trainable
=
False
else
:
# without transfer learning
if
self
.
architecture
.
value
==
"Xception"
:
model
=
applications
.
Xception
(
weights
=
None
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"VGG16"
:
model
=
applications
.
VGG16
(
weights
=
None
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"VGG19"
:
model
=
applications
.
VGG19
(
weights
=
None
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"ResNet50"
:
model
=
applications
.
ResNet50
(
weights
=
None
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"InceptionV3"
:
model
=
applications
.
InceptionV3
(
weights
=
None
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
elif
self
.
architecture
.
value
==
"MobileNet"
:
model
=
applications
.
MobileNet
(
weights
=
None
,
include_top
=
False
,
input_shape
=
(
IMG_WIDTH
,
IMG_HEIGHT
,
3
))
for
layer
in
model
.
layers
:
layer
.
trainable
=
True
# Adding custom Layers
new_custom_layers
=
model
.
output
new_custom_layers
=
Flatten
()(
new_custom_layers
)
new_custom_layers
=
Dense
(
1024
,
activation
=
"relu"
)(
new_custom_layers
)
new_custom_layers
=
Dropout
(
0.5
)(
new_custom_layers
)
new_custom_layers
=
Dense
(
1024
,
activation
=
"relu"
)(
new_custom_layers
)
predictions
=
Dense
(
num_classes
,
activation
=
"softmax"
)(
new_custom_layers
)
# creating the final model
model
=
Model
(
inputs
=
model
.
input
,
outputs
=
predictions
)
return
model
def
make_dataset
(
self
,
dataset
):
# create symbolic links to the dataset
KERAS_DATASET_DIR_NAME
=
"keras_dataset"
KERAS_DIR_TRAIN_NAME
=
"train"
KERAS_DIR_VALIDATION_NAME
=
"validation"
PERC_TRAIN
=
60
# create keras dir dataset
if
not
os
.
path
.
exists
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
)):
os
.
makedirs
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
))
# create keras dir train
if
not
os
.
path
.
exists
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_TRAIN_NAME
)):
os
.
makedirs
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_TRAIN_NAME
))
# create keras dir validation
if
not
os
.
path
.
exists
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_VALIDATION_NAME
)):
os
.
makedirs
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_VALIDATION_NAME
))
for
root
,
dirs
,
files
in
os
.
walk
(
dataset
,
topdown
=
False
):
# shuffle array
random
.
shuffle
(
files
)
quant_files
=
len
(
files
)
file_index
=
0
if
not
KERAS_DATASET_DIR_NAME
in
root
.
split
(
"/"
)
and
root
.
split
(
"/"
)[
-
1
]
!=
dataset
.
split
(
"/"
)[
-
1
]:
if
not
os
.
path
.
exists
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_TRAIN_NAME
,
root
.
split
(
"/"
)[
-
1
])):
os
.
makedirs
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_TRAIN_NAME
,
root
.
split
(
"/"
)[
-
1
]))
if
not
os
.
path
.
exists
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_VALIDATION_NAME
,
root
.
split
(
"/"
)[
-
1
])):
os
.
makedirs
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_VALIDATION_NAME
,
root
.
split
(
"/"
)[
-
1
]))
for
file
in
files
:
if
file_index
<=
((
quant_files
/
100
)
*
PERC_TRAIN
):
if
not
os
.
path
.
islink
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_TRAIN_NAME
,
root
.
split
(
"/"
)[
-
1
],
file
)):
os
.
symlink
(
File
.
make_path
(
root
,
file
),
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_TRAIN_NAME
,
root
.
split
(
"/"
)[
-
1
],
file
))
file_index
+=
1
else
:
if
not
os
.
path
.
islink
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_VALIDATION_NAME
,
root
.
split
(
"/"
)[
-
1
],
file
)):
os
.
symlink
(
File
.
make_path
(
root
,
file
),
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_VALIDATION_NAME
,
root
.
split
(
"/"
)[
-
1
],
file
))
train_datagen
=
ImageDataGenerator
()
train_generator
=
train_datagen
.
flow_from_directory
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_TRAIN_NAME
),
target_size
=
(
IMG_HEIGHT
,
IMG_WIDTH
),
batch_size
=
self
.
batch_size
.
value
,
shuffle
=
True
,
class_mode
=
"categorical"
)
validation_datagen
=
ImageDataGenerator
()
validation_generator
=
validation_datagen
.
flow_from_directory
(
File
.
make_path
(
dataset
,
KERAS_DATASET_DIR_NAME
,
KERAS_DIR_VALIDATION_NAME
),
target_size
=
(
IMG_HEIGHT
,
IMG_WIDTH
),
batch_size
=
self
.
batch_size
.
value
,
shuffle
=
True
,
class_mode
=
"categorical"
)
return
train_generator
,
validation_generator
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment