Skip to content

Instantly share code, notes, and snippets.

@rbirkby
Last active August 29, 2015 14:08
Show Gist options
  • Save rbirkby/6f05a030bda853ed8cf3 to your computer and use it in GitHub Desktop.
Save rbirkby/6f05a030bda853ed8cf3 to your computer and use it in GitHub Desktop.
Reactive Equations
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
class X : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
return Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Select(_ => new Random().Next(10))
.Do(x => Console.WriteLine("X={0}", x))
.Subscribe(observer);
}
}
class A : IObservable<int>
{
private IObservable<int> _x;
public A(IObservable<int> x)
{
_x = x;
}
public IDisposable Subscribe(IObserver<int> observer)
{
return _x.Select(x => 2 * x)
.Subscribe(observer);
}
}
class B : IObservable<int>
{
private IObservable<int> _x;
public B(IObservable<int> x)
{
_x = x;
}
public IDisposable Subscribe(IObserver<int> observer)
{
return _x.Select(x => 5 * x)
.Subscribe(observer);
}
}
class AtomicDiamond
{
static void Main()
{
new AtomicDiamond().Run();
Console.ReadKey();
}
private void Run()
{
var x = new X().Replay();
x.Connect();
IObservable<int> a = new A(x);
IObservable<int> b = new B(x);
Func<int, int, int> sum = (left, right) => left + right;
a.Zip(b, sum)
.Subscribe(answer => Console.WriteLine("2x + 5x = {0}", answer));
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
static class ObservableAlgos
{
public static IObservable<int> Multiply(this IObservable<int> source, int multiplier)
{
return source.Select(x => x * multiplier);
}
}
class ComposableDiamond
{
static void Main()
{
new ComposableDiamond().Run();
Console.ReadKey();
}
private void Run()
{
var x = Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Select(_ => new Random().Next(10))
.Do(xValue => Console.WriteLine("X={0}", xValue))
.Replay();
x.Connect();
var a = x.Multiply(2);
var b = x.Multiply(5);
Func<int, int, int> sum = (left, right) => left + right;
a.Zip(b, sum)
.Subscribe(answer => Console.WriteLine("2x + 5x = {0}", answer));
}
}
using System;
using System.Reactive.Linq;
class FunctionalAtomicDiamond
{
static void Main()
{
var x = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => new Random().Next(10))
.Replay();
x.Connect();
var a = x.Select(xValue => 2 * xValue);
var b = x.Select(xValue => 5 * xValue);
a.Zip(b, x, (left, right, multiplicand) => string.Format("2*{0} + 5*{0} = {1}", multiplicand, left + right))
.Subscribe(Console.WriteLine);
Console.ReadKey();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
class X : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
return Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Select(_ => new Random().Next(10))
.Do(x => Console.WriteLine("X={0}", x))
.Subscribe(observer);
}
}
class A : IObservable<int>
{
private IObservable<int> _x;
public A(IObservable<int> x)
{
_x = x;
}
public IDisposable Subscribe(IObserver<int> observer)
{
return _x.Select(x => 2 * x)
.Subscribe(observer);
}
}
class B : IObservable<int>
{
private IObservable<int> _x;
public B(IObservable<int> x)
{
_x = x;
}
public IDisposable Subscribe(IObserver<int> observer)
{
return _x.Select(x => 5 * x)
.Subscribe(observer);
}
}
class NaiveDiamond
{
static void Main()
{
new NaiveDiamond().Run();
Console.ReadKey();
}
private void Run()
{
var x = new X().Replay();
x.Connect();
var a = new A(x);
var b = new B(x);
Func<IEnumerable<int>, int> sum = values => values.Aggregate((accumulate, value) => accumulate + value);
Observable.CombineLatest(a, b)
.Select(sum)
.Subscribe(answer => Console.WriteLine("2x + 5x = {0}", answer));
}
}
// X=4
// 2x + 5x = 28
// X=7
// 2x + 5x = 34
// 2x + 5x = 49
// X=5
// 2x + 5x = 45
// 2x + 5x = 35
// X=2
// 2x + 5x = 29
// 2x + 5x = 14
// X=5
// 2x + 5x = 20
// 2x + 5x = 35
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
class SimplestSum
{
static void Main()
{
var a = from interval in Observable.Interval(TimeSpan.FromSeconds(1))
select new Random().Next(10);
var b = from interval in Observable.Interval(TimeSpan.FromSeconds(1.1111111))
select new Random().Next(10);
Observable.CombineLatest(a, b)
.Subscribe(values => Console.WriteLine(values[0] + values[1]));
Console.ReadKey();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
class A : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
return Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => new Random().Next(10))
.Do(a => Console.Write("A:{0}...", a))
.Subscribe(observer);
}
}
class B : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
return Observable.Interval(TimeSpan.FromSeconds(1.1111111))
.Select(_ => new Random().Next(10))
.Do(b => Console.Write("B:{0}...", b))
.Subscribe(observer);
}
}
class C : IObserver<int>
{
public void OnCompleted() { }
public void OnError(Exception error)
{
throw error;
}
public void OnNext(int value)
{
Console.WriteLine(value);
}
}
class SimpleSum
{
static void Main()
{
new SimpleSum().Run();
Console.ReadKey();
}
private void Run()
{
Func<IEnumerable<int>, int> sum = values => values.Aggregate((x, y) => x + y);
var a = new A();
var b = new B();
var c = new C();
Observable.CombineLatest(a, b)
.Select(sum)
.Subscribe(c);
}
}
// A:1...B:5...6
// A:5...10
// B:3...8
// A:3...6
// B:0...3
// A:7...7
// B:7...14
// A:1...8
// B:5...6
// A:6...11
// B:2...8
// A:0...2
// B:9...9
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
class SimpleSum
{
static void Main()
{
new SimpleSum().Run();
Console.ReadKey();
}
private void Run()
{
Func<IEnumerable<int>, int> sum = values => values.Aggregate((x, y) => x + y);
var a = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => new Random().Next(10))
.Do(aValue => Console.Write("A:{0}...", aValue));
var b = Observable.Interval(TimeSpan.FromSeconds(1.1111111))
.Select(_ => new Random().Next(10))
.Do(bValue => Console.Write("B:{0}...", bValue));
Observable.CombineLatest(a, b)
.Select(sum)
.Subscribe(Console.WriteLine);
}
}
// A:4...B:3...7
// A:1...4
// B:0...1
// A:6...6
// B:8...14
// A:0...8
// B:5...5
// A:3...8
// B:2...5
// A:0...2
// B:9...9
// A:3...12
// B:7...10
// A:1...8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment