Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Last active December 21, 2016 19:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benjchristensen/e771fa58ce723e517ccdfbdd8a6bb4d8 to your computer and use it in GitHub Desktop.
Save benjchristensen/e771fa58ce723e517ccdfbdd8a6bb4d8 to your computer and use it in GitHub Desktop.
package lithium;
import org.reactivestreams.Publisher;
//import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import io.reactivesocket.AbstractReactiveSocket;
import io.reactivesocket.ConnectionSetupPayload;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
public class AggregatorServer {
// private static final CBORFactory cborFactory = new CBORFactory();
public static void main(String[] args) {
ReactiveSocketServer
.create(TcpTransportServer.create(4500))
.start((ConnectionSetupPayload setup, ReactiveSocket reactiveSocket) -> {
System.out.println("Received new connection to AggregatorServer");
subscribeToClientsForMetrics(reactiveSocket);
//
return createAcceptor();
})
.awaitShutdown();
}
private static void subscribeToClientsForMetrics(ReactiveSocket reactiveSocket) {
Flowable
.fromPublisher(reactiveSocket.requestStream(new PayloadImpl("getMetrics".getBytes())))
.takeUntil(reactiveSocket.onClose())
.doOnNext(payload -> System.out.println("metrics output ==> " + new String(payload.getData().array())))
.doOnSubscribe(s -> System.out.println("sending getMetrics request"))
.doOnComplete(() -> System.out.println("completed getMetrics request"))
.doOnError(t -> System.out.println("Error sending getMetrics request: " + t.getMessage()))
.subscribe(); // kick off async and let it finish when the socket closes
}
private static DisabledLeaseAcceptingSocket createAcceptor() {
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() {
public Publisher<Payload> requestStream(Payload payload) {
System.out.println("got requestStream on aggregator");
// this will be clients subscribing to this system
return super.requestStream(payload);
}
});
}
}
package lithium;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.reactivestreams.Publisher;
//import com.fasterxml.jackson.core.JsonToken;
//import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
//import com.fasterxml.jackson.dataformat.cbor.CBORParser;
import io.reactivesocket.AbstractReactiveSocket;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.ReactiveSocketClient.SocketAcceptor;
import io.reactivesocket.client.SetupProvider;
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
import io.reactivex.Single;
public class GatewayMock {
// private static final CBORFactory cborFactory = new CBORFactory();
public static void main(String... args) {
InetSocketAddress host = InetSocketAddress.createUnresolved("localhost", 4500);
new LithiumGatewayMock().attachToInsightsStreamAggregator(host).blockingGet();
// hold open ... since this is a client there is no await
while (true) {
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
}
}
}
private Single<? extends ReactiveSocket>
attachToInsightsStreamAggregator(InetSocketAddress host) {
System.out.println("Connecting to host: " + host);
ReactiveSocketClient insightsStreamAggregatorClient = ReactiveSocketClient.createDuplex(
TcpTransportClient.create(host),
new GatewayInsightsAcceptor(),
SetupProvider
//.keepAlive(KeepAliveProvider.from(10000, Flowable.rangeLong(1, Long.MAX_VALUE)))
.keepAlive(KeepAliveProvider.never())
.disableLease()
.dataMimeType("application/cbor"));
return Single
.fromPublisher(insightsStreamAggregatorClient.connect())
.doOnSuccess(s -> System.out.println("Successfully connected to aggregator"))
.doOnError(t -> System.out.println("Error connecting: " + t.getMessage()));
}
/**
* Handler that responds to subscriptions from the InsightsStreamAggregator to deliver metrics and logs.
*/
private static class GatewayInsightsAcceptor implements SocketAcceptor {
@Override
public LeaseEnforcingSocket accept(ReactiveSocket reactiveSocket) {
System.out.println("the mock got a socket");
// TODO use leasing, or restrict with a counter the number of concurrent subscriptions
// TODO to protect Gateway
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() {
@Override
public Publisher<Payload> requestStream(Payload request) {
System.out.println("Got a subscription to the GatewayMock");
// try {
// CBORParser parser = cborFactory.createParser(request.getData().array());
// JsonToken nextToken = null;
// while ((nextToken = parser.nextToken()) != null) {
// System.out.println("got request: " + nextToken.asString());
// }
//
// } catch (IOException e) {
// e.printStackTrace();
// }
return Flowable.just(new PayloadImpl("hello".getBytes()));
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment