Discussion:
[rabbitmq-users] Pipelining of requests forbidden exception
Marcello Santambrogio
2014-12-02 11:46:36 UTC
Permalink
Hi all,
i'm developing a simple library to develop some common pattern using
Rabbit. I'm having some problem with the Subscriber as my model is (most
probably) wrong, and sometimes i receive an exception (Pipelining of
requests forbidden).
From what i've read (
http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2010-December/010391.html)
this error is caused by a race condition occurring on channel resources.
This means that in my model i have multiple threads that are concurrently
using the channel...and that's true (i'll explain why later on).
First question:
What i need to understand is what are the kind of operations that could
cause this kind of exception.
On the discuss message it's stated that this is caused by AMQP RPC
operations like ExchangeDeclare.
What are the other AMQP RPC operations?

Second question:
I need to create a Subscriber class that takes messages from a queue,
notifying interested classes that there's a message available. In case of
error, the subscriber republish the message on a failure exchange with the
detail of the exception.
Please note that the Subscriber could need to defer the acknowledge untill
requested (not using the autoack) in order to create a transactional-style
operation (in my case i need to be sure that a message received must be
delivered to another queue (or a failure exchange) before acking it). This
means that after that class x receives the message from the subscriber (via
an event), it will later on ask to the subscriber to ack that message.
Do you have any hint or best practice to handle this kind of scenario? I'm
feeling that this model is somewhat very wrong, but i can't figure out
other ways to do it.

What i've done is that there's a Subscriber class that have a single IModel
(channel) and a single QueueingBasicConsumer.
When "started", this class creates a new Task that does an always running
loop like this:

while(isActive){

basicConsumer.Dequeue();

//Create a new thread that will take care of handling the message

_threadPoolInstance.QueueWorkItem(() => ExecuteMessageConsuming(ea));

}

In the ExecuteMessageConsumingMethod i deserialize the message, ack it only
if requested and notify (using an event) someone that there's a new message
ready to be taken.
In case of errors (for example, a deserialization error) the subscriber
will try to resend this message to the configured failure exchange. This is
a possible failure point, because i declare both the exchange and the
destination queue as i can't know in advance if those exists on my
virtualHost, and i'm using an AMQP RPC operation from another thread on the
same channel.

Sorry for the long (and maybe messy) post...hope that someone can clear my
mind a bit =)
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Michael Klishin
2014-12-02 12:12:04 UTC
Permalink
Post by Marcello Santambrogio
What i need to understand is what are the kind of operations that
could cause this kind of exception.
On the discuss message it's stated that this is caused by AMQP
RPC operations like ExchangeDeclare.
What are the other AMQP RPC operations?
Publishing, acknowledgements. However, there are all kinds of unexpected situations
you may run into with shared channels, so don't share them between threads.
Post by Marcello Santambrogio
I need to create a Subscriber class that takes messages from a
queue, notifying interested classes that there's a message
available. In case of error, the subscriber republish the message
on a failure exchange with the detail of the exception.
Manual [consumer] acknowledgements exists exactly for this reason:
http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

You can either re-queue the delivery or simply ack it and publish a new message
(or rather, publish first and ack later).
Post by Marcello Santambrogio
Please note that the Subscriber could need to defer the acknowledge
untill requested (not using the autoack) in order to create a
transactional-style operation (in my case i need to be sure that
a message received must be delivered to another queue (or a failure
exchange) before acking it). This means that after that class
x receives the message from the subscriber (via an event), it
will later on ask to the subscriber to ack that message.
Do you have any hint or best practice to handle this kind of scenario?
I'm feeling that this model is somewhat very wrong, but i can't
figure out other ways to do it.
See above + publisher confirms when publishing.

http://www.rabbitmq.com/confirms.html
Post by Marcello Santambrogio
What i've done is that there's a Subscriber class that have a single
IModel (channel) and a single QueueingBasicConsumer.
When "started", this class creates a new Task that does an always
Post by Marcello Santambrogio
while(isActive){
Post by Marcello Santambrogio
basicConsumer.Dequeue();
//Create a new thread that will take care of handling the message
_threadPoolInstance.QueueWorkItem(() => ExecuteMessageConsuming(ea));
}
QueueingBasicConsumer will have deliveries pushed to it as they become available, and
keep them enqueued in the client. You can limit how many messages will be "prefetched"
using basic.qos (again, see tutorial 2) but I'd recommend using a "regular" consumer
instead of the queueing one.
Post by Marcello Santambrogio
In the ExecuteMessageConsumingMethod i deserialize the message,
ack it only if requested and notify (using an event) someone that
there's a new message ready to be taken.
In case of errors (for example, a deserialization error) the
subscriber will try to resend this message to the configured
failure exchange. This is a possible failure point, because
i declare both the exchange and the destination queue as i can't
know in advance if those exists on my virtualHost, and i'm using
an AMQP RPC operation from another thread on the same channel.
What you are trying to do is basically what the Shovel plugin does, plus a bit of business logic.

Shovel combines manual acks with publisher confirms and lets you specify when to ack deliveries.
In your case, you want to ack after you got a publisher confirm for your "2nd stage" message. 
--
MK

Staff Software Engineer, Pivotal/RabbitMQ
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Marcello Santambrogio
2014-12-02 13:30:48 UTC
Permalink
Thanks for the answer.
To recap: my model is ok, but the implementation not (multi thread on
single channel)
You've suggested to use a DefaultBasicConsumer, right? as i'm not taking
real advantage of the Queued one.

The next big question is:
How can i do all those things in a single thread (dequeuing a message,
deserializing it, eventually handle an exception and ack/republish the
message with exception details, and most importantly, taking an ack/reject
request coming from an external object) giving enough horsepower to ensure
a decent throughput?
Sorry if the question seems (and maybe is) dull but i'm trying to
understand what should be a best practice in a real-life scenario and i'm
probably missing some bits.

Il giorno martedì 2 dicembre 2014 12:46:36 UTC+1, Marcello Santambrogio ha
Post by Marcello Santambrogio
Hi all,
i'm developing a simple library to develop some common pattern using
Rabbit. I'm having some problem with the Subscriber as my model is (most
probably) wrong, and sometimes i receive an exception (Pipelining of
requests forbidden).
From what i've read (
http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2010-December/010391.html)
this error is caused by a race condition occurring on channel resources.
This means that in my model i have multiple threads that are concurrently
using the channel...and that's true (i'll explain why later on).
What i need to understand is what are the kind of operations that could
cause this kind of exception.
On the discuss message it's stated that this is caused by AMQP RPC
operations like ExchangeDeclare.
What are the other AMQP RPC operations?
I need to create a Subscriber class that takes messages from a queue,
notifying interested classes that there's a message available. In case of
error, the subscriber republish the message on a failure exchange with the
detail of the exception.
Please note that the Subscriber could need to defer the acknowledge untill
requested (not using the autoack) in order to create a transactional-style
operation (in my case i need to be sure that a message received must be
delivered to another queue (or a failure exchange) before acking it). This
means that after that class x receives the message from the subscriber (via
an event), it will later on ask to the subscriber to ack that message.
Do you have any hint or best practice to handle this kind of scenario? I'm
feeling that this model is somewhat very wrong, but i can't figure out
other ways to do it.
What i've done is that there's a Subscriber class that have a single
IModel (channel) and a single QueueingBasicConsumer.
When "started", this class creates a new Task that does an always running
while(isActive){
basicConsumer.Dequeue();
//Create a new thread that will take care of handling the message
_threadPoolInstance.QueueWorkItem(() => ExecuteMessageConsuming(ea));
}
In the ExecuteMessageConsumingMethod i deserialize the message, ack it
only if requested and notify (using an event) someone that there's a new
message ready to be taken.
In case of errors (for example, a deserialization error) the subscriber
will try to resend this message to the configured failure exchange. This is
a possible failure point, because i declare both the exchange and the
destination queue as i can't know in advance if those exists on my
virtualHost, and i'm using an AMQP RPC operation from another thread on the
same channel.
Sorry for the long (and maybe messy) post...hope that someone can clear my
mind a bit =)
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Michael Klishin
2014-12-02 13:53:55 UTC
Permalink
Post by Marcello Santambrogio
Thanks for the answer.
To recap: my model is ok, but the implementation not (multi thread
on single channel)
You've suggested to use a DefaultBasicConsumer, right?
A subclass of it, yes.
Post by Marcello Santambrogio
as i'm
not taking real advantage of the Queued one.
It's not that you're not taking advantage of it, it just doesn't fit your use case very well.
You can make it work with the queueing one, I can explain how if you really have reasons
to stick to it.
Post by Marcello Santambrogio
How can i do all those things in a single thread (dequeuing a message,
deserializing it, eventually handle an exception and ack/republish
the message with exception details, and most importantly, taking
an ack/reject request coming from an external object) giving
enough horsepower to ensure a decent throughput?
Sorry if the question seems (and maybe is) dull but i'm trying
to understand what should be a best practice in a real-life scenario
and i'm probably missing some bits.
Do you depend on deliveries being processed in order? .NET client currently
dispatches deliveries on the I/O thread so you should dispatch deliveries
to another thread (any way you like, just watch out for race conditions if ordering matters).
--
MK

