-
Notifications
You must be signed in to change notification settings - Fork 13
Description
As pointed out by Alexei: https://groups.google.com/forum/#!topic/pangool-user/iWf3BODBI9o
It would be nice to have the possibility of specifying different group bys for the same Pangool Job. Pig seems to have this possibility built-in in its optimizer. Even though I'm not sure if this is always the most efficient way of executing several groupBy's over the same input, it makes sense that Pangool supports such advanced usages because the philosophy of Pangool is to make "low-level MapReduce easier". If there are low-level use cases that we don't natively support I think it is worth taking a look and seeing how reasonable that is to implement.
Alexei proposes an API based on declaring "Streams", which would be quite different to the current API where you have to specify intermediate schemas. At first sight I wouldn't recommend such an API revamp, because this is still a particular use case and it doesn't make sense to me to make it so "first class".
Here we can follow up a discussion on this. I have been thinking on it and I have a few ideas on the matter:
- First of all it seems the concept of "stream id" != "intermediate schema id" is needed as one would be able to "group by" in different ways the same intermediate schema. So we have a 1 - N relation between intermediate schema and stream id, and for N intermediate schemas we may have M stream ids where M >= N.
- To keep with the same IO efficiency, Pangool shouldn't add any extra integer for the stream id. What is now used as schemaId would be called "stream id" internally, and by default every intermediate schema is assigned one "stream id", like now. So now we have the particular case that N = M, but we would make it more general.
- The API could remain exactly as it is, to be backwards compatible. The groupBy would be the "common group by". If specified, nothing else can be done: the Job is only grouped by this criteria. But, we could add a second option with the following new method in TupleMRBuilder:
addGroupBy(groupBy)
.withSchema(schema1);
So the user can either act as now (use a common group by) or add K particular groupBys to the same Job, each associated with one or more Schemas:
addGroupBy(groupBy)
.withSchema(schema1)
.withSchema(schema2);
The Pangool Mapper would check this and emit as many Tuples as needed internally if one schema has more than one groupBy, assigning different stream ids.
- Serialization / Deserialization: It seems to me (even though I haven't taken a look that far) that Pangool is assuming always the presence of a "common schema". After reading the common schema, it knows the (to be called stream id) which makes it able to deserialize the particular part. Changes in the code would need to be made to avoid the need of having a common schema, thus having the stream id sometimes be the first field serialized and deserialized.
- Comparator: It looks like the SortComparator wouldn't need to be touched that much, just to change it so that the schemaId used to compare would be the stream id.
- Partitioner: The Partitioner should be able to partition more than one stream id into the same partition, for instance by taking the minimum stream id within a group. In the above example all tuples emitted with schema1 and schema2 in the particular groupBy would be partitioned in the same partition, even though they would be serialized with different "stream ids".
- Reducer: By default, this method would delegate all groupings to the reducer specified in setTupleReducer(). This is fine, since Pangool reducers have a Tuple as key, so every key may be different even in the same Reducer. But it might be a bit challenging to "detect" with some if / else what groupings are we receiving, so it makes sense to have per-groupBy reducers:
addGroupBy(groupBy)
.withSchema(schema1)
.withSchema(schema2)
.withReducer(reducer);
So the Pangool Reducer would instantiate many reducers in this case and delegate to one or another depending on the "stream id". If this methods are used, then the TupleMRBuilder won't complain if setTupleReducer() hasn't been called. If both things are called, the most particular thing wins: for schemas that have a particular reducer associated, this would be the one which would be called, for the rest the "general" reducer would be called.
- Order by: Each schema within a groupBy could be sorted in different ways. Therefore the withSchema() method should allow specifying an order by:
addGroupBy(groupBy)
.withSchema(schema1, orderBy1)
.withSchema(schema2, orderBy2)
.withReducer(reducer);
This all seems a bit complex, but it is actually challenging to be as backwards compatible as possible, and remain coherent with the whole idea of the API and Pangool.
That would be my two cents as to how this could be implemented.