Marcello Santambrogio
2014-12-02 11:46:36 UTC
Hi all,
i'm developing a simple library to develop some common pattern using
Rabbit. I'm having some problem with the Subscriber as my model is (most
probably) wrong, and sometimes i receive an exception (Pipelining of
requests forbidden).
From what i've read (
http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2010-December/010391.html)
this error is caused by a race condition occurring on channel resources.
This means that in my model i have multiple threads that are concurrently
using the channel...and that's true (i'll explain why later on).
First question:
What i need to understand is what are the kind of operations that could
cause this kind of exception.
On the discuss message it's stated that this is caused by AMQP RPC
operations like ExchangeDeclare.
What are the other AMQP RPC operations?
Second question:
I need to create a Subscriber class that takes messages from a queue,
notifying interested classes that there's a message available. In case of
error, the subscriber republish the message on a failure exchange with the
detail of the exception.
Please note that the Subscriber could need to defer the acknowledge untill
requested (not using the autoack) in order to create a transactional-style
operation (in my case i need to be sure that a message received must be
delivered to another queue (or a failure exchange) before acking it). This
means that after that class x receives the message from the subscriber (via
an event), it will later on ask to the subscriber to ack that message.
Do you have any hint or best practice to handle this kind of scenario? I'm
feeling that this model is somewhat very wrong, but i can't figure out
other ways to do it.
What i've done is that there's a Subscriber class that have a single IModel
(channel) and a single QueueingBasicConsumer.
When "started", this class creates a new Task that does an always running
loop like this:
while(isActive){
basicConsumer.Dequeue();
//Create a new thread that will take care of handling the message
_threadPoolInstance.QueueWorkItem(() => ExecuteMessageConsuming(ea));
}
In the ExecuteMessageConsumingMethod i deserialize the message, ack it only
if requested and notify (using an event) someone that there's a new message
ready to be taken.
In case of errors (for example, a deserialization error) the subscriber
will try to resend this message to the configured failure exchange. This is
a possible failure point, because i declare both the exchange and the
destination queue as i can't know in advance if those exists on my
virtualHost, and i'm using an AMQP RPC operation from another thread on the
same channel.
Sorry for the long (and maybe messy) post...hope that someone can clear my
mind a bit =)
i'm developing a simple library to develop some common pattern using
Rabbit. I'm having some problem with the Subscriber as my model is (most
probably) wrong, and sometimes i receive an exception (Pipelining of
requests forbidden).
From what i've read (
http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2010-December/010391.html)
this error is caused by a race condition occurring on channel resources.
This means that in my model i have multiple threads that are concurrently
using the channel...and that's true (i'll explain why later on).
First question:
What i need to understand is what are the kind of operations that could
cause this kind of exception.
On the discuss message it's stated that this is caused by AMQP RPC
operations like ExchangeDeclare.
What are the other AMQP RPC operations?
Second question:
I need to create a Subscriber class that takes messages from a queue,
notifying interested classes that there's a message available. In case of
error, the subscriber republish the message on a failure exchange with the
detail of the exception.
Please note that the Subscriber could need to defer the acknowledge untill
requested (not using the autoack) in order to create a transactional-style
operation (in my case i need to be sure that a message received must be
delivered to another queue (or a failure exchange) before acking it). This
means that after that class x receives the message from the subscriber (via
an event), it will later on ask to the subscriber to ack that message.
Do you have any hint or best practice to handle this kind of scenario? I'm
feeling that this model is somewhat very wrong, but i can't figure out
other ways to do it.
What i've done is that there's a Subscriber class that have a single IModel
(channel) and a single QueueingBasicConsumer.
When "started", this class creates a new Task that does an always running
loop like this:
while(isActive){
basicConsumer.Dequeue();
//Create a new thread that will take care of handling the message
_threadPoolInstance.QueueWorkItem(() => ExecuteMessageConsuming(ea));
}
In the ExecuteMessageConsumingMethod i deserialize the message, ack it only
if requested and notify (using an event) someone that there's a new message
ready to be taken.
In case of errors (for example, a deserialization error) the subscriber
will try to resend this message to the configured failure exchange. This is
a possible failure point, because i declare both the exchange and the
destination queue as i can't know in advance if those exists on my
virtualHost, and i'm using an AMQP RPC operation from another thread on the
same channel.
Sorry for the long (and maybe messy) post...hope that someone can clear my
mind a bit =)
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+***@googlegroups.com.
To post to this group, send an email to rabbitmq-***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.