Skip to content
This repository was archived by the owner on Feb 4, 2021. It is now read-only.

Commit b2eb13b

Browse files
committed
Impl feed job
1 parent 094b270 commit b2eb13b

File tree

11 files changed

+224
-8
lines changed

11 files changed

+224
-8
lines changed

app/di/store_component.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/ProgrammingLab/prolab-accounts/app/config"
1313
"github.com/ProgrammingLab/prolab-accounts/infra/store"
14+
entrystore "github.com/ProgrammingLab/prolab-accounts/infra/store/entry"
1415
feedstore "github.com/ProgrammingLab/prolab-accounts/infra/store/feed"
1516
profilestore "github.com/ProgrammingLab/prolab-accounts/infra/store/profile"
1617
sessionstore "github.com/ProgrammingLab/prolab-accounts/infra/store/session"
@@ -25,6 +26,7 @@ type StoreComponent interface {
2526
ProfileStore(ctx context.Context) store.ProfileStore
2627
UserBlogStore(ctx context.Context) store.UserBlogStore
2728
FeedStore(ctx context.Context) store.FeedStore
29+
EntryStore(ctx context.Context) store.EntryStore
2830
}
2931

3032
// NewStoreComponent returns new store component
@@ -145,3 +147,7 @@ func (s *storeComponentImpl) UserBlogStore(ctx context.Context) store.UserBlogSt
145147
func (s *storeComponentImpl) FeedStore(ctx context.Context) store.FeedStore {
146148
return feedstore.NewFeedStore(ctx)
147149
}
150+
151+
func (s *storeComponentImpl) EntryStore(ctx context.Context) store.EntryStore {
152+
return entrystore.NewEntryStore(ctx, s.db)
153+
}

app/job/feed_job.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package job
2+
3+
import (
4+
"context"
5+
6+
"google.golang.org/grpc/grpclog"
7+
8+
"github.com/ProgrammingLab/prolab-accounts/app/di"
9+
)
10+
11+
func feedJob(ctx context.Context, store di.StoreComponent) error {
12+
bs := store.UserBlogStore(ctx)
13+
blogs, err := bs.ListUserBlogs()
14+
if err != nil {
15+
return err
16+
}
17+
18+
fs := store.FeedStore(ctx)
19+
es := store.EntryStore(ctx)
20+
for _, b := range blogs {
21+
if b.FeedURL == "" {
22+
continue
23+
}
24+
feed, err := fs.GetFeed(b.FeedURL)
25+
if err != nil {
26+
grpclog.Errorf("feed job: failed to get feed: blog id: %v : %+v", b.ID, err)
27+
continue
28+
}
29+
30+
n, err := es.CreateEntries(b, feed)
31+
if err != nil {
32+
return err
33+
}
34+
grpclog.Infof("feed job: created %v entries", n)
35+
}
36+
37+
return nil
38+
}

app/job/job.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package job
22

33
import (
4+
"context"
45
"time"
56

67
"google.golang.org/grpc/grpclog"
@@ -13,11 +14,13 @@ import (
1314
var (
1415
started = false
1516
stop = make(chan struct{})
16-
jobs = []Job{}
17+
jobs = []Job{
18+
feedJob,
19+
}
1720
)
1821

1922
// Job represents job for worker
20-
type Job func(store di.StoreComponent) error
23+
type Job func(ctx context.Context, store di.StoreComponent) error
2124

2225
// Start starts the worker
2326
func Start(store di.StoreComponent, cfg *config.Config) {
@@ -52,7 +55,7 @@ func run(store di.StoreComponent, interval time.Duration) {
5255
select {
5356
case <-time.After(interval):
5457
for _, j := range jobs {
55-
err := j(store)
58+
err := j(context.Background(), store)
5659
if err != nil {
5760
grpclog.Errorf("job error: %+v", err)
5861
}

db/blogs.schema

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ create_table :blogs, force: :cascade do |t|
33
t.string :feed_url, null: false
44
t.references :user, foreign_key: true, null: false
55
t.timestamps
6+
7+
t.index [:feed_url], unique: true
68
end
79

810
add_foreign_key :blogs, :users

db/entries.schema

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ create_table :entries, force: :cascade do |t|
77
t.string :guid, null: false
88
t.string :image_url, null: false
99
t.references :blog, foreign_key: true, null: false
10+
t.datetime :published_at, null: true
1011
t.timestamps
1112

1213
t.index [:guid], unique: true

infra/record/entries.go

Lines changed: 31 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

infra/record/entries_test.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

infra/store/entry/entry_store.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package entrystore
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"database/sql"
7+
"encoding/base64"
8+
"encoding/binary"
9+
10+
"github.com/mmcdole/gofeed"
11+
"github.com/pkg/errors"
12+
"github.com/volatiletech/null"
13+
"github.com/volatiletech/sqlboiler/boil"
14+
"github.com/volatiletech/sqlboiler/queries/qm"
15+
16+
"github.com/ProgrammingLab/prolab-accounts/app/util"
17+
"github.com/ProgrammingLab/prolab-accounts/infra/record"
18+
"github.com/ProgrammingLab/prolab-accounts/infra/store"
19+
)
20+
21+
type entryStoreImpl struct {
22+
ctx context.Context
23+
db *sql.DB
24+
}
25+
26+
// NewEntryStore returns new entry blog store
27+
func NewEntryStore(ctx context.Context, db *sql.DB) store.EntryStore {
28+
return &entryStoreImpl{
29+
ctx: ctx,
30+
db: db,
31+
}
32+
}
33+
34+
func (s *entryStoreImpl) CreateEntries(blog *record.Blog, feed *gofeed.Feed) (n int64, err error) {
35+
tx, err := s.db.Begin()
36+
if err != nil {
37+
return 0, errors.WithStack(err)
38+
}
39+
defer func() {
40+
if e := util.ErrorFromRecover(recover()); e != nil {
41+
_ = tx.Rollback()
42+
err = e
43+
}
44+
}()
45+
46+
mods := []qm.QueryMod{
47+
qm.Select(record.EntryColumns.ID, record.EntryColumns.GUID),
48+
qm.Where("blog_id = ?", blog.ID),
49+
}
50+
entries, err := record.Entries(mods...).All(s.ctx, tx)
51+
if err != nil {
52+
_ = tx.Rollback()
53+
return 0, errors.WithStack(err)
54+
}
55+
56+
exists := make(map[string]struct{})
57+
for _, e := range entries {
58+
exists[e.GUID] = struct{}{}
59+
}
60+
61+
n = 0
62+
for _, item := range feed.Items {
63+
guid, err := getGUID(blog.ID, item.GUID)
64+
if err != nil {
65+
_ = tx.Rollback()
66+
return 0, err
67+
}
68+
69+
_, ok := exists[guid]
70+
if ok {
71+
continue
72+
}
73+
74+
e := &record.Entry{
75+
Title: item.Title,
76+
Description: item.Description,
77+
Content: item.Content,
78+
Link: item.Link,
79+
AuthorID: blog.UserID,
80+
GUID: guid,
81+
BlogID: blog.ID,
82+
PublishedAt: null.TimeFromPtr(item.PublishedParsed),
83+
}
84+
if i := item.Image; i != nil {
85+
e.ImageURL = i.URL
86+
}
87+
88+
err = e.Insert(s.ctx, tx, boil.Infer())
89+
if err != nil {
90+
_ = tx.Rollback()
91+
return 0, errors.WithStack(err)
92+
}
93+
n++
94+
}
95+
96+
err = tx.Commit()
97+
if err != nil {
98+
_ = tx.Rollback()
99+
return 0, errors.WithStack(err)
100+
}
101+
return n, nil
102+
}
103+
104+
func getGUID(blogID int64, guid string) (string, error) {
105+
h := sha256.New()
106+
err := binary.Write(h, binary.LittleEndian, blogID)
107+
if err != nil {
108+
return "", errors.WithStack(err)
109+
}
110+
_, err = h.Write([]byte(guid))
111+
if err != nil {
112+
return "", errors.WithStack(err)
113+
}
114+
115+
return base64.RawURLEncoding.EncodeToString(h.Sum(nil)), nil
116+
}

infra/store/entry_store.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package store
2+
3+
import (
4+
"github.com/ProgrammingLab/prolab-accounts/infra/record"
5+
"github.com/mmcdole/gofeed"
6+
)
7+
8+
// EntryStore accesses entry data
9+
type EntryStore interface {
10+
CreateEntries(blog *record.Blog, feed *gofeed.Feed) (int64, error)
11+
}

infra/store/user_blog/user_blog_store.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ func NewUserBlogStore(ctx context.Context, db *sql.DB) store.UserBlogStore {
2626
}
2727
}
2828

29+
func (s *userBlogStoreImpl) ListUserBlogs() ([]*record.Blog, error) {
30+
b, err := record.Blogs().All(s.ctx, s.db)
31+
if err != nil {
32+
return nil, errors.WithStack(err)
33+
}
34+
return b, nil
35+
}
36+
2937
func (s *userBlogStoreImpl) GetUserBlog(blogID int64) (*record.Blog, error) {
3038
b, err := record.FindBlog(s.ctx, s.db, int64(blogID))
3139
if err != nil {
@@ -44,14 +52,15 @@ func (s *userBlogStoreImpl) CreateUserBlog(blog *record.Blog) error {
4452
return nil
4553
}
4654

47-
func (s *userBlogStoreImpl) UpdateUserBlog(blog *record.Blog) error {
55+
func (s *userBlogStoreImpl) UpdateUserBlog(blog *record.Blog) (err error) {
4856
tx, err := s.db.Begin()
4957
if err != nil {
5058
return errors.WithStack(err)
5159
}
5260
defer func() {
53-
if err = util.ErrorFromRecover(recover()); err != nil {
61+
if e := util.ErrorFromRecover(recover()); e != nil {
5462
_ = tx.Rollback()
63+
err = e
5564
}
5665
}()
5766

infra/store/user_blog_store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
// UserBlogStore accesses users data
88
type UserBlogStore interface {
9+
ListUserBlogs() ([]*record.Blog, error)
910
GetUserBlog(blogID int64) (*record.Blog, error)
1011
CreateUserBlog(blog *record.Blog) error
1112
UpdateUserBlog(blog *record.Blog) error

0 commit comments

Comments
 (0)