-
1
-
2
-
3
-
4
-
5
-
6
-
7
-
8
-
9
-
10
-
11
-
12
-
13
-
14
-
15
-
16
-
17
-
18
-
19
-
20
-
21
-
22
-
23
-
24
-
25
-
26
-
27
-
28
-
29
-
30
-
31
-
32
-
33
-
34
-
35
-
36
-
37
-
38
-
39
-
40
-
41
-
42
-
43
-
44
-
45
-
46
-
47
-
48
-
49
-
50
-
51
-
52
-
53
-
54
-
55
-
56
-
57
-
58
-
59
-
60
-
61
-
62
-
63
-
64
-
65
-
66
-
67
-
68
-
69
-
70
-
71
-
72
-
73
-
74
-
75
-
76
-
77
-
78
-
79
-
80
-
81
-
82
-
83
-
84
-
85
-
86
-
87
-
88
-
89
-
90
-
91
-
92
-
93
-
94
-
95
-
96
-
97
-
98
-
99
-
100
-
101
-
102
-
103
-
104
-
105
-
106
-
107
-
108
-
109
-
110
-
111
-
112
-
113
-
114
-
115
-
116
-
117
-
118
-
119
-
120
-
121
-
122
-
123
-
124
-
125
-
126
-
127
-
128
-
129
-
130
-
131
-
132
-
133
-
134
-
135
// Copyright 2025 Shota FUJI
//
// This source code is licensed under Zero-Clause BSD License.
// You can find a copy of the Zero-Clause BSD License at LICENSES/0BSD.txt
// You may also obtain a copy of the Zero-Clause BSD License at
// <https://opensource.org/license/0bsd>
//
// SPDX-License-Identifier: 0BSD
package users
import (
"context"
"database/sql"
"fmt"
"google.golang.org/protobuf/proto"
"pocka.jp/x/event_sourcing_user_management_poc/events"
"pocka.jp/x/event_sourcing_user_management_poc/gen/event"
"pocka.jp/x/event_sourcing_user_management_poc/gen/projection"
)
func GetProjection(db *sql.DB) (*projection.UsersProjection, int, error) {
ctx := context.Background()
var p projection.UsersProjection
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return nil, 0, fmt.Errorf("Failed to begin transaction for UsersProjection: %s", err)
}
defer tx.Rollback()
var eventSeq int
var payload []byte
err = tx.QueryRow("SELECT event_seq, payload FROM users_snapshots ORDER BY event_seq DESC LIMIT 1").Scan(&eventSeq, &payload)
if err == sql.ErrNoRows {
p = projection.UsersProjection{
Users: []*projection.User{},
}
eventSeq = -1
} else if err != nil {
return nil, 0, fmt.Errorf("Failed to get latest snapshot: %s", err)
} else {
if err := proto.Unmarshal(payload, &p); err != nil {
return nil, 0, fmt.Errorf("Failed to decode latest snapshot: %s", err)
}
}
stmt, err := tx.Prepare("SELECT seq, event_name, payload FROM user_events WHERE seq > ? ORDER BY seq ASC")
if err != nil {
return nil, 0, fmt.Errorf("Failed to prepare event fetching query: %s", err)
}
maxSeq := -1
rows, err := stmt.Query(eventSeq)
for rows.Next() {
ev, seq, err := events.ScanEvent(rows)
if err != nil {
return nil, 0, err
}
maxSeq = max(maxSeq, seq)
apply(ev, &p)
}
return &p, maxSeq, nil
}
func apply(ev proto.Message, p *projection.UsersProjection) {
switch v := ev.(type) {
case *event.UserCreated:
p.Users = append(p.Users, &projection.User{
Id: v.Id,
DisplayName: v.DisplayName,
Email: v.Email,
})
return
case *event.PasswordLoginConfigured:
if v.UserId == nil {
return
}
for _, user := range p.Users {
if *user.Id != *v.UserId {
continue
}
user.PasswordLogin = &projection.User_PasswordLogin{
Hash: v.PasswordHash,
Salt: v.Salt,
}
return
}
return
case *event.RoleAssigned:
if v.UserId == nil {
return
}
for _, user := range p.Users {
if *user.Id != *v.UserId {
continue
}
user.Role = v.Role
return
}
return
}
}
func SaveSnapshot(db *sql.DB) error {
p, seq, err := GetProjection(db)
if err != nil {
return err
}
stmt, err := db.Prepare("INSERT OR ABORT INTO users_snapshots (event_seq, payload) VALUES (?, ?)")
if err != nil {
return err
}
payload, err := proto.Marshal(p)
if err != nil {
return err
}
_, err = stmt.Exec(seq, payload)
return err
}