Posts Tagged ‘Scalability’

And we’re back

So, I have finally fixed this stupid site. I spent years devising plans for rewriting it in lisp, or something, then one day read this essay about perfectionism and decided I was being completely ridiculous. Yes, that is my dog in the header, thank you for asking.

Some would have balked at the prospect of hand editing years of predominantly embarrassing posts, but I soldiered on through good times and bad. I am still messing with the layout and I haven’t even looked at the damn thing in Internet Explorer yet. (If you have IE—I don’t at home—let me know if something is really messed up.)

Evidently, once upon a time I had a career writing a lot of code for various Microsoft platforms. I feel like that part of my life needs to remain there for posterity, even though I am very happily participating in the open source, OMG-scale web world these days.

If I leave those posts there, perhaps someday someone can explain them to me. Very few of them make any sense to me now. I do remember fielding the e-mail from the ignoramus I-banker that prompted me to write Significant Digits for the Inummerate. With a little luck, his life is now ruined forever.

I thought I would miss debugging obscure threading issues, and rooting through core dumps, and staring at disassembly, but I was wrong. I have come to appreciate the sublime beauty of fork, the challenge of writing code for epic scale, having the damn source code, and solving problems that matter to people that are not evil mutants. I haven’t hand-edited XML in two years. Life is beautiful. Hopefully with the whole “blog” issue out of the way I will be able to think of something interesting to talk about.

Native Posix Python Condition Implementation

So I wrote this replacement version of Condition using the native posix support. Event and Semaphore are both written in terms of Condition, so you can use this as a fast route to getting native versions of those synchronization primitives. (Note, though, that there is a native posix semaphore, so implementing it in terms of a condition variable is not really necessary.)

I have trained myself to ask, "why hasn't anybody done this before?" when writing this sort of thing. And as always there's a really good reason: for a percentage of applications that is probably very close to one hundred percent, the difference in performance between this and the pure python version (which is implemented using polling) is not going to amount to a hill of beans. That was indeed the case for the application where I was trying to put this to use, and I reverted to the much simpler python Condition.

But there was no way to know for sure that that was the case without trying this, so if you find yourself in a similar situation, here is the code.

#include "Python.h"
#include "pthread.h"
#include "structmember.h"

typedef struct {
  PyObject_HEAD
  int set;
  pthread_cond_t cond;
  pthread_mutex_t lock;
} ConditionObject;

static int cond_init( ConditionObject* self, PyObject* args,
              PyObject* kwargs );
static void cond_free( ConditionObject* self );
static PyObject* cond_acquire( ConditionObject* self );
static PyObject* cond_release( ConditionObject* self );
static PyObject* cond_wait( ConditionObject* self, PyObject* args );
static PyObject* cond_notify( ConditionObject* self );
static PyObject* cond_notifyAll( ConditionObject* self );

static PyMemberDef cond_members[] = {
  {NULL}
};

static PyMethodDef cond_methods[] = {
  { "acquire", (PyCFunction)cond_acquire, METH_NOARGS, "" },
  { "release", (PyCFunction)cond_release, METH_NOARGS, "" },
  { "wait", (PyCFunction)cond_wait, METH_VARARGS, "" },
  { "notify", (PyCFunction)cond_notify, METH_NOARGS, "" },
  { "notifyAll", (PyCFunction)cond_notifyAll, METH_NOARGS, "" },
  { NULL }
};

