* (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
* alternatives
*
- * Copyright (c) 2013, 2014, Intel Corporation.
+ * Copyright (c) 2013, 2016, Intel Corporation.
*/
/*
* lustre/mdt/mdt_hsm_cdt_actions.c
#define DEBUG_SUBSYSTEM S_MDS
+#include <libcfs/libcfs.h>
+#include <libcfs/libcfs_hash.h>
#include <obd_support.h>
-#include <lustre_net.h>
#include <lustre_export.h>
#include <obd.h>
#include <lprocfs_status.h>
#include <lustre_log.h>
#include "mdt_internal.h"
+struct cdt_agent_record_loc {
+ struct hlist_node carl_hnode;
+ atomic_t carl_refcount;
+ u64 carl_cookie;
+ u32 carl_cat_idx;
+ u32 carl_rec_idx;
+};
+
+static inline void cdt_agent_record_loc_get(struct cdt_agent_record_loc *carl)
+{
+ LASSERT(atomic_read(&carl->carl_refcount) > 0);
+ atomic_inc(&carl->carl_refcount);
+}
+
+static inline void cdt_agent_record_loc_put(struct cdt_agent_record_loc *carl)
+{
+ LASSERT(atomic_read(&carl->carl_refcount) > 0);
+ if (atomic_dec_and_test(&carl->carl_refcount))
+ OBD_FREE_PTR(carl);
+}
+
+static unsigned int
+cdt_agent_record_hash(struct cfs_hash *hs, const void *key, unsigned int mask)
+{
+ return cfs_hash_djb2_hash(key, sizeof(u64), mask);
+}
+
+static void *cdt_agent_record_object(struct hlist_node *hnode)
+{
+ return hlist_entry(hnode, struct cdt_agent_record_loc, carl_hnode);
+}
+
+static void *cdt_agent_record_key(struct hlist_node *hnode)
+{
+ struct cdt_agent_record_loc *carl = cdt_agent_record_object(hnode);
+
+ return &carl->carl_cookie;
+}
+
+static int cdt_agent_record_keycmp(const void *key, struct hlist_node *hnode)
+{
+ const u64 *cookie2 = cdt_agent_record_key(hnode);
+
+ return *(const u64 *)key == *cookie2;
+}
+
+static void cdt_agent_record_get(struct cfs_hash *hs, struct hlist_node *hnode)
+{
+ struct cdt_agent_record_loc *carl = cdt_agent_record_object(hnode);
+
+ cdt_agent_record_loc_get(carl);
+}
+
+static void cdt_agent_record_put(struct cfs_hash *hs, struct hlist_node *hnode)
+{
+ struct cdt_agent_record_loc *carl = cdt_agent_record_object(hnode);
+
+ cdt_agent_record_loc_put(carl);
+}
+
+struct cfs_hash_ops cdt_agent_record_hash_ops = {
+ .hs_hash = cdt_agent_record_hash,
+ .hs_key = cdt_agent_record_key,
+ .hs_keycmp = cdt_agent_record_keycmp,
+ .hs_object = cdt_agent_record_object,
+ .hs_get = cdt_agent_record_get,
+ .hs_put_locked = cdt_agent_record_put,
+};
+
+void cdt_agent_record_hash_add(struct coordinator *cdt, u64 cookie, u32 cat_idx,
+ u32 rec_idx)
+{
+ struct cdt_agent_record_loc *carl0;
+ struct cdt_agent_record_loc *carl1;
+
+ OBD_ALLOC_PTR(carl1);
+ if (carl1 == NULL)
+ return;
+
+ INIT_HLIST_NODE(&carl1->carl_hnode);
+ atomic_set(&carl1->carl_refcount, 1);
+ carl1->carl_cookie = cookie;
+ carl1->carl_cat_idx = cat_idx;
+ carl1->carl_rec_idx = rec_idx;
+
+ carl0 = cfs_hash_findadd_unique(cdt->cdt_agent_record_hash,
+ &carl1->carl_cookie,
+ &carl1->carl_hnode);
+
+ LASSERT(carl0->carl_cookie == carl1->carl_cookie);
+ LASSERT(carl0->carl_cat_idx == carl1->carl_cat_idx);
+ LASSERT(carl0->carl_rec_idx == carl1->carl_rec_idx);
+
+ if (carl0 != carl1)
+ cdt_agent_record_loc_put(carl0);
+
+ cdt_agent_record_loc_put(carl1);
+}
+
+void cdt_agent_record_hash_lookup(struct coordinator *cdt, u64 cookie,
+ u32 *cat_idx, u32 *rec_idx)
+{
+ struct cdt_agent_record_loc *carl;
+
+ carl = cfs_hash_lookup(cdt->cdt_agent_record_hash, &cookie);
+ if (carl != NULL) {
+ LASSERT(carl->carl_cookie == cookie);
+ *cat_idx = carl->carl_cat_idx;
+ *rec_idx = carl->carl_rec_idx;
+ cdt_agent_record_loc_put(carl);
+ } else {
+ *cat_idx = 0;
+ *rec_idx = 0;
+ }
+}
+
+void cdt_agent_record_hash_del(struct coordinator *cdt, u64 cookie)
+{
+ cfs_hash_del_key(cdt->cdt_agent_record_hash, &cookie);
+}
+
void dump_llog_agent_req_rec(const char *prefix,
const struct llog_agent_req_rec *larr)
{
* \param mdt [IN] MDT device
* \param cb [IN] llog callback funtion
* \param data [IN] llog callback data
+ * \param rw [IN] cdt_llog_lock mode (READ or WRITE)
+ * \param start_cat_idx first catalog index to examine
+ * \param start_rec_idx first record index to examine
* \retval 0 success
* \retval -ve failure
*/
int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
- llog_cb_t cb, void *data)
+ llog_cb_t cb, void *data, u32 start_cat_idx,
+ u32 start_rec_idx, int rw)
{
struct obd_device *obd = mdt2obd_dev(mdt);
struct llog_ctxt *lctxt = NULL;
if (lctxt == NULL || lctxt->loc_handle == NULL)
RETURN(-ENOENT);
- mutex_lock(&cdt->cdt_llog_lock);
+ if (rw == READ)
+ down_read(&cdt->cdt_llog_lock);
+ else
+ down_write(&cdt->cdt_llog_lock);
- rc = llog_cat_process(env, lctxt->loc_handle, cb, data, 0, 0);
+ rc = llog_cat_process(env, lctxt->loc_handle, cb, data, start_cat_idx,
+ start_rec_idx);
if (rc < 0)
CERROR("%s: failed to process HSM_ACTIONS llog (rc=%d)\n",
mdt_obd_name(mdt), rc);
rc = 0;
llog_ctxt_put(lctxt);
- mutex_unlock(&cdt->cdt_llog_lock);
+
+ if (rw == READ)
+ up_read(&cdt->cdt_llog_lock);
+ else
+ up_write(&cdt->cdt_llog_lock);
+
RETURN(rc);
}
if (lctxt == NULL || lctxt->loc_handle == NULL)
GOTO(free, rc = -ENOENT);
- mutex_lock(&cdt->cdt_llog_lock);
+ down_write(&cdt->cdt_llog_lock);
/* in case of cancel request, the cookie is already set to the
* value of the request cookie to be cancelled
if (rc > 0)
rc = 0;
- mutex_unlock(&cdt->cdt_llog_lock);
+ up_write(&cdt->cdt_llog_lock);
llog_ctxt_put(lctxt);
EXIT;
enum agent_req_status status)
{
struct data_update_cb ducb;
- int rc;
+ u32 start_cat_idx = -1;
+ u32 start_rec_idx = -1;
+ u32 cat_idx;
+ u32 rec_idx;
+ int i;
+ int rc;
ENTRY;
+ /* Find the first location (start_cat_idx, start_rec_idx)
+ * among the records corresponding to cookies. */
+ for (i = 0; i < cookies_count; i++) {
+ /* If we cannot find a cached location for a cookie
+ * (perhaps because the MDT was restart then we must
+ * start from the beginning. In this case
+ * mdt_agent_record_hash_get() sets both of cat_idx and
+ * rec_idx to 0. */
+ cdt_agent_record_hash_lookup(&mdt->mdt_coordinator, cookies[i],
+ &cat_idx, &rec_idx);
+ if (cat_idx < start_cat_idx) {
+ start_cat_idx = cat_idx;
+ start_rec_idx = rec_idx;
+ } else if (cat_idx == start_cat_idx &&
+ rec_idx < start_rec_idx) {
+ start_rec_idx = rec_idx;
+ }
+ }
+
+ /* Fixup starting record index for llog_cat_process(). */
+ if (start_rec_idx != 0)
+ start_rec_idx -= 1;
+
ducb.mdt = mdt;
ducb.cookies = cookies;
ducb.cookies_count = cookies_count;
ducb.status = status;
ducb.change_time = cfs_time_current_sec();
- rc = cdt_llog_process(env, mdt, mdt_agent_record_update_cb, &ducb);
+ rc = cdt_llog_process(env, mdt, mdt_agent_record_update_cb, &ducb,
+ start_cat_idx, start_rec_idx, WRITE);
if (rc < 0)
CERROR("%s: cdt_llog_process() failed, rc=%d, cannot update "
"status to %s for %d cookies, done %d\n",
if (aai->aai_eof)
RETURN(0);
- mutex_lock(&cdt->cdt_llog_lock);
+ down_read(&cdt->cdt_llog_lock);
rc = llog_cat_process(&aai->aai_env, aai->aai_ctxt->loc_handle,
hsm_actions_show_cb, s,
aai->aai_cat_index, aai->aai_index);
- mutex_unlock(&cdt->cdt_llog_lock);
+ up_read(&cdt->cdt_llog_lock);
if (rc == 0) /* all llog parsed */
aai->aai_eof = true;
if (rc == LLOG_PROC_BREAK) /* buffer full */