Input/output strategies
Introduction
This article outlines various ways to implement blocking and non-blocking network communications. This is also known as synchronous and asynchronous communications.
Single process, single threaded
The simplest form of a networked service consists of a server that binds to an IP address and a port. Requests are served as they come in and requests have to wait for the previously queued ones to be processed first, this is also known as blocking input/output.
import socket
host = ''
port = 50000
backlog = 5
size = 1024
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host,port))
s.listen(backlog)
while 1:
client, address = s.accept()
data = client.recv(size)
if data:
client.send(data)
client.close()
Prefork based multi-processing, single-threaded
To overcome the performance issues of such code, various concurrency algorithms are employed. The simplest form of concurrency is fork(), which basically makes bunch of copies of the process to handle the flood. Technically this is still blocked input/output.
import os
import sys
import socket
acceptor = socket.socket()
acceptor.bind(('localhost', 4242))
acceptor.listen(10)
for i in range(3):
pid = os.fork()
if pid == 0:
childpid = os.getpid()
print "Child %s listening on localhost:4242" % childpid
try:
while 1:
conn, addr = acceptor.accept()
flo = conn.makefile()
flo.write('Child %s echo> ' % childpid)
flo.flush()
message = flo.readline()
flo.write(message)
conn.close()
print "Child %s echo'd: %r" % \
(childpid, message.strip())
except KeyboardInterrupt:
sys.exit()
try:
os.waitpid(-1, 0)
except KeyboardInterrupt:
print "\nbailing"
sys.exit()
Multithreading
Multithreading allows concurrency within single process.
from socket import *
import thread
BUFF = 1024
HOST = '127.0.0.1'# must be input parameter @TODO
PORT = 9999 # must be input parameter @TODO
def gen_response():
return 'this_is_the_return_from_the_server'
def handler(clientsock,addr):
while 1:
data = clientsock.recv(BUFF)
print 'data:' + repr(data)
if not data: break
clientsock.send(gen_response())
print 'sent:' + repr(gen_response())
clientsock.close()
if __name__=='__main__':
ADDR = (HOST, PORT)
serversock = socket(AF_INET, SOCK_STREAM)
serversock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
serversock.bind(ADDR)
serversock.listen(5)
while 1:
print 'waiting for connection...'
clientsock, addr = serversock.accept()
print '...connected from:', addr
thread.start_new_thread(handler, (clientsock, addr))
select, poll, epoll, kqueue
Modern kernels provide functions like select(), poll(), epoll() and kqueue() which all serve the same goal, to notify program about file descriptor events. For a web server there are four main events: when a connection is established, when there are bytes to read from the input stream, when bytes can be written to output stream or when the connection is closed. This is also an sample of non-blocking input-output also known as asynchronous communication.
import socket, select
EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(0)
serversocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
connections = {}; requests = {}; responses = {}
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = response
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
epoll.modify(fileno, select.EPOLLOUT)
print('-'*40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
Note that this strategy could cause terrible lag if request processing slows down. Consider eg. scaling a JPEG image for a response, the events of other file descriptors will be delayed until response generation is finished.