pd
github.com/kkdai/pd
go get github.com/kkdai/pd
pd

github.com/kkdai/pd

(PUBSUB) Publish-Subscrbe message broker with Disk queue in Golang

by Evan Lin

v1.0.0 (see all)
go get github.com/kkdai/pd
Readme

PD: Pubsub service base on disk queue on golang

GitHub license GoDoc Build Status

What is Pubsub

Pubsub is prove of concept implement for Redis "Pub/Sub" messaging management feature. SUBSCRIBE, UNSUBSCRIBE and PUBLISH implement the Publish/Subscribe messaging paradigm where (citing Wikipedia) senders (publishers) are not programmed to send their messages to specific receivers (subscribers). (sited from here)

What is Disk Queue

Disk Queue is a data structure which come from NSQ. It is message queue data structure using disk file as storage medium.

How it work together

This is no API change from Pub/Sub mechanism, but change it basic concurrency process.

Topic as a another object to handle Topic related info with Disk Queue. Each Topic contains two kind information:

  • Data Queue: Which is all publish data send to this topic.
  • Channel List: Which is who subscribe this topic, we need notify.

In this modification, it gain follow benefits:

  • Infinite Topic Queue Size (depends on storage size)
  • Buffering Publish to improve performance.

Installation and Usage

Install

go get github.com/kkdai/pd

Usage

package main

import (
    "fmt"
    "time"

    . "github.com/kkdai/pubsub"
)

func main() {
    ser := NewPubsub(1)
    c1 := ser.Subscribe("topic1")
    c2 := ser.Subscribe("topic2")
    ser.Publish("test1", "topic1")
    ser.Publish("test2", "topic2")
    fmt.Println(<-c1)
    //Got "test1"
    fmt.Println(<-c2)
    //Got "test2"

    // Add subscription "topic2" for c1.
    ser.AddSubscription(c1, "topic2")

    // Publish new content in topic2
    ser.Publish("test3", "topic2")

    fmt.Println(<-c1)
    //Got "test3"
    fmt.Println(<-c2)
    //Got "test3"

    // Remove subscription "topic2" in c1
    ser.RemoveSubscription(c1, "topic2")

    // Publish new content in topic2
    ser.Publish("test4", "topic2")

    select {
    case val := <-c1:
        fmt.Errorf("Should not get %v notify on remove topic", val)
        break
    case <-time.After(time.Second):
        //Will go here, because we remove subscription topic2 in c1.
        fmt.Println("Not receive any msg from topic2, timeout.")
        break
    }
} 

Benchmark

Benchmark include memory usage. (original memory)

BenchmarkAddSub-4                500    2906467 ns/op
BenchmarkRemoveSub-4           10000     232910 ns/op
BenchmarkBasicFunction-4     5000000        232 ns/op

Benchmark include memory usage. (Using Disk Queue)

BenchmarkAddSub-4             300000    125628 ns/op 
BenchmarkRemoveSub-4          200000    144854 ns/op
BenchmarkBasicFunction-4        2000    906076 ns/op

Inspired By

Project52

It is one of my project 52.

License

This package is licensed under MIT license. See LICENSE for details.

GitHub Stars

11

LAST COMMIT

3yrs ago

MAINTAINERS

0

CONTRIBUTORS

2

OPEN ISSUES

0

OPEN PRs

0
VersionTagPublished
v1.0.1-0.20190803083924-2e9ef2ff0fff
1yr ago
v1.0.0
2yrs ago
No alternatives found
No tutorials found
Add a tutorial