Webカメラを買ったので、なにか遊ぼうと思いついたのが交通量調査。数年前にOpenCVが大流行りした時に少し試してみたことがあったのですが、その時は ラズバイ3BとUSBカメラで処理速度が圧倒的に足りず、消化不良だったことを思い出し、今回は RTSP 接続のストリーム映像をリアルタイムで処理してみて、モノになる性能が出せるものかどうかを確認します。
使用したコード
github のどこかで入手したコードを改造したものです。いろいろ見て回っていたら、元の作者さんの場所を失念してしまいました。[ python ] + [ traffic ] などのキーワードで github を検索すれば山程出てきますので、探してみてください。
環境
- Windows11
- Python 3.9.13
- opencv-python 4.8.0.76
- numpy 1.24.4
- Wansview Q6 ( 640 x 480 / 15fps / RTSP接続 )
コード
2ファイル構成です。同一フォルダに配置してください。
YOLO などの高機能な物体検出ライブラリを使用せず、パワープレイで検知をゴリ押す実装ですので、参考になるのではないかと思います。
countcar.py
import numpy as np
import cv2
import Cars
import time
# Input and output counters
cnt_up = 0
cnt_down = 0
#Video source
cap = cv2.VideoCapture('rtsp://user:pass@IPアドレス:554/live/ch1')
#Print the capture properties to console
for i in range(19):
print (i, cap.get(i))
w = cap.get(3)
h = cap.get(4)
frameArea = h*w
areaTH = frameArea/750
print ('Area Threshold', areaTH)
#In / out lines
line_up = int(1.2*(h/3))
line_down = int(1.8*(h/4))
up_limit = int(1.4*(h/5))
down_limit = int(2.2*(h/4))
print ("Red line y:",str(line_down))
print ("Blue line y:", str(line_up))
line_down_color = (255,0,0)
line_up_color = (0,0,255)
pt1 = [0, line_down];
pt2 = [w, line_down];
pts_L1 = np.array([pt1,pt2], np.int32)
pts_L1 = pts_L1.reshape((-1,1,2))
pt3 = [0, line_up];
pt4 = [w, line_up];
pts_L2 = np.array([pt3,pt4], np.int32)
pts_L2 = pts_L2.reshape((-1,1,2))
pt5 = [0, up_limit];
pt6 = [w, up_limit];
pts_L3 = np.array([pt5,pt6], np.int32)
pts_L3 = pts_L3.reshape((-1,1,2))
pt7 = [0, down_limit];
pt8 = [w, down_limit];
pts_L4 = np.array([pt7,pt8], np.int32)
pts_L4 = pts_L4.reshape((-1,1,2))
#Background Substractor
fgbg = cv2.createBackgroundSubtractorMOG2(detectShadows = True)
# Structuring elements for morphographic filters
kernelOp = np.ones((3,3),np.uint8)
kernelOp2 = np.ones((5,5),np.uint8)
kernelCl = np.ones((11,11),np.uint8)
#Variables
font = cv2.FONT_HERSHEY_SIMPLEX
cars = []
max_p_age = 5
pid = 1
while(cap.isOpened()):
##for image in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
#Read an image of the video source
ret, frame = cap.read()
## frame = image.array
for i in cars:
i.age_one() #age every object one frame
#Application background subtraction
fgmask = fgbg.apply(frame)
fgmask2 = fgbg.apply(frame)
#Binariazcion to eliminate shadows (gray color)
try:
ret,imBin= cv2.threshold(fgmask,200,255,cv2.THRESH_BINARY)
ret,imBin2 = cv2.threshold(fgmask2,200,255,cv2.THRESH_BINARY)
#Opening (erode->dilate)
mask = cv2.morphologyEx(imBin, cv2.MORPH_OPEN, kernelOp)
mask2 = cv2.morphologyEx(imBin2, cv2.MORPH_OPEN, kernelOp)
#Closing (dilate -> erode)
mask = cv2.morphologyEx(mask , cv2.MORPH_CLOSE, kernelCl)
mask2 = cv2.morphologyEx(mask2, cv2.MORPH_CLOSE, kernelCl)
except:
print('EOF')
print ('UP:',cnt_up)
print ('DOWN:',cnt_down)
break
# RETR_EXTERNAL returns only extreme outer flags. All child contours are left behind.
contours0, hierarchy = cv2.findContours(mask2,cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours0:
area = cv2.contourArea(cnt)
if area > areaTH:
#################
# TRACKING #
#################
M = cv2.moments(cnt)
cx = int(M['m10']/M['m00'])
cy = int(M['m01']/M['m00'])
x,y,w,h = cv2.boundingRect(cnt)
new = True
if cy in range(up_limit,down_limit):
for i in cars:
if abs(cx-i.getX()) <= w and abs(cy-i.getY()) <= h:
# the object is close to one that was already detected before
new = False
i.updateCoords(cx,cy) #Update coordinates on the object and resets age
if i.going_UP(line_down,line_up) == True:
cnt_up += 1;
print ("ID:",i.getId(),'crossed going up at',time.strftime("%c"))
elif i.going_DOWN(line_down,line_up) == True:
cnt_down += 1;
print ("ID:",i.getId(),'crossed going down at',time.strftime("%c"))
break
if i.getState() == '1':
if i.getDir() == 'down' and i.getY() > down_limit:
i.setDone()
elif i.getDir() == 'up' and i.getY() < up_limit:
i.setDone()
if i.timedOut():
index = cars.index(i)
cars.pop(index)
del i
if new == True:
p = Cars.MyCars(pid,cx,cy, max_p_age)
cars.append(p)
pid += 1
cv2.circle(frame,(cx,cy), 5, (0,0,255), -1)
img = cv2.rectangle(frame,(x,y),(x+w,y+h),(0,255,0),2)
#cv2.drawContours(frame, cnt, -1, (0,255,0), 3)
#END for cnt in contours0
for i in cars:
## if len(i.getTracks()) >= 2:
## pts = np.array(i.getTracks(), np.int32)
## pts = pts.reshape((-1,1,2))
## frame = cv2.polylines(frame,[pts],False,i.getRGB())
## if i.getId() == 9:
## print str(i.getX()), ',', str(i.getY())
cv2.putText(frame, str(i.getId()),(i.getX(),i.getY()),font,0.3,i.getRGB(),1,cv2.LINE_AA)
#################
# IMAGES #
#################
str_up = 'up: '+ str(cnt_up)
str_down = 'down: '+ str(cnt_down)
frame = cv2.polylines(frame,[pts_L1],False,line_down_color,thickness=2)
frame = cv2.polylines(frame,[pts_L2],False,line_up_color,thickness=2)
frame = cv2.polylines(frame,[pts_L3],False,(255,255,255),thickness=1)
frame = cv2.polylines(frame,[pts_L4],False,(255,255,255),thickness=1)
cv2.putText(frame, str_up ,(15,40),font,0.5,(200,200,200),2,cv2.LINE_AA)
cv2.putText(frame, str_up ,(15,40),font,0.5,(0,200,0),1,cv2.LINE_AA)
cv2.putText(frame, str_down ,(15,60),font,0.5,(200,200,200),2,cv2.LINE_AA)
cv2.putText(frame, str_down ,(15,60),font,0.5,(0,0,200),1,cv2.LINE_AA)
cv2.imshow('Frame',frame)
#cv2.imshow('Mask',mask)
#press ESC to exit
k = cv2.waitKey(30) & 0xff
if k == 27:
break
#END while(cap.isOpened())
cap.release()
cv2.destroyAllWindows()
Cars.py
from random import randint
import time
class MyCars:
tracks = []
def __init__(self, i, xi, yi, max_age):
self.i = i
self.x = xi
self.y = yi
self.tracks = []
self.R = randint(0,255)
self.G = randint(0,255)
self.B = randint(0,255)
self.done = False
self.state = '0'
self.age = 0
self.max_age = max_age
self.dir = None
def getRGB(self):
return (self.R,self.G,self.B)
def getTracks(self):
return self.tracks
def getId(self):
return self.i
def getState(self):
return self.state
def getDir(self):
return self.dir
def getX(self):
return self.x
def getY(self):
return self.y
def updateCoords(self, xn, yn):
self.age = 0
self.tracks.append([self.x,self.y])
self.x = xn
self.y = yn
def setDone(self):
self.done = True
def timedOut(self):
return self.done
def going_UP(self,mid_start,mid_end):
if len(self.tracks) >= 2:
if self.state == '0':
if self.tracks[-1][1] < mid_end and self.tracks[-2][1] >= mid_end: #melalui batas
state = '1'
self.dir = 'up'
return True
else:
return False
else:
return False
def going_DOWN(self,mid_start,mid_end):
if len(self.tracks) >= 2:
if self.state == '0':
if self.tracks[-1][1] > mid_start and self.tracks[-2][1] <= mid_start: #melalui batas
state = '1'
self.dir = 'down'
return True
else:
return False
else:
return False
def age_one(self):
self.age += 1
if self.age > self.max_age:
self.done = True
return True
class MultiCars:
def __init__(self, cars, xi, yi):
self.cars = cars
self.x = xi
self.y = yi
self.tracks = []
self.R = randint(0,255)
self.G = randint(0,255)
self.B = randint(0,255)
self.done = False
実行結果
自宅からの映像では劣悪なアングルで精度が低い
分かりきっていたことですが、横着して自宅の窓からの景色を対象にしたため、計測に最適なアングルを確保できていません。そのせいで、かなり怪しい動きをしているところがあります。以下の動画でご確認いただけます。
赤と青のラインをクロスした動体を検知して、上りと下りの交通量をカウントしています。
ご覧いただければお分かりのように、道路が斜めで検知エリアも狭く、一時停止ライン付近であるため、交通量調査には不適切な映像です。加えて、Q6の高解像度 RTSP ストリームが安定しないため、低解像度のストリームを対象とせざるを得ないことも、検出の精度に悪影響を与えていると思われます。このような訳で、かなりおかしなカウントをしている場面が何度も出てきてしまっています。
理想的なアングルからの映像では認識精度は向上する
そこで、試しに最適な交通動画を対象に同じロジックで精度の差を確認してみました。github で同様のコードを配布している方が使用していた動画をお借りしています。試されたい方は、countcar.py のコードで映像の input と検出位置などの調整を行ってみてください。
https://github.com/HodenX/python-traffic-counter-with-yolo-and-sort/blob/master/input/highway.mp4
この動画の解像度は 1280 x 720(30fps) で、アングルも理想的です。検出の精度もかなり改善されているのが確認できます。しかし、複数台が団子になって来た時の検出で怪しいところが見られますね。
本気で精度を上げるためには、歩道橋の上から超広角レンズで真上に近い位置から撮影するフットワークが不可欠でしょう。そのようなアングルならば、車が重複してしまう心配がありません。その場合、被写体がブレブレにならないよう、シャッター速度にも気を使う必要があるかもしれません。
あるいは、yolo などを用いて高度な物体認識をする実装の検討が必要でしょう。
しかし、30fpsの高解像度映像を普通のパソコンでリアルタイムに処理できてしまうとは、すごい時代になったものです。
コードを改造するなら
処理速度に余裕があるならば、より高解像度の入力としたり、解析映像を録画したり、ログデータをTSVフォーマットで出力して時間ごとの分析をExcelなどで後処理できるようにすると楽ですね。
車だけでなく人物も認識する(してしまう)
ロジックを眺めてみてもらえば分かりますが、車だけでなく動体を認識する特性がありますので、人数などのカウントにも使えます。学園祭などのイベントの入場者数のカウントにも応用できるかもしれません。その場合、やはりカメラのアングルを最適化できるかどうかが成否のポイントになるでしょうね。
Q6の RTSP はpythonからOPENする分には安定している
これまで、Q6 の RTSP 接続が不安定であることを報告してきましたが、不思議なことに、今回遊んでいる中で接続が途絶することなく安定していました。気温との関連性などがあるのかもしれませんが、詳細な検証はできていません。
3,000円でリモート接続のカメラというのが価値がありますね。大きめのモバイルバッテリとの組み合わせで完全ワイヤレス駆動も可能でしょうから、子供の夏休みの研究などで、スイカの食べ残しなどをリモートで観察するなどの用途など、アイディア次第で面白いことができそうです。
Windows環境とパソコンに内蔵のカメラだけあれば動作させることは可能ですので、遊び甲斐がありますよ。