Staff Software Engineer, Pivotal/RabbitMQ
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Marcello Santambrogio
2014-12-02 15:09:59 UTC
Permalink
No real reason to stick to the Queued one, just used it as i thought that
was a right choice :)
I have no strict delivery order requirement...i've failed to explain myself.
The problem that i don't know how to resolve easily is that, if the
delivery tag is related to an instance of a channel, how can i manage the
ack/reject the messages?
I mean, if i have n threads, everyone with its channel, and the (external)
object interested to message x needs to signal that it must be
ack'd/rejected, how can i do it? The only thing that this object can have
is a delivery-tag and, at most, a consumer-tag.


Il giorno martedì 2 dicembre 2014 14:54:10 UTC+1, Michael Klishin ha
Post by Michael Klishin
Post by Marcello Santambrogio
Thanks for the answer.
To recap: my model is ok, but the implementation not (multi thread
on single channel)
You've suggested to use a DefaultBasicConsumer, right?
A subclass of it, yes.
Post by Marcello Santambrogio
as i'm
not taking real advantage of the Queued one.
It's not that you're not taking advantage of it, it just doesn't fit your
use case very well.
You can make it work with the queueing one, I can explain how if you really have reasons
to stick to it.
Post by Marcello Santambrogio
How can i do all those things in a single thread (dequeuing a message,
deserializing it, eventually handle an exception and ack/republish
the message with exception details, and most importantly, taking
an ack/reject request coming from an external object) giving
enough horsepower to ensure a decent throughput?
Sorry if the question seems (and maybe is) dull but i'm trying
to understand what should be a best practice in a real-life scenario
and i'm probably missing some bits.
Do you depend on deliveries being processed in order? .NET client currently
dispatches deliveries on the I/O thread so you should dispatch deliveries
to another thread (any way you like, just watch out for race conditions if
ordering matters).
--
MK
Staff Software Engineer, Pivotal/RabbitMQ
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Michael Klishin
2014-12-02 15:40:49 UTC
Permalink
you can pass the channel along with the delivery information.

MK
The problem that i don't know how to resolve easily is that, if the delivery tag is related to an instance of a channel, how can i manage the ack/reject the messages?
I mean, if i have n threads, everyone with its channel, and the (external) object interested to message x needs to signal that it must be ack'd/rejected, how can i do it? The only thing that this object can have is a delivery-tag and, at most, a consumer-tag.
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Marcello Santambrogio
2014-12-02 15:51:45 UTC
Permalink
Once more, sorry for the missing informations...i'm trying to isolate
(wrap) all the RabbitMq concept inside this library that i'm developing.
Passing back a channel would invalidate that effort. But i think that it's
the best solution that i can have right now (the project deadline is coming
closer) =)
But i still missing some pieces...if the external object (thread 1) works
on channel x (ack, for example) while the library (thread 2) is working on
the same channel, doesn't it creates the same problem? So how sending back
the channel should solve my original problem?
In fact in my actual solution i have 1 channel where there are n possible
threads working on it.
But if i create one channel per thread (inside my Subscriber) and the
external object asks to my Subscriber to Ack a message, those are two
requests coming from two separated thread that are insisting on the same
channel...right?
My head is starting to melt...

Il giorno martedì 2 dicembre 2014 16:40:55 UTC+1, Michael Klishin ha
Post by Michael Klishin
you can pass the channel along with the delivery information.
MK
Post by Marcello Santambrogio
The problem that i don't know how to resolve easily is that, if the
delivery tag is related to an instance of a channel, how can i manage the
ack/reject the messages?
Post by Marcello Santambrogio
I mean, if i have n threads, everyone with its channel, and the
(external) object interested to message x needs to signal that it must be
ack'd/rejected, how can i do it? The only thing that this object can have
is a delivery-tag and, at most, a consumer-tag.
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Michael Klishin
2014-12-02 15:59:22 UTC
Permalink
if you know that the two threads effectively do not use the channel concurrently then no. But I see how that may be a rare and dangerous thing to expect.

can your library begin working after delivery is dispatched?

can you communicate back to the "origin" thread?

I think that passing some kind of context around to the delegate that runs some business logic is inevitable, be it for it to use a channel or send a reply back.

have you considered EasyNetQ? Its patterns may give you some leverage.

MK
Once more, sorry for the missing informations...i'm trying to isolate (wrap) all the RabbitMq concept inside this library that i'm developing. Passing back a channel would invalidate that effort. But i think that it's the best solution that i can have right now (the project deadline is coming closer) =)
But i still missing some pieces...if the external object (thread 1) works on channel x (ack, for example) while the library (thread 2) is working on the same channel, doesn't it creates the same problem? So how sending back the channel should solve my original problem?
In fact in my actual solution i have 1 channel where there are n possible threads working on it.
But if i create one channel per thread (inside my Subscriber) and the external object asks to my Subscriber to Ack a message, those are two requests coming from two separated thread that are insisting on the same channel...right?
My head is starting to melt...
Post by Michael Klishin
you can pass the channel along with the delivery information.
MK
The problem that i don't know how to resolve easily is that, if the delivery tag is related to an instance of a channel, how can i manage the ack/reject the messages?
I mean, if i have n threads, everyone with its channel, and the (external) object interested to message x needs to signal that it must be ack'd/rejected, how can i do it? The only thing that this object can have is a delivery-tag and, at most, a consumer-tag.
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Marcello Santambrogio
2014-12-03 16:29:37 UTC
Permalink
Thanks to all for your help, i've solved my problem creating a correctly
handled multi-thread subscriber that uses one channel per thread, handling
the ack and all the other operations in a delegate method passed to the
subscriber itself (so the same thread takes into account the responsibility
of doing the subsequent ack).
Now i need to wrestle a bit with the network failure tolerance...but that's
another story =)
Cheers



Il giorno martedì 2 dicembre 2014 17:01:02 UTC+1, Michael Klishin ha
Post by Michael Klishin
if you know that the two threads effectively do not use the channel
concurrently then no. But I see how that may be a rare and dangerous thing
to expect.
can your library begin working after delivery is dispatched?
can you communicate back to the "origin" thread?
I think that passing some kind of context around to the delegate that runs
some business logic is inevitable, be it for it to use a channel or send a
reply back.
have you considered EasyNetQ? Its patterns may give you some leverage.
MK
Once more, sorry for the missing informations...i'm trying to isolate
(wrap) all the RabbitMq concept inside this library that i'm developing.
Passing back a channel would invalidate that effort. But i think that it's
the best solution that i can have right now (the project deadline is coming
closer) =)
But i still missing some pieces...if the external object (thread 1) works
on channel x (ack, for example) while the library (thread 2) is working on
the same channel, doesn't it creates the same problem? So how sending back
the channel should solve my original problem?
In fact in my actual solution i have 1 channel where there are n possible
threads working on it.
But if i create one channel per thread (inside my Subscriber) and the
external object asks to my Subscriber to Ack a message, those are two
requests coming from two separated thread that are insisting on the same
channel...right?
My head is starting to melt...
Post by Michael Klishin
you can pass the channel along with the delivery information.
MK
Post by Marcello Santambrogio
The problem that i don't know how to resolve easily is that, if the
delivery tag is related to an instance of a channel, how can i manage the
ack/reject the messages?
Post by Marcello Santambrogio
I mean, if i have n threads, everyone with its channel, and the
(external) object interested to message x needs to signal that it must be
ack'd/rejected, how can i do it? The only thing that this object can have
is a delivery-tag and, at most, a consumer-tag.
--
You received this message because you are subscribed to the Google Groups
"rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an
<javascript:>.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Continue reading on narkive:
Loading...