Archive for November, 2006

Synchronized Threads (Part 3)

The first two parts in this series dealt with synchronously starting and stopping threads. Now we’ll bring in what we know about QMetaObject::invokeMethod and dynamic return types to perform synchronous function calls across threads.

Synchronous function calls are not for everyone. One thread (the caller) waits while another thread performs and completes a function call. If you are using threads for performance reasons, making use of synchronous calls between your threads will probably defeat the point of having threads at all.

So who are they for, then? Synchronized calls can come in handy when you’re using threads for reasons other than performance. There are situations where you simply must have a thread to perform an operation, because the operating system or some library requires it. Other times, maybe you want to perform a task asynchronously but only a “blocking” API is offered. In these cases, a thread is your ticket, and it has nothing to do with performance. Additionally, if you have infrequent communication between threads, then a synchronous call might not have any performance impact anyway.

Let’s now upgrade SyncThreadAgent:

class SyncThreadAgent : public QObject
{
        Q_OBJECT
public:
        SyncThreadAgent(QObject *parent = 0) : QObject(parent)
        {
                QMetaObject::invokeMethod(this, "started",
                        Qt::QueuedConnection);
        }

signals:
        void started();
        void call_ret(bool success, const QVariant &ret);

public slots:
        void call_do(QObject *obj, const QByteArray &method,
                const QVariantList )
        {
                QVariant ret;
                bool ok = invokeMethodWithVariants(obj, method, args,
                        &ret, Qt::DirectConnection);
                emit call_ret(ok, ret);
        }
};

The call_do() slot calls a method for us. The call_ret() signal is emitted when the call is completed. This is not asynchronous, as the method is called with DirectConnection. However, since SyncThreadAgent exists in the worker thread, which could be busy at any given moment, it is intended that call_do() will be invoked using QueuedConnection. This means that the process will end up being asynchronous, from the main thread’s point of view. The request to call will be queued until the worker thread returns to the event loop, the call will then be executed, and then the signal for completion will be emitted back.

We can then convert this process into a synchronous call, by simply putting the main thread to sleep until call_ret() is emitted from the worker thread.

Below is a reworked version of SyncThread using the new SyncThreadAgent features. It is directly copied from QCA, so it is a bit more advanced due to the Private class. Notable changes from the previous article are highlighted.

class SyncThread : public QThread
{
        Q_OBJECT
public:
        SyncThread(QObject *parent = 0);
        ~SyncThread();

        void start();
        void stop();
        QVariant call(QObject *obj, const QByteArray &method,
                const QVariantList &args = QVariantList(), bool *ok = 0);

protected:
        virtual void atStart() = 0;
        virtual void atEnd() = 0;

        // reimplemented
        virtual void run();

private:
        class Private;
        friend class Private;
        Private *d;
};

// implementation

class SyncThread::Private : public QObject
{
        Q_OBJECT
public:
        SyncThread *q;
        QMutex m;
        QWaitCondition w;
        QEventLoop *loop;
        SyncThreadAgent *agent;
        bool last_success;
        QVariant last_ret;

        Private(SyncThread *_q) : QObject(_q), q(_q)
        {
                loop = 0;
                agent = 0;
        }

private slots:
        void agent_started();
        void agent_call_ret(bool success, const QVariant &ret);
};

SyncThread::SyncThread(QObject *parent)
:QThread(parent)
{
        d = new Private(this);
        qRegisterMetaType<QVariant>("QVariant");
        qRegisterMetaType<QVariantList>("QVariantList");
}

SyncThread::~SyncThread()
{
        stop();
        delete d;
}

void SyncThread::start()
{
        QMutexLocker locker(&d->m);
        Q_ASSERT(!d->loop);
        QThread::start();
        d->w.wait(&d->m);
}

void SyncThread::stop()
{
        QMutexLocker locker(&d->m);
        if(!d->loop)
                return;
        QMetaObject::invokeMethod(d->loop, "quit");
        d->w.wait(&d->m);
        wait();
}

QVariant SyncThread::call(QObject *obj, const QByteArray &method,
        const QVariantList &args, bool *ok)
{
        QMutexLocker locker(&d->m);
        bool ret;
        ret = QMetaObject::invokeMethod(d->agent, "call_do",
                Qt::QueuedConnection, Q_ARG(QObject*, obj),
                Q_ARG(QByteArray, QByteArray(method)),
                Q_ARG(QVariantList, args));
        Q_ASSERT(ret);
        d->w.wait(&d->m);
        if(ok)
                *ok = d->last_success;
        QVariant v = d->last_ret;
        d->last_ret = QVariant();
        return v;
}

void SyncThread::run()
{
        d->m.lock();
        d->loop = new QEventLoop;
        d->agent = new SyncThreadAgent;
        connect(d->agent, SIGNAL(started()), d, SLOT(agent_started()),
                Qt::DirectConnection);
        connect(d->agent, SIGNAL(call_ret(bool, const QVariant &)), d,
                SLOT(agent_call_ret(bool, const QVariant &)),
                Qt::DirectConnection);
        d->loop->exec ();
        d->m.lock();
        atEnd();
        delete d->agent;
        delete d->loop;
        d->agent = 0;
        d->loop = 0;
        d->w.wakeOne();
        d->m.unlock();
}

void SyncThread::Private::agent_started()
{
        q->atStart();
        w.wakeOne();
        m.unlock();
}

void SyncThread::Private::agent_call_ret(bool success, const QVariant &ret)
{
        QMutexLocker locker(&m);
        last_success = success;
        last_ret = ret;
        w.wakeOne();
}

With SyncThread, you can start, stop, and call a method in another thread while the main thread sleeps. The only requirement is that the methods be declared as slots.

Below is a contrived example, where we have an object in another thread that increments a counter over a some interval, using the Qt event loop, and provides a method to inspect the value.

First, the Counter object:

class Counter : public QObject
{
        Q_OBJECT
private:
        int x;
        QTimer timer;

public:
        Counter() : timer(this)
        {
                x = 0;
                connect(&timer, SIGNAL(timeout()), SLOT(t_timeout()));
        }

public slots:
        void start(int seconds)
        {
                timer.setInterval(seconds * 1000);
                timer.start();
        }

        int value() const
        {
                return x;
        }

private slots:
        void t_timeout()
        {
                ++x;
        }
};

Looks like a typical object, no surprises.

Now to wrap Counter with SyncThread. We went over how to do this in the first article, and it is very straightforward:

class CounterThread : public SyncThread
{
        Q_OBJECT
public:
        Counter *counter;

        CounterThread(QObject *parent) : SyncThread(parent)
        {
                counter = 0;
        }

        ~CounterThread()
        {
                // SyncThread will stop the thread on destruct, but since our
                //   atStop() function makes references to CounterThread's
                //   members, we need to shutdown here, before CounterThread
                //   destructs.
                stop();
        }

protected:
        virtual void atStart()
        {
                counter = new Counter;
        }

        virtual void atStop()
        {
                delete counter;
        }
};

We can then use it like this:

CounterThread *thread = new CounterThread;

// after this call, the thread is started and the Counter is ready
thread->start();

// let's start the counter with a 1 second interval
thread->call(thread->counter, "start", QVariantList() << 1);
...

// after some time passes, let's check on the value
int x = thread->call(thread->counter, "value").toInt();

// we're done with this thing
delete thread;

Do you see a mutex anywhere? I didn’t think so.

Comments

Bad Behavior has blocked 557 access attempts in the last 7 days.