Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix bug
  • Loading branch information
PeterChen13579 committed Oct 6, 2024
commit 0c5205b73d088b33f21edfdac83474d2d7b3be15
178 changes: 91 additions & 87 deletions tools/cassandra_delete_range/cassandra_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,100 +70,15 @@ var (
func main() {
log.SetOutput(os.Stdout)

cmd := strings.Join(os.Args[1:], " ")
command := kingpin.MustParse(app.Parse(os.Args[1:]))
cluster, err := prepareDb(hosts)
if err != nil {
log.Fatal(err)
}

cmd := strings.Join(os.Args[1:], " ")
if *resume {
// format of file continue.txt is
/*
Previous user command (must match the same command to resume deletion)
Table name (ie. objects, ledger_hashes etc)
Deletion method (ie. token_range). We don't store ledger_range because these tables are extremely small
Values of token_ranges (each pair of values seperated line by line) or just a single int for ledger_range
*/

file, err := os.Open("continue.txt")
if err != nil {
log.Fatal("continue.txt does not exist. Aborted")
}
defer file.Close()

if err != nil {
log.Fatalf("Failed to open file: %v", err)
}
scanner := bufio.NewScanner(file)
scanner.Scan()

// --resume must be last flag passed; so can check command matches
if os.Args[len(os.Args)-1] != "--resume" {
log.Fatal("--resume must be the last flag passed")
}

// get rid of --resume at the end
cmd = strings.Join(os.Args[1:len(os.Args)-1], " ")

// makes sure command that got aborted matches the user command they enter
if scanner.Text() != cmd {
log.Fatalf("File continue.txt has %s command stored. \n You provided %s which does not match. \n Aborting...", scanner.Text(), cmd)
}

scanner.Scan()
// skip the neccessary tables based on where the program aborted
// for example if account_tx, all tables before account_tx
// should be already deleted so we skip for deletion
switch scanner.Text() {
case "account_tx":
*skipLedgersTable = true
fallthrough
case "ledgers":
*skipLedgerTransactionsTable = true
fallthrough
case "ledger_transactions":
*skipDiffTable = true
fallthrough
case "diff":
*skipTransactionsTable = true
fallthrough
case "transactions":
*skipLedgerHashesTable = true
fallthrough
case "ledger_hashes":
*skipObjectsTable = true
fallthrough
case "objects":
*skipSuccessorTable = true
}

scanner.Scan()
if scanner.Text() == "token_range" {
rangeRead := make(map[int64]int64)

// now go through all the ledger range and load it to a set
for scanner.Scan() {
line := scanner.Text()
Range := strings.Split(line, ",")
if len(Range) != 2 {
log.Fatalf("Range is not two integers. %s . Aborting...", Range)
}
startStr := strings.TrimSpace(Range[0])
endStr := strings.TrimSpace(Range[1])

// convert string to int64
start, err1 := strconv.ParseInt(startStr, 10, 64)
end, err2 := strconv.ParseInt(endStr, 10, 64)

if err1 != nil || err2 != nil {
log.Fatalf("Error converting integer: %s, %s", err1, err2)
}
rangeRead[start] = end
}
ledgerOrTokenRange = &util.StoredRange{}
ledgerOrTokenRange.TokenRange = maybe.Set(rangeRead)
}
prepareResume(&cmd)
}

clioCass := cass.NewClioCass(&cass.Settings{
Expand Down Expand Up @@ -310,3 +225,92 @@ func prepareDb(dbHosts *string) (*gocql.ClusterConfig, error) {

return cluster, nil
}

func prepareResume(cmd *string) {
// format of file continue.txt is
/*
Previous user command (must match the same command to resume deletion)
Table name (ie. objects, ledger_hashes etc)
Deletion method (ie. token_range). We don't store ledger_range because these tables are extremely small
Values of token_ranges (each pair of values seperated line by line) or just a single int for ledger_range
*/

file, err := os.Open("continue.txt")
if err != nil {
log.Fatal("continue.txt does not exist. Aborted")
}
defer file.Close()

if err != nil {
log.Fatalf("Failed to open file: %v", err)
}
scanner := bufio.NewScanner(file)
scanner.Scan()

// --resume must be last flag passed; so can check command matches
if os.Args[len(os.Args)-1] != "--resume" {
log.Fatal("--resume must be the last flag passed")
}

// get rid of --resume at the end
*cmd = strings.Join(os.Args[1:len(os.Args)-1], " ")

// makes sure command that got aborted matches the user command they enter
if scanner.Text() != *cmd {
log.Fatalf("File continue.txt has %s command stored. \n You provided %s which does not match. \n Aborting...", scanner.Text(), *cmd)
}

scanner.Scan()
// skip the neccessary tables based on where the program aborted
// for example if account_tx, all tables before account_tx
// should be already deleted so we skip for deletion
switch scanner.Text() {
case "account_tx":
*skipLedgersTable = true
fallthrough
case "ledgers":
*skipLedgerTransactionsTable = true
fallthrough
case "ledger_transactions":
*skipDiffTable = true
fallthrough
case "diff":
*skipTransactionsTable = true
fallthrough
case "transactions":
*skipLedgerHashesTable = true
fallthrough
case "ledger_hashes":
*skipObjectsTable = true
fallthrough
case "objects":
*skipSuccessorTable = true
}

scanner.Scan()
if scanner.Text() == "token_range" {
rangeRead := make(map[int64]int64)

// now go through all the ledger range and load it to a set
for scanner.Scan() {
line := scanner.Text()
Range := strings.Split(line, ",")
if len(Range) != 2 {
log.Fatalf("Range is not two integers. %s . Aborting...", Range)
}
startStr := strings.TrimSpace(Range[0])
endStr := strings.TrimSpace(Range[1])

// convert string to int64
start, err1 := strconv.ParseInt(startStr, 10, 64)
end, err2 := strconv.ParseInt(endStr, 10, 64)

if err1 != nil || err2 != nil {
log.Fatalf("Error converting integer: %s, %s", err1, err2)
}
rangeRead[start] = end
}
ledgerOrTokenRange = &util.StoredRange{}
ledgerOrTokenRange.TokenRange = maybe.Set(rangeRead)
}
}
9 changes: 4 additions & 5 deletions tools/cassandra_delete_range/internal/cass/cass.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *ClioCass) DeleteBefore(ledgerIdx uint64) {
log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...")
}

if latestLedgerIdxInDB <= ledgerIdx {
if latestLedgerIdxInDB < ledgerIdx {
log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...")
}

Expand All @@ -104,7 +104,7 @@ func (c *ClioCass) DeleteAfter(ledgerIdx uint64) {

log.Printf("DB ledger range is %d -> %d\n", firstLedgerIdxInDB, latestLedgerIdxInDB)

if firstLedgerIdxInDB >= ledgerIdx {
if firstLedgerIdxInDB > ledgerIdx {
log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...")
}

Expand Down Expand Up @@ -570,7 +570,7 @@ func (c *ClioCass) prepareDeleteQueries(
sessionCreationWaitGroup.Add(c.settings.WorkerCount)

for i := 0; i < c.settings.WorkerCount; i++ {
go func(q string) {
go func(q string, info deleteInfo) {
defer wg.Done()

var session *gocql.Session
Expand All @@ -588,7 +588,6 @@ func (c *ClioCass) prepareDeleteQueries(
if value, exists := c.settings.RangesRead.TokenRange.Value()[r.StartRange]; exists {
// Check for end range
if value == r.EndRange {
//delete(c.settings.RangesRead.TokenRange.Value(), r.StartRange)
continue
}
}
Expand Down Expand Up @@ -651,7 +650,7 @@ func (c *ClioCass) prepareDeleteQueries(
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
atomic.AddUint64(&totalErrors, 1)
}
}(queryTemplate)
}(queryTemplate, info)
}

wg.Wait()
Expand Down