Skip to content

Commit e90d54d

Browse files
committed
git/githistory: introduce subpackage 'log'
1 parent ca31ac0 commit e90d54d

File tree

7 files changed

+446
-0
lines changed

7 files changed

+446
-0
lines changed

git/githistory/log/log.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package log
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"io/ioutil"
7+
"strings"
8+
"sync"
9+
10+
"github.com/git-lfs/git-lfs/tools"
11+
"github.com/olekukonko/ts"
12+
)
13+
14+
// Logger logs a series of tasks to an io.Writer, processing each task in order
15+
// until completion .
16+
type Logger struct {
17+
// sink is the writer to write to.
18+
sink io.Writer
19+
20+
// widthFn is a function that returns the width of the terminal that
21+
// this logger is running within.
22+
widthFn func() int
23+
24+
// queue is the incoming, unbuffered queue of tasks to enqueue.
25+
queue chan Task
26+
// tasks is the set of tasks to process.
27+
tasks chan Task
28+
// wg is a WaitGroup that is incremented when new tasks are enqueued,
29+
// and decremented when tasks finish.
30+
wg *sync.WaitGroup
31+
}
32+
33+
// NewLogger retuns a new *Logger instance that logs to "sink" and uses the
34+
// current terminal width as the width of the line.
35+
func NewLogger(sink io.Writer) *Logger {
36+
if sink == nil {
37+
sink = ioutil.Discard
38+
}
39+
40+
l := &Logger{
41+
sink: sink,
42+
widthFn: func() int {
43+
size, err := ts.GetSize()
44+
if err != nil {
45+
return 80
46+
}
47+
return size.Col()
48+
},
49+
queue: make(chan Task),
50+
tasks: make(chan Task),
51+
wg: new(sync.WaitGroup),
52+
}
53+
54+
go l.consume()
55+
56+
return l
57+
}
58+
59+
// Close closes the queue and does not allow new Tasks to be `enqueue()`'d. It
60+
// waits until the currently running Task has completed.
61+
func (l *Logger) Close() {
62+
close(l.queue)
63+
64+
l.wg.Wait()
65+
}
66+
67+
// Waitier creates and enqueues a new *WaitingTask.
68+
func (l *Logger) Waiter(msg string) *WaitingTask {
69+
t := NewWaitingTask(msg)
70+
l.enqueue(t)
71+
72+
return t
73+
}
74+
75+
// Percentage creates and enqueues a new *PercentageTask.
76+
func (l *Logger) Percentage(msg string, total uint64) *PercentageTask {
77+
t := NewPercentageTask(msg, total)
78+
l.enqueue(t)
79+
80+
return t
81+
}
82+
83+
// enqueue enqueues the given Tasks "ts".
84+
func (l *Logger) enqueue(ts ...Task) {
85+
if l == nil {
86+
for _, t := range ts {
87+
go func() {
88+
for range <-t.Updates() {
89+
// Discard all updates.
90+
}
91+
}()
92+
}
93+
return
94+
}
95+
96+
l.wg.Add(len(ts))
97+
for _, t := range ts {
98+
l.queue <- t
99+
}
100+
}
101+
102+
// consume creates a pseudo-infinte buffer between the incoming set of tasks and
103+
// the queue of tasks to work on.
104+
func (l *Logger) consume() {
105+
go func() {
106+
// Process the single next task in sequence until completion,
107+
// then consume the next task.
108+
for task := range l.tasks {
109+
l.logTask(task)
110+
}
111+
}()
112+
113+
pending := make([]Task, 0)
114+
115+
L:
116+
for {
117+
// If there is a pending task, "peek" it off of the set of
118+
// pending tasks.
119+
var next Task
120+
if len(pending) > 0 {
121+
next = pending[0]
122+
}
123+
124+
if next == nil {
125+
// If there was no pending task, wait for either a)
126+
// l.queue to close, or b) a new task to be submitted.
127+
task, ok := <-l.queue
128+
if !ok {
129+
// If the queue is closed, no more new tasks may
130+
// be added.
131+
break L
132+
}
133+
134+
// Otherwise, add a new task to the set of tasks to
135+
// process immediately, since there is no current
136+
// buffer.
137+
l.tasks <- task
138+
} else {
139+
// If there is a pending task, wait for either a) a
140+
// write to process the task to become non-blocking, or
141+
// b) a new task to enter the queue.
142+
select {
143+
case task, ok := <-l.queue:
144+
if !ok {
145+
// If the queue is closed, no more tasks
146+
// may be added.
147+
break L
148+
}
149+
// Otherwise, add the next task to the set of
150+
// pending, active tasks.
151+
pending = append(pending, task)
152+
case l.tasks <- next:
153+
// Or "pop" the peeked task off of the pending
154+
// set.
155+
pending = pending[1:]
156+
}
157+
}
158+
}
159+
160+
close(l.tasks)
161+
}
162+
163+
// logTask logs the set of updates from a given task to the sink, then logs a
164+
// "done" message, and then marks the task as done.
165+
func (l *Logger) logTask(task Task) {
166+
defer l.wg.Done()
167+
168+
var last string
169+
for last = range task.Updates() {
170+
l.logLine(last)
171+
}
172+
173+
l.log(fmt.Sprintf("%s, done\n", last))
174+
}
175+
176+
// logLine writes a complete line and moves the cursor to the beginning of the
177+
// line.
178+
//
179+
// It returns the number of bytes "n" written to the sink and the error "err",
180+
// if one was encountered.
181+
func (l *Logger) logLine(str string) (n int, err error) {
182+
padding := strings.Repeat(" ", tools.MaxInt(0, l.widthFn()-len(str)))
183+
184+
return l.log(str + padding + "\r")
185+
}
186+
187+
// log writes a string verbatim to the sink.
188+
//
189+
// It returns the number of bytes "n" written to the sink and the error "err",
190+
// if one was encountered.
191+
func (l *Logger) log(str string) (n int, err error) {
192+
return fmt.Fprint(l.sink, str)
193+
}

