Ruminations of idle rants and ramblings of a code monkey

Dual Mode Data Sinks–Part II

StreamInsight | Code Sample

Well, it’s been a while. Far too long, in fact. I’ve been on a project that’s been pretty intense has consumed much of my time. So … my apologies for taking so long to continue.

With that out of the way, let’s pick up with our data sinks. There were two major things left undone with the last post. First, the API. I left it without an abstracted, cleaner API that was similar to the adapter model. That’s the first thing to add. Now, we don’t want to go “all the way” with this and produce a runnable process – that would defeat one of the coolest features of a process – the ability to contain multiple sources and sinks on one clean block that starts and stops as a whole. This also allows us to send individual queries to multiple sinks (without DQC or subjects) and makes reuse of the sources across multiple sinks simpler than in the adapter model with DQC. And, to keep in consistent with the adapter model, we want to attach our sink to the stream directly from the IQStreamable interface – and this means extension method. (I’ll confess that I have come to deeply love extension methods, by the way … I really like the clean API that they create). Our extension method will encapsulate creating the observer and then binding the stream to the observer. So that we can include multiple bindings within a process, we’ll just return the streamable binding that we create. Our extension method looks like this:

ToBinding Extension Method
  1. public static IRemoteStreamableBinding ToBinding<TPayload>(
  2. this IQStreamable<TPayload> stream,
  3. Application cepApplication,
  4. Type consumerFactoryType,
  5. object configInfo,
  6. EventShape eventShape)
  7. {
  8. var factory = Activator.CreateInstance(consumerFactoryType) as ISinkFactory;
  10. if (factory == null)
  11. {
  12. throw new ArgumentException("Factory cannot be created or does not implement ISinkFactory");
  13. }
  15. switch (eventShape)
  16. {
  17. case EventShape.Interval:
  18.  var intervalObserver =  cepApplication.DefineObserver(() => factory.CreateIntervalObserverSink<TPayload>(configInfo));
  19.  return stream.Bind(intervalObserver);
  21. case EventShape.Edge:
  22.  var edgeObserver = cepApplication.DefineObserver(() => factory.CreateEdgeObserverSink<TPayload>(configInfo));
  23.  return stream.Bind(edgeObserver);
  25. case EventShape.Point:
  26.  var pointObserver = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TPayload>(configInfo));
  27.  return stream.Bind(pointObserver);
  29. default:
  30.  throw new ArgumentOutOfRangeException("eventShape");
  31. }
  32. }

We won’t call it ToQuery(), doing so, while consistent, wouldn’t be descriptive at all. A binding that then gets run in a process is an inherently different thing from a query. With the adapter model, a query is a single runnable unit of logic; a binding is not. A binding can only run within the context of a process – it is the process, not the binding, that is the closest corollary to the query. But, the naming and the arguments are similar enough. Now, when we want to bind a stream to a sink, it looks like this:

Binding a Stream to a Sink
  1. var sinkConfig = new ConsoleOutputConfig()
  2. {
  3. ShowCti = true,
  4. CtiEventColor = ConsoleColor.Blue,
  5. InsertEventColor = ConsoleColor.Green
  6. };
  8. var binding = data.ToBinding(cepApplication, typeof (ConsoleOutputFactory), sinkConfig, EventShape.Point);
  9. binding.Run("Hello");

You’ll also notice, if you haven’t noticed before, that this API forces you to use the entire event, not just the payload, when sending to the sink. While you can just send the payload to the sink and ignore the event itself, that seems quite pointless and can also cause quite a bit of confusion. I’ve seen a lot of folks put timestamps in the payload; I typically don’t. In the vast majority of cases, I rely on – and use – the timestamps on the event itself. These are the timestamps that actually matter; these are the timestamps that the StreamInsight engine is using when evaluating the event. These are the timestamps of the event within the application timeline. Anything in the payload is just an attribute of the event; the start time and end time are a key part of the definition. Also, if you only send the payload to your sink, you won’t get any CTIs at all. CTIs are kinda important, I think, for a sink for a couple of reasons. First, even if you have no events coming through but you have CTIs, you know that the engine is running and pumping data through. This is particularly useful when you aren’t getting any data at all in your sink. Without the CTIs, you won’t have a good idea if your query is actually getting processed through the engine or if you have some other error in your logic. Second … CTI’s let you know when you can/should write to a durable store in a batch. Here’s the thing – Insert events don’t get released to your sink until there is a CTI. That CTI tells you, also, that all of the events up to that point in the application time have been released to your sink. So that’s the perfect time to batch up any writes that you have to a durable store (say, Sql Server) and write them in one shot. Batched updates/inserts are going to scale far better than single writes and that’s always a very good thing in a StreamInsight application. You always need to remember that, in many cases, that sink is going to be the biggest potential bottleneck in the application because it usually involves some sort of I/O. And that I/O is always going to be slower than the raw CPU and memory-bound performance that you can get from your queries.

