If the cross-MDTs update failed, the OSP transcation callback
will invalidate related object(s) cache. Originally, it uses
lu_object_find() to locate the object by FID. But it may be
blocked if related object is marked as dying (for some reason,
such as purged when umount).
To avoid such trouble, we make the "osp_update_request" to
reference related object(s) directly, then when needs to
invalidate, we can access the object(s) directly without
locating again.
Test-Parameters: alwaysuploadlogs mdsfilesystemtype=ldiskfs mdtfilesystemtype=ldiskfs ostfilesystemtype=ldiskfs mdscount=2 mdtcount=4 testlist=sanity-scrub,sanity-scrub,sanity-scrub
Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I043182b349b38153cce4e20b2675556ead94c6c1
Reviewed-on: http://review.whamcloud.com/21330
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
LASSERT(our->our_th != NULL);
osp_trans_callback(env, our->our_th, -EIO);
/* our will be destroyed in osp_thandle_put() */
LASSERT(our->our_th != NULL);
osp_trans_callback(env, our->our_th, -EIO);
/* our will be destroyed in osp_thandle_put() */
- osp_thandle_put(our->our_th);
+ osp_thandle_put(env, our->our_th);
}
spin_unlock(&ou->ou_lock);
}
spin_unlock(&ou->ou_lock);
ENTRY;
if (osp->opd_async_requests != NULL) {
ENTRY;
if (osp->opd_async_requests != NULL) {
- osp_update_request_destroy(osp->opd_async_requests);
+ osp_update_request_destroy(env, osp->opd_async_requests);
osp->opd_async_requests = NULL;
}
osp->opd_async_requests = NULL;
}
int our_update_nr;
struct list_head our_cb_items;
int our_update_nr;
struct list_head our_cb_items;
+ struct list_head our_invalidate_cb_list;
/* points to thandle if this update request belongs to one */
struct osp_thandle *our_th;
/* points to thandle if this update request belongs to one */
struct osp_thandle *our_th;
const struct lu_env *opo_owner;
struct lu_attr opo_attr;
struct list_head opo_xattr_list;
const struct lu_env *opo_owner;
struct lu_attr opo_attr;
struct list_head opo_xattr_list;
+ struct list_head opo_invalidate_cb_list;
/* Protect opo_ooa. */
spinlock_t opo_lock;
};
/* Protect opo_ooa. */
spinlock_t opo_lock;
};
osp_update_interpreter_t interpreter);
struct osp_update_request *osp_update_request_create(struct dt_device *dt);
osp_update_interpreter_t interpreter);
struct osp_update_request *osp_update_request_create(struct dt_device *dt);
-void osp_update_request_destroy(struct osp_update_request *update);
+void osp_update_request_destroy(const struct lu_env *env,
+ struct osp_update_request *update);
int osp_send_update_thread(void *arg);
int osp_check_and_set_rpc_version(struct osp_thandle *oth,
struct osp_object *obj);
int osp_send_update_thread(void *arg);
int osp_check_and_set_rpc_version(struct osp_thandle *oth,
struct osp_object *obj);
-void osp_thandle_destroy(struct osp_thandle *oth);
+void osp_thandle_destroy(const struct lu_env *env, struct osp_thandle *oth);
static inline void osp_thandle_get(struct osp_thandle *oth)
{
atomic_inc(&oth->ot_refcount);
}
static inline void osp_thandle_get(struct osp_thandle *oth)
{
atomic_inc(&oth->ot_refcount);
}
-static inline void osp_thandle_put(struct osp_thandle *oth)
+static inline void osp_thandle_put(const struct lu_env *env,
+ struct osp_thandle *oth)
{
if (atomic_dec_and_test(&oth->ot_refcount))
{
if (atomic_dec_and_test(&oth->ot_refcount))
- osp_thandle_destroy(oth);
+ osp_thandle_destroy(env, oth);
}
int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
}
int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
if (req != NULL)
ptlrpc_req_finished(req);
if (req != NULL)
ptlrpc_req_finished(req);
- osp_update_request_destroy(update);
+ osp_update_request_destroy(env, update);
if (obj->opo_attr.la_valid & LA_SIZE && obj->opo_attr.la_size < *pos)
obj->opo_attr.la_size = *pos;
if (obj->opo_attr.la_valid & LA_SIZE && obj->opo_attr.la_size < *pos)
obj->opo_attr.la_size = *pos;
+ spin_lock(&obj->opo_lock);
+ if (list_empty(&obj->opo_invalidate_cb_list)) {
+ lu_object_get(&obj->opo_obj.do_lu);
+
+ list_add_tail(&obj->opo_invalidate_cb_list,
+ &update->our_invalidate_cb_list);
+ }
+ spin_unlock(&obj->opo_lock);
+
ptlrpc_req_finished(req);
out_update:
ptlrpc_req_finished(req);
out_update:
- osp_update_request_destroy(update);
+ osp_update_request_destroy(env, update);
if (req != NULL)
ptlrpc_req_finished(req);
if (req != NULL)
ptlrpc_req_finished(req);
- osp_update_request_destroy(update);
+ osp_update_request_destroy(env, update);
ptlrpc_req_finished(req);
if (update != NULL && !IS_ERR(update))
ptlrpc_req_finished(req);
if (update != NULL && !IS_ERR(update))
- osp_update_request_destroy(update);
+ osp_update_request_destroy(env, update);
if (oxe != NULL)
osp_oac_xattr_put(oxe);
if (oxe != NULL)
osp_oac_xattr_put(oxe);
spin_lock_init(&po->opo_lock);
o->lo_header->loh_attr |= LOHA_REMOTE;
INIT_LIST_HEAD(&po->opo_xattr_list);
spin_lock_init(&po->opo_lock);
o->lo_header->loh_attr |= LOHA_REMOTE;
INIT_LIST_HEAD(&po->opo_xattr_list);
+ INIT_LIST_HEAD(&po->opo_invalidate_cb_list);
if (is_ost_obj(o)) {
po->opo_obj.do_ops = &osp_obj_ops;
if (is_ost_obj(o)) {
po->opo_obj.do_ops = &osp_obj_ops;
INIT_LIST_HEAD(&our->our_req_list);
INIT_LIST_HEAD(&our->our_cb_items);
INIT_LIST_HEAD(&our->our_list);
INIT_LIST_HEAD(&our->our_req_list);
INIT_LIST_HEAD(&our->our_cb_items);
INIT_LIST_HEAD(&our->our_list);
+ INIT_LIST_HEAD(&our->our_invalidate_cb_list);
spin_lock_init(&our->our_list_lock);
rc = osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
spin_lock_init(&our->our_list_lock);
rc = osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
-void osp_update_request_destroy(struct osp_update_request *our)
+void osp_update_request_destroy(const struct lu_env *env,
+ struct osp_update_request *our)
{
struct osp_update_request_sub *ours;
struct osp_update_request_sub *tmp;
{
struct osp_update_request_sub *ours;
struct osp_update_request_sub *tmp;
OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size);
OBD_FREE_PTR(ours);
}
OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size);
OBD_FREE_PTR(ours);
}
+
+ if (!list_empty(&our->our_invalidate_cb_list)) {
+ struct lu_env lenv;
+ struct osp_object *obj;
+ struct osp_object *next;
+
+ if (env == NULL) {
+ lu_env_init(&lenv, LCT_MD_THREAD | LCT_DT_THREAD);
+ env = &lenv;
+ }
+
+ list_for_each_entry_safe(obj, next,
+ &our->our_invalidate_cb_list,
+ opo_invalidate_cb_list) {
+ spin_lock(&obj->opo_lock);
+ list_del_init(&obj->opo_invalidate_cb_list);
+ spin_unlock(&obj->opo_lock);
+
+ lu_object_put(env, &obj->opo_obj.do_lu);
+ }
+
+ if (env == &lenv)
+ lu_env_fini(&lenv);
+ }
+
* \param[in] oth osp thandle.
*/
static void osp_thandle_invalidate_object(const struct lu_env *env,
* \param[in] oth osp thandle.
*/
static void osp_thandle_invalidate_object(const struct lu_env *env,
- struct osp_thandle *oth)
+ struct osp_thandle *oth,
+ int result)
{
struct osp_update_request *our = oth->ot_our;
{
struct osp_update_request *our = oth->ot_our;
- struct osp_update_request_sub *ours;
+ struct osp_object *obj;
+ struct osp_object *next;
- list_for_each_entry(ours, &our->our_req_list, ours_list) {
- struct object_update_request *our_req = ours->ours_req;
- unsigned int i;
- struct lu_object *obj;
-
- for (i = 0; i < our_req->ourq_count; i++) {
- struct object_update *update;
+ list_for_each_entry_safe(obj, next, &our->our_invalidate_cb_list,
+ opo_invalidate_cb_list) {
+ if (result < 0)
+ osp_invalidate(env, &obj->opo_obj);
- update = object_update_request_get(our_req, i, NULL);
- if (update == NULL)
- break;
+ spin_lock(&obj->opo_lock);
+ list_del_init(&obj->opo_invalidate_cb_list);
+ spin_unlock(&obj->opo_lock);
- if (update->ou_type != OUT_WRITE)
- continue;
-
- if (!fid_is_sane(&update->ou_fid))
- continue;
-
- obj = lu_object_find_slice(env,
- &oth->ot_super.th_dev->dd_lu_dev,
- &update->ou_fid, NULL);
- if (IS_ERR(obj))
- break;
-
- osp_invalidate(env, lu2dt_obj(obj));
- lu_object_put(env, obj);
- }
+ lu_object_put(env, &obj->opo_obj.do_lu);
dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
}
dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
}
- if (result < 0)
- osp_thandle_invalidate_object(env, oth);
+ osp_thandle_invalidate_object(env, oth, result);
/* oth and osp_update_requests will be destoryed in
* osp_thandle_put */
osp_trans_stop_cb(env, oth, rc);
/* oth and osp_update_requests will be destoryed in
* osp_thandle_put */
osp_trans_stop_cb(env, oth, rc);
+ osp_thandle_put(env, oth);
- osp_update_request_destroy(our);
+ osp_update_request_destroy(env, our);
ouc->ouc_data, 0, rc);
osp_update_callback_fini(env, ouc);
}
ouc->ouc_data, 0, rc);
osp_update_callback_fini(env, ouc);
}
- osp_update_request_destroy(our);
+ osp_update_request_destroy(env, our);
} else {
args = ptlrpc_req_async_args(req);
args->oaua_update = our;
} else {
args = ptlrpc_req_async_args(req);
args->oaua_update = our;
-void osp_thandle_destroy(struct osp_thandle *oth)
+void osp_thandle_destroy(const struct lu_env *env,
+ struct osp_thandle *oth)
{
LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
LASSERT(list_empty(&oth->ot_commit_dcb_list));
LASSERT(list_empty(&oth->ot_stop_dcb_list));
if (oth->ot_our != NULL)
{
LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
LASSERT(list_empty(&oth->ot_commit_dcb_list));
LASSERT(list_empty(&oth->ot_stop_dcb_list));
if (oth->ot_our != NULL)
- osp_update_request_destroy(oth->ot_our);
+ osp_update_request_destroy(env, oth->ot_our);
osp_trans_commit_cb(oth, result);
req->rq_committed = 1;
osp_trans_commit_cb(oth, result);
req->rq_committed = 1;
+ osp_thandle_put(NULL, oth);
if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
if (!osp->opd_imp_active || !osp->opd_imp_connected) {
osp_trans_callback(env, oth, rc);
if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
if (!osp->opd_imp_active || !osp->opd_imp_connected) {
osp_trans_callback(env, oth, rc);
+ osp_thandle_put(env, oth);
GOTO(out, rc = -ENOTCONN);
}
rc = obd_get_request_slot(&osp->opd_obd->u.cli);
if (rc != 0) {
osp_trans_callback(env, oth, rc);
GOTO(out, rc = -ENOTCONN);
}
rc = obd_get_request_slot(&osp->opd_obd->u.cli);
if (rc != 0) {
osp_trans_callback(env, oth, rc);
+ osp_thandle_put(env, oth);
GOTO(out, rc = -ENOTCONN);
}
args->oaua_flow_control = true;
GOTO(out, rc = -ENOTCONN);
}
args->oaua_flow_control = true;
/* If osp_update_interpret is not being called,
* release the osp_thandle */
args->oaua_update = NULL;
/* If osp_update_interpret is not being called,
* release the osp_thandle */
args->oaua_update = NULL;
+ osp_thandle_put(env, oth);
}
req->rq_cb_data = NULL;
rc = rc == 0 ? req->rq_status : rc;
osp_trans_callback(env, oth, rc);
}
req->rq_cb_data = NULL;
rc = rc == 0 ? req->rq_status : rc;
osp_trans_callback(env, oth, rc);
+ osp_thandle_put(env, oth);
spin_unlock(&our->our_list_lock);
osp_trans_callback(&env, our->our_th,
our->our_th->ot_super.th_result);
spin_unlock(&our->our_list_lock);
osp_trans_callback(&env, our->our_th,
our->our_th->ot_super.th_result);
- osp_thandle_put(our->our_th);
+ osp_thandle_put(&env, our->our_th);
if (!osp_send_update_thread_running(osp)) {
if (our != NULL) {
osp_trans_callback(&env, our->our_th, -EINTR);
if (!osp_send_update_thread_running(osp)) {
if (our != NULL) {
osp_trans_callback(&env, our->our_th, -EINTR);
- osp_thandle_put(our->our_th);
+ osp_thandle_put(&env, our->our_th);
osp_invalidate_request(osp);
/* Balanced for thandle_get in osp_check_and_set_rpc_version */
osp_invalidate_request(osp);
/* Balanced for thandle_get in osp_check_and_set_rpc_version */
- osp_thandle_put(our->our_th);
+ osp_thandle_put(&env, our->our_th);
}
thread->t_flags = SVC_STOPPED;
}
thread->t_flags = SVC_STOPPED;
osp_trans_callback(env, oth, th->th_result);
}
out:
osp_trans_callback(env, oth, th->th_result);
}
out:
+ osp_thandle_put(env, oth);