git/githistory/log/log_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package log
2+
3+
import (
4+
"bytes"
5+
"strings"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
type ChanTask chan string
12+
13+
func (e ChanTask) Updates() <-chan string { return e }
14+
15+
func TestLoggerLogsTasks(t *testing.T) {
16+
var buf bytes.Buffer
17+
18+
task := make(chan string)
19+
go func() {
20+
task <- "first"
21+
task <- "second"
22+
close(task)
23+
}()
24+
25+
l := NewLogger(&buf)
26+
l.widthFn = func() int { return 0 }
27+
l.enqueue(ChanTask(task))
28+
l.Close()
29+
30+
assert.Equal(t, "first\rsecond\rsecond, done\n", buf.String())
31+
}
32+
33+
func TestLoggerLogsMultipleTasksInOrder(t *testing.T) {
34+
var buf bytes.Buffer
35+
36+
t1 := make(chan string)
37+
go func() {
38+
t1 <- "first"
39+
t1 <- "second"
40+
close(t1)
41+
}()
42+
t2 := make(chan string)
43+
go func() {
44+
t2 <- "third"
45+
t2 <- "fourth"
46+
close(t2)
47+
}()
48+
49+
l := NewLogger(&buf)
50+
l.widthFn = func() int { return 0 }
51+
l.enqueue(ChanTask(t1), ChanTask(t2))
52+
l.Close()
53+
54+
assert.Equal(t, strings.Join([]string{
55+
"first\r",
56+
"second\r",
57+
"second, done\n",
58+
"third\r",
59+
"fourth\r",
60+
"fourth, done\n",
61+
}, ""), buf.String())
62+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package log
2+
3+
import (
4+
"fmt"
5+
"sync/atomic"
6+
)
7+
8+
// PercentageTask is a task that is performed against a known number of
9+
// elements.
10+
type PercentageTask struct {
11+
// msg is the task message.
12+
msg string
13+
// n is the number of elements whose work has been completed. It is
14+
// managed sync/atomic.
15+
n uint64
16+
// total is the total number of elements to execute work upon.
17+
total uint64
18+
// ch is a channel which is written to when the task state changes and
19+
// is closed when the task is completed.
20+
ch chan string
21+
}
22+
23+
func NewPercentageTask(msg string, total uint64) *PercentageTask {
24+
p := &PercentageTask{
25+
msg: msg,
26+
total: total,
27+
ch: make(chan string, 1),
28+
}
29+
p.Count(0)
30+
31+
return p
32+
}
33+
34+
// Count indicates that work has been completed against "n" number of elements,
35+
// marking the task as complete if the total "n" given to all invocations of
36+
// this method is equal to total.
37+
//
38+
// Count returns the new total number of (atomically managed) elements that have
39+
// been completed.
40+
func (c *PercentageTask) Count(n uint64) (new uint64) {
41+
new = atomic.AddUint64(&c.n, n)
42+
43+
percentage := 100 * float64(new) / float64(c.total)
44+
msg := fmt.Sprintf("%s: %3.f%% (%d/%d)",
45+
c.msg, percentage, new, c.total)
46+
47+
c.ch <- msg
48+
49+
if new >= c.total {
50+
close(c.ch)
51+
}
52+
53+
return new
54+
}
55+
56+
// Updates implements Task.Updates and returns a channel which is written to
57+
// when the state of this task changes, and closed when the task is completed.
58+
// has been completed.
59+
func (c *PercentageTask) Updates() <-chan string {
60+
return c.ch
61+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package log
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestPercentageTaskCalculuatesPercentages(t *testing.T) {
10+
task := NewPercentageTask("example", 10)
11+
12+
assert.Equal(t, "example: 0% (0/10)", <-task.Updates())
13+
14+
n := task.Count(3)
15+
assert.EqualValues(t, 3, n)
16+
17+
assert.Equal(t, "example: 30% (3/10)", <-task.Updates())
18+
}
19+
20+
func TestPercentageTaskCallsDoneWhenComplete(t *testing.T) {
21+
task := NewPercentageTask("example", 10)
22+
23+
select {
24+
case v, ok := <-task.Updates():
25+
if ok {
26+
assert.Equal(t, "example: 0% (0/10)", v)
27+
} else {
28+
t.Fatal("expected channel to be open")
29+
}
30+
default:
31+
}
32+
33+
assert.EqualValues(t, 10, task.Count(10))
34+
assert.Equal(t, "example: 100% (10/10)", <-task.Updates())
35+
36+
if _, ok := <-task.Updates(); ok {
37+
t.Fatalf("expected channel to be closed")
38+
}
39+
}

git/githistory/log/task.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package log
2+
3+
// Task is an interface which encapsulates an activity which can be logged.
4+
type Task interface {
5+
// Updates returns a channel which is written to with the current state
6+
// of the Task when an update is present. It is closed when the task is
7+
// complete.
8+
Updates() <-chan string
9+
}

git/githistory/log/waiting_task.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package log
2+
3+
import "fmt"
4+
5+
// WaitingTask represents a task for which the total number of items to do work
6+
// is on is unknown.
7+
type WaitingTask struct {
8+
// ch is used to transmit task updates.
9+
ch chan string
10+
}
11+
12+
// NewWaitingTask returns a new *WaitingTask.
13+
func NewWaitingTask(msg string) *WaitingTask {
14+
ch := make(chan string, 1)
15+
ch <- fmt.Sprintf("%s: ...", msg)
16+
17+
return &WaitingTask{ch: ch}
18+
}
19+
20+
// Complete marks the task as completed.
21+
func (w *WaitingTask) Complete() {
22+
close(w.ch)
23+
}
24+
25+
// Done implements Task.Done and returns a channel which is closed when
26+
// Complete() is called.
27+
func (w *WaitingTask) Updates() <-chan string {
28+
return w.ch
29+
}

0 commit comments

Comments
 (0)