c - AMQP RabbitMQ consumers blocking eachother? -
I have coded a C (RabbitMQ-C) worker application which is published by a linear Python script (pica) consumption. I have the following strange behavior that I can not seem to solve: Any ideas what can be happening? I am sure that every consumer has its own channel (is it necessary?) But still the same behavior ... Consumers ( Worker) Here's the code for:
conn = amqp_new_connection (); Sock = (amqp_socket_t *) (uint64_t) amqp_tcp_socket_new (conn); Amqp_socket_open (Sock, "Localhost", 5672); Amqp_login (conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"); If (amqp_channel_open (conn, chan) == faucet) LOG_ERR ("[!! Amqp failed to open channel! \ N"); LOG_ERR ("Queue \ N to fail to declare [!]" If ((Q = amqp_queue_declare (cone, chan, amqp_cstring_bytes ("borders"), 0, 0, 0, 0, amqp_empty_table)) == ; LOG_INFO ("[x] line (message count =% d) \ n", q- & gt; message_number); Amqp_queue_bind (conn, chan, amqp_cstring_bytes ("ranges"), amqp_empty_bytes, amqp_empty_table); Amqp_basic_consume (cone, chan, amqp_cstring_bytes ("borders"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table); While (1) {amqp_maybe_release_buffers (conn); Amqp_consume_message (conn, & amp; e, NULL, 0); {Int n; Amqp_frame_t f; Unsigned char buff [8]; Unsigned char * pbuf = buf; Amqp_simple_wait_frame (conn, & amp; amp; f); // METHOD frame amqp_simple_wait_frame (conn, & amp; f); // header frame n = f.payload.properties.body_size; If (n! = Sizeof (range_buf) LOG_ERR ("[!] Invalid message size!"); While (n) {amqp_simple_wait_frame (conn, & amp; f); // Body Frame Mempie (PBF, FPLod.ad_framement.bites, FPLod.ID_framement.lann); N - = f.payload.body_fragment.len; Pbuf + = f.payload.body_fragment.len; } // something with the buff LOG_INFO ("Message received from [x] queue \ n"); } Amqp_destroy_envelope (& amp; A); Amqp_maybe_release_buffers (Conn); }
The problem here is most likely that all messages are pre-installed when your user starts - Receives. This is the default behavior by RabbitMQ, but if you can reduce the number of messages brought before by the consumer, then you better allow to spread the workload across multiple workers.
This means that one or more consumers will pick up all the messages, and none will leave the new consumers.
If you apply QoZ to your subscriber and limit pre-fetch to say 10 messages. Consumers will queue up to only 10 first messages, and new users can lift loose.
The function you are looking at to implement it is called, and in addition to this you get the consumer- prefetch
Comments
Post a Comment