static PyTypeObject ConditionType  = {
  PyObject_HEAD_INIT(NULL)
  0,                         /*ob_size*/
  "_pthread_cond.Condition", /*tp_name*/
  sizeof(ConditionObject),   /*tp_basicsize*/
  0,                         /*tp_itemsize*/
  (destructor)cond_free,     /*tp_dealloc*/
  0,                         /*tp_print*/
  0,                         /*tp_getattr*/
  0,                         /*tp_setattr*/
  0,                         /*tp_compare*/
  0,                         /*tp_repr*/
  0,                         /*tp_as_number*/
  0,                         /*tp_as_sequence*/
  0,                         /*tp_as_mapping*/
  0,                         /*tp_hash */
  0,                         /*tp_call*/
  0,                         /*tp_str*/
  0,                         /*tp_getattro*/
  0,                         /*tp_setattro*/
  0,                         /*tp_as_buffer*/
  Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,        /*tp_flags*/
  "",                        /* tp_doc */
  0,                       /* tp_traverse */
  0,                       /* tp_clear */
  0,                       /* tp_richcompare */
  0,                       /* tp_weaklistoffset */
  0,                       /* tp_iter */
  0,                       /* tp_iternext */
  cond_methods,              /* tp_methods */
  cond_members,              /* tp_members */
  0,                         /* tp_getset */
  0,                         /* tp_base */
  0,                         /* tp_dict */
  0,                         /* tp_descr_get */
  0,                         /* tp_descr_set */
  0,                         /* tp_dictoffset */
  (initproc)cond_init,       /* tp_init */
  0,                         /* tp_alloc */
  0,                         /* tp_new */
};

static int cond_init( ConditionObject* self, PyObject* args,
              PyObject* kwargs ) {
  self->set = 0;
  int err = pthread_mutex_init( &self->lock, NULL );
  if( err != 0 ) {
    PyErr_SetFromErrno( PyExc_OSError );
    return -1;
  }
  err = pthread_cond_init( &self->cond, NULL );
  if( err != 0 ) {
    PyErr_SetFromErrno( PyExc_OSError );
    return -1;
  }
  return 0;
}

static void cond_free( ConditionObject* self ) {
  pthread_mutex_destroy( &self->lock );
  pthread_cond_destroy( &self->cond );
}

static PyObject* cond_acquire( ConditionObject* self ) {
  int err = 0;
  Py_BEGIN_ALLOW_THREADS;
  err = pthread_mutex_lock( &self->lock );
  Py_END_ALLOW_THREADS;
  if( err != 0 ) {
    PyErr_SetFromErrno( PyExc_OSError );
    return NULL;
  }
  Py_RETURN_NONE;
}

static PyObject* cond_release( ConditionObject* self ) {
  int err = pthread_mutex_unlock( &self->lock );
  if( err != 0 ) {
    PyErr_SetFromErrno( PyExc_OSError );
    return NULL;
  }
  Py_RETURN_NONE;
}

static PyObject* cond_wait( ConditionObject* self, PyObject* args ) {
  // for now timed waits are not supported (it's ignored)
  int err = 0;
  while( !self->set && err != EINVAL ) {
    Py_BEGIN_ALLOW_THREADS;
    err = pthread_cond_wait( &self->cond, &self->lock );
    Py_END_ALLOW_THREADS;
    if( PyErr_CheckSignals() ) {
      return NULL;
    }
  }
  if( err != 0 ) {
    PyErr_SetFromErrno( PyExc_OSError );
    return NULL;
  }
  Py_RETURN_NONE;
}

static PyObject* cond_notify( ConditionObject* self ) {
  self->set = 1;
  int err = 0;
  Py_BEGIN_ALLOW_THREADS;
  err = pthread_cond_signal( &self->cond );
  Py_END_ALLOW_THREADS;
  self->set = 0;
  if( err != 0 ) {
    PyErr_SetFromErrno( PyExc_OSError );
    return NULL;
  }
  Py_RETURN_NONE;
}

static PyObject* cond_notifyAll( ConditionObject* self ) {
  self->set = 1;
  int err = 0;
  Py_BEGIN_ALLOW_THREADS;
  err = pthread_cond_broadcast( &self->cond );
  Py_END_ALLOW_THREADS;
  self->set = 0;
  if( err != 0 ) {
    PyErr_SetFromErrno( PyExc_OSError );
    return NULL;
  }
  Py_RETURN_NONE;
}

static PyMethodDef module_methods[] = {
  {NULL}
};

PyMODINIT_FUNC
init_pthread_cond(void)
{
  ConditionType.tp_new = PyType_GenericNew;
  if( PyType_Ready( &ConditionType ) < 0 ) {
    return;
  }
  PyObject* mod = Py_InitModule( "_pthread_cond", module_methods );
  if( mod == NULL ) {
    return;
  }
  Py_INCREF( &ConditionType );
  PyModule_AddObject( mod, "Condition", (PyObject*)&ConditionType );
}

First Impressions of CCR

I got a chance to mess around with the Concurrency and Coordination Runtime (CCR) bits recently. Before I get into that, first check out this real-life code I wrote this week.

public static class NotificationQueue
{
    private static Queue<Notification> _queue;
    private static Semaphore _work;

    static NotificationQueue()
    {
        _queue = new Queue<Notification>();
        _work = new Semaphore(0, int.MaxValue);
    }

    public static void Enqueue(Notification n)
    {
        lock (_queue)
        {
            _queue.Enqueue(n);
        }
        _work.Release();
    }

    public static Notification Dequeue()
    {
        _work.WaitOne();
        lock (_queue)
        {
            return _queue.Dequeue();
        }
    }
}

The idea here is to post notifications to the queue from many threads, and have a single notification thread sending the messages out from something like this:

private static void NotifyThreadProc()
{
    while(!Abort())
        NotificationQueue.Dequeue().Send();
}

The documentation for CCR is pretty sparse at this point, limited to just this paper [pdf], the Channel9 video, and this Wiki. Some of the particulars seem to have changed significantly, but I was able to figure out how to replicate my explicitly-threaded notification queue:

public class NotificationService : CcrServiceBase
{
    private Port<Notification> _port;

    public NotificationService()
        : base(new DispatcherQueue("foo"))
    {
        _port = new Port<Notification>();
        Activate(Arbiter.Receive(true, _port,
        delegate(Notification n)
        {
            n.Send();
        }));
    }

    public void Post(Notification n)
    {
        _port.Post(n);
    }
}

That's pretty awesome: notice that no explicit locks or waits are necessary, and I don't need to write the NotificationThreadProc or the code required to start it. I just need to make a NotificationService and start posting to it.

I can't say it's the most immediately comprehensible API I've ever seen, but hopefully that will change with more documentation and some polish. It's also possible that I am just warped from years of using the lower-level concepts. Overall this is awfully impressive for a library.

Nonblocking Pool Class

This is not an original idea but I thought I would post/explain it anyway. This is a generalized version of a pattern I have been using for a while. I'm not sure where I first picked it up but I've seen it used in several places.

The purpose of this class is to pool instances of a particular type in a server application. The assumptions I am making about the problem are:

  • It is both possible and worthwhile to reuse instances of a certain type. Types that may fit this criteria are large arrays of primitive types, types that hold unmanaged or scarce resources such as connections, et al. Not all types fit this criteria, obviously.
  • It is more undesirable to have a thread enter a waiting state (fail to acquire a lock, in other words) than it is to create a new instance of the type being reused. That would be the case if the instances are somewhat cheap but the average request or call time to your server is relatively long.

The nice thing about this pool class is that it handles the second case gracefully. It will reuse objects as much as possible, but it won't block a thread in the case that the attempt fails. If it didn't, you might end up introducing massive contention in your attempt to increase throughput with a different, locking pool.

The class provides very lightweight synchronization using atomic operations - there's no use of critical sections (the lock keyword).

  /// <summary>
  /// Provides and reuses objects of type <typeparamref name="T"/>.
  /// </summary>
  /// <typeparam name="T">
  /// The type that is pooled. Must provide a default constructor.
  /// </typeparam>
  public class NonBlockingPool<T>
     where T : new()
  {
     // Contains the pooled items.
     private Stack<T> _stack;

     // The maximum size of _stack.
     private int _max;

     // This reference is used to ensure that only one thread
     // calls methods on _stack at a time.
     private object _lock = new object();

     /// <summary>
     /// Gets or sets the maximum size of the pool.
     /// </summary>
     public int MaximumSize
     {
        get { return _max; }
        set { _max = value; }
     }

     /// <summary>
     /// Gets a pooled instance of type <typeparamref name="T"/>,
     /// or yields a new instance.
     /// </summary>
     public T Get()
     {
        // If two threads enter this method at the same time,
        // only one will acquire _lock (the other will be given
        // null). The caller that fails to acquire _lock will
        // be returned a new instance of T.
        T ret = default(T);
        object obj = Interlocked.Exchange(ref _lock, null);
        try
        {
           if (obj != null && _stack.Count > 0)
           {
              ret = _stack.Pop();
           }
           else
           {
              ret = new T();
           }
        }
        finally
        {
           if (obj != null)
           {
              _lock = obj;
           }
        }
        return ret;
     }

     /// <summary>
     /// Reuses an instance of type <paramref name="T"/> in a
     /// subsequent request or call whenever possible.
     /// </summary>
     public void Reuse(T t)
     {
        // If two threads enter this method at the same time,
        // only one will acquire _lock (the other will be given
        // a null reference). The instance of T provided by
        // the losing thread will just be collected and not
        // reused.
        object obj = Interlocked.Exchange(ref _lock, null);
        try
        {
           if (obj != null && _stack.Count < _max)
           {
              _stack.Push(t);
           }
        }
        finally
        {
           if (obj != null)
           {
              _lock = obj;
           }
        }
     }

     /// <summary>
     /// Constructor.
     /// </summary>
     /// <param name="max">
     /// The maximum number of instances of
     /// <typeparamref name="T"/> to hold in the pool.
     /// </param>
     public NonBlockingPool(int max)
     {
        if (max < 0)
        {
           throw new ArgumentOutOfRangeException("max");
        }
        _stack = new Stack<T>(max);
        _max = max;
     }
  }

