Files
wixon_blog/app.py

266 lines
10 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, flash, session
import pymysql
from flask import session, g, jsonify
import bcrypt
from bs4 import BeautifulSoup
from werkzeug.utils import secure_filename
import os
import uuid
import re
from markupsafe import Markup
from jinja2 import filters
UPLOAD_FOLDER = 'static/upload/img' # 경로를 Flask 앱 루트 기준으로 수정
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
app = Flask(__name__)
app.secret_key = 'your secret key'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
def remove_html_tags(text):
clean = re.compile('<.*?>')
return re.sub(clean, '', text)
@app.before_request
def load_user():
if 'user_info' in session:
g.is_login = True
g.user_info = session['user_info']
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/upload_image', methods=['POST'])
def upload_image():
if 'file' not in request.files:
return jsonify(success=False, message='No file part'), 400
file = request.files['file']
if file.filename == '':
return jsonify(success=False, message='No selected file'), 400
if file and allowed_file(file.filename):
# Secure the filename and keep its extension
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower()
# Generate a random filename using uuid4
random_filename = f'{uuid.uuid4().hex}.{ext}'
file_path = os.path.join(app.config['UPLOAD_FOLDER'], random_filename)
# Create directories if not exist
os.makedirs(os.path.dirname(file_path), exist_ok=True)
file.save(file_path)
# Generate the URL of the saved image file
file_url = url_for('static', filename='upload/img/' + random_filename)
return jsonify(success=True, fileUrl=file_url), 200
return jsonify(success=False, message='File not allowed'), 400
# MySQL 데이터베이스 연결 설정
def connnect_db():
return pymysql.connect(
host='wxnasso.synology.me',
user='wixon5',
password='Wixon2022@!',
database='test',
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor, # DictCursor를 사용하여 딕셔너리 형태로 결과를 반환
init_command='SET SQL_SAFE_UPDATES = 0;',
)
def sql_execute(q, d, is_data=False, is_last_id=False):
data = None
last_id = None
with connnect_db().cursor(pymysql.cursors.DictCursor) as cursor:
try:
res = cursor.execute(q, d)
data = cursor.fetchall() if is_data else None
last_id = cursor.lastrowid if cursor.lastrowid != 0 else None
cursor.connection.commit()
except Exception as e:
print(e)
cursor.connection.rollback()
res = False
return (res, last_id if is_last_id else data) if is_data or is_last_id else res
@app.route('/')
def index():
if 'user_info' in session: # 로그인된 사용자
query = "SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date` FROM `blog` WHERE `use_yn` = 'Y' ORDER BY `add_date` DESC limit 6;"
r_query = "SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date` FROM `blog` WHERE `use_yn` = 'Y' ORDER BY RAND() DESC limit 1;"
else:
query = "SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date` FROM `blog` WHERE `use_yn` = 'Y' and `public_yn` = 'Y' ORDER BY `add_date` DESC limit 6;"
r_query = "SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date` FROM `blog` WHERE `use_yn` = 'Y' and `public_yn` = 'Y' ORDER BY RAND() DESC limit 1;"
r, posts = sql_execute(query, (), is_data=True)
r, random_post = sql_execute(r_query, (), is_data=True)
posts.append(random_post[0])
# 태그 제거 후 150자로 제한
for post in posts:
post['contents'] = remove_html_tags(post['contents'])[:150]
return render_template('index.html', posts=posts)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password').encode('utf-8')
# DB에서 사용자 정보 가져오기
res, user = sql_execute("SELECT * FROM member WHERE mb_id=%s", (username,), is_data=True)
if res and user and bcrypt.checkpw(password, user[0]['mb_passwd'].encode('utf-8')):
session['username'] = username
session['user_info'] = user[0]
return redirect(url_for('index'))
else:
flash('Username or password is incorrect')
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
g.is_login = False
g.user_info = None
return redirect(url_for('index'))
@app.route('/post/<int:post_id>')
def post(post_id):
# SQL 쿼리와 삽입할 데이터 설정
query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.id = %s"
data = (post_id,)
# Post를 데이터베이스에서 검색
result, fetched_data = sql_execute(query, data, is_data=True)
if result:
# 검색 성공시, 포스트 상세 페이지로 이동
post = fetched_data[0]
# 이전과 다음 포스트를 찾기
if 'user_info' in session: # 로그인된 사용자가 있을 경우
# 외부 공개 안된 글도 포함하여 모든 블로그 포스트를 가져옵니다.
prev_query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.add_date < %s AND blog.use_yn = 'Y' ORDER BY blog.add_date DESC LIMIT 1"
next_query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.add_date > %s AND blog.use_yn = 'Y' ORDER BY blog.add_date ASC LIMIT 1"
else: # 로그인된 사용자가 없을 경우
# 외부 공개된 블로그 포스트만 가져옵니다.
prev_query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.add_date < %s AND blog.public_yn = 'Y' AND blog.use_yn = 'Y' ORDER BY blog.add_date DESC LIMIT 1"
next_query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.add_date > %s AND blog.public_yn = 'Y' AND blog.use_yn = 'Y' ORDER BY blog.add_date ASC LIMIT 1"
prev_post = None
next_post = None
prev_result, prev_fetched_data = sql_execute(prev_query, (post['add_date'],), is_data=True)
next_result, next_fetched_data = sql_execute(next_query, (post['add_date'],), is_data=True)
if prev_result and prev_fetched_data:
prev_post = prev_fetched_data[0]
if next_result and next_fetched_data:
next_post = next_fetched_data[0]
return render_template('post.html', post=post, prev_post=prev_post, next_post=next_post)
else:
# 검색 실패시, 에러 메시지 반환
return "Failed to fetch the post", 500
@app.route('/edit_post/<int:post_id>', methods=['GET', 'POST'])
def edit_post(post_id):
if 'username' not in session or ('username' in session and session['username'] not in ['admin', 'wixon']):
flash('You are not allowed to edit this post.')
return redirect(url_for('index'))
if request.method == 'POST':
# 포스트 업데이트 로직
title = request.form.get('title')
category = request.form.get('category')
public_yn = 'Y' if request.form.get('public') == 'on' else 'N'
contents = request.form.get('contents')
query = "UPDATE blog SET title = %s, category = %s, public_yn = %s, contents = %s WHERE id = %s"
data = (title, category, public_yn, contents, post_id)
res = sql_execute(query, data)
if res:
flash('Post updated successfully.')
return redirect(url_for('post', post_id=post_id))
else:
flash('Failed to update the post.')
return redirect(url_for('edit_post', post_id=post_id))
else:
# 포스트 가져오기 로직
query = "SELECT * FROM blog WHERE id = %s"
data = (post_id,)
res, fetched_data = sql_execute(query, data, is_data=True)
if res and fetched_data:
return render_template('edit_post.html', post=fetched_data[0])
else:
flash('Failed to fetch the post.')
return redirect(url_for('index'))
@app.route('/write', methods=['GET', 'POST'])
def write():
if 'user_info' not in session:
flash("You need to login first.")
return redirect(url_for('login'))
if request.method == 'POST':
user_id = session['user_info']['mb_idx']
title = request.form['title']
category = request.form['category']
contents = request.form['contents']
is_public = 'Y' if request.form.get('public') == 'on' else 'N'
soup = BeautifulSoup(contents, 'html.parser')
first_image = soup.find('img')
thumbnail_img = first_image['src'] if first_image else None
query = "INSERT INTO blog (user_id, title, category, contents, thumbnail_img, public_yn) VALUES (%s, %s, %s, %s, %s, %s)"
data = (user_id, title, category, contents, thumbnail_img, is_public)
result, last_id = sql_execute(query, data, is_last_id=True)
if result:
return redirect(url_for('index'))
else:
return "Failed to write post", 500
else:
return render_template('write.html')
@app.route('/blog/<int:id>', methods=['DELETE'])
def delete_post(id):
query = "UPDATE `blog` SET `use_yn` = 'N' WHERE `id` = %s;"
res = sql_execute(query, (id,))
if res:
return jsonify(success=True, message='Post deleted successfully'), 200
else:
return jsonify(success=False, message='Could not delete the post'), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8899)