实现类似于Twitter和其他社交网络的“粉丝”功能。
一、多对多关系。
在粉丝关系中,用户关注其他用户,只有一个用户实体,第二个实体也是用户。 一个类的实例被关联到同一个类的其他实例的关系被称为自引用关系。数据库表关系如下:
二、数据库模型的实现
followers关联表,app/models/user.py:
followers = db.Table('followers',
db.Column('follower_id', db.Integer, db.ForeignKey('user.id')),
db.Column('followed_id', db.Integer, db.ForeignKey('user.id'))
)
这是一个关联表。 请注意,我没有像我为用户和用户动态所做的那样,将表声明为模型。 因为这是一个除了外键没有其他数据的辅助表,所以创建它的时候没有关联到模型类。
在用户表中声明多对多的关系,app/models/user.py:
followed=db.relationship(
'User',secondary=followers,
primaryjoin=(followers.c.follower_id==id),
secondaryjoin=(followers.c.followed_id==id),
backref=db.backref('followers',lazy='dynamic'),lazy='dynamic'
)
db.relationship()所有的参数:
-
'User’是关系当中的右侧实体(将左侧实体看成是上级类)。由于这是自引用关系,所以我不得不在两侧都使用同一个实体。
-
secondary 指定了用于该关系的关联表,就是使用我在上面定义的followers。 primaryjoin 指明了通过关系表关联到左侧实体(关注者)的条件。关系中的左侧的join条件是关系表中的follower_id字段与这个关注者的用户ID匹配。followers.c.follower_id表达式引用了该关系表中的follower_id列。
-
secondaryjoin 指明了通过关系表关联到右侧实体(被关注者)的条件 。
这个条件与primaryjoin类似,唯一的区别在于,现在我使用关系表的字段的是followed_id了。 -
backref定义了右侧实体如何访问该关系。在左侧,关系被命名为followed,所以在右侧我将使用followers来表示所有左侧用户的列表,即粉丝列表。附加的lazy参数表示这个查询的执行模式,设置为动态模式的查询不会立即执行,直到被调用,这也是我设置用户动态一对多的关系的方式。
-
lazy和backref中的lazy类似,只不过当前的这个是应用于左侧实体,backref中的是应用于右侧实体。
同样,数据库的变更,需要记录到一个新的数据库迁移中:
(microblog) D:\pythonProgram\PycharmProjects\microblog>flask db migrate -m "followers"
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.autogenerate.compare] Detected added table 'followers'
Generating D:\pythonProgram\PycharmProjects\microblog\migrations\versions\47a337ed86bb_followers.py ... done
(microblog) D:\pythonProgram\PycharmProjects\microblog>flask db upgrade
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> 47a337ed86bb, followers
三、关注与取消关注的实现
在User模型中实现“follow”和“unfollow”方法,app/models/user.py:
class User(UserMixin, db.Model):
#...
def is_following(self,user):
return self.followed.filter(
followers.c.followed_id==user.id).count()>0
def follow(self,user):
if not self.is_following(user):
self.followed.append(user)
def unfollow(self,user):
if self.is_following(user):
self.followed.remove(user)
四、查看已关注用户的动态
首先看看下面的这个查询:
class User(db.Model):
#...
def followed_posts(self):
return Post.query.join(
followers, (followers.c.followed_id == Post.user_id)).filter(
followers.c.follower_id == self.id).order_by(
Post.timestamp.desc())
有三个主要部分,分别是join()、filter()和order_by(),他们都是SQLAlchemy查询对象的方法:
Post.query.join(...).filter(...).order_by(...)
联合查询
要理解join操作的功能,我们来看一个例子。 假设我有一个包含以下内容的User表:
id | username |
---|---|
1 | john |
2 | susan |
3 | mary |
4 | david |
为了简单起见,我只会保留用户模型的id和username字段以便进行查询,其他的都略去。
假设followers关系表中数据表达的是用户john关注用户susan和
david,用户susan关注用户mary,用户mary关注用户david。
这些的数据如下表所示:
follower_id | followed_id |
---|---|
1 | 2 |
1 | 4 |
2 | 3 |
3 | 4 |
最后,用户动态表中包含了每个用户的一条动态:
id | text | user_id |
---|---|---|
1 | post from susan | 2 |
2 | post from mary | 3 |
3 | post from david | 4 |
4 | post from john | 1 |
这张表也省略了一些不属于这个讨论范围的字段。
这是我为该查询再次设计的join()调用:
Post.query.join(followers, (followers.c.followed_id == Post.user_id))
我在用户动态表上调用join操作。 第一个参数是followers关联表,第二个参数是join条件。
我的这个调用表达的含义是我希望数据库创建一个临时表,它将用户动态表和关注者表中的数据结合在一起。 数据将根据参数传递的条件进行合并。
我使用的条件表示了followers关系表的followed_id字段必须等于用户动态表的user_id字段。
要执行此合并,数据库将从用户动态表(join的左侧)获取每条记录,并追加followers关系表(join的右侧)中的匹配条件的所有记录。
如果followers关系表中有多个记录符合条件,那么用户动态数据行将重复出现。
如果对于一个给定的用户动态,followers关系表中却没有匹配,那么该用户动态的记录不会出现在join操作的结果中。
利用我上面定义的示例数据,执行join操作的结果如下:
id | text | user_id | follower_id | followed_9id |
---|---|---|---|---|
1 | post from susan | 2 | 1 | 2 |
2 | post from mary | 3 | 2 | 3 |
3 | post from david | 4 | 1 | 4 |
3 | post from david | 4 | 3 | 4 |
注意user_id和followed_id列在所有数据行中都是相等的,因为这是join条件。
来自用户john的用户动态不会出现在临时表中,因为被关注列表中没有包含john用户,换句话说,没有任何人关注john。
而来自david的用户动态出现了两次,因为该用户有两个粉丝。
虽然创建了这个join操作,但却没有得到想要的结果。请继续看下去,因为这只是更大的查询的一部分。
过滤
Join操作给了我一个所有被关注用户的用户动态的列表,远超出我想要的那部分数据。
我只对这个列表的一个子集感兴趣——某个用户关注的用户们的动态,所以我需要用filter()来剔除所有我不需要的数据。
这是过滤部分的查询语句:
filter(followers.c.follower_id == self.id)
该查询是User类的一个方法,self.id表达式是指我感兴趣的用户的ID。filter()挑选临时表中follower_id列等于这个ID的行,换句话说,我只保留follower(粉丝)是该用户的数据。
假如我现在对id为1的用户john能看到的用户动态感兴趣,这是从临时表过滤后的结果:
id | text | user_id | follower_id | followed_9id |
---|---|---|---|---|
1 | post from susan | 2 | 1 | 2 |
3 | post from david | 4 | 1 | 4 |
这正是我想要的结果!
请记住,查询是从Post类中发出的,所以尽管我曾经得到了由数据库创建的一个临时表来作为查询的一部分,但结果将是包含在此临时表中的用户动态,
而不会存在由于执行join操作添加的其他列。 排序 查询流程的最后一步是对结果进行排序。这部分的查询语句如下:
order_by(Post.timestamp.desc())
在这里,我要说的是,我希望使用用户动态产生的时间戳按降序排列结果列表。排序之后,第一个结果将是最新的用户动态。
组合自身动态和关注的用户动态
扩展followed_posts()函数通过联合查询来并入用户自己的动态:
def followed_posts(self):
followed = Post.query.join(
followers, (followers.c.followed_id == Post.user_id)).filter(
followers.c.follower_id == self.id)
own = Post.query.filter_by(user_id=self.id)
return followed.union(own).order_by(Post.timestamp.desc())
五、对用户模型执行单元测试
Python包含一个非常有用的unittest包,可以轻松编写和执行单元测试。 让我们来为User类中的现有方法编写一些单元测试并存储到tests.py模块:
from datetime import datetime, timedelta
import unittest
from app import app, db
from app.models import User, Post
class UserModelCase(unittest.TestCase):
def setUp(self):
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://'
db.create_all()
def tearDown(self):
db.session.remove()
db.drop_all()
def test_password_hashing(self):
u = User(username='charlie1')
u.set_password('cat')
self.assertFalse(u.check_password('dog'))
self.assertTrue(u.check_password('cat'))
def test_follow(self):
u1 = User(username='john', email='john@example.com')
u2 = User(username='susan', email='susan@example.com')
db.session.add(u1)
db.session.add(u2)
db.session.commit()
self.assertEqual(u1.followed.all(), [])
self.assertEqual(u1.followers.all(), [])
u1.follow(u2)
db.session.commit()
self.assertTrue(u1.is_following(u2))
self.assertEqual(u1.followed.count(), 1)
self.assertEqual(u1.followed.first().username, 'susan')
self.assertEqual(u2.followers.count(), 1)
self.assertEqual(u2.followers.first().username, 'john')
u1.unfollow(u2)
db.session.commit()
self.assertFalse(u1.is_following(u2))
self.assertEqual(u1.followed.count(), 0)
self.assertEqual(u2.followers.count(), 0)
def test_follow_posts(self):
# create four users
u1 = User(username='john', email='john@example.com')
u2 = User(username='susan', email='susan@example.com')
u3 = User(username='mary', email='mary@example.com')
u4 = User(username='david', email='david@example.com')
db.session.add_all([u1, u2, u3, u4])
# create four posts
now = datetime.utcnow()
p1 = Post(body="post from john", author=u1,
timestamp=now + timedelta(seconds=1))
p2 = Post(body="post from susan", author=u2,
timestamp=now + timedelta(seconds=4))
p3 = Post(body="post from mary", author=u3,
timestamp=now + timedelta(seconds=3))
p4 = Post(body="post from david", author=u4,
timestamp=now + timedelta(seconds=2))
db.session.add_all([p1, p2, p3, p4])
db.session.commit()
# setup the followers
u1.follow(u2) # john follows susan
u1.follow(u4) # john follows david
u2.follow(u3) # susan follows mary
u3.follow(u4) # mary follows david
db.session.commit()
# check the followed posts of each user
f1 = u1.followed_posts().all()
f2 = u2.followed_posts().all()
f3 = u3.followed_posts().all()
f4 = u4.followed_posts().all()
self.assertEqual(f1, [p2, p4, p1])
self.assertEqual(f2, [p2, p3])
self.assertEqual(f3, [p3, p4])
self.assertEqual(f4, [p4])
if __name__ == '__main__':
unittest.main(verbosity=2)
使用以下命令运行整个测试组件:
Testing started at 20:15 ...
D:\PycharmProjects\flask\microblog\Scripts\python.exe "D:\Program Files\JetBrains\PyCharm 2020.1\plugins\python\helpers\pycharm\_jb_unittest_runner.py" --path D:/pythonProgram/PycharmProjects/microblog/test/test1.py
Launching unittests with arguments python -m unittest D:/pythonProgram/PycharmProjects/microblog/test/test1.py in D:\pythonProgram\PycharmProjects\microblog\test
[2021-07-13 20:15:07,035] INFO in __init__: Microblog startup
Process finished with exit code 0
Ran 3 tests in 0.524s
OK
六、在应用中集成
数据库和模型中粉丝机制的实现现在已经完成,现在将它集成到应用中。
让我们来添加两个新的路由和视图函数,它们提供了用户关注和取消关注的URL和逻辑实现,在 app/views/main.py中实现:
@main.route('/follow/<username>')
@login_required
def follow(username):
user = User.query.filter_by(username=username).first()
if user is None:
flash('User {} not found.'.format(username))
return redirect(url_for('index'))
if user == current_user:
flash('You cannot follow yourself!')
return redirect(url_for('auth.user', username=username))
current_user.follow(user)
db.session.commit()
flash('You are following {}!'.format(username))
return redirect(url_for('auth.user', username=username))
@main.route('/unfollow/<username>')
@login_required
def unfollow(username):
user = User.query.filter_by(username=username).first()
if user is None:
flash('User {} not found.'.format(username))
return redirect(url_for('index'))
if user == current_user:
flash('You cannot unfollow yourself!')
return redirect(url_for('auth.user', username=username))
current_user.unfollow(user)
db.session.commit()
flash('You are not following {}.'.format(username))
return redirect(url_for('auth.user', username=username))
添加这两个视图函数的路由到每个用户的个人主页中,以便其他用户执行关注和取消关注的操作,在app/templates/user.html中实现:
{% extends "base.html" %}
{% block app_content %}
<table>
<tr valign="top">
<td><img src="{{ user.avatar(128) }}"></td>
<td>
<h1>User: {{ user.username }}</h1>
{% if user.about_me %}
<p>{{ user.about_me }}</p>
{% endif %}
{% if user.last_seen %}
<p>Last seen on: {{ user.last_seen }}</p>
{% endif %}
<p>{{ user.followers.count() }} followers, {{ user.followed.count() }} following.</p>
{% if user == current_user %}
<p><a href="{{ url_for('auth.edit_profile') }}">Edit your profile</a></p>
{% elif not current_user.is_following(user) %}
<p><a href="{{ url_for('main.follow', username=user.username) }}">Follow</a></p>
{% else %}
<p><a href="{{ url_for('main.unfollow', username=user.username) }}">Unfollow</a></p>
{% endif %}
</td>
</tr>
</table>
<hr>
{% for post in posts %}
{% include '_post.html' %}
{% endfor %}
{% endblock %}
现在运行应用,测试一下吧!