PyRobovision: Difference between revisions

From ICO wiki
Jump to navigationJump to search
Lvosandi (talk | contribs)
Created page with " <source lang="python"> import cv2 import numpy as np import signal import sys from flask import Flask, render_template, Response from collections import deque from datetime..."
 
Lvosandi (talk | contribs)
No edit summary
 
(7 intermediate revisions by the same user not shown)
Line 1: Line 1:
This page is simply to show off the MVP using Python, OpenCV, [[Firmata]], Flask. Currently the robot is able to turn towards the golf ball and then approach it.


It supports streaming MJPEG to the browser as well.
[http://github.com/laurivosandi/pyrobovision Updated version available at GitHub]
[[File:pyrobovision.png]]
First install [[Linux+OpenCV3+Python3.5]], [[Firmata]] and also Flask:
<source lang="bash">
apt-get install python3-flask
</source>
Corresponding Python source:


<source lang="python">
<source lang="python">
import cv2
import cv2
import numpy as np
import numpy as np
Line 10: Line 25:
from datetime import datetime
from datetime import datetime
from time import time, sleep
from time import time, sleep
from threading import Thread
from threading import Thread, Event
from PyMata.pymata import PyMata
from PyMata.pymata import PyMata


class Motors(Thread):
# Motor pins on Arduino
    MOTOR_1_PWM = 2
MOTOR_1_PWM = 2
    MOTOR_1_A  = 3
MOTOR_1_A  = 3
    MOTOR_1_B  = 4
MOTOR_1_B  = 4
    MOTOR_2_PWM = 5
MOTOR_2_PWM = 5
    MOTOR_2_A  = 6
MOTOR_2_A  = 6
    MOTOR_2_B  = 7
MOTOR_2_B  = 7
    MOTOR_3_PWM = 8
MOTOR_3_PWM = 8
    MOTOR_3_A  = 9
MOTOR_3_A  = 9
    MOTOR_3_B  = 10
MOTOR_3_B  = 10
      
 
def signal_handler(sig, frame):
     board.reset()
 
# Here we initialize the motor pins on Arduino
board = PyMata(bluetooth=False)
signal.signal(signal.SIGINT, signal_handler)
board.set_pin_mode(MOTOR_1_PWM, board.PWM,    board.DIGITAL)
board.set_pin_mode(MOTOR_1_A,  board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_1_B,  board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_2_PWM, board.PWM,    board.DIGITAL)
board.set_pin_mode(MOTOR_2_A,  board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_2_B,  board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_3_PWM, board.PWM,    board.DIGITAL)
board.set_pin_mode(MOTOR_3_A,  board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_3_B,  board.OUTPUT, board.DIGITAL)
 
class MotorThread(Thread):
     def __init__(self):
     def __init__(self):
         Thread.__init__(self)
         Thread.__init__(self)
         self.daemon = True
         self.daemon = True
         self.board = PyMata("/dev/ttyACM1")
         self.lock = Event()
         def signal_handler(sig, frame):
         self.set(0, 0, 0)
            self.stop_motors()
            self.board.reset()
            sys.exit(0)
        signal.signal(signal.SIGINT, signal_handler)
        self.board.set_pin_mode(self.MOTOR_1_PWM, self.board.PWM,    self.board.DIGITAL)
        self.board.set_pin_mode(self.MOTOR_1_A,  self.board.OUTPUT, self.board.DIGITAL)
        self.board.set_pin_mode(self.MOTOR_1_B,  self.board.OUTPUT, self.board.DIGITAL)
        self.board.set_pin_mode(self.MOTOR_2_PWM, self.board.PWM,    self.board.DIGITAL)
        self.board.set_pin_mode(self.MOTOR_2_A,  self.board.OUTPUT, self.board.DIGITAL)
        self.board.set_pin_mode(self.MOTOR_2_B,  self.board.OUTPUT, self.board.DIGITAL)
        self.board.set_pin_mode(self.MOTOR_3_PWM, self.board.PWM,    self.board.DIGITAL)
        self.board.set_pin_mode(self.MOTOR_3_A,  self.board.OUTPUT, self.board.DIGITAL)
        self.board.set_pin_mode(self.MOTOR_3_B,  self.board.OUTPUT, self.board.DIGITAL)
        self.dx, self.dy = 0, 0
          
          
     def stop_motors(self):
     def set(self, m1, m2, m3):
         self.board.digital_write(self.MOTOR_1_B, 0)
         self.m1, self.m2, self.m3 = m1, m2, m3
        self.board.digital_write(self.MOTOR_1_A, 0)
         self.lock.set()
        self.board.digital_write(self.MOTOR_2_B, 0)
        self.board.digital_write(self.MOTOR_2_A, 0)
        self.board.digital_write(self.MOTOR_3_B, 0)
         self.board.digital_write(self.MOTOR_3_A, 0)


     def run(self):
     def run(self):
         while True:
         while True:
             # Reset all direction pins to avoid damaging H-bridges
             board.digital_write(MOTOR_1_B, 0) # Reset all direction pins to avoid damaging H-bridges
             self.stop_motors()
             board.digital_write(MOTOR_1_A, 0)
 
            board.digital_write(MOTOR_2_B, 0)
            dist = abs(self.dx)
            board.digital_write(MOTOR_2_A, 0)
            if dist > 2:
            board.digital_write(MOTOR_3_B, 0)
                if self.dx > 0:
            board.digital_write(MOTOR_3_A, 0)
                    print("Turning left")
            board.analog_write(MOTOR_1_PWM, int(abs(self.m1) + 25) if self.m1 else 0) # Set duty cycle
                    self.board.digital_write(self.MOTOR_1_B, 1)
            board.analog_write(MOTOR_2_PWM, int(abs(self.m2) + 25) if self.m2 else 0)
                    self.board.digital_write(self.MOTOR_2_B, 1)
            board.analog_write(MOTOR_3_PWM, int(abs(self.m3) + 25) if self.m3 else 0)
                    self.board.digital_write(self.MOTOR_3_B, 1)
             board.digital_write(MOTOR_1_A, self.m1 < 0) # Set directions
                else:
            board.digital_write(MOTOR_1_B, self.m1 > 0)
                    print("Turning right")
            board.digital_write(MOTOR_2_A, self.m2 < 0)
                    self.board.digital_write(self.MOTOR_1_A, 1)
            board.digital_write(MOTOR_2_B, self.m2 > 0)
                    self.board.digital_write(self.MOTOR_2_A, 1)
            board.digital_write(MOTOR_3_A, self.m3 < 0)
                    self.board.digital_write(self.MOTOR_3_A, 1)
            board.digital_write(MOTOR_3_B, self.m3 > 0)
                self.board.analog_write(self.MOTOR_1_PWM, int(dist ** 0.7 + 25))
            self.lock.wait()
                self.board.analog_write(self.MOTOR_2_PWM, int(dist ** 0.7 + 25))
             self.lock.clear()
                self.board.analog_write(self.MOTOR_3_PWM, int(dist ** 0.7 + 25))
             elif self.dy > 30:
                print("Going forward")
                self.board.digital_write(self.MOTOR_1_B, 1)
                self.board.digital_write(self.MOTOR_3_A, 1)
                self.board.analog_write(self.MOTOR_1_PWM, int(self.dy ** 0.5 )+30)
                self.board.analog_write(self.MOTOR_2_PWM, 0)
                self.board.analog_write(self.MOTOR_3_PWM, int(self.dy ** 0.5 )+30)
             sleep(0.03)


class FrameGrabber(Thread):
class FrameGrabber(Thread):
     BALL_LOWER = ( 5, 140, 140)
    # Set HSV color ranges, this basically means color red regardless of saturation or brightness
     BALL_UPPER = (30, 255, 255)
     BALL_LOWER = ( 0, 140, 140)
 
     BALL_UPPER = (10, 255, 255)
     def __init__(self, width=320, height=240):
     def __init__(self, width=640, height=480):
         Thread.__init__(self)
         Thread.__init__(self)
         self.daemon = True
         self.daemon = True
         self.video = cv2.VideoCapture(0)
         self.video = cv2.VideoCapture(0)
         self.video.set(3, width)
 
         self.video.set(4, height)
        # PS3 eye config:
         self.video.set(3, width)             # Set capture width to 640px
         self.video.set(4, height)           # Set capture height to 480px
        self.video.set(cv2.CAP_PROP_FPS, 60) # Set framerate to 60 fps
     
        self.cx, self.cy = width / 2, height
         self.timestamp = time()
         self.timestamp = time()
         self.frames = 0
         self.frames = 0
         self.fps = 50
         self.fps = 0
         self.current_frame = None
         self.last_frame = None
 
     def run(self):
     def run(self):
         while True:
         while True:
Line 104: Line 115:
                 self.frames = 0
                 self.frames = 0
                 self.timestamp = timestamp_begin
                 self.timestamp = timestamp_begin
             success, frame = self.video.read()
 
            frame = cv2.flip(frame, 1)
             success, frame = self.video.read() # Grab another frame from the camera
            original = frame
             blurred = cv2.blur(frame, (4,4))
             blurred = cv2.blur(frame, (4,4))
             hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV)
             hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV) # Convert red, green and blue to hue, saturation and brightness
             mask = cv2.inRange(hsv, self.BALL_LOWER, self.BALL_UPPER)
             mask = cv2.inRange(hsv, self.BALL_LOWER, self.BALL_UPPER)
             mask = cv2.dilate(mask, None, iterations=2)
             mask = cv2.dilate(mask, None, iterations=2)
             cutout = cv2.bitwise_and(frame,frame, mask= mask)
             cutout = cv2.bitwise_and(frame,frame, mask=mask)
             cnts = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2]
             cnts = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2]
            distance = None
           
             if len(cnts) > 0:
             if len(cnts) > 0:
                 c = max(cnts, key=cv2.contourArea)
                 c = max(cnts, key=cv2.contourArea)
Line 118: Line 131:
                 M = cv2.moments(c)
                 M = cv2.moments(c)
                 center = (int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]))
                 center = (int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]))
                 if radius > 5:
                 if radius > 10:
                     cv2.circle(frame, (int(x), int(y)), int(radius),
                     cv2.circle(frame, center, int(radius), (0, 0, 255), 5)
                        (0, 255, 255), 2)
                     distance = round((1/radius)*100*11.35, 2)
                     cv2.circle(frame, center, 5, (0, 0, 255), -1)
                     cv2.putText(frame, str(radius), (int(x),int(y)), cv2.FONT_HERSHEY_SIMPLEX, 0.7,(255,255,255),1)
                    radius = 1/radius
                   
                    radius = round(radius*100*11.35, 2)
            if distance:
                     cv2.putText(original,str(radius),(int(x),int(y)), cv2.FONT_HERSHEY_SIMPLEX, 0.7,(255,255,255),1,cv2.LINE_AA)
                dx = (x - self.cx) / 2.0
                     cv2.putText(original,str(radius),(int(x+3),int(y)), cv2.FONT_HERSHEY_SIMPLEX, 0.59,(0,0,0),1,cv2.LINE_AA)
                dy = (self.cy - y) / 3.0
                     delta = int((x-160)/5.0)
                adx = abs(dx)
                     motors.dx, motors.dy = delta, 240-y
 
             cv2.putText(frame,"%.01f fps" % self.fps, (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1,cv2.LINE_AA)
                if adx > 100:
             self.current_frame = np.hstack([original, cutout])
                     if dx > 0:
                        cv2.putText(frame,"Going right %d" % adx, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
                        motors.set(adx, adx, adx)
                    else:
                        cv2.putText(frame,"Going left %d" % adx, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
                        motors.set(-adx, -adx, -adx)
                else:
                     cv2.putText(frame,"Going forward %d" % dy, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
                     motors.set(100 + dy, -100 -dy, dx)
            else:
                cv2.putText(frame,"Stopping", (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
                motors.set(0, 0, 0)
 
             cv2.putText(frame,"%.01f fps" % self.fps, (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
             self.last_frame = np.hstack([frame, cutout])


motors = Motors()
motors = MotorThread()
grabber = FrameGrabber()
motors.start()
motors.start()
grabber = FrameGrabber()
grabber.start()
grabber.start()


app = Flask(__name__)
app = Flask(__name__)
 
@app.route('/')
@app.route('/')
def index():
def index():
    return render_template('index.html')
@app.route('/video_feed')
def video_feed():
     def generator():
     def generator():
         while True:
         while True:
             if grabber.current_frame != None:
             if grabber.last_frame != None:
                 ret, jpeg = cv2.imencode('.jpg', grabber.current_frame, (cv2.IMWRITE_JPEG_QUALITY, 10))
                 ret, jpeg = cv2.imencode('.jpg', grabber.last_frame, (cv2.IMWRITE_JPEG_QUALITY, 80))
                 yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n'
                 yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + jpeg.tostring() + b'\r\n\r\n'
             sleep(0.2)
             sleep(0.05)
     return Response(generator(),
     return Response(generator(), mimetype='multipart/x-mixed-replace; boundary=frame')
                    mimetype='multipart/x-mixed-replace; boundary=frame')
 
if __name__ == '__main__':
if __name__ == '__main__':
     app.run(host='0.0.0.0', debug=True,use_reloader=False,threaded=True)
     app.run(host='0.0.0.0', debug=True, use_reloader=False, threaded=True)
 
</source>
</source>

Latest revision as of 18:44, 6 September 2016

This page is simply to show off the MVP using Python, OpenCV, Firmata, Flask. Currently the robot is able to turn towards the golf ball and then approach it.

It supports streaming MJPEG to the browser as well.

Updated version available at GitHub

First install Linux+OpenCV3+Python3.5, Firmata and also Flask:

apt-get install python3-flask

Corresponding Python source:

import cv2
import numpy as np
import signal
import sys
from flask import Flask, render_template, Response
from collections import deque
from datetime import datetime
from time import time, sleep
from threading import Thread, Event
from PyMata.pymata import PyMata

# Motor pins on Arduino
MOTOR_1_PWM = 2
MOTOR_1_A   = 3
MOTOR_1_B   = 4
MOTOR_2_PWM = 5
MOTOR_2_A   = 6
MOTOR_2_B   = 7
MOTOR_3_PWM = 8
MOTOR_3_A   = 9
MOTOR_3_B   = 10

def signal_handler(sig, frame):
    board.reset()

# Here we initialize the motor pins on Arduino
board = PyMata(bluetooth=False)
signal.signal(signal.SIGINT, signal_handler)
board.set_pin_mode(MOTOR_1_PWM, board.PWM,    board.DIGITAL)
board.set_pin_mode(MOTOR_1_A,   board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_1_B,   board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_2_PWM, board.PWM,    board.DIGITAL)
board.set_pin_mode(MOTOR_2_A,   board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_2_B,   board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_3_PWM, board.PWM,    board.DIGITAL)
board.set_pin_mode(MOTOR_3_A,   board.OUTPUT, board.DIGITAL)
board.set_pin_mode(MOTOR_3_B,   board.OUTPUT, board.DIGITAL)

class MotorThread(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.daemon = True
        self.lock = Event()
        self.set(0, 0, 0)
        
    def set(self, m1, m2, m3):
        self.m1, self.m2, self.m3 = m1, m2, m3
        self.lock.set()

    def run(self):
        while True:
            board.digital_write(MOTOR_1_B, 0) # Reset all direction pins to avoid damaging H-bridges
            board.digital_write(MOTOR_1_A, 0)
            board.digital_write(MOTOR_2_B, 0)
            board.digital_write(MOTOR_2_A, 0)
            board.digital_write(MOTOR_3_B, 0)
            board.digital_write(MOTOR_3_A, 0)
            board.analog_write(MOTOR_1_PWM, int(abs(self.m1) + 25) if self.m1 else 0) # Set duty cycle
            board.analog_write(MOTOR_2_PWM, int(abs(self.m2) + 25) if self.m2 else 0)
            board.analog_write(MOTOR_3_PWM, int(abs(self.m3) + 25) if self.m3 else 0)
            board.digital_write(MOTOR_1_A, self.m1 < 0) # Set directions
            board.digital_write(MOTOR_1_B, self.m1 > 0)
            board.digital_write(MOTOR_2_A, self.m2 < 0)
            board.digital_write(MOTOR_2_B, self.m2 > 0)
            board.digital_write(MOTOR_3_A, self.m3 < 0)
            board.digital_write(MOTOR_3_B, self.m3 > 0)
            self.lock.wait()
            self.lock.clear()

class FrameGrabber(Thread):
    # Set HSV color ranges, this basically means color red regardless of saturation or brightness
    BALL_LOWER = ( 0, 140, 140)
    BALL_UPPER = (10, 255, 255)
 
    def __init__(self, width=640, height=480):
        Thread.__init__(self)
        self.daemon = True
        self.video = cv2.VideoCapture(0)

        # PS3 eye config:
        self.video.set(3, width)             # Set capture width to 640px
        self.video.set(4, height)            # Set capture height to 480px
        self.video.set(cv2.CAP_PROP_FPS, 60) # Set framerate to 60 fps
       
        self.cx, self.cy = width / 2, height
        self.timestamp = time()
        self.frames = 0
        self.fps = 0
        self.last_frame = None
 
    def run(self):
        while True:
            self.frames += 1
            timestamp_begin = time()
            if self.frames > 10:
                self.fps = self.frames / (timestamp_begin - self.timestamp)
                self.frames = 0
                self.timestamp = timestamp_begin

            success, frame = self.video.read() # Grab another frame from the camera
            blurred = cv2.blur(frame, (4,4))
            hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV) # Convert red, green and blue to hue, saturation and brightness
            mask = cv2.inRange(hsv, self.BALL_LOWER, self.BALL_UPPER)
            mask = cv2.dilate(mask, None, iterations=2)
            cutout = cv2.bitwise_and(frame,frame, mask=mask)
            cnts = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2]

            distance = None
            
            if len(cnts) > 0:
                c = max(cnts, key=cv2.contourArea)
                (x, y), radius = cv2.minEnclosingCircle(c)
                M = cv2.moments(c)
                center = (int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]))
                if radius > 10:
                    cv2.circle(frame, center, int(radius), (0, 0, 255), 5)
                    distance = round((1/radius)*100*11.35, 2)
                    cv2.putText(frame, str(radius), (int(x),int(y)), cv2.FONT_HERSHEY_SIMPLEX, 0.7,(255,255,255),1)
                    
            if distance:
                dx = (x - self.cx) / 2.0
                dy = (self.cy - y) / 3.0
                adx = abs(dx)

                if adx > 100:
                    if dx > 0:
                        cv2.putText(frame,"Going right %d" % adx, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
                        motors.set(adx, adx, adx)
                    else:
                        cv2.putText(frame,"Going left %d" % adx, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
                        motors.set(-adx, -adx, -adx)
                else:
                    cv2.putText(frame,"Going forward %d" % dy, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
                    motors.set(100 + dy, -100 -dy, dx)
            else:
                cv2.putText(frame,"Stopping", (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
                motors.set(0, 0, 0)

            cv2.putText(frame,"%.01f fps" % self.fps, (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.3,(255,255,255),1)
            self.last_frame = np.hstack([frame, cutout])

motors = MotorThread()
motors.start()

grabber = FrameGrabber() 
grabber.start()

app = Flask(__name__)
 
@app.route('/')
def index():
    def generator():
        while True:
            if grabber.last_frame != None:
                ret, jpeg = cv2.imencode('.jpg', grabber.last_frame, (cv2.IMWRITE_JPEG_QUALITY, 80))
                yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + jpeg.tostring() + b'\r\n\r\n'
            sleep(0.05)
    return Response(generator(), mimetype='multipart/x-mixed-replace; boundary=frame')
 
if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True, use_reloader=False, threaded=True)