Ruminations of idle rants and ramblings of a code monkey

How long did that edge event take?

Code Sample | StreamInsight

With StreamInsight, you have three different event shapes – point, interval and edge. With a point, the duration is easy; it’s a single tick. Intervals are also easy since you know both the start and end times when you enqueue them. But what about edge events? Edge events are different because you know the start time, but not the end time, when you enqueue them. As I’m working on my presentation for Sql Saturday Baton Rouge, it occurred to me that this would be a very interesting bit of information to know. However, with just queries, it was difficult to do. (I say difficult; that doesn’t mean that it can’t be done, I just couldn’t figure out how.) But … once again, subjects come to the rescue. Since a subject acts as a sink and a source, I can attach it to the edge stream, evaluate the events as they are published and, when an end comes along, calculate the duration and then publish that to another stream. Even if your inbound events aren’t edges, remember that you can jump between the different shapes by manipulating your event’s temporal headers easily enough. So here goes.

First, let’s declare the class and implement ISubject. I’ve kept it so this subject can be used with any payload class.

Class Declaration
  1. public class EdgeEventDurationSubject<TPayload> : ISubject<EdgeEvent<TPayload>, PointEvent<EventDuration<TPayload>>>
  2. {
  4. private List<IObserver<PointEvent<EventDuration<TPayload>>>> _observers =
  5. new List<IObserver<PointEvent<EventDuration<TPayload>>>>();
  6. }

You’ll see that it publishes point events of type EventDuration<TPayload>. This generic event allows us to “hook up” to any stream and then publish the event duration with the payload. The query writer can then join this back to the original stream (if desired) or just continue on; after all, they have the entire payload.

EventDuration Class
  1. public class EventDuration<TPayload>
  2. {
  4. public double Duration { get; set; }
  6. public TPayload Payload { get; set; }
  7. }

Note that we support a list of observers – so we can publish to multiple sinks. Next, let’s implement the IObservable side of the subject.

IObservable Implementation
  1. public IDisposable Subscribe(IObserver<PointEvent<EventDuration<TPayload>>> observer)
  2. {
  3. lock (this)
  4. {
  5. if (!_observers.Contains(observer))
  6. {
  7.  _observers.Add(observer);
  8. }
  9. return Disposable.Create(() => RemoveObserver(observer));
  10. }
  11. }
  13. private void RemoveObserver(IObserver<PointEvent<EventDuration<TPayload>>> observer)
  14. {
  15. lock (this)
  16. {
  17. if (_observers.Contains(observer))
  18. {
  19.  _observers.Remove(observer);
  20. }
  21. }
  22. }

Note that we are cheating a bit and using the Disposable.Create from the Reactive Extensions. But it’s easy and it works well so I don’t feel a bit bad about it.

Now, for the interesting part – implementing the IObserver side of the subject. We know that we’ll be consuming edge events because of our implementation of ISubject and all of our work will be done in the OnNext method.

  1. public void OnNext(EdgeEvent<TPayload> value)
  2. {
  3. if (_observers.Count == 0)
  4. {
  5. //No one is listening.
  6. return;
  7. }
  8. if (value.EventKind == EventKind.Cti)
  9. {
  10. PublishEvent(PointEvent<EventDuration<TPayload>>.CreateCti(value.StartTime.AddTicks(1)));
  11. return;
  12. }
  14. if (value.EdgeType == EdgeType.End)
  15. {
  16. var payload = value.Payload;
  17. var result = new EventDuration<TPayload>
  18. {
  20.  Duration = (value.EndTime - value.StartTime).TotalMilliseconds,
  21.  Payload = payload,
  22. };
  23. PublishEvent(PointEvent<EventDuration<TPayload>>.CreateInsert(value.EndTime.AddTicks(-1), result));
  24. }
  25. }
  27. private void PublishEvent(PointEvent<EventDuration<TPayload>> newEvent)
  28. {
  29. lock (this)
  30. {
  31. foreach (var observer in _observers)
  32. {
  33.  observer.OnNext(newEvent);
  34. }
  35. }
  36. }

We take the CTI’s and simply republish them, adding a tick to them. This pretty much guarantees that our CTIs will be after any events that we publish. If it’s not a CTI, it’s an Insert so we check the edge type. Only End edges have an end time and, besides, that the idea behind edges … you don’t the end when you know the start. So we have to wait for the end to determine the duration. That’s easy enough … EndTime – StartTime, get the total milliseconds (because TimeSpan isn’t valid in a StreamInsight stream) and publish the event to the observers. By setting the point’s start time to a tick less than the edge event’s end time, we make sure that we can join back to the original stream without shifting if we so desire.

Using this in our queries is very simple.

  1. var timeEvalSubject = cepApplication.CreateSubject("TimeEval",
  2. () => new EdgeEventDurationSubject<WebRequestPayload>());
  4. var requestExecutionTimeStream = timeEvalSubject.ToPointStreamable(e => e  );

All you need to do now is make sure that the subject is bound to the other streamable bindings using With so they all run in the same process.