Whamcloud - gitweb
LU-9338 hsm: cache agent record locations
[fs/lustre-release.git] / lustre / mdt / mdt_hsm_cdt_actions.c
index 94bc265..abb3e6a 100644 (file)
@@ -36,6 +36,8 @@
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <libcfs/libcfs.h>
+#include <libcfs/libcfs_hash.h>
 #include <obd_support.h>
 #include <lustre_export.h>
 #include <obd.h>
 #include <lustre_log.h>
 #include "mdt_internal.h"
 
+struct cdt_agent_record_loc {
+       struct hlist_node carl_hnode;
+       atomic_t carl_refcount;
+       u64 carl_cookie;
+       u32 carl_cat_idx;
+       u32 carl_rec_idx;
+};
+
+static inline void cdt_agent_record_loc_get(struct cdt_agent_record_loc *carl)
+{
+       LASSERT(atomic_read(&carl->carl_refcount) > 0);
+       atomic_inc(&carl->carl_refcount);
+}
+
+static inline void cdt_agent_record_loc_put(struct cdt_agent_record_loc *carl)
+{
+       LASSERT(atomic_read(&carl->carl_refcount) > 0);
+       if (atomic_dec_and_test(&carl->carl_refcount))
+               OBD_FREE_PTR(carl);
+}
+
+static unsigned int
+cdt_agent_record_hash(struct cfs_hash *hs, const void *key, unsigned int mask)
+{
+       return cfs_hash_djb2_hash(key, sizeof(u64), mask);
+}
+
+static void *cdt_agent_record_object(struct hlist_node *hnode)
+{
+       return hlist_entry(hnode, struct cdt_agent_record_loc, carl_hnode);
+}
+
+static void *cdt_agent_record_key(struct hlist_node *hnode)
+{
+       struct cdt_agent_record_loc *carl = cdt_agent_record_object(hnode);
+
+       return &carl->carl_cookie;
+}
+
+static int cdt_agent_record_keycmp(const void *key, struct hlist_node *hnode)
+{
+       const u64 *cookie2 = cdt_agent_record_key(hnode);
+
+       return *(const u64 *)key == *cookie2;
+}
+
+static void cdt_agent_record_get(struct cfs_hash *hs, struct hlist_node *hnode)
+{
+       struct cdt_agent_record_loc *carl = cdt_agent_record_object(hnode);
+
+       cdt_agent_record_loc_get(carl);
+}
+
+static void cdt_agent_record_put(struct cfs_hash *hs, struct hlist_node *hnode)
+{
+       struct cdt_agent_record_loc *carl = cdt_agent_record_object(hnode);
+
+       cdt_agent_record_loc_put(carl);
+}
+
+struct cfs_hash_ops cdt_agent_record_hash_ops = {
+       .hs_hash        = cdt_agent_record_hash,
+       .hs_key         = cdt_agent_record_key,
+       .hs_keycmp      = cdt_agent_record_keycmp,
+       .hs_object      = cdt_agent_record_object,
+       .hs_get         = cdt_agent_record_get,
+       .hs_put_locked  = cdt_agent_record_put,
+};
+
+void cdt_agent_record_hash_add(struct coordinator *cdt, u64 cookie, u32 cat_idx,
+                              u32 rec_idx)
+{
+       struct cdt_agent_record_loc *carl0;
+       struct cdt_agent_record_loc *carl1;
+
+       OBD_ALLOC_PTR(carl1);
+       if (carl1 == NULL)
+               return;
+
+       INIT_HLIST_NODE(&carl1->carl_hnode);
+       atomic_set(&carl1->carl_refcount, 1);
+       carl1->carl_cookie = cookie;
+       carl1->carl_cat_idx = cat_idx;
+       carl1->carl_rec_idx = rec_idx;
+
+       carl0 = cfs_hash_findadd_unique(cdt->cdt_agent_record_hash,
+                                       &carl1->carl_cookie,
+                                       &carl1->carl_hnode);
+
+       LASSERT(carl0->carl_cookie == carl1->carl_cookie);
+       LASSERT(carl0->carl_cat_idx == carl1->carl_cat_idx);
+       LASSERT(carl0->carl_rec_idx == carl1->carl_rec_idx);
+
+       if (carl0 != carl1)
+               cdt_agent_record_loc_put(carl0);
+
+       cdt_agent_record_loc_put(carl1);
+}
+
+void cdt_agent_record_hash_lookup(struct coordinator *cdt, u64 cookie,
+                                 u32 *cat_idx, u32 *rec_idx)
+{
+       struct cdt_agent_record_loc *carl;
+
+       carl = cfs_hash_lookup(cdt->cdt_agent_record_hash, &cookie);
+       if (carl != NULL) {
+               LASSERT(carl->carl_cookie == cookie);
+               *cat_idx = carl->carl_cat_idx;
+               *rec_idx = carl->carl_rec_idx;
+               cdt_agent_record_loc_put(carl);
+       } else {
+               *cat_idx = 0;
+               *rec_idx = 0;
+       }
+}
+
+void cdt_agent_record_hash_del(struct coordinator *cdt, u64 cookie)
+{
+       cfs_hash_del_key(cdt->cdt_agent_record_hash, &cookie);
+}
+
 void dump_llog_agent_req_rec(const char *prefix,
                             const struct llog_agent_req_rec *larr)
 {
@@ -81,11 +204,14 @@ void dump_llog_agent_req_rec(const char *prefix,
  * \param cb [IN] llog callback funtion
  * \param data [IN] llog callback  data
  * \param rw [IN] cdt_llog_lock mode (READ or WRITE)
+ * \param start_cat_idx first catalog index to examine
+ * \param start_rec_idx first record index to examine
  * \retval 0 success
  * \retval -ve failure
  */
 int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
-                    llog_cb_t cb, void *data, int rw)
+                    llog_cb_t cb, void *data, u32 start_cat_idx,
+                    u32 start_rec_idx, int rw)
 {
        struct obd_device       *obd = mdt2obd_dev(mdt);
        struct llog_ctxt        *lctxt = NULL;
@@ -102,7 +228,8 @@ int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
        else
                down_write(&cdt->cdt_llog_lock);
 
-       rc = llog_cat_process(env, lctxt->loc_handle, cb, data, 0, 0);
+       rc = llog_cat_process(env, lctxt->loc_handle, cb, data, start_cat_idx,
+                             start_rec_idx);
        if (rc < 0)
                CERROR("%s: failed to process HSM_ACTIONS llog (rc=%d)\n",
                        mdt_obd_name(mdt), rc);
@@ -273,9 +400,37 @@ int mdt_agent_record_update(const struct lu_env *env, struct mdt_device *mdt,
                            enum agent_req_status status)
 {
        struct data_update_cb    ducb;
-       int                      rc;
+       u32 start_cat_idx = -1;
+       u32 start_rec_idx = -1;
+       u32 cat_idx;
+       u32 rec_idx;
+       int i;
+       int rc;
        ENTRY;
 
+       /* Find the first location (start_cat_idx, start_rec_idx)
+        * among the records corresponding to cookies. */
+       for (i = 0; i < cookies_count; i++) {
+               /* If we cannot find a cached location for a cookie
+                * (perhaps because the MDT was restart then we must
+                * start from the beginning. In this case
+                * mdt_agent_record_hash_get() sets both of cat_idx and
+                * rec_idx to 0. */
+               cdt_agent_record_hash_lookup(&mdt->mdt_coordinator, cookies[i],
+                                            &cat_idx, &rec_idx);
+               if (cat_idx < start_cat_idx) {
+                       start_cat_idx = cat_idx;
+                       start_rec_idx = rec_idx;
+               } else if (cat_idx == start_cat_idx &&
+                          rec_idx < start_rec_idx) {
+                       start_rec_idx = rec_idx;
+               }
+       }
+
+       /* Fixup starting record index for llog_cat_process(). */
+       if (start_rec_idx != 0)
+               start_rec_idx -= 1;
+
        ducb.mdt = mdt;
        ducb.cookies = cookies;
        ducb.cookies_count = cookies_count;
@@ -284,7 +439,7 @@ int mdt_agent_record_update(const struct lu_env *env, struct mdt_device *mdt,
        ducb.change_time = cfs_time_current_sec();
 
        rc = cdt_llog_process(env, mdt, mdt_agent_record_update_cb, &ducb,
-                             WRITE);
+                             start_cat_idx, start_rec_idx, WRITE);
        if (rc < 0)
                CERROR("%s: cdt_llog_process() failed, rc=%d, cannot update "
                       "status to %s for %d cookies, done %d\n",