Odds and Ends
AWS Rekognition Video Detection : 저장된 영상 분석 시 권한 설정 방법 본문
[AWS Rekognition Video 저장된 영상 분석 시 권한 처리 방법 부터 코드 작성까지 단계]
1. 사용자 권한 3가지 추가
: AmazonSQSFullAccess, AmazonRekognitionFullAccess, AmazonS3ReadOnlyAccess
2. 역할 직접 새로 생성
: AmazonRekognitionServiceRole 권한을 포함하도록 역할을 생성한다.
: 역할 명은 serviceRekognition (알아서 해도됨). 생성한 역할 serviceRekognition의 arn을 복사해둔다.
3. 정책 직접 새로 생성
: JSON로 직접 편집한다. 2번에서 복사해둔 arn을 Resource에 복붙한다.
: 아래 화면과 같이 정책을 생성하고, 1번처럼 사용자의 권한으로 추가한다. 정책 명은 customPolicy (알아서 해도됨)
필자는 여러개 추가했으나 지금까지 따라왔을 때 4개의 정책이 추가되어있으면 된다.
4. S3 버킷을 만들어서 영상 업로드
AWS S3 버킷을 생성하여 분석하고자 하는 영상을 올린다.
5. SNS, SQS 생성
SNS를 생성하여 arn을 복사한다. 이후 SQS를 만들고 url과 arn을 복사해둔다. 이때 SQS가 sendMessage 권한을 가지도록 추가한다.
만든 SQS에서 SNS arn을 복붙해 넣어 구독하게 한다.
아래 코드에 지금까지 생성한 것들로 arn 및 url 등을 채워넣으면 간단한 사물인식이 진행된다.
#Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#PDX-License-Identifier: MIT-0 (For details, see https://github.com/awsdocs/amazon-rekognition-developer-guide/blob/master/LICENSE-SAMPLECODE.)
import boto3
import json
import sys
import time
class VideoDetect:
jobId = ''
access_key_id = ''
secret_access_key = ''
rek = boto3.client('rekognition', region_name='ap-northeast-2', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
sqs = boto3.client('sqs', region_name='ap-northeast-2', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
sns = boto3.client('sns', region_name='ap-northeast-2', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
roleArn = ''
bucket = ''
video = ''
startJobId = ''
sqsQueueUrl = ''
snsTopicArn = ''
processType = ''
def __init__(self, role, bucket, video):
self.roleArn = role
self.bucket = bucket
self.video = video
def GetSQSMessageSuccess(self):
jobFound = False
succeeded = False
dotLine=0
while jobFound == False:
sqsResponse = self.sqs.receive_message(QueueUrl=self.sqsQueueUrl, MessageAttributeNames=['ALL'],
MaxNumberOfMessages=10)
if sqsResponse:
if 'Messages' not in sqsResponse:
if dotLine<40:
print('.', end='')
dotLine=dotLine+1
else:
print()
dotLine=0
sys.stdout.flush()
time.sleep(5)
continue
for message in sqsResponse['Messages']:
notification = json.loads(message['Body'])
rekMessage = json.loads(notification['Message'])
print(rekMessage['JobId'])
print(rekMessage['Status'])
if rekMessage['JobId'] == self.startJobId:
print('Matching Job Found:' + rekMessage['JobId'])
jobFound = True
if (rekMessage['Status']=='SUCCEEDED'):
succeeded=True
self.sqs.delete_message(QueueUrl=self.sqsQueueUrl,
ReceiptHandle=message['ReceiptHandle'])
else:
print("Job didn't match:" +
str(rekMessage['JobId']) + ' : ' + self.startJobId)
# Delete the unknown message. Consider sending to dead letter queue
self.sqs.delete_message(QueueUrl=self.sqsQueueUrl,
ReceiptHandle=message['ReceiptHandle'])
return succeeded
def StartLabelDetection(self):
response=self.rek.start_label_detection(Video={'S3Object': {'Bucket': self.bucket, 'Name': self.video}},
NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn})
self.startJobId=response['JobId']
print('Start Job Id: ' + self.startJobId)
def GetLabelDetectionResults(self):
maxResults = 10
paginationToken = ''
finished = False
while finished == False:
response = self.rek.get_label_detection(JobId=self.startJobId,
MaxResults=maxResults,
NextToken=paginationToken,
SortBy='TIMESTAMP')
print('Codec: ' + response['VideoMetadata']['Codec'])
print('Duration: ' + str(response['VideoMetadata']['DurationMillis']))
print('Format: ' + response['VideoMetadata']['Format'])
print('Frame rate: ' + str(response['VideoMetadata']['FrameRate']))
print()
for labelDetection in response['Labels']:
label=labelDetection['Label']
print("Timestamp: " + str(labelDetection['Timestamp']))
print(" Label: " + label['Name'])
print(" Confidence: " + str(label['Confidence']))
print(" Instances:")
for instance in label['Instances']:
print (" Confidence: " + str(instance['Confidence']))
print (" Bounding box")
print (" Top: " + str(instance['BoundingBox']['Top']))
print (" Left: " + str(instance['BoundingBox']['Left']))
print (" Width: " + str(instance['BoundingBox']['Width']))
print (" Height: " + str(instance['BoundingBox']['Height']))
print()
print()
print (" Parents:")
for parent in label['Parents']:
print (" " + parent['Name'])
print ()
if 'NextToken' in response:
paginationToken = response['NextToken']
else:
finished = True
def CreateTopicandQueue(self):
millis = str(int(round(time.time() * 1000)))
#Create SNS topic
snsTopicName="AmazonRekognitionExample" + millis
topicResponse=self.sns.create_topic(Name=snsTopicName)
self.snsTopicArn = topicResponse['TopicArn']
#create SQS queue
sqsQueueName="AmazonRekognitionQueue" + millis
self.sqs.create_queue(QueueName=sqsQueueName)
self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl']
attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl,
AttributeNames=['QueueArn'])['Attributes']
sqsQueueArn = attribs['QueueArn']
# Subscribe SQS queue to SNS topic
self.sns.subscribe(
TopicArn=self.snsTopicArn,
Protocol='sqs',
Endpoint=sqsQueueArn)
#Authorize SNS to write SQS queue
policy = """{{
"Version":"2012-10-17",
"Statement":[
{{
"Sid":"MyPolicy",
"Effect":"Allow",
"Principal" : {{"AWS" : "*"}},
"Action":"SQS:SendMessage",
"Resource": "{}",
"Condition":{{
"ArnEquals":{{
"aws:SourceArn": "{}"
}}
}}
}}
]
}}""".format(sqsQueueArn, self.snsTopicArn)
response = self.sqs.set_queue_attributes(
QueueUrl = self.sqsQueueUrl,
Attributes = {
'Policy' : policy
})
def DeleteTopicandQueue(self):
self.sqs.delete_queue(QueueUrl=self.sqsQueueUrl)
self.sns.delete_topic(TopicArn=self.snsTopicArn)
def main():
roleArn = ''
bucket = ''
video = ''
analyzer=VideoDetect(roleArn, bucket,video)
analyzer.CreateTopicandQueue()
analyzer.StartLabelDetection()
if analyzer.GetSQSMessageSuccess()==True:
analyzer.GetLabelDetectionResults()
analyzer.DeleteTopicandQueue()
if __name__ == "__main__":
main()
AWS 공식문서, youtube 튜토리얼 참고
728x90