antimony.rs

Antimony is pure rust stream processing / distributed computation platform that is real-time, fault-tolerant,



Simple rust topology API

Simply add to your cargo.toml and you can write your first topology

[dependencies]
antimony = "0.0.1"

An exclaiming topology

Define your topology as a JSON file representing the DAG (Directed Acyclic Graph) in a topology.json file

{
    "name": "FqdnLen",
    "sm_count": 4,
    "instances_per_sm": 3,
    "topology": [
        {
            "item_type": "Spout",
            "name": "log-entry",
            "module": "spouts::lespout",
            "instance_count": 2
        },
        {
            "item_type": "Bolt",
            "name": "extract-fqdn",
            "module": "bolts::efbolt",
            "instance_count": 4,
            "input_stream": "log-entries"
        },
        {
            "item_type": "Bolt",
            "name": "find-len",
            "module": "bolts::flbolts",
            "instance_count": 6,
            "input_stream": "fqdn"
        },
    ]
}

Create a new lib with cargo and place topology.json as follows

.
├── Cargo.lock
├── Cargo.toml
├── src
│   ├── bolts
│   │   ├── mod.rs
│   │   ├── efbolt.rs
│   │   └── febolts.rs
│   ├── lib.rs
│   └── spouts
│       ├── mod.rs
│       └── lespout.rs
└── topology.json

Implement a spout

use antimony::components::spout::{Spout, BaseSpout};
use antimony::components::Message;
use antimony::components::ComponentConfig;


struct Generator {
    spout: Spout,
}

impl BaseSpout for Generator {
    fn prepare(&mut self) {}

    fn next_tuple(&mut self) {
        self.spout.emit(Message::tuple("log-entries", "{ip}{fqdn}"));
    }
}

pub fn start(args: ComponentConfig) {
    let mut e = Spout::new("Generator");
    let g = Generator { spout: e.clone() };
    Spout::start(&mut e, g, args);
}

Implement a bolt

use antimony::components::bolt::{Bolt, BaseBolt};
use antimony::components::Message;
use antimony::components::ComponentConfig;

struct Extract{
        bolt: Bolt
}
impl BaseBolt for Extract{
	fn prepare(&mut self){
		
	}

    fn process_tuple(&mut self, tpl: Message){
        println!("bolt B1 got message: {:?}", tpl);
        <<Your_Logic_Here>>
        self.bolt.emit(Message::tuple("fqdn", "zurich.rustfest.eu"));
    }
}

pub fn start(args: ComponentConfig){
	let mut e = Bolt::new("B1");
	let g = Count{bolt: e.clone()};
	Bolt::start(&mut e, g, args);
}

Get Help

Get help