Here's a (contrived) minimal example of a consumer of such a pool. This server class makes a context object available to each thread for the duration of each request. This object is stored in a slot unique to each thread (specified with the ThreadStaticAttribute) while a ProcessRequest function is called. The instance is returned to the pool in a finally block after that call is finished.

  public class SampleServer
  {
     [ThreadStatic]
     private static ServerContext _context;

     private NonBlockingPool<ServerContext> _pool;

     public static ServerContext Context
     {
        get { return _context; }
     }

     internal void ProcessRequest(IServerApp app)
     {
        try
        {
           _context = _pool.Get();
           app.ProcessRequest();
        }
        finally
        {
           if (_context != null)
           {
              _context.Reset();
              _pool.Reuse(_context);

              // We want the context to be collected if it isn't
              // actually reused by the pool.
              _context = null;
           }
        }
     }
  }

A more concrete example might be an IHttpModule or a remoting server channel sink. As I said once already, it's important to consider 1) the type of resource you are pooling and 2) the amount of load your application is expecting before committing yourself to a pattern such as this one.

Moore’s Law and the Free Lunch

This article was brought to my attention from a few sources. The general theme here is, “concurrency is going to be really important as processors begin to hit physical limits, and these kinds of programs are harder to write.” I thought I would give my spin on it. Here is another one-sentence summary of that article, childishly represented using Windows Paint.

Your friend Jack Albertson

At the admitted risk of sounding too much like a doomsday prophet, my prediction can be summed up as follows: despite attempts at tool and language support (for instance, ), this is going to be painful for a large percentage of developers. Software cycles will probably take a big turn for the worse, so you might be better off working on the quantum computers in your garages now.

My reasoning is heavily influenced by the alleged “object revolution.” The fact is, you can’t claim to be doing object-oriented development just by virtue of using a language that has object-oriented features. And you can’t reap the benefits of doing object-oriented development in that case, either. In this day and age, I still see a ton (scores… hundreds.. maybe thousands) of methods that are 200 lines long and take twelve arguments. Now you can put a “virtual” in front of a method like that, but there’s obviously still a problem.

I like this quote from Object Thinking:

Both software engineering and object orientation have achieved a strange status - everyone claims to be doing them without really doing so.

The thesis of that book is that OOP and traditional programming take drastically different mindsets. Changes in mindsets can be really difficult to accomplish. Tools help you but they don’t automatically make you good at the task at hand. We’ve got great OOP tools now, but I think a lot of people still work on teams where deadlines are missed, integrating code written by different people is hard, and any number of shortcuts leads to a mess of spaghetti. We get away with it to an extent, mostly because it is accepted that software projects are late and contain bugs. We work way too hard in the process, though.

And as accurate as I think that is for the “object revolution”, it is only more accurate for the upcoming “concurrency revolution.” Writing a multithreaded app is a lot different than writing a single threaded app.

It could be good news for the highly motivated / educated, but as in any field that is the minority. I’m looking forward to the challenge, but purely for selfish reasons.