/***************************************************************************** This file was modified by Oracle on 2015/02/05 Modifications copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. *****************************************************************************/ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include "default_engine.h" #include "memcached/util.h" #include "memcached/config_parser.h" #define CMD_SET_VBUCKET 0x83 #define CMD_GET_VBUCKET 0x84 #define CMD_DEL_VBUCKET 0x85 static const engine_info* default_get_info(ENGINE_HANDLE* handle); static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle, const char* config_str); static void default_destroy(ENGINE_HANDLE* handle, bool force); static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle, const void* cookie, item **item, const void* key, const size_t nkey, const size_t nbytes, const int flags, const rel_time_t exptime); static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle, const void* cookie, const void* key, const size_t nkey, uint64_t cas, uint16_t vbucket); static void default_item_release(ENGINE_HANDLE* handle, const void *cookie, item* item); static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle, const void* cookie, item** item, const void* key, const int nkey, uint16_t vbucket); static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle, const void *cookie, const char *stat_key, int nkey, ADD_STAT add_stat); static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie); static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle, const void *cookie, item* item, uint64_t *cas, ENGINE_STORE_OPERATION operation, uint16_t vbucket); static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle, const void* cookie, const void* key, const int nkey, const bool increment, const bool create, const uint64_t delta, const uint64_t initial, const rel_time_t exptime, uint64_t *cas, uint64_t *result, uint16_t vbucket); static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle, const void* cookie, time_t when); static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se, const char *cfg_str); static TAP_ITERATOR get_tap_iterator(ENGINE_HANDLE* handle, const void* cookie, const void* client, size_t nclient, uint32_t flags, const void* userdata, size_t nuserdata); static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle, const void* cookie, protocol_binary_request_header *request, ADD_RESPONSE response); union vbucket_info_adapter { char c; struct vbucket_info v; }; static void set_vbucket_state(struct default_engine *e, uint16_t vbid, enum vbucket_state to) { union vbucket_info_adapter vi; vi.c = e->vbucket_infos[vbid]; vi.v.state = to; e->vbucket_infos[vbid] = vi.c; } static enum vbucket_state get_vbucket_state(struct default_engine *e, uint16_t vbid) { union vbucket_info_adapter vi; vi.c = e->vbucket_infos[vbid]; return vi.v.state; } static bool handled_vbucket(struct default_engine *e, uint16_t vbid) { return e->config.ignore_vbucket || (get_vbucket_state(e, vbid) == VBUCKET_STATE_ACTIVE); } /* mechanism for handling bad vbucket requests */ #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; } static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie, const item* item, item_info *item_info); static const char const * vbucket_state_name(enum vbucket_state s) { static const char const * vbucket_states[] = { "dead", "active", "replica", "pending" }; return vbucket_states[s]; } ENGINE_ERROR_CODE create_instance(uint64_t interface, GET_SERVER_API get_server_api, ENGINE_HANDLE **handle) { SERVER_HANDLE_V1 *api = get_server_api(); if (interface != 1 || api == NULL) { return ENGINE_ENOTSUP; } struct default_engine *engine = malloc(sizeof(*engine)); if (engine == NULL) { return ENGINE_ENOMEM; } struct default_engine default_engine = { .engine = { .interface = { .interface = 1 }, .get_info = default_get_info, .initialize = default_initialize, .destroy = default_destroy, .allocate = default_item_allocate, .remove = default_item_delete, .release = default_item_release, .get = default_get, .get_stats = default_get_stats, .reset_stats = default_reset_stats, .store = default_store, .arithmetic = default_arithmetic, .flush = default_flush, .unknown_command = default_unknown_command, .item_set_cas = item_set_cas, .get_item_info = get_item_info, .get_tap_iterator = get_tap_iterator }, .server = *api, .get_server_api = get_server_api, .initialized = true, .assoc = { .hashpower = 16, }, .slabs = { .lock = PTHREAD_MUTEX_INITIALIZER }, .cache_lock = PTHREAD_MUTEX_INITIALIZER, .stats = { .lock = PTHREAD_MUTEX_INITIALIZER, }, .config = { .use_cas = true, .verbose = 0, .oldest_live = 0, .evict_to_free = true, .maxbytes = 64 * 1024 * 1024, .preallocate = false, .factor = 1.25, .chunk_size = 48, .item_size_max= 1024 * 1024, }, .scrubber = { .lock = PTHREAD_MUTEX_INITIALIZER, }, /* FIXME: compilation issue on solaris x86 .info.engine_info = { .description = "Default engine v0.1", .num_features = 1, .features = { [0].feature = ENGINE_FEATURE_LRU } } */ }; *engine = default_engine; *handle = (ENGINE_HANDLE*)&engine->engine; return ENGINE_SUCCESS; } static inline struct default_engine* get_handle(ENGINE_HANDLE* handle) { return (struct default_engine*)handle; } static inline hash_item* get_real_item(item* item) { return (hash_item*)item; } static const engine_info* default_get_info(ENGINE_HANDLE* handle) { return &get_handle(handle)->info.engine_info; } static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle, const char* config_str) { struct default_engine* se = get_handle(handle); ENGINE_ERROR_CODE ret = initalize_configuration(se, config_str); if (ret != ENGINE_SUCCESS) { return ret; } /* fixup feature_info */ if (se->config.use_cas) { se->info.engine_info.features[se->info.engine_info.num_features++].feature = ENGINE_FEATURE_CAS; } ret = assoc_init(se); if (ret != ENGINE_SUCCESS) { return ret; } ret = slabs_init(se, se->config.maxbytes, se->config.factor, se->config.preallocate); if (ret != ENGINE_SUCCESS) { return ret; } return ENGINE_SUCCESS; } static void default_destroy(ENGINE_HANDLE* handle, bool force) { struct default_engine* se = get_handle(handle); if (se->initialized) { pthread_mutex_destroy(&se->cache_lock); pthread_mutex_destroy(&se->stats.lock); pthread_mutex_destroy(&se->slabs.lock); se->initialized = false; free(se); } } static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle, const void* cookie, item **item, const void* key, const size_t nkey, const size_t nbytes, const int flags, const rel_time_t exptime) { struct default_engine* engine = get_handle(handle); size_t ntotal = sizeof(hash_item) + nkey + nbytes; if (engine->config.use_cas) { ntotal += sizeof(uint64_t); } unsigned int id = slabs_clsid(engine, ntotal); if (id == 0) { return ENGINE_E2BIG; } hash_item *it; it = item_alloc(engine, key, nkey, flags, engine->server.core->realtime(exptime), nbytes, cookie); if (it != NULL) { *item = it; return ENGINE_SUCCESS; } else { return ENGINE_ENOMEM; } } static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle, const void* cookie, const void* key, const size_t nkey, uint64_t cas, uint16_t vbucket) { struct default_engine* engine = get_handle(handle); VBUCKET_GUARD(engine, vbucket); hash_item *it = item_get(engine, key, nkey); if (it == NULL) { return ENGINE_KEY_ENOENT; } if (cas == 0 || cas == item_get_cas(it)) { item_unlink(engine, it); item_release(engine, it); } else { return ENGINE_KEY_EEXISTS; } return ENGINE_SUCCESS; } static void default_item_release(ENGINE_HANDLE* handle, const void *cookie, item* item) { item_release(get_handle(handle), get_real_item(item)); } static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle, const void* cookie, item** item, const void* key, const int nkey, uint16_t vbucket) { struct default_engine *engine = get_handle(handle); VBUCKET_GUARD(engine, vbucket); *item = item_get(engine, key, nkey); if (*item != NULL) { return ENGINE_SUCCESS; } else { return ENGINE_KEY_ENOENT; } } static void stats_vbucket(struct default_engine *e, ADD_STAT add_stat, const void *cookie) { for (int i = 0; i < NUM_VBUCKETS; i++) { enum vbucket_state state = get_vbucket_state(e, i); if (state != VBUCKET_STATE_DEAD) { char buf[16]; snprintf(buf, sizeof(buf), "vb_%d", i); const char * state_name = vbucket_state_name(state); add_stat(buf, strlen(buf), state_name, strlen(state_name), cookie); } } } static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle, const void* cookie, const char* stat_key, int nkey, ADD_STAT add_stat) { struct default_engine* engine = get_handle(handle); ENGINE_ERROR_CODE ret = ENGINE_SUCCESS; if (stat_key == NULL) { char val[128]; int len; pthread_mutex_lock(&engine->stats.lock); len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.evictions); add_stat("evictions", 9, val, len, cookie); len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_items); add_stat("curr_items", 10, val, len, cookie); len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.total_items); add_stat("total_items", 11, val, len, cookie); len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_bytes); add_stat("bytes", 5, val, len, cookie); len = sprintf(val, "%"PRIu64, engine->stats.reclaimed); add_stat("reclaimed", 9, val, len, cookie); len = sprintf(val, "%"PRIu64, (uint64_t)engine->config.maxbytes); add_stat("engine_maxbytes", 15, val, len, cookie); pthread_mutex_unlock(&engine->stats.lock); } else if (strncmp(stat_key, "slabs", 5) == 0) { slabs_stats(engine, add_stat, cookie); } else if (strncmp(stat_key, "items", 5) == 0) { item_stats(engine, add_stat, cookie); } else if (strncmp(stat_key, "sizes", 5) == 0) { item_stats_sizes(engine, add_stat, cookie); } else if (strncmp(stat_key, "vbucket", 7) == 0) { stats_vbucket(engine, add_stat, cookie); } else if (strncmp(stat_key, "scrub", 5) == 0) { char val[128]; int len; pthread_mutex_lock(&engine->scrubber.lock); if (engine->scrubber.running) { add_stat("scrubber:status", 15, "running", 7, cookie); } else { add_stat("scrubber:status", 15, "stopped", 7, cookie); } if (engine->scrubber.started != 0) { if (engine->scrubber.stopped != 0) { time_t diff = engine->scrubber.started - engine->scrubber.stopped; len = sprintf(val, "%"PRIu64, (uint64_t)diff); add_stat("scrubber:last_run", 17, val, len, cookie); } len = sprintf(val, "%"PRIu64, engine->scrubber.visited); add_stat("scrubber:visited", 16, val, len, cookie); len = sprintf(val, "%"PRIu64, engine->scrubber.cleaned); add_stat("scrubber:cleaned", 16, val, len, cookie); } pthread_mutex_unlock(&engine->scrubber.lock); } else { ret = ENGINE_KEY_ENOENT; } return ret; } static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle, const void *cookie, item* item, uint64_t *cas, ENGINE_STORE_OPERATION operation, uint16_t vbucket) { struct default_engine *engine = get_handle(handle); VBUCKET_GUARD(engine, vbucket); return store_item(engine, get_real_item(item), cas, operation, cookie); } static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle, const void* cookie, const void* key, const int nkey, const bool increment, const bool create, const uint64_t delta, const uint64_t initial, const rel_time_t exptime, uint64_t *cas, uint64_t *result, uint16_t vbucket) { struct default_engine *engine = get_handle(handle); VBUCKET_GUARD(engine, vbucket); return arithmetic(engine, cookie, key, nkey, increment, create, delta, initial, exptime, cas, result); } static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle, const void* cookie, time_t when) { item_flush_expired(get_handle(handle), when); return ENGINE_SUCCESS; } static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie) { struct default_engine *engine = get_handle(handle); item_stats_reset(engine); pthread_mutex_lock(&engine->stats.lock); engine->stats.evictions = 0; engine->stats.reclaimed = 0; engine->stats.total_items = 0; pthread_mutex_unlock(&engine->stats.lock); } static tap_event_t tap_always_pause(ENGINE_HANDLE *e, const void *cookie, item **itm, void **es, uint16_t *nes, uint8_t *ttl, uint16_t *flags, uint32_t *seqno, uint16_t *vbucket) { return TAP_PAUSE; } static tap_event_t tap_always_disconnect(ENGINE_HANDLE *e, const void *cookie, item **itm, void **es, uint16_t *nes, uint8_t *ttl, uint16_t *flags, uint32_t *seqno, uint16_t *vbucket) { return TAP_DISCONNECT; } static TAP_ITERATOR get_tap_iterator(ENGINE_HANDLE* handle, const void* cookie, const void* client, size_t nclient, uint32_t flags, const void* userdata, size_t nuserdata) { TAP_ITERATOR rv = tap_always_pause; if ((flags & TAP_CONNECT_FLAG_DUMP) || (flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS)) { rv = tap_always_disconnect; } return rv; } static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se, const char *cfg_str) { ENGINE_ERROR_CODE ret = ENGINE_SUCCESS; se->config.vb0 = true; if (cfg_str != NULL) { struct config_item items[] = { { .key = "use_cas", .datatype = DT_BOOL, .value.dt_bool = &se->config.use_cas }, { .key = "verbose", .datatype = DT_SIZE, .value.dt_size = &se->config.verbose }, { .key = "eviction", .datatype = DT_BOOL, .value.dt_bool = &se->config.evict_to_free }, { .key = "cache_size", .datatype = DT_SIZE, .value.dt_size = &se->config.maxbytes }, { .key = "preallocate", .datatype = DT_BOOL, .value.dt_bool = &se->config.preallocate }, { .key = "factor", .datatype = DT_FLOAT, .value.dt_float = &se->config.factor }, { .key = "chunk_size", .datatype = DT_SIZE, .value.dt_size = &se->config.chunk_size }, { .key = "item_size_max", .datatype = DT_SIZE, .value.dt_size = &se->config.item_size_max }, { .key = "ignore_vbucket", .datatype = DT_BOOL, .value.dt_bool = &se->config.ignore_vbucket }, { .key = "vb0", .datatype = DT_BOOL, .value.dt_bool = &se->config.vb0 }, { .key = "config_file", .datatype = DT_CONFIGFILE }, { .key = NULL} }; ret = se->server.core->parse_config(cfg_str, items, stderr); } if (se->config.vb0) { set_vbucket_state(se, 0, VBUCKET_STATE_ACTIVE); } return ENGINE_SUCCESS; } static protocol_binary_response_status set_vbucket(struct default_engine *e, protocol_binary_request_header *request, const char **msg) { protocol_binary_request_no_extras *req = (protocol_binary_request_no_extras*)request; assert(req); char keyz[32]; char valz[32]; // Read the key. int keylen = ntohs(req->message.header.request.keylen); if (keylen >= (int)sizeof(keyz)) { *msg = "Key is too large."; return PROTOCOL_BINARY_RESPONSE_EINVAL; } memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen); keyz[keylen] = 0x00; // Read the value. size_t bodylen = ntohl(req->message.header.request.bodylen) - ntohs(req->message.header.request.keylen); if (bodylen >= sizeof(valz)) { *msg = "Value is too large."; return PROTOCOL_BINARY_RESPONSE_EINVAL; } memcpy(valz, (char*)request + sizeof(req->message.header) + keylen, bodylen); valz[bodylen] = 0x00; protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS; *msg = "Configured"; enum vbucket_state state; if (strcmp(valz, "active") == 0) { state = VBUCKET_STATE_ACTIVE; } else if(strcmp(valz, "replica") == 0) { state = VBUCKET_STATE_REPLICA; } else if(strcmp(valz, "pending") == 0) { state = VBUCKET_STATE_PENDING; } else if(strcmp(valz, "dead") == 0) { state = VBUCKET_STATE_DEAD; } else { *msg = "Invalid state."; return PROTOCOL_BINARY_RESPONSE_EINVAL; } uint32_t vbucket = 0; if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) { *msg = "Value out of range."; rv = PROTOCOL_BINARY_RESPONSE_EINVAL; } else { set_vbucket_state(e, (uint16_t)vbucket, state); } return rv; } static protocol_binary_response_status get_vbucket(struct default_engine *e, protocol_binary_request_header *request, const char **msg) { protocol_binary_request_no_extras *req = (protocol_binary_request_no_extras*)request; assert(req); char keyz[8]; // stringy 2^16 int // Read the key. int keylen = ntohs(req->message.header.request.keylen); if (keylen >= (int)sizeof(keyz)) { *msg = "Key is too large."; return PROTOCOL_BINARY_RESPONSE_EINVAL; } memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen); keyz[keylen] = 0x00; protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS; uint32_t vbucket = 0; if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) { *msg = "Value out of range."; rv = PROTOCOL_BINARY_RESPONSE_EINVAL; } else { *msg = vbucket_state_name(get_vbucket_state(e, (uint16_t)vbucket)); } return rv; } static protocol_binary_response_status rm_vbucket(struct default_engine *e, protocol_binary_request_header *request, const char **msg) { protocol_binary_request_no_extras *req = (protocol_binary_request_no_extras*)request; assert(req); char keyz[8]; // stringy 2^16 int // Read the key. int keylen = ntohs(req->message.header.request.keylen); if (keylen >= (int)sizeof(keyz)) { *msg = "Key is too large."; return PROTOCOL_BINARY_RESPONSE_EINVAL; } memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen); keyz[keylen] = 0x00; protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS; uint32_t vbucket = 0; if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) { *msg = "Value out of range."; rv = PROTOCOL_BINARY_RESPONSE_EINVAL; } else { set_vbucket_state(e, (uint16_t)vbucket, VBUCKET_STATE_DEAD); } assert(msg); return rv; } static protocol_binary_response_status scrub_cmd(struct default_engine *e, protocol_binary_request_header *request, const char **msg) { return item_start_scrub(e) ? PROTOCOL_BINARY_RESPONSE_SUCCESS : PROTOCOL_BINARY_RESPONSE_EBUSY; } static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle, const void* cookie, protocol_binary_request_header *request, ADD_RESPONSE response) { struct default_engine* e = get_handle(handle); bool handled = true; const char *msg = NULL; protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; switch(request->request.opcode) { case PROTOCOL_BINARY_CMD_SCRUB: res = scrub_cmd(e, request, &msg); break; case CMD_DEL_VBUCKET: res = rm_vbucket(e, request, &msg); break; case CMD_SET_VBUCKET: res = set_vbucket(e, request, &msg); break; case CMD_GET_VBUCKET: res = get_vbucket(e, request, &msg); break; default: handled = false; break; } bool sent = false; if (handled) { size_t msg_size = msg ? strlen(msg) : 0; sent = response(NULL, 0, NULL, 0, msg, (uint16_t)msg_size, PROTOCOL_BINARY_RAW_BYTES, (uint16_t)res, 0, cookie); } else { sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie); } if (sent) { return ENGINE_SUCCESS; } else { return ENGINE_FAILED; } } uint64_t item_get_cas(const hash_item* item) { if (item->iflag & ITEM_WITH_CAS) { return *(uint64_t*)(item + 1); } return 0; } void item_set_cas(ENGINE_HANDLE *handle, const void *cookie, item* item, uint64_t val) { hash_item* it = get_real_item(item); if (it->iflag & ITEM_WITH_CAS) { *(uint64_t*)(it + 1) = val; } } const void* item_get_key(const hash_item* item) { char *ret = (void*)(item + 1); if (item->iflag & ITEM_WITH_CAS) { ret += sizeof(uint64_t); } return ret; } char* item_get_data(const hash_item* item) { return ((char*)item_get_key(item)) + item->nkey; } uint8_t item_get_clsid(const hash_item* item) { return 0; } static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie, const item* item, item_info *item_info) { hash_item* it = (hash_item*)item; if (item_info->nvalue < 1) { return false; } item_info->cas = item_get_cas(it); item_info->exptime = it->exptime; item_info->nbytes = it->nbytes; item_info->flags = it->flags; item_info->clsid = it->slabs_clsid; item_info->nkey = it->nkey; item_info->nvalue = 1; item_info->key = item_get_key(it); item_info->value[0].iov_base = item_get_data(it); item_info->value[0].iov_len = it->nbytes; return true; }