Redis源码剖析从源码到对照表(redis源码对照表)
Redis源码剖析:从源码到对照表
Redis是一个开源的、高性能的、支持多种数据结构的NoSQL数据库,它是由Salvatore Sanfilippo开发的,C语言编写。Redis在工业界得到了广泛的应用,被誉为高性能的“键值存储器”或“键值数据库”。
由于Redis的应用广泛,源码的阅读也变得重要,本文将从源码的角度,剖析Redis数据库的内部原理,帮助读者更好地了解Redis的工作过程,并提供对照表以加深记忆。
Redis源码剖析
Redis数据库的源码主要由以下几部分组成:网络、多线程、内存分配、持久化、订阅和发布、客户端等。下面通过一些核心源码的分析,让读者了解Redis数据库内部结构。
网络
Redis是基于Socket通信的,Socket通信分为分叉处理和非分叉处理两种方式,其中非分叉处理的效率更好,且可靠性更高。
以下是Redis网络连接的核心代码:
/* Set the socket non-blocking. We have to do the same in Redis, but since
* socket operations are serialized on the event loop, it's better to * set the socket in non-blocking mode in the accept() call, to get
* better performance inside the event loop itself. */anetNonBlock(NULL, connfd);
/* Set TCP keep alive option to detect dead peers. */anetKeepAlive(NULL, connfd, server.tcpkeepalive);
/* Create a client that is by default not yet registered in an event loop. */c = createClient(connfd);
/* Check if the user is authenticated. */if (server.requirepass) {
c->flags |= CLIENT_PENDING_WRITE; c->auth_callback = connCreateAuthCallback(connfd, conn);
aeCreateFileEvent(server.el, connfd, AE_WRITABLE, sendAuthReply, c); return c;
}
在上面的代码中,anetNonBlock()函数设置连接的套接字为非阻塞模式,并且anetKeepAlive()函数设置TCP长连接和心跳机制。如果需要密码认证,Redis数据库会将连接添加到Redis事件服务器,等待认证消息到达。
多线程
Redis数据库中,多线程主要用于实现数据的并发读写操作,并提高数据库的并发性能。Redis使用了一种简单的多线程架构,并使用了读写锁来确保数据的安全性。
以下是Redis数据库的多线程锁保证代码:
/* Acquire the lock for the specified object. This function is used in
* blocking operations, and has a timeout. */int lockThreadedIO(const char *func, robj *key, long long timeout, aeEventLoop *el) {
connection *conn = el->ioconn;
/* Don't wt to a negative timeout. */ if (timeout
/* Create the lock key as a string object. */ robj *lockkey = createObject(OBJ_STRING,
sdscatprintf(sdsempty(), "threaded-io-lock:%s:%s", func, (char*)key->ptr));
/* Lock the lock. */ while (timeout >= 0) {
/* Try to create the lock key with a timeout. */ long long start = ustime();
int64_t res = setnxOne(conn,lockkey,shared.cone,NULL);
/* Check if we acquired the lock. */ if (res == 1) {
decrRefCount(lockkey); return 0;
}
/* Wt for a little bit, then try agn. */ if (timeout > THREAD_IDELTIME_CAP) {
uint64_t delay = (rand()%1000)*1000 + 500000; /* Max 1.5 sec */ delay = (delay * timeout) / THREAD_IDELTIME_CAP;
if (delay if (delay > timeout) delay = timeout;
usleep(delay); timeout -= (ustime()-start)/1000;
} else { usleep(THREAD_MIN_SLEEP);
timeout -= THREAD_MIN_SLEEP/1000; }
/* Check for SIGTERM or SIGINT signals. */ stopThreadedIOIfNeeded();
}
decrRefCount(lockkey); errno = ETIMEDOUT;
return -1;}
这段代码使用了读写锁,确保线程之间的数据安全。对于读取操作,线程可以使用共享读锁,允许多个线程同时访问相同的数据。但对于写操作,线程必须使用独占写锁,以确保线程之间不会干扰数据的修改。
内存分配
Redis数据库的内存管理策略采用了基于Unix机制的内存分配,包括mmap、sbrk以及malloc等。
以下是Redis内存分配策略代码:
/* Macro to revert back a couple of lines of code when a malloc() fls, to
* repeat the operation with a smaller request. */#define REDIS_MALLOC_FL_CLEAN(ptr,size) do { \
zfree(ptr); \ ptr = zmalloc(size); \
if (!ptr) oom("malloc()",size); \} while(0);
/* Allocate or realloc 'oldptr' so that it can contn at least * 'minsize' bytes of data. */
void *zrealloc(void *oldptr, size_t size) { void *ptr = realloc(oldptr,size);
if (!ptr) oom("realloc()",size); return ptr;
}
/* Allocate 'size' bytes of memory, and return a pointer to the allocated * memory. */
void *zmalloc(size_t size) { void *ptr = malloc(size+PREFIX_SIZE);
/* Even on flure to allocate we set the prefix in order to get predictable * behavior from zfree. */
if (!ptr) oom("malloc()",size); *((size_t*)ptr) = size;
return (char*)ptr+PREFIX_SIZE;}
在这段代码中,Redis的内存分配策略主要是通过宏定义完成,使用zmalloc()为Redis分配内存,并设置前缀以确保内存分配的可预测性。同时,Redis代码中还包括了内存溢出的处理代码,确保Redis在分配内存时不会因为内存溢出而崩溃。
持久化
Redis数据库支持数据持久化,可以将数据保存在磁盘中,以防止宕机和数据丢失。Redis的数据持久化主要分为rdb持久化和aof持久化两种方式。
以下是Redis数据库的rdb持久化代码:
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */
int rdbSave(char *filename) { atomicSet(server.rdb_child_pid,-1);
if ((server.bgrewritechildpid != -1 || server.aof_child_pid != -1 || server.pipe_conns != NULL) &&
server.rdb_child_pid == -1) {
errno = EAGN; return REDIS_ERR;
}
int fd; ssize_t nwritten;
FILE *fp;
/* Open the output file. */ if ((fp = fopen(filename,"w")) == NULL) {
redisLog(REDIS_WARNING,"Fled opening .rdb for saving: %s", strerror(errno)); return REDIS_ERR;
} fd = fileno(fp);
if (server.rdb_compression) { if (setCompressionAlgorithm(fd)
fclose(fp); return REDIS_ERR;
} }
/* BgSave */ beforeRDBSave();
if (rdbSaveRio(fd,&server.rdb_save Rio) == -1) { fclose(fp);
return REDIS_ERR; }
afterRDBSave();
fflush(fp); if (ferror(fp)) {
fclose(fp); return REDIS_ERR;
} nwritten = server.rdb_save Rio .processedBytes();
if (server.rdb_checksum) { unsigned char digest[10];
memset(digest,0,sizeof(digest));
/* Note that we are just obtning the digest of the first database. * It's not really a good way, but redis-check-rdb will spit out a
* warning to the user. */ rio cksum;
rioInitWithFlag(&cksum,RIO_CHECKSUM); rdbSaveRio(&cksum,NULL);
/* Append the checksum to the RDB file. */ memcpy(digest,cksum.updateChecksum(NULL,0