event_sourcing_user_management_poc

PoC for user management in Event Sourcing using SQLite3

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  18. 18
  19. 19
  20. 20
  21. 21
  22. 22
  23. 23
  24. 24
  25. 25
  26. 26
  27. 27
  28. 28
  29. 29
  30. 30
  31. 31
  32. 32
  33. 33
  34. 34
  35. 35
  36. 36
  37. 37
  38. 38
  39. 39
  40. 40
  41. 41
  42. 42
  43. 43
  44. 44
  45. 45
  46. 46
  47. 47
  48. 48
  49. 49
  50. 50
  51. 51
  52. 52
  53. 53
  54. 54
  55. 55
  56. 56
  57. 57
  58. 58
  59. 59
  60. 60
  61. 61
  62. 62
  63. 63
  64. 64
  65. 65
  66. 66
  67. 67
  68. 68
  69. 69
  70. 70
  71. 71
  72. 72
  73. 73
  74. 74
  75. 75
  76. 76
  77. 77
  78. 78
  79. 79
  80. 80
  81. 81
  82. 82
  83. 83
  84. 84
  85. 85
  86. 86
  87. 87
  88. 88
  89. 89
  90. 90
  91. 91
  92. 92
  93. 93
  94. 94
  95. 95
  96. 96
  97. 97
  98. 98
  99. 99
  100. 100
  101. 101
  102. 102
  103. 103
  104. 104
  105. 105
  106. 106
  107. 107
  108. 108
  109. 109
  110. 110
  111. 111
  112. 112
  113. 113
  114. 114
  115. 115
  116. 116
  117. 117
  118. 118
  119. 119
  120. 120
  121. 121
  122. 122
  123. 123
  124. 124
  125. 125
  126. 126
  127. 127
  128. 128
  129. 129
  130. 130
  131. 131
  132. 132
  133. 133
  134. 134
  135. 135
// Copyright 2025 Shota FUJI
//
// This source code is licensed under Zero-Clause BSD License.
// You can find a copy of the Zero-Clause BSD License at LICENSES/0BSD.txt
// You may also obtain a copy of the Zero-Clause BSD License at
// <https://opensource.org/license/0bsd>
//
// SPDX-License-Identifier: 0BSD

package users

import (
	"context"
	"database/sql"
	"fmt"

	"google.golang.org/protobuf/proto"

	"pocka.jp/x/event_sourcing_user_management_poc/events"
	"pocka.jp/x/event_sourcing_user_management_poc/gen/event"
	"pocka.jp/x/event_sourcing_user_management_poc/gen/projection"
)

func GetProjection(db *sql.DB) (*projection.UsersProjection, int, error) {
	ctx := context.Background()

	var p projection.UsersProjection

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		return nil, 0, fmt.Errorf("Failed to begin transaction for UsersProjection: %s", err)
	}
	defer tx.Rollback()

	var eventSeq int
	var payload []byte

	err = tx.QueryRow("SELECT event_seq, payload FROM users_snapshots ORDER BY event_seq DESC LIMIT 1").Scan(&eventSeq, &payload)
	if err == sql.ErrNoRows {
		p = projection.UsersProjection{
			Users: []*projection.User{},
		}
		eventSeq = -1
	} else if err != nil {
		return nil, 0, fmt.Errorf("Failed to get latest snapshot: %s", err)
	} else {
		if err := proto.Unmarshal(payload, &p); err != nil {
			return nil, 0, fmt.Errorf("Failed to decode latest snapshot: %s", err)
		}
	}

	stmt, err := tx.Prepare("SELECT seq, event_name, payload FROM user_events WHERE seq > ? ORDER BY seq ASC")
	if err != nil {
		return nil, 0, fmt.Errorf("Failed to prepare event fetching query: %s", err)
	}

	maxSeq := -1
	rows, err := stmt.Query(eventSeq)
	for rows.Next() {
		ev, seq, err := events.ScanEvent(rows)
		if err != nil {
			return nil, 0, err
		}

		maxSeq = max(maxSeq, seq)

		apply(ev, &p)
	}

	return &p, maxSeq, nil
}

func apply(ev proto.Message, p *projection.UsersProjection) {
	switch v := ev.(type) {
	case *event.UserCreated:
		p.Users = append(p.Users, &projection.User{
			Id:          v.Id,
			DisplayName: v.DisplayName,
			Email:       v.Email,
		})
		return
	case *event.PasswordLoginConfigured:
		if v.UserId == nil {
			return
		}

		for _, user := range p.Users {
			if *user.Id != *v.UserId {
				continue
			}

			user.PasswordLogin = &projection.User_PasswordLogin{
				Hash: v.PasswordHash,
				Salt: v.Salt,
			}
			return
		}
		return
	case *event.RoleAssigned:
		if v.UserId == nil {
			return
		}

		for _, user := range p.Users {
			if *user.Id != *v.UserId {
				continue
			}

			user.Role = v.Role
			return
		}
		return
	}
}

func SaveSnapshot(db *sql.DB) error {
	p, seq, err := GetProjection(db)
	if err != nil {
		return err
	}

	stmt, err := db.Prepare("INSERT OR ABORT INTO users_snapshots (event_seq, payload) VALUES (?, ?)")
	if err != nil {
		return err
	}

	payload, err := proto.Marshal(p)
	if err != nil {
		return err
	}

	_, err = stmt.Exec(seq, payload)

	return err
}