The next thing on our list is a bit tougher. I mentioned it in my previous post and it’s the one thing that I really didn’t like about the entire reactive model when I first saw it. You see, I’m a big fan of untyped adapters, especially output adapters. Yes, they are pretty useful on the input side as well but on the output they are absolutely essential. You can’t always know what your payload is going to look like and you don’t want to be writing a sink for every different payload that you dream up. For input … in a lot of cases, you can do OK since you’ll have a good idea of what the schema is going to look like. This is something that also came up recently on the StreamInsight forum so I know I’m not the only one that misses it. Fortunately, the .NET Framework gives us a way to make this happen. It’s more work than a simple untyped adapter but, with reflection, we can get the same kind of flexibility that we had with untyped adapters in our sinks.

What we want to do is to have, at the end, something similar to what an output adapter provides for us – name/value pairs where nested classes have a [ParentProperty].[ChildProperty] type name. While we’re at it, we’ll simplify it a bit; it’ll be a simple, straightforward dictionary with the name and the value, rather than having a separate event definition and then matching the index with the name, as in an untyped adapter. At the highest level, we’ll have a method called "GetPropertyValues” that takes the object and handles all of the details for us. Of course, it’ll be an extension method.

  1. /// <summary>
  2. /// Returns the values of properties as name/value pairs.
  3. /// </summary>
  4. /// <param name="source">The object to read properties from</param>
  5. /// <returns></returns>
  6. public static Dictionary<string, object> GetPropertyValues(this object source )
  7. {
  8. var propertyDictionary = new Dictionary<string, object>();
  9. //Get all of the properties.
  10. AppendPropertyValues(source, string.Empty, propertyDictionary);
  11. return propertyDictionary;
  13. }

The real work is in the AppendPropertyValues method. This takes the dictionary, the object and a property name prefix and appends the properties and their values to the dictionary. To do this, first we get a list of the public instance properties on the object. From there, we loop over them. If one of them is a class, we then recurse into AppendPropertyValues but adding the source property name as the prefix. After that, we also get the fields with the same binding flags; the GetProperties method won’t return the fields and there isn’t a single reflection method that I could find that would give me both in a single call.

  1. private static void AppendPropertyValues(object source, string prefix, Dictionary<string, object> propertyDictionary)
  2. {
  3. var properties = source.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance);
  4. foreach (var propertyInfo in properties)
  5. {
  6. if (_validProp.Contains(propertyInfo.PropertyType))
  7. {
  8.  var method = propertyInfo.GetGetMethod();
  9.  object value = method.Invoke(source, null);
  10.  propertyDictionary.Add(prefix + propertyInfo.Name, value);
  11. }
  12. else if(propertyInfo.PropertyType.IsClass)
  13. {
  14.  var method = propertyInfo.GetGetMethod();
  15.  object value = method.Invoke(source, null);
  16.  AppendPropertyValues(value, prefix + propertyInfo.Name + ".", propertyDictionary);
  17. }
  18. }
  19. var fields = source.GetType().GetFields(BindingFlags.Public | BindingFlags.Instance);
  20. foreach (var fieldInfo in fields)
  21. {
  22. if (_validProp.Contains(fieldInfo.FieldType))
  23. {
  24.  object value = fieldInfo.GetValue(source);
  25.  propertyDictionary.Add(prefix + fieldInfo.Name, value);
  26. }
  27. else if (fieldInfo.FieldType.IsClass)
  28. {
  29.  var value = fieldInfo.GetValue(source);
  30.  AppendPropertyValues(value, prefix + fieldInfo.Name + ".", propertyDictionary);
  31. }
  32. }
  33. }

With this now in place, we have the same capabilities in our sinks that we had in our output adapters. Using this, we can change our ConsoleDataConsumer to display the payload properties, rather than just some basic header info.

Code Snippet
  1. if (outputEvent.EventKind == EventKind.Insert)
  2. {
  3. Console.ForegroundColor = Configuration.InsertEventColor;
  4. var eventValues = outputEvent.Payload.GetPropertyValues();
  5. StringBuilder output = new StringBuilder(2048);
  6. foreach (var eventValue in eventValues)
  7. {
  8. output.Append(eventValue.Key).Append(":").Append(eventValue.Value.ToString()).Append("\t");
  9. }
  10. Console.WriteLine("Insert Event Received at " + outputEvent.StartTime + "\t" + output.ToString());
  11. }
  12. else if (Configuration.ShowCti)
  13. {
  14. Console.ForegroundColor = Configuration.CtiEventColor;
  15. Console.WriteLine("CTI event received at " + outputEvent.StartTime);
  16. }
  17. Console.ResetColor();

Now … we could certainly do some optimizations on this. For example, we could cache the property names and definitions for each of the types. I’ll leave that as an exercise for later … or for you, dear blog reader.

You can download the solution from my SkyDrive. As you look, you’ll also see that I’ve done some refactoring on the project names and namespaces …