Add db notion bidirectional syncing
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -32,3 +32,4 @@ __pycache__/
|
|||||||
.pytest_cache/
|
.pytest_cache/
|
||||||
config.yaml
|
config.yaml
|
||||||
bin/
|
bin/
|
||||||
|
*.db
|
||||||
@@ -6,6 +6,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
@@ -168,7 +170,7 @@ func migrateDb0To1(userVersion *int) {
|
|||||||
func initScheduler(mainScheduler *gocron.Scheduler) {
|
func initScheduler(mainScheduler *gocron.Scheduler) {
|
||||||
scheduler = mainScheduler
|
scheduler = mainScheduler
|
||||||
_, err := (*scheduler).NewJob(gocron.CronJob("0 5 * * *", false), gocron.NewTask(
|
_, err := (*scheduler).NewJob(gocron.CronJob("0 5 * * *", false), gocron.NewTask(
|
||||||
per,
|
notionDbSync,
|
||||||
))
|
))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error(fmt.Sprintln("PooRecorderInit Error creating scheduled task", err))
|
slog.Error(fmt.Sprintln("PooRecorderInit Error creating scheduled task", err))
|
||||||
@@ -176,8 +178,130 @@ func initScheduler(mainScheduler *gocron.Scheduler) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func per() {
|
func notionDbSync() {
|
||||||
slog.Info("PooRecorderInit Running scheduled task ")
|
slog.Info("PooRecorder Running DB sync with Notion..")
|
||||||
|
if !viper.InConfig("pooRecorder.tableId") {
|
||||||
|
slog.Warn("PooRecorder Table ID not found in config file, sync aborted")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tableId := viper.GetString("pooRecorder.tableId")
|
||||||
|
rowsNotion, err := notion.GetAllTableRows(tableId)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to get table header", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
header := rowsNotion[0]
|
||||||
|
rowsNotion = rowsNotion[1:] // remove header
|
||||||
|
rowsDb, err := db.Query(`SELECT * FROM poo_records`)
|
||||||
|
rowsDbMap := make(map[string]pooStatusDbEntry)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to get db rows", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rowsDb.Close()
|
||||||
|
for rowsDb.Next() {
|
||||||
|
var row pooStatusDbEntry
|
||||||
|
err = rowsDb.Scan(&row.Timestamp, &row.Status, &row.Latitude, &row.Longitude)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to scan db row", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rowsDbMap[row.Timestamp] = row
|
||||||
|
}
|
||||||
|
// notion to db
|
||||||
|
counter := 0
|
||||||
|
for _, rowNotion := range rowsNotion {
|
||||||
|
rowNotionTimestamp := rowNotion.TableRow.Cells[0][0].PlainText + "T" + rowNotion.TableRow.Cells[1][0].PlainText
|
||||||
|
rowNotionTime, err := time.ParseInLocation("2006-01-02T15:04", rowNotionTimestamp, time.Now().Location())
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to parse timestamp", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rowNotionTimeInDbFormat := rowNotionTime.UTC().Format("2006-01-02T15:04Z07:00")
|
||||||
|
_, exists := rowsDbMap[rowNotionTimeInDbFormat]
|
||||||
|
if !exists {
|
||||||
|
locationNotion := rowNotion.TableRow.Cells[3][0].PlainText
|
||||||
|
latitude, err := strconv.ParseFloat(strings.Split(locationNotion, ",")[0], 64)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to parse latitude to float", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
longitude, err := strconv.ParseFloat(strings.Split(locationNotion, ",")[1], 64)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to parse longitude to float", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, err = db.Exec(`INSERT INTO poo_records (timestamp, status, latitude, longitude) VALUES (?, ?, ?, ?)`,
|
||||||
|
rowNotionTimeInDbFormat, rowNotion.TableRow.Cells[2][0].PlainText, latitude, longitude)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to insert new row", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
counter++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slog.Info(fmt.Sprintln("PooRecorderSyncDb Inserted", counter, "new rows from Notion to DB"))
|
||||||
|
|
||||||
|
// db to notion
|
||||||
|
counter = 0
|
||||||
|
var rowsDbSlice []pooStatusDbEntry
|
||||||
|
rowsDb, err = db.Query(`SELECT * FROM poo_records ORDER BY timestamp DESC`)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to get db rows", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rowsDb.Close()
|
||||||
|
for rowsDb.Next() {
|
||||||
|
var row pooStatusDbEntry
|
||||||
|
err = rowsDb.Scan(&row.Timestamp, &row.Status, &row.Latitude, &row.Longitude)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to scan db row", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rowsDbSlice = append(rowsDbSlice, row)
|
||||||
|
}
|
||||||
|
startFromId := header.GetID().String()
|
||||||
|
for iNotion, iDb := 0, 0; iNotion < len(rowsNotion) || iDb < len(rowsDbSlice); {
|
||||||
|
notionTimeStamp := rowsNotion[iNotion].TableRow.Cells[0][0].PlainText + "T" + rowsNotion[iNotion].TableRow.Cells[1][0].PlainText
|
||||||
|
notionTime, err := time.ParseInLocation("2006-01-02T15:04", notionTimeStamp, time.Now().Location())
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to parse notion timestamp", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
notionTimeStampInDbFormat := notionTime.UTC().Format("2006-01-02T15:04Z07:00")
|
||||||
|
dbTimeStamp := rowsDbSlice[iDb].Timestamp
|
||||||
|
dbTime, err := time.Parse("2006-01-02T15:04Z07:00", dbTimeStamp)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to parse db timestamp", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dbTimeLocal := dbTime.Local()
|
||||||
|
dbTimeDate := dbTimeLocal.Format("2006-01-02")
|
||||||
|
dbTimeTime := dbTimeLocal.Format("15:04")
|
||||||
|
if notionTimeStampInDbFormat == dbTimeStamp {
|
||||||
|
startFromId = rowsNotion[iNotion].GetID().String()
|
||||||
|
iNotion++
|
||||||
|
iDb++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
id, err := notion.WriteTableRow([]string{
|
||||||
|
dbTimeDate,
|
||||||
|
dbTimeTime,
|
||||||
|
rowsDbSlice[iDb].Status,
|
||||||
|
fmt.Sprintf("%f,%f",
|
||||||
|
rowsDbSlice[iDb].Latitude, rowsDbSlice[iDb].Longitude)},
|
||||||
|
tableId,
|
||||||
|
startFromId)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to write row to Notion", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
startFromId = id
|
||||||
|
iDb++
|
||||||
|
counter++
|
||||||
|
time.Sleep(400 * time.Millisecond)
|
||||||
|
}
|
||||||
|
slog.Info(fmt.Sprintln("PooRecorderSyncDb Inserted", counter, "new rows from DB to Notion"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func storeStatus(record recordDetail, timestamp time.Time) error {
|
func storeStatus(record recordDetail, timestamp time.Time) error {
|
||||||
@@ -201,7 +325,7 @@ func storeStatus(record recordDetail, timestamp time.Time) error {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
headerId := header[0].GetID()
|
headerId := header[0].GetID()
|
||||||
err = notion.WriteTableRow([]string{recordDate, recordTime, record.Status, record.Latitude + "," + record.Longitude}, tableId, headerId.String())
|
_, err = notion.WriteTableRow([]string{recordDate, recordTime, record.Status, record.Latitude + "," + record.Longitude}, tableId, headerId.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Warn(fmt.Sprintln("HandleRecordPoo Failed to write table row", err))
|
slog.Warn(fmt.Sprintln("HandleRecordPoo Failed to write table row", err))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ package notion
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
"github.com/jomei/notionapi"
|
"github.com/jomei/notionapi"
|
||||||
)
|
)
|
||||||
@@ -17,23 +19,85 @@ func GetClient() *notionapi.Client {
|
|||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTableRows(tableId string, numberOfRows int, startFromId string) ([]notionapi.Block, error) {
|
func GetTableRows(tableId string, numberOfRows int, startFromId string) ([]notionapi.TableRowBlock, error) {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
return nil, errors.New("notion client not initialized")
|
return nil, errors.New("notion client not initialized")
|
||||||
}
|
}
|
||||||
|
var rows []notionapi.TableRowBlock
|
||||||
|
var nextNumberToGet int
|
||||||
|
if numberOfRows > 100 {
|
||||||
|
nextNumberToGet = 100
|
||||||
|
} else {
|
||||||
|
nextNumberToGet = numberOfRows
|
||||||
|
}
|
||||||
|
for numberOfRows > 0 {
|
||||||
block, err := client.Block.GetChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.Pagination{
|
block, err := client.Block.GetChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.Pagination{
|
||||||
StartCursor: notionapi.Cursor(startFromId),
|
StartCursor: notionapi.Cursor(startFromId),
|
||||||
PageSize: numberOfRows,
|
PageSize: nextNumberToGet,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return block.Results, nil
|
for _, block := range block.Results {
|
||||||
|
if block.GetType().String() == "table_row" {
|
||||||
|
tableRow, ok := block.(*notionapi.TableRowBlock)
|
||||||
|
if !ok {
|
||||||
|
slog.Error("Notion.GetTableRows Failed to cast block to table row")
|
||||||
|
return nil, errors.New("Notion.GetTableRows failed to cast block to table row")
|
||||||
|
}
|
||||||
|
rows = append(rows, *tableRow)
|
||||||
|
} else {
|
||||||
|
slog.Error(fmt.Sprintf("Block ID %s is not a table row", block.GetID()))
|
||||||
|
return nil, errors.New("Notion.GetAllTableRows block ID is not a table row")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numberOfRows -= nextNumberToGet
|
||||||
|
if numberOfRows > 100 {
|
||||||
|
nextNumberToGet = 100
|
||||||
|
} else {
|
||||||
|
nextNumberToGet = numberOfRows
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rows, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func WriteTableRow(content []string, tableId string, after string) error {
|
func GetAllTableRows(tableId string) ([]notionapi.TableRowBlock, error) {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
return errors.New("notion client not initialized")
|
return nil, errors.New("notion client not initialized")
|
||||||
|
}
|
||||||
|
rows := []notionapi.TableRowBlock{}
|
||||||
|
nextCursor := ""
|
||||||
|
hasMore := true
|
||||||
|
for hasMore {
|
||||||
|
blockChildren, err := client.Block.GetChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.Pagination{
|
||||||
|
StartCursor: notionapi.Cursor(nextCursor),
|
||||||
|
PageSize: 100,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, block := range blockChildren.Results {
|
||||||
|
if block.GetType().String() == "table_row" {
|
||||||
|
tableRow, ok := block.(*notionapi.TableRowBlock)
|
||||||
|
if !ok {
|
||||||
|
slog.Error("Notion.GetAllTableRows Failed to cast block to table row")
|
||||||
|
return nil, errors.New("Notion.GetAllTableRows failed to cast block to table row")
|
||||||
|
}
|
||||||
|
rows = append(rows, *tableRow)
|
||||||
|
} else {
|
||||||
|
slog.Error(fmt.Sprintf("Block ID %s is not a table row", block.GetID()))
|
||||||
|
return nil, errors.New("Notion.GetAllTableRows block ID is not a table row")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nextCursor = blockChildren.NextCursor
|
||||||
|
hasMore = blockChildren.HasMore
|
||||||
|
}
|
||||||
|
return rows, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func WriteTableRow(content []string, tableId string, after string) (string, error) {
|
||||||
|
if client == nil {
|
||||||
|
return "", errors.New("notion client not initialized")
|
||||||
}
|
}
|
||||||
rich := [][]notionapi.RichText{}
|
rich := [][]notionapi.RichText{}
|
||||||
for _, c := range content {
|
for _, c := range content {
|
||||||
@@ -56,10 +120,10 @@ func WriteTableRow(content []string, tableId string, after string) error {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := client.Block.AppendChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.AppendBlockChildrenRequest{
|
res, err := client.Block.AppendChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.AppendBlockChildrenRequest{
|
||||||
After: notionapi.BlockID(after),
|
After: notionapi.BlockID(after),
|
||||||
Children: []notionapi.Block{tableRow},
|
Children: []notionapi.Block{tableRow},
|
||||||
})
|
})
|
||||||
|
|
||||||
return err
|
return res.Results[0].GetID().String(), err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user