}
}
+static void cdt_crh_free(struct rcu_head *head)
+{
+ struct cdt_restore_handle *crh;
+
+ crh = container_of(head, struct cdt_restore_handle, crh_rcu);
+ OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+}
+
+static void
+cdt_crh_put(struct cdt_restore_handle *crh, struct mdt_thread_info *cdt_mti)
+{
+ if (atomic_dec_and_test(&crh->crh_refc)) {
+ /* XXX We pass a NULL object since the restore handle does not
+ * keep a reference on the object being restored.
+ */
+ if (lustre_handle_is_used(&crh->crh_lh.mlh_reg_lh))
+ mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
+ call_rcu(&crh->crh_rcu, cdt_crh_free);
+ }
+}
+
+static void crh_free_hash(void *vcrh, void *vcdt_mti)
+{
+ struct cdt_restore_handle *crh = vcrh;
+ struct mdt_thread_info *cdt_mti = vcdt_mti;
+
+ /* put last reference */
+ cdt_crh_put(crh, cdt_mti);
+}
+
+static const struct rhashtable_params crh_hash_params = {
+ .key_len = sizeof(struct lu_fid),
+ .key_offset = offsetof(struct cdt_restore_handle, crh_fid),
+ .head_offset = offsetof(struct cdt_restore_handle, crh_hash),
+ .hashfn = lu_fid_hash,
+ .automatic_shrinking = true,
+};
+
/* Release the ressource used by the coordinator. Called when the
* coordinator is stopping. */
static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
struct coordinator *cdt = &mdt->mdt_coordinator;
struct cdt_agent_req *car, *tmp1;
struct hsm_agent *ha, *tmp2;
- struct cdt_restore_handle *crh, *tmp3;
struct mdt_thread_info *cdt_mti;
/* start cleaning */
up_write(&cdt->cdt_agent_lock);
cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
- mutex_lock(&cdt->cdt_restore_lock);
- list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_handle_list,
- crh_list) {
- list_del(&crh->crh_list);
- /* give back layout lock */
- mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
- OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
- }
- mutex_unlock(&cdt->cdt_restore_lock);
+ rhashtable_free_and_destroy(&cdt->cdt_restore_hash, crh_free_hash,
+ cdt_mti);
+ rcu_barrier();
}
/*
*/
crh->crh_extent.start = 0;
crh->crh_extent.end = he->length;
+ atomic_set(&crh->crh_refc, 2);
- mutex_lock(&cdt->cdt_restore_lock);
- if (cdt_restore_handle_find(cdt, fid) != NULL)
- GOTO(out_crl, rc = 1);
-
- list_add_tail(&crh->crh_list, &cdt->cdt_restore_handle_list);
- mutex_unlock(&cdt->cdt_restore_lock);
+ rc = rhashtable_lookup_insert_fast(&cdt->cdt_restore_hash,
+ &crh->crh_hash, crh_hash_params);
+ if (rc) {
+ OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+ RETURN(rc);
+ }
/* get the layout lock */
obj = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
MDS_INODELOCK_LAYOUT, LCK_EX);
- if (IS_ERR(obj))
- GOTO(out_ldel, rc = PTR_ERR(obj));
+ if (IS_ERR(obj)) {
+ rc = rhashtable_remove_fast(&cdt->cdt_restore_hash,
+ &crh->crh_hash, crh_hash_params);
+ /* rc < 0 means it has been removed in a parallel thread.
+ * This shouldn't happen by design as at current stage record
+ * hasn't been added in llog yet.
+ */
+ if (!rc)
+ cdt_crh_put(crh, mti);
+ cdt_crh_put(crh, mti);
+
+ RETURN(PTR_ERR(obj));
+ }
/* We do not keep a reference on the object during the restore
* which can be very long.
*/
mdt_object_put(mti->mti_env, obj);
-
- RETURN(0);
-out_ldel:
- mutex_lock(&cdt->cdt_restore_lock);
- list_del(&crh->crh_list);
-out_crl:
- mutex_unlock(&cdt->cdt_restore_lock);
- OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
-
- return rc;
+ cdt_crh_put(crh, mti);
+ RETURN(rc);
}
/**
* lookup a restore handle by FID
- * caller needs to hold cdt_restore_lock
* \param cdt [IN] coordinator
* \param fid [IN] FID
- * \retval cdt_restore_handle found
- * \retval NULL not found
+ * \retval true cdt_restore_handle found
+ * \retval false not found
*/
-struct cdt_restore_handle *cdt_restore_handle_find(struct coordinator *cdt,
- const struct lu_fid *fid)
+bool cdt_restore_handle_exists(struct coordinator *cdt,
+ const struct lu_fid *fid)
{
- struct cdt_restore_handle *crh;
- ENTRY;
-
- list_for_each_entry(crh, &cdt->cdt_restore_handle_list, crh_list) {
- if (lu_fid_eq(&crh->crh_fid, fid))
- RETURN(crh);
- }
-
- RETURN(NULL);
+ return rhashtable_lookup_fast(&cdt->cdt_restore_hash, fid,
+ crh_hash_params);
}
void cdt_restore_handle_del(struct mdt_thread_info *mti,
struct cdt_restore_handle *crh;
/* give back layout lock */
- mutex_lock(&cdt->cdt_restore_lock);
- crh = cdt_restore_handle_find(cdt, fid);
- if (crh != NULL)
- list_del(&crh->crh_list);
- mutex_unlock(&cdt->cdt_restore_lock);
-
+ rcu_read_lock();
+ crh = rhashtable_lookup(&cdt->cdt_restore_hash, fid, crh_hash_params);
+ if (crh &&
+ rhashtable_remove_fast(&cdt->cdt_restore_hash, &crh->crh_hash,
+ crh_hash_params))
+ crh = NULL;
+ rcu_read_unlock();
+
+ /* crh has been removed in a parallel thread */
if (crh == NULL)
return;
- /* XXX We pass a NULL object since the restore handle does not
- * keep a reference on the object being restored. */
- mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
- OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
+ cdt_crh_put(crh, mti);
}
/**
}
rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid, &hai->hai_extent);
- if (rc == 1)
+ if (rc == -EEXIST) {
+ CWARN("%s: duplicate restore record for fid="DFID" found in the llog: rc = %d\n",
+ mdt_obd_name(mti->mti_mdt), PFID(&hai->hai_fid), rc);
rc = 0;
+ }
out:
RETURN(rc);
}
init_rwsem(&cdt->cdt_llog_lock);
init_rwsem(&cdt->cdt_agent_lock);
init_rwsem(&cdt->cdt_request_lock);
- mutex_init(&cdt->cdt_restore_lock);
mutex_init(&cdt->cdt_state_lock);
set_cdt_state(cdt, CDT_STOPPED);
INIT_LIST_HEAD(&cdt->cdt_request_list);
INIT_LIST_HEAD(&cdt->cdt_agents);
- INIT_LIST_HEAD(&cdt->cdt_restore_handle_list);
cdt->cdt_request_cookie_hash = cfs_hash_create("REQUEST_COOKIE_HASH",
CFS_HASH_BITS_MIN,
cdt->cdt_user_request_mask = (1UL << HSMA_RESTORE);
cdt->cdt_group_request_mask = (1UL << HSMA_RESTORE);
cdt->cdt_other_request_mask = (1UL << HSMA_RESTORE);
+ rc = rhashtable_init(&cdt->cdt_restore_hash, &crh_hash_params);
+ if (rc) {
+ CERROR("%s: failed to create cdt_restore hash: rc = %d\n",
+ mdt_obd_name(mdt), rc);
+ set_cdt_state(cdt, CDT_STOPPED);
+ RETURN(rc);
+ }
/* to avoid deadlock when start is made through sysfs
* sysfs entries are created by the coordinator thread
* cdt_llog_lock
* cdt_agent_lock
* cdt_counter_lock
- * cdt_restore_lock
* cdt_request_lock
*/
struct coordinator {
struct rw_semaphore cdt_agent_lock; /**< protect agent list */
struct rw_semaphore cdt_request_lock; /**< protect request
* list */
- struct mutex cdt_restore_lock; /**< protect restore
- * list */
timeout_t cdt_loop_period; /**< llog scan period */
timeout_t cdt_grace_delay; /**< request grace
* delay */
struct list_head cdt_request_list;
struct list_head cdt_agents; /**< list of register
* agents */
- struct list_head cdt_restore_handle_list;
+ struct rhashtable cdt_restore_hash; /* rhashtable for
+ * restore requests
+ */
/* Hash of cookies to locations of record locations in agent
* request log. */
};
struct cdt_restore_handle {
- struct list_head crh_list; /**< to chain the handle */
+ struct rhash_head crh_hash; /**< link to cdt_restore_hash */
struct lu_fid crh_fid; /**< fid of the object */
struct ldlm_extent crh_extent; /**< extent of the restore */
struct mdt_lock_handle crh_lh; /**< lock handle */
+ atomic_t crh_refc; /**< crh ref counter */
+ struct rcu_head crh_rcu; /**< crh rcu head */
};
extern struct kmem_cache *mdt_hsm_cdt_kmem; /** restore handle slab cache */
int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
const struct lu_fid *fid,
const struct hsm_extent *he);
-struct cdt_restore_handle *cdt_restore_handle_find(struct coordinator *cdt,
+bool cdt_restore_handle_exists(struct coordinator *cdt,
const struct lu_fid *fid);
void cdt_restore_handle_del(struct mdt_thread_info *mti,
struct coordinator *cdt, const struct lu_fid *fid);