diff --git a/.gitignore b/.gitignore index a4b6d63..c39d755 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,5 @@ temp_data/ __pycache__/ .pytest_cache/ config.yaml -bin/ \ No newline at end of file +bin/ +*.db \ No newline at end of file diff --git a/src/components/pooRecorder/pooRecorder.go b/src/components/pooRecorder/pooRecorder.go index d98edf0..8383f3c 100644 --- a/src/components/pooRecorder/pooRecorder.go +++ b/src/components/pooRecorder/pooRecorder.go @@ -6,6 +6,8 @@ import ( "fmt" "net/http" "os" + "strconv" + "strings" "time" "log/slog" @@ -168,7 +170,7 @@ func migrateDb0To1(userVersion *int) { func initScheduler(mainScheduler *gocron.Scheduler) { scheduler = mainScheduler _, err := (*scheduler).NewJob(gocron.CronJob("0 5 * * *", false), gocron.NewTask( - per, + notionDbSync, )) if err != nil { slog.Error(fmt.Sprintln("PooRecorderInit Error creating scheduled task", err)) @@ -176,8 +178,130 @@ func initScheduler(mainScheduler *gocron.Scheduler) { } } -func per() { - slog.Info("PooRecorderInit Running scheduled task ") +func notionDbSync() { + slog.Info("PooRecorder Running DB sync with Notion..") + if !viper.InConfig("pooRecorder.tableId") { + slog.Warn("PooRecorder Table ID not found in config file, sync aborted") + return + } + tableId := viper.GetString("pooRecorder.tableId") + rowsNotion, err := notion.GetAllTableRows(tableId) + if err != nil { + slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to get table header", err)) + return + } + header := rowsNotion[0] + rowsNotion = rowsNotion[1:] // remove header + rowsDb, err := db.Query(`SELECT * FROM poo_records`) + rowsDbMap := make(map[string]pooStatusDbEntry) + if err != nil { + slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to get db rows", err)) + return + } + defer rowsDb.Close() + for rowsDb.Next() { + var row pooStatusDbEntry + err = rowsDb.Scan(&row.Timestamp, &row.Status, &row.Latitude, &row.Longitude) + if err != nil { + slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to scan db row", err)) + return + } + rowsDbMap[row.Timestamp] = row + } + // notion to db + counter := 0 + for _, rowNotion := range rowsNotion { + rowNotionTimestamp := rowNotion.TableRow.Cells[0][0].PlainText + "T" + rowNotion.TableRow.Cells[1][0].PlainText + rowNotionTime, err := time.ParseInLocation("2006-01-02T15:04", rowNotionTimestamp, time.Now().Location()) + if err != nil { + slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to parse timestamp", err)) + return + } + rowNotionTimeInDbFormat := rowNotionTime.UTC().Format("2006-01-02T15:04Z07:00") + _, exists := rowsDbMap[rowNotionTimeInDbFormat] + if !exists { + locationNotion := rowNotion.TableRow.Cells[3][0].PlainText + latitude, err := strconv.ParseFloat(strings.Split(locationNotion, ",")[0], 64) + if err != nil { + slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to parse latitude to float", err)) + return + } + longitude, err := strconv.ParseFloat(strings.Split(locationNotion, ",")[1], 64) + if err != nil { + slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to parse longitude to float", err)) + return + } + _, err = db.Exec(`INSERT INTO poo_records (timestamp, status, latitude, longitude) VALUES (?, ?, ?, ?)`, + rowNotionTimeInDbFormat, rowNotion.TableRow.Cells[2][0].PlainText, latitude, longitude) + if err != nil { + slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to insert new row", err)) + return + } + counter++ + } + } + slog.Info(fmt.Sprintln("PooRecorderSyncDb Inserted", counter, "new rows from Notion to DB")) + + // db to notion + counter = 0 + var rowsDbSlice []pooStatusDbEntry + rowsDb, err = db.Query(`SELECT * FROM poo_records ORDER BY timestamp DESC`) + if err != nil { + slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to get db rows", err)) + return + } + defer rowsDb.Close() + for rowsDb.Next() { + var row pooStatusDbEntry + err = rowsDb.Scan(&row.Timestamp, &row.Status, &row.Latitude, &row.Longitude) + if err != nil { + slog.Error(fmt.Sprintln("PooRecorderSyncDb Failed to scan db row", err)) + return + } + rowsDbSlice = append(rowsDbSlice, row) + } + startFromId := header.GetID().String() + for iNotion, iDb := 0, 0; iNotion < len(rowsNotion) || iDb < len(rowsDbSlice); { + notionTimeStamp := rowsNotion[iNotion].TableRow.Cells[0][0].PlainText + "T" + rowsNotion[iNotion].TableRow.Cells[1][0].PlainText + notionTime, err := time.ParseInLocation("2006-01-02T15:04", notionTimeStamp, time.Now().Location()) + if err != nil { + slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to parse notion timestamp", err)) + return + } + notionTimeStampInDbFormat := notionTime.UTC().Format("2006-01-02T15:04Z07:00") + dbTimeStamp := rowsDbSlice[iDb].Timestamp + dbTime, err := time.Parse("2006-01-02T15:04Z07:00", dbTimeStamp) + if err != nil { + slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to parse db timestamp", err)) + return + } + dbTimeLocal := dbTime.Local() + dbTimeDate := dbTimeLocal.Format("2006-01-02") + dbTimeTime := dbTimeLocal.Format("15:04") + if notionTimeStampInDbFormat == dbTimeStamp { + startFromId = rowsNotion[iNotion].GetID().String() + iNotion++ + iDb++ + continue + } + id, err := notion.WriteTableRow([]string{ + dbTimeDate, + dbTimeTime, + rowsDbSlice[iDb].Status, + fmt.Sprintf("%f,%f", + rowsDbSlice[iDb].Latitude, rowsDbSlice[iDb].Longitude)}, + tableId, + startFromId) + if err != nil { + slog.Warn(fmt.Sprintln("PooRecorderSyncDb Failed to write row to Notion", err)) + return + } + startFromId = id + iDb++ + counter++ + time.Sleep(400 * time.Millisecond) + } + slog.Info(fmt.Sprintln("PooRecorderSyncDb Inserted", counter, "new rows from DB to Notion")) } func storeStatus(record recordDetail, timestamp time.Time) error { @@ -201,7 +325,7 @@ func storeStatus(record recordDetail, timestamp time.Time) error { return } headerId := header[0].GetID() - err = notion.WriteTableRow([]string{recordDate, recordTime, record.Status, record.Latitude + "," + record.Longitude}, tableId, headerId.String()) + _, err = notion.WriteTableRow([]string{recordDate, recordTime, record.Status, record.Latitude + "," + record.Longitude}, tableId, headerId.String()) if err != nil { slog.Warn(fmt.Sprintln("HandleRecordPoo Failed to write table row", err)) } diff --git a/src/util/notion/notion.go b/src/util/notion/notion.go index c917e83..5076ef7 100644 --- a/src/util/notion/notion.go +++ b/src/util/notion/notion.go @@ -3,6 +3,8 @@ package notion import ( "context" "errors" + "fmt" + "log/slog" "github.com/jomei/notionapi" ) @@ -17,23 +19,85 @@ func GetClient() *notionapi.Client { return client } -func GetTableRows(tableId string, numberOfRows int, startFromId string) ([]notionapi.Block, error) { +func GetTableRows(tableId string, numberOfRows int, startFromId string) ([]notionapi.TableRowBlock, error) { if client == nil { return nil, errors.New("notion client not initialized") } - block, err := client.Block.GetChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.Pagination{ - StartCursor: notionapi.Cursor(startFromId), - PageSize: numberOfRows, - }) - if err != nil { - return nil, err + var rows []notionapi.TableRowBlock + var nextNumberToGet int + if numberOfRows > 100 { + nextNumberToGet = 100 + } else { + nextNumberToGet = numberOfRows } - return block.Results, nil + for numberOfRows > 0 { + block, err := client.Block.GetChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.Pagination{ + StartCursor: notionapi.Cursor(startFromId), + PageSize: nextNumberToGet, + }) + if err != nil { + return nil, err + } + for _, block := range block.Results { + if block.GetType().String() == "table_row" { + tableRow, ok := block.(*notionapi.TableRowBlock) + if !ok { + slog.Error("Notion.GetTableRows Failed to cast block to table row") + return nil, errors.New("Notion.GetTableRows failed to cast block to table row") + } + rows = append(rows, *tableRow) + } else { + slog.Error(fmt.Sprintf("Block ID %s is not a table row", block.GetID())) + return nil, errors.New("Notion.GetAllTableRows block ID is not a table row") + } + } + numberOfRows -= nextNumberToGet + if numberOfRows > 100 { + nextNumberToGet = 100 + } else { + nextNumberToGet = numberOfRows + } + } + return rows, nil } -func WriteTableRow(content []string, tableId string, after string) error { +func GetAllTableRows(tableId string) ([]notionapi.TableRowBlock, error) { if client == nil { - return errors.New("notion client not initialized") + return nil, errors.New("notion client not initialized") + } + rows := []notionapi.TableRowBlock{} + nextCursor := "" + hasMore := true + for hasMore { + blockChildren, err := client.Block.GetChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.Pagination{ + StartCursor: notionapi.Cursor(nextCursor), + PageSize: 100, + }) + if err != nil { + return nil, err + } + for _, block := range blockChildren.Results { + if block.GetType().String() == "table_row" { + tableRow, ok := block.(*notionapi.TableRowBlock) + if !ok { + slog.Error("Notion.GetAllTableRows Failed to cast block to table row") + return nil, errors.New("Notion.GetAllTableRows failed to cast block to table row") + } + rows = append(rows, *tableRow) + } else { + slog.Error(fmt.Sprintf("Block ID %s is not a table row", block.GetID())) + return nil, errors.New("Notion.GetAllTableRows block ID is not a table row") + } + } + nextCursor = blockChildren.NextCursor + hasMore = blockChildren.HasMore + } + return rows, nil +} + +func WriteTableRow(content []string, tableId string, after string) (string, error) { + if client == nil { + return "", errors.New("notion client not initialized") } rich := [][]notionapi.RichText{} for _, c := range content { @@ -56,10 +120,10 @@ func WriteTableRow(content []string, tableId string, after string) error { }, } - _, err := client.Block.AppendChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.AppendBlockChildrenRequest{ + res, err := client.Block.AppendChildren(context.Background(), notionapi.BlockID(tableId), ¬ionapi.AppendBlockChildrenRequest{ After: notionapi.BlockID(after), Children: []notionapi.Block{tableRow}, }) - return err + return res.Results[0].GetID().String(), err }