Minimal wrapper around the POSIX message queue. The POSIX message queue offers:
- Persistence. Push messages while nothing is listening.
- Simplicity. Nothing to set up. Built into Linux.
- IPC. Blazingly fast communication between processes on the same machine.
- Blocking and non-blocking. Listeners block until a message arrives on the queue. No polling. Sending messages doesn't block.
Note that this requires no third party message broker. The messages are handled by the kernel of your computer. Not all kernels have support for POSIX message queues, a notably example is Darwin (OS X). Darwin implements the older System V IPC API. See my SysV MQ wrapper.
In your Gemfile:
gem 'posix-mqueue'
- This will not work on OS X, but on Linux and probably BSD (not tested).
sendandreceiveblock.timedsendandtimedreceivedo not.- The default message size is
4096bytes. - Linux's default queue size is
10bytes.
Read on for details.
require 'posix/mqueue'
# On Linux the queue name must be prefixed with a slash. Note it is not a file
# created at `/whatever`. It's just the name of the queue.
# Set maximum default Linux options. See next section to push those limits.
# Default options are msgsize: 4096 and maxmsg: 10
m = POSIX::Mqueue.new("/whatever", msgsize: 8192, maxmsg: 10)
m.send "hello"
m.receive
# => "hello"
fork { POSIX::Mqueue.new("/whatever").send("world") }
# Blocks until the forked process pushes to the queue
m.receive
# => "world"
# Queue is now full by default Linux settings, see below on how to increase it.
10.times { m.send rand(100).to_s }
# #size returns the size of the queue
m.size
# => 10
# #send will block until something is popped off the now full queue.
# timesend takes timeout arguments (first one is seconds, second is
# nanoseconds). Pass 0 for for both to not block, this is default.
assert_raises POSIX::Mqueue::QueueFull do
m.timedsend "I will fail"
end
# Empty the queue again
10.times { m.receive }
# Like timedsend, timedreceive takes timeout arguments and will raise
# POSIX::Mqueue::Queueempty when it would otherwise block.
assert_raises POSIX::Mqueue::QueueEmpty do
m.timedreceive
end
# Deletes the queue and any messages remaining.
# None in this case. If not unlinked, the queue will persist till reboot.
m.unlinkMost important information from the manpages, with a little added information
about the behavior of posix-mqueue.
Linux has some default limits you can easily change.
/proc/sys/fs/mqueue/msg_max. Contains the maximum number of messages in a single queue. Defaults to 10. You should increase that number.#sendwill eventually block if the queue is full.#timedsendwill throwQueueFull./proc/sys/fs/mqueue/msgsize_max. Maximum size of a single message. Defaults to 8192 bytes.posix-mqueuedefaults to 4096 bytes. Overwrite this by passing{msgsize: 8192}as the second argument when initializing./proc/sys/fs/mqueue/queues_max. Maximum number of queues on the system. Defaults to 256.
The message queue is created as a virtual file system. That means you can mount it:
# sudo mkdir /dev/queue
# sudo mount -t mqueue none /dev/queueAdd a queue and a few tasks, count the characters (19):
$ irb
> require 'posix/mqueue'
=> true
> m = POSIX::Mqueue.new("/queue")
=> #<POSIX::Mqueue:0xb8c9fe88>
> m.send "narwhal"
=> true
> m.send "walrus"
=> true
> m.send "ponies"
=> trueInspect the mounted filesystem:
$ ls /dev/queue/
important mails queue
$ cat /dev/queue/queue
QSIZE:19 NOTIFY:0 SIGNO:0 NOTIFY_PID:0Here QSIZE is the bytes of data in the queue. The other flags are about
notifications which posix-mqueue does not support currently, read about them
in mq_overview(7).
