Part3_DB_RDB

패스트 캠퍼스에서 수강하는 데이터 엔지니어링 강의 내용의 정리본이다.

Part3_DB_RDB

목차

  • Step 1. AWS
  • Step 2. MySQL DB
  • Step 3. Spotify Data Model
  • Step 4. Pymysql
  • Step 5. 데이터 채우기

Step 1. AWS

Cloud Service : 네트워크 기반 서비스 형태로 제공

  • On-premise : 데이터 센터나 서버실에서 서버를 직접 관리하는 방식으로 전통적이고 널리 사용

  • Off-premise(cloud) : 필요한 리소스들을 인터넷을 통해 제공받아 사용한 만큼 비용을 지불하는 방식으로 기존 물리적인 형태의 시물 컴퓨팅 리소스를 네트워크 기반 서비스 형태로 제공하는 것

AWS : 크게 13가지 서비스로 구성되어 있으며, 다양한 상품이 존재

image

1) AWS 구조 예시 : 서비스 구성도

  • ELB(Elastic Load Balancing)

    Load balancer의 역할로 트래픽을 여러 대의 EC2로 분산하며 장애가 발생한 EC2를 감지해서 자동으로 배제

  • Cloud Watch

    AWS에 있는 서비스를 모니터링하며 ELB의 부하를 체크

image

2) RDS(Relational Database Service)

  • 관리형 관계형 데이터베이스 서비스로서, 고객이 선택할 수 있도록 Amazon Aurora, MySQL, SQL Server, PostgreSQL 등과 같은 총 6개의 익숙한 데이터베이스 엔진을 제공

3) S3(Simple Storage Service)

  • 이미지나 동영상 같은 파일을 저장하고, 사용자가 요청하면 제공
  • S3는 쿼리 지원 기능을 가진 유일한 클라우드 스토리지 솔루션으로, s3에 있는 데이터를 분석할 수 잇음

Step 2. MySQL DB

이제 AWS를 통해 MySQL을 생성할거다. 방법은 조금 복잡하긴 하지만 AWS에서 제공하는 가이드가 매우 상세히 나와있기에 이걸 참고하면 되겠다.

https://aws.amazon.com/ko/getting-started/hands-on/create-mysql-db/

Step 3. Spotify Data Model

자 이제 RDB인 MySQL도 만들었으니, 우리가 필요로 하는 데이터에 대한 테이블들을 만들어보자.

프로젝트에서 필요했던 테이블은 artists, artist_genres, top_tracks 그리고 audio_features 였다.

image

이 중에서 MySQL을 이용해 만들 테이블은 artists와 artist_gernes 테이블이다.

# DB 이름은 production
CREATE DATABASES production;
USE production;

# Artists table
CREATE TABLE artists (id VARCHAR(255), name VARCHAR(255), followers INTEGER, popularity INTEGER, url VARCHAR(255), image_url VARCHAR(255),
					  PRIMARY KEY(id)) ENGINE = InnoDB DEFAULT CHARSET = 'uft8';

# Artist_genres table
CREATE TABLE artist_genres (artist_id VARCHAR(255), genre VARCHAR(255),unique key (artist_id, genre)) 
							ENGINE = InnoDB DEFAULT CHARSET = 'uft8';

# Data 추가시 현재 시간을 갱신하여 추가
ALTER TABLE artist_genres ADD COLUMN update_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

SHOW tables;

artist_genres
artists

Step 4. Pymysql

이제 파이썬과 MySQL을 연결을 통해 sql 쿼리를 사용할 수 있게 해주는 Pymysql libraray를 사용해보자

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pymysql
import logging
import sys

# 연결해주기 위해 필요한 정보들
# MySQL 생성 당시 기입했던 정보들이다.
host = 'mydb.cl2fdw3ky5vb.ap-northeast-2.rds.amazonaws.com'
port = 3306
username = '~~'
database = 'production'
password = '~~'

# 연결이 안될 경우를 대비
try:
	# 글자 깨짐 현상 방지
	conn = pymysql.connect(host, user = username, passwd = password,db = database, 
						   port = port, use_unicode = True, charset = 'utf8')
	cursor = conn.cursor()
except:
	logging.error('could not connect to RDS')
	sys.exit(1) # 1 = fail

수행시 에러가 뜨지 않는다면 성공적으로 연결이 된것이다.

Step 5. 데이터 채우기

연결을 했으니 이제 API를 이용해서 데이터를 테이블에 쌓아보자

우리가 사용할 쿼리의 예시는 다음과 같다

INSERT INTO artists (id, name, followers, popularity, url, image_url)
VALUES ('{}', '{}', {}, {}, '{}', '{}')
# 중복키인 경우 제외하고 나머지를 업데이트하고 싶을 때
ON DUPLICATE KEY UPDATE id = '{}', name = '{}', followers = {}, popularity = {}, url = '{}', image_url = '{}' 

매번 다른 테이블이나 값들을 업데이트 할 때마다 쿼리를 적는 것은 비효율적이므로 해당 쿼리에 알맞는 함수를 정의하자

1
2
3
4
5
6
7
8
9
10
# data는 dictionary 형태
def insert_row(cursor, data, table):

	placeholders = ', '.join(['{}'] * len(data))
	columns = ', '.join(data.keys()) 
	key_placesholders = ', '.join(['{} = %s'.format(k) for k in data.keys()])
	sql = 'INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s' % (table, columns, placeholders, key_placeholders)
	# 2 쌍의 placeholders 존재
	cursor.excute(sql, list(data.values())*2)
	

SEARCH API 사용시 데이터의 구조는 다음과 같이 생겼다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
  "artists": {
    "href": "https://api.spotify.com/v1/search?query=tania+bowra&offset=0&limit=20&type=artist",
    "items": [ {
      "external_urls": {
        "spotify": "https://open.spotify.com/artist/08td7MxkoHQkXnWAYD8d6Q"
      },
      "genres": [ ],
      "href": "https://api.spotify.com/v1/artists/08td7MxkoHQkXnWAYD8d6Q",
      "id": "08td7MxkoHQkXnWAYD8d6Q",
      "images": [ {
        "height": 640,
        "url": "https://i.scdn.co/image/f2798ddab0c7b76dc2d270b65c4f67ddef7f6718",
        "width": 640
      }, {
        "height": 300,
        "url": "https://i.scdn.co/image/b414091165ea0f4172089c2fc67bb35aa37cfc55",
        "width": 300
      }, {
        "height": 64,
        "url": "https://i.scdn.co/image/8522fc78be4bf4e83fea8e67bb742e7d3dfe21b4",
        "width": 64
      } ],
      "name": "Tania Bowra",
      "popularity": 0,
      "type": "artist",
      "uri": "spotify:artist:08td7MxkoHQkXnWAYD8d6Q"
    } ],
    "limit": 20,
    "next": null,
    "offset": 0,
    "previous": null,
    "total": 1
  }
}

이제 API를 이용해서 테이블에 진짜로 쌓아보자 500 여명 정도의 아티스트 이름을 csv 파일로 먼저 정리를 해두었다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import sys
import requests
import base64
import json
import logging
import pymysql
import pandas as pd

def main():
	
	# connect to RDS DB
    try:
        conn = pymysql.connect(host, user = username, passwd = password, db = database, port = port, use_unicode = True, charset = 'utf8')
        cursor = conn.cursor()
    except:
        logging.error('could not connect to RDS')
        sys.exit(1) # 1 = fail
	
	# get access_token
	headers = get_headers(client_id, client_secret)
	
	# load csv file
	artist = pd.read_csv('artist_list.csv', encoding = 'utf-8')
	# DF to list
	artist_name = artist.values.tolist()
	artist_name = sum(artist_name, [])
	
	# Use spotify Search api 
	for name in artist_name:
		
		params = {
			'q' : name,
			'type' : 'artist',
			'limit' : '1'
		}

		r = requests.get("https://api.spotify.com/v1/search", params = params, headers = headers)
		# string to dictionary
		raw = json.loads(r.text)
		
		# update artist dictionary
		artist = {}
		try:
			artist_raw = raw['artist']['items'][0]
			
			# API 에 없는 artists를 방지하기 위함 
			if artist_raw['name'] == params['q']:
			   # update dictionary
			   artist.update(
					{
						'id' = artist_raw['id'],
						'name' = artist_raw['name'],
						'followers' = artist_raw['followers']['total'],
						'popularity' = artist_raw['popularity'],
						'url' = artist_raw['external_urls']['spotify'],
						'image_url' = artist_raw['images'][0]['url']
					}
				)
				# insert_row function
				insert_row(cursor, artist, 'artists')
		except: 
				logging.error('NO ITEMS FROM SEARCH API')
				continue
	# table에 적재
	conn.commit()

if __name__ == '__main__':
	main()

제대로 테이블에 데이터가 쌓였는지 확인해보자

1
2
3
SELECT COUNT(*) FROM artists;
498
SELECT * FROM artists LIMIT 10;

image

이번엔 artists 테이블의 id 값을 이용해 artist_genres 테이블을 채워보자 이전과 다른점은 batch 형식으로 좀 더 빠르게 채우는 것이다.

image

ID는 최대 50개까지 배치형식을 이용해 데이터를 한번에 불러올 수 있다. endpoint 형식은 다음과 같다

curl -X GET "https://api.spotify.com/v1/artists?ids=0oSGxfWSnnOXhD2fKuz2Gy,3dBVyJ7JuOMt4GE9607Qin" -H "Authorization: Bearer {your access token}"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import sys
import requests
import base64
import json
import logging
import pymysql

def main():

    try:
        conn = pymysql.connect(host, user = username, passwd = password, db = database, port = port, use_unicode = True, charset = 'utf8')
        cursor = conn.cursor()
    except:
        logging.error('could not connect to RDS')
        sys.exit(1) # 1 == fail

    headers = get_headers(client_id, client_secret)
	
	artist_ids = []
	# DB로부터 id 가져오기
	cursor.execute('SELECT id FROM artists')
	
	# fetchall : 데이터 적용
	for (id, ) in cursor.fetchall()
		artist_ids.append(id)
	
	# 50개 batch 형식으로 변환
	artist_batch = [artist_ids[i : i+50] for i in range(0, len(artist_batch), 50)]
	artist_genres = []
	
	for ids in artist_batch:
	
		ids = ','.join(ids)
		url = 'https://api.spotify.com/v1/artists?ids={}'.format(ids)

		r = requests.get(url, headers)
		raw = json.loads(r.text)
		
		# 1명의 아티스트가 여러개의 장르가 포함된 경우가 존재
		for artist in raw['artists']:
			for genre in artist['genres']:
				
				artist_genres.append(
					{
						'artist_id' : artist['id'],
						'genre' : genre
					}
				)
				
	for data in artist_genres:
		insert_row(cursor, data, 'artist_genres')

	conn.commit()

이렇게 MySQL 테이블인 artists, artist_geres를 모두 채웠다.