road_detection/app/server.py

342 lines
13 KiB
Python

import base64
import flask
from flask import Flask, jsonify
import numpy as np
import io
from PIL import Image, ImageDraw
import tensorflow as tf
import ops as utils_ops
import visualization_utils as vis_util
import cv2
from serve import serve_unet_model, serve_bridge_model
from serve import serve_rcnn_model
app = Flask(__name__)
def load_unet_model():
global tflite_interpreter_c, height_c, width_c, input_details_c, output_details_c
tflite_interpreter_c, height_c, width_c, input_details_c, output_details_c = serve_unet_model()
def load_rcnn_model():
global detection_graph
detection_graph = serve_rcnn_model()
load_unet_model()
load_rcnn_model()
def prepare_img(image, type):
if type == "detect":
return image.resize((width, height))
elif type == "segment":
return image.resize((width_c, height_c))
def load_image_into_numpy_array(image):
(im_width, im_height) = image.size
return np.array(image.getdata()).reshape(
(im_height, im_width, 3)).astype(np.uint8)
@app.route("/detect/rcnn", methods=["POST"])
def detect_rcnn():
if flask.request.method == "POST":
if flask.request.files.get("image"):
image = Image.open(flask.request.files["image"])
image_np = load_image_into_numpy_array(image)
# image_np_expanded = np.expand_dims(image_np, axis=0)
output_dict = run_inference_for_single_image(image_np, detection_graph)
category_index = {0: {"name": "pothole"}, 1: {"name": "pothole"}}
print(output_dict.get('detection_masks'))
i, is_crack = vis_util.visualize_boxes_and_labels_on_image_array(
image_np,
output_dict['detection_boxes'],
output_dict['detection_classes'],
output_dict['detection_scores'],
category_index,
instance_masks=output_dict.get('detection_masks'),
use_normalized_coordinates=True,
line_thickness=8,
skip_scores=True,
skip_labels=True)
img = Image.fromarray(image_np.astype("uint8"))
img = img.resize((128, 128))
raw_bytes = io.BytesIO()
img.save(raw_bytes, "JPEG")
raw_bytes.seek(0)
img_byte = raw_bytes.getvalue()
img_str = base64.b64encode(img_byte)
data = {
"result": is_crack,
"img": img_str.decode('utf-8')
}
return jsonify(data)
else:
return "Could not find image"
return "Please use POST method"
@app.route("/segment", methods=["POST"])
def segment():
if flask.request.method == "POST":
if flask.request.files.get("image"):
# read the image in PIL format
img = prepare_img(Image.open(flask.request.files["image"]), "segment")
input_data = np.expand_dims(img, axis=0)
input_data = np.float32(input_data) / 255.0
tflite_interpreter_c.set_tensor(input_details_c[0]['index'], input_data)
tflite_interpreter_c.invoke()
result = tflite_interpreter_c.get_tensor(output_details_c[0]['index'])
result = result > 0.5
result = result * 255
mask = np.squeeze(result)
bg = np.asarray(img).copy()
is_crack = False
for i in range(len(mask)):
for j in range(len(mask[i])):
if mask[i][j] > 0:
bg[i][j][0] = 0
bg[i][j][1] = 0
bg[i][j][2] = 255
is_crack = True
img = Image.fromarray(bg.astype("uint8"))
raw_bytes = io.BytesIO()
img.save(raw_bytes, "JPEG")
raw_bytes.seek(0)
img_byte = raw_bytes.getvalue()
img_str = base64.b64encode(img_byte)
data = {
"result": is_crack,
"img": img_str.decode('utf-8')
}
return jsonify(data)
else:
return "Could not find image"
return "Please use POST method"
def run_inference_for_single_image(image, graph):
with graph.as_default():
with tf.compat.v1.Session() as sess:
# Get handles to input and output tensors
ops = tf.compat.v1.get_default_graph().get_operations()
all_tensor_names = {
output.name for op in ops for output in op.outputs}
tensor_dict = {}
for key in [
'num_detections', 'detection_boxes', 'detection_scores',
'detection_classes', 'detection_masks'
]:
tensor_name = key + ':0'
if tensor_name in all_tensor_names:
tensor_dict[key] = tf.compat.v1.get_default_graph().get_tensor_by_name(
tensor_name)
if 'detection_masks' in tensor_dict:
# The following processing is only for single image
detection_boxes = tf.squeeze(
tensor_dict['detection_boxes'], [0])
detection_masks = tf.squeeze(
tensor_dict['detection_masks'], [0])
# Reframe is required to translate mask from box coordinates to image coordinates and fit the image
# size.
real_num_detection = tf.cast(
tensor_dict['num_detections'][0], tf.int32)
detection_boxes = tf.slice(detection_boxes, [0, 0], [
real_num_detection, -1])
detection_masks = tf.slice(detection_masks, [0, 0, 0], [
real_num_detection, -1, -1])
detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
detection_masks, detection_boxes, image.shape[0], image.shape[1])
detection_masks_reframed = tf.cast(
tf.greater(detection_masks_reframed, 0.5), tf.uint8)
# Follow the convention by adding back the batch dimension
tensor_dict['detection_masks'] = tf.expand_dims(
detection_masks_reframed, 0)
image_tensor = tf.compat.v1.get_default_graph().get_tensor_by_name('image_tensor:0')
# Run inference
output_dict = sess.run(tensor_dict,
feed_dict={image_tensor: np.expand_dims(image, 0)})
# all outputs are float32 numpy arrays, so convert types as appropriate
output_dict['num_detections'] = int(
output_dict['num_detections'][0])
output_dict['detection_classes'] = output_dict[
'detection_classes'][0].astype(np.uint8)
output_dict['detection_boxes'] = output_dict['detection_boxes'][0]
output_dict['detection_scores'] = output_dict['detection_scores'][0]
if 'detection_masks' in output_dict:
output_dict['detection_masks'] = output_dict['detection_masks'][0]
return output_dict
@app.route('/', methods=["POST"])
def index():
if flask.request.method == "POST":
if flask.request.files.get("image"):
img_src = Image.open(flask.request.files["image"])
# start crack detection
img_segment = prepare_img(img_src, "segment")
input_data = np.expand_dims(img_segment, axis=0)
input_data = np.float32(input_data) / 255.0
tflite_interpreter_c.set_tensor(input_details_c[0]['index'], input_data)
tflite_interpreter_c.invoke()
result = tflite_interpreter_c.get_tensor(output_details_c[0]['index'])
result = result > 0.5
result = result * 255
mask = np.squeeze(result)
bg = np.asarray(img_segment).copy()
is_crack = False
for i in range(len(mask)):
for j in range(len(mask[i])):
if mask[i][j] > 0:
bg[i][j][0] = 0
bg[i][j][1] = 0
bg[i][j][2] = 255
is_crack = True
img = Image.fromarray(bg.astype("uint8"))
# start pothole detection
image_np = load_image_into_numpy_array(img_src)
# image_np_expanded = np.expand_dims(image_np, axis=0)
output_dict = run_inference_for_single_image(image_np, detection_graph)
category_index = {0: {"name": "pothole"}, 1: {"name": "pothole"}}
_, is_pothole = vis_util.visualize_boxes_and_labels_on_image_array(
image_np,
output_dict['detection_boxes'],
output_dict['detection_classes'],
output_dict['detection_scores'],
category_index,
instance_masks=output_dict.get('detection_masks'),
use_normalized_coordinates=True,
line_thickness=8,
skip_scores=True,
skip_labels=True)
raw_bytes = io.BytesIO()
raw_src = io.BytesIO()
img.save(raw_bytes, "JPEG")
img_src.save(raw_src, "JPEG")
raw_bytes.seek(0)
raw_src.seek(0)
img_byte = raw_bytes.getvalue()
img_src_byte = raw_src.getvalue()
img_str = base64.b64encode(img_src_byte)
img_discern = base64.b64encode(img_byte)
data = {
"code": 0,
"crack": is_crack,
"pothole": is_pothole,
"img_src": img_str.decode('utf-8'),
"img_discern": img_discern.decode('utf-8')
}
return jsonify(data)
else:
data = {
"code": 10001,
"msg": "Could not find image"
}
return jsonify(data)
return "Road Damage Detection"
@app.route("/predict/bridge", methods=["POST"])
def bridge():
if flask.request.method == "POST":
if flask.request.files.get("image"):
pred_data_colr = []
pred_data_inv = []
img_src = cv2.imread(flask.request.files["image"], 0)
image_dst = resize_keep_aspect_ratio(img_src, (227, 227))
bi_inv, colored_img = process_image(image_dst)
pred_data_colr.append(colored_img)
pred_data_inv.append(bi_inv)
final_pred_colr = np.array(pred_data_colr).reshape((len(pred_data_colr), 227, 227, 1))
final_pred_inv = np.array(pred_data_inv).reshape((len(pred_data_inv), 227, 227, 1))
is_crack = predict_image_util(final_pred_inv)
image_np = load_image_into_numpy_array(img_src)
img = Image.fromarray(image_np.astype("uint8"))
img = img.resize((128, 128))
raw_bytes = io.BytesIO()
img.save(raw_bytes, "JPEG")
raw_bytes.seek(0)
img_byte = raw_bytes.getvalue()
img_str = base64.b64encode(img_byte)
data = {
"result": is_crack,
"img": img_str.decode('utf-8')
}
return jsonify(data)
else:
data = {
"code": 10001,
"msg": "Could not find image"
}
return jsonify(data)
return "Bridge Detection"
def predict_image_util(final_pred_inv):
model = serve_bridge_model()
img_test = (final_pred_inv[0].reshape((1, 227, 227, 1)))
raw_predicted_label = model.predict(img_test, batch_size=None, verbose=0, steps=None)[0][0]
predicted_label = 1
if raw_predicted_label < 0.8:
predicted_label = 0
predicted_label_str = 'Crack'
if predicted_label == 0:
predicted_label_str = 'No Crack'
print('Raw Predicted Label(Numeric): ' + str(raw_predicted_label))
print('Predicted Label : ' + predicted_label_str)
return predicted_label
def process_image(img):
ret, bi_inv = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY_INV)
return bi_inv, img
def resize_keep_aspect_ratio(image_src, dst_size):
src_h, src_w = image_src.shape[:2]
dst_h, dst_w = dst_size
# 判断应该按哪个边做等比缩放
h = dst_w * (float(src_h) / src_w) # 按照w做等比缩放
w = dst_h * (float(src_w) / src_h) # 按照h做等比缩放
h = int(h)
w = int(w)
if h <= dst_h:
image_dst = cv2.resize(image_src, (dst_w, int(h)))
else:
image_dst = cv2.resize(image_src, (int(w), dst_h))
h_, w_ = image_dst.shape[:2]
top = int((dst_h - h_) / 2)
down = int((dst_h - h_ + 1) / 2)
left = int((dst_w - w_) / 2)
right = int((dst_w - w_ + 1) / 2)
value = [0, 0, 0]
border_type = cv2.BORDER_CONSTANT
image_dst = cv2.copyMakeBorder(image_dst, top, down, left, right, border_type, None, value)
return image_dst
if __name__ == "__main__":
app.run()