« Back to Index

[Golang AWS S3 Examples]

View original Gist on GitHub

Tags: #go #golang #aws #s3

1. basic list objects call.go

	sessionToken := "" // not required
	accessKey := "AWS_ACCESS_KEY_ID"
	secretKey := "AWS_SECRET_ACCESS_KEY"

	sess, err := session.NewSession(&aws.Config{
		Region:      aws.String("us-east-1"),
		Credentials: credentials.NewStaticCredentials(accessKey, secretKey, sessionToken),
	})
	if err != nil {
		log.Fatal("unable to create aws session")
	}

	svc := s3.New(sess)

	input := &s3.ListObjectsInput{
		Bucket:  aws.String("some_bucket_name"),
		MaxKeys: aws.Int64(2), // only return two results!
	}

	result, err := svc.ListObjects(input)
	if err != nil {
		if aerr, ok := err.(awserr.Error); ok {
			switch aerr.Code() {
			case s3.ErrCodeNoSuchBucket:
				fmt.Println(s3.ErrCodeNoSuchBucket, aerr.Error())
			default:
				fmt.Println(aerr.Error())
			}
		} else {
			// Print the error, cast err to awserr.Error to get the Code and
			// Message from an error.
			fmt.Println(err.Error())
		}
		return
	}

	fmt.Println(result)

2. concurrently list and delete large number of S3 objects.go

/*
Original written by Mark Gannaway...
https://gist.github.com/Ganners/86f23c2d121332a8b3968bf05d2f720a

Dry Runs:

# stage
go run main.go --bucket=<your_bucket_name> --profile=planz-stage

# prod
go run main.go --bucket=<your_bucket_name> --profile=planz-prod

Real Deletes:

# stage
go run main.go --bucket=<your_bucket_name> --profile=planz-stage --dryrun=false

# prod
go run main.go --bucket=<your_bucket_name> --profile=planz-prod --dryrun=false
*/

package main

import (
	"errors"
	"flag"
	"fmt"
	"log"
	"math/rand"
	"net/url"
	"strings"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
)

var (
	whitelist = map[string]struct{}{
		// Used for /wd/UserWidget (which is used for ads)
		"bids":    struct{}{},
		"cs":      struct{}{},
		"ct":      struct{}{},
		"network": struct{}{},
		"or":      struct{}{},
		"u":       struct{}{},
		"uo":      struct{}{},
		"wid":     struct{}{},

		// Keep
		"country":         struct{}{}, // Used for varying the country, usually used when fetching feeds
		"p":               struct{}{}, // Used for selecting page
		"page":            struct{}{}, // Used for selecting page
		"page_quantity":   struct{}{}, // Used for specifying page size
		"page_size":       struct{}{}, // Used for specifying page size
		"render_template": struct{}{}, // Used for next pages in, etc. /nifty?render_template=0&page=2
	}

	invalidSuffix = []string{
		".mobile.js",
		".mobile3.js",
	}

	invalidPrefix = []string{
		"/",
		"api/comments",
	}
)

const (
	deleteWorkers   = 100
	deleteBatchSize = 10
	listWorkers     = 1000
)

func main() {
	dryrun := true
	bucket := ""
	profile := ""
	{
		flag.BoolVar(&dryrun, "dryrun", true, "is this a dry run? false will execute deletes")
		flag.StringVar(&bucket, "bucket", "plan-z-stage-us-east-1", "what bucket to use")
		flag.StringVar(&profile, "profile", "planz", "what bucket to use")
		flag.Parse()
	}

	svc := s3.New(session.Must(session.NewSessionWithOptions(session.Options{
		AssumeRoleTokenProvider: stscreds.StdinTokenProvider,
		Config:                  aws.Config{Region: aws.String("us-east-1")},
		Profile:                 profile,
	})))

	prefixes := []string{"api/comments/stats", "/api/comments/stats", "?"}

	// build the prefix list, this is geared specifically towards speeding up
	// the plan z deletions
	for c1 := '0'; c1 <= '9'; c1++ {
		for c2 := '0'; c2 <= '9'; c2++ {
			for c3 := '0'; c3 <= '9'; c3++ {
				// it appears we can have those starting with a slash and not
				prefixes = append(prefixes, "api/comments/"+string(c1)+string(c2)+string(c3))
				prefixes = append(prefixes, "/api/comments/"+string(c1)+string(c2)+string(c3))
			}
		}
	}
	for c1 := '!'; c1 <= '~'; c1++ {
		for c2 := '!'; c2 <= '~'; c2++ {
			for c3 := '!'; c3 <= '~'; c3++ {
				prefix := string(c1) + string(c2) + string(c3)
				if prefix == "api" {
					continue
				}
				// it appears we can have those starting with a slash and not
				prefixes = append(prefixes, prefix)
			}
		}
	}

	// shuffle keys
	for i := range prefixes {
		j := rand.Intn(i + 1)
		prefixes[i], prefixes[j] = prefixes[j], prefixes[i]
	}

	// file list workers
	outputChan := NewListFilesWorkers(svc, bucket, prefixes, listWorkers)

	if dryrun {
		for output := range outputChan {
			// dryrun just prints
			fmt.Println(output)
		}
	} else {
		// file delete workers
		<-NewDeleteFilesWorkers(svc, bucket, outputChan, deleteWorkers)
	}
}

// DeleteFilesWorkers handles multiple workers to delete files
type DeleteFilesWorkers struct {
	svc      *s3.S3
	bucket   string
	keysChan chan string
}

// NewDeleteFilesWorkers will spawn and start a number of workers to handle
// deletion, will output to the returned channel when complete
func NewDeleteFilesWorkers(svc *s3.S3, bucket string, keysChan chan string, numWorkers int) chan struct{} {
	dfw := &DeleteFilesWorkers{
		svc:      svc,
		bucket:   bucket,
		keysChan: keysChan,
	}
	return dfw.Start(numWorkers)
}

// del will handle the actual deletion request to S3, retries up to 5 times
// with jittered exponential backoff
func (dfw *DeleteFilesWorkers) del(objects []*s3.ObjectIdentifier) error {
	if len(objects) == 0 {
		return nil
	}
	for attempt := 0; attempt < 5; attempt++ {
		if attempt > 0 {
			sleepJitter := time.Duration(rand.Intn(30))
			sleepSeconds := sleepJitter + time.Duration(attempt*attempt)
			time.Sleep(sleepSeconds * time.Second)
		}

		_, err := dfw.svc.DeleteObjects(&s3.DeleteObjectsInput{
			Bucket: aws.String(dfw.bucket),
			Delete: &s3.Delete{
				Objects: objects,
			},
		})

		if err == nil {
			return nil
		}
		log.Println("delete error", err)
	}
	return errors.New("unable to delete")
}

// Start will start a number of workers to handle file batch deletion
func (dfw *DeleteFilesWorkers) Start(workers int) chan struct{} {
	doneCh := make(chan struct{})
	go func() {
		wg := sync.WaitGroup{}
		for i := 0; i < workers; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				objects := make([]*s3.ObjectIdentifier, 0, deleteBatchSize)
				for key := range dfw.keysChan {
					objects = append(objects, &s3.ObjectIdentifier{
						Key: aws.String(key),
					})

					if len(objects) < deleteBatchSize {
						continue
					}

					err := dfw.del(objects)
					if err == nil {
						log.Println("successfully deleted", len(objects), "items")
						objects = make([]*s3.ObjectIdentifier, 0, deleteBatchSize)
					}
				}

				// if chan has been closed, trigger final delete
				dfw.del(objects)
			}()
		}
		wg.Wait()
		doneCh <- struct{}{}
	}()
	return doneCh
}

// ListFilesWorkers will start a number of workers to list files matching the
// pattern and then sending them to the outputChan
type ListFilesWorkers struct {
	svc        *s3.S3
	bucket     string
	prefixes   []string
	outputChan chan string
}

// NewListFilesWorkers will spawn a number of workers to iterate over the
// prefixes to divide up the work
func NewListFilesWorkers(svc *s3.S3, bucket string, prefixes []string, numWorkers int) chan string {
	lfw := &ListFilesWorkers{
		svc:        svc,
		bucket:     bucket,
		prefixes:   prefixes,
		outputChan: make(chan string),
	}

	go lfw.Start(numWorkers)

	return lfw.outputChan
}

// listObjects will handle looping and checking the contents of each key
func (lfw *ListFilesWorkers) listObjects(p *s3.ListObjectsOutput, last bool) bool {
	for _, obj := range p.Contents {
		if obj == nil {
			continue
		}

		// check if it is valid
		url, err := url.Parse(strings.Replace(*obj.Key, "%3F", "?", -1))
		if err != nil {
			// if there was an error, assume it's fine
			continue
		}

		invalidParams := []string{}
		query := url.Query()

		for _, suffix := range invalidSuffix {
			if strings.HasSuffix(*obj.Key, suffix) {
				goto delete
			}
		}
		for _, prefix := range invalidPrefix {
			if strings.HasPrefix(*obj.Key, prefix) {
				goto delete
			}
		}

		for key := range query {
			if _, ok := whitelist[key]; !ok {
				invalidParams = append(invalidParams, key)
			}
		}

		if len(invalidParams) == 0 {
			continue
		}

	delete:
		lfw.outputChan <- *obj.Key
	}

	return true
}

// Start will commence a the workers, should be called in a goroutine. Will
// close up the outputChan when it is finished
func (lfw *ListFilesWorkers) Start(workers int) {
	numPrefixes := len(lfw.prefixes)
	if numPrefixes < workers {
		workers = numPrefixes
	}

	splitPrefixes := [][]string{}
	for i, j := 0, 0; i <= numPrefixes; j, i = i, (i + numPrefixes/workers) {
		if i == 0 {
			continue
		}
		splitPrefixes = append(splitPrefixes, lfw.prefixes[j:i])
	}

	wg := &sync.WaitGroup{}
	wg.Add(len(splitPrefixes))
	for _, chunk := range splitPrefixes {
		go func(chunk []string) {
			defer wg.Done()
			for _, prefix := range chunk {
				err := lfw.svc.ListObjectsPages(
					&s3.ListObjectsInput{
						Bucket:  aws.String(lfw.bucket),
						Prefix:  aws.String(prefix),
						MaxKeys: aws.Int64(1000),
					},
					lfw.listObjects,
				)
				if err != nil {
					fmt.Println("failed to list objects", err)
				}
			}
		}(chunk)
	}
	wg.Wait()
	close(lfw.outputChan)
}