struct lu_orphan_rec lti_rec;
struct lov_user_md lti_lum;
struct dt_insert_rec lti_dt_rec;
+ struct lu_object_conf lti_conf;
};
/* lfsck_lib.c */
}
static inline struct dt_object *
+lfsck_object_find_by_dev_nowait(const struct lu_env *env, struct dt_device *dev,
+ const struct lu_fid *fid)
+{
+ struct lu_object_conf *conf = &lfsck_env_info(env)->lti_conf;
+ struct dt_object *obj;
+
+ conf->loc_flags = LOC_F_NOWAIT;
+ obj = lu2dt(lu_object_find_slice(env, dt2lu_dev(dev), fid, conf));
+ if (unlikely(obj == NULL))
+ return ERR_PTR(-ENOENT);
+
+ return obj;
+}
+
+static inline struct dt_object *
lfsck_object_find_by_dev(const struct lu_env *env, struct dt_device *dev,
const struct lu_fid *fid)
{
GOTO(stop, rc);
dt_read_lock(env, parent, 0);
- if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
+ if (unlikely(lfsck_is_dead_obj(parent)))
GOTO(unlock2, rc = 1);
rc = dt_create(env, child, cla, hint, NULL, handle);
GOTO(stop, rc);
dt_write_lock(env, parent, 0);
- if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
+ if (unlikely(lfsck_is_dead_obj(parent)))
GOTO(unlock2, rc = 1);
rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
GOTO(stop, rc);
dt_write_lock(env, parent, 0);
- if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
+ if (unlikely(lfsck_is_dead_obj(parent)))
GOTO(unlock2, rc = 0);
rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
/* Use the dt_object lock to serialize with destroy and attr_set. */
dt_read_lock(env, parent, 0);
- if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
+ if (unlikely(lfsck_is_dead_obj(parent)))
GOTO(unlock, rc = 1);
/* Get the latest parent's owner. */
int rc;
ENTRY;
- rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
- if (rc != 0) {
- if (lu_object_is_dying(parent->do_lu.lo_header))
- RETURN(0);
+ if (unlikely(lfsck_is_dead_obj(parent)))
+ RETURN(0);
+ rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
+ if (rc != 0)
GOTO(out, rc);
- }
rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
if (rc == -ENOENT) {
- if (lu_object_is_dying(parent->do_lu.lo_header))
+ if (unlikely(lfsck_is_dead_obj(parent)))
RETURN(0);
type = LLIT_DANGLING;
thread_is_stopped(athread))
GOTO(out, rc = 0);
+ if (unlikely(lfsck_is_dead_obj(parent)))
+ GOTO(out, rc = 0);
+
ostid_le_to_cpu(&objs->l_ost_oi, oi);
index = le32_to_cpu(objs->l_ost_idx);
rc = ostid_to_fid(fid, oi, index);
goto next;
}
- cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid);
+ /* There is potential deadlock race condition between object
+ * destroy and layout LFSCK. Consider the following scenario:
+ *
+ * 1) The LFSCK thread obtained the parent object firstly, at
+ * that time, the parent object has not been destroyed yet.
+ *
+ * 2) One RPC service thread destroyed the parent and all its
+ * children objects. Because the LFSCK is referencing the
+ * parent object, then the parent object will be marked as
+ * dying in RAM. On the other hand, the parent object is
+ * referencing all its children objects, then all children
+ * objects will be marked as dying in RAM also.
+ *
+ * 3) The LFSCK thread tries to find some child object with
+ * the parent object referenced. Then it will find that the
+ * child object is dying. According to the object visibility
+ * rules: the object with dying flag cannot be returned to
+ * others. So the LFSCK thread has to wait until the dying
+ * object has been purged from RAM, then it can allocate a
+ * new object (with the same FID) in RAM. Unfortunately, the
+ * LFSCK thread itself is referencing the parent object, and
+ * cause the parent object cannot be purged, then cause the
+ * child object cannot be purged also. So the LFSCK thread
+ * will fall into deadlock.
+ *
+ * We introduce non-blocked version lu_object_find() to allow
+ * the LFSCK thread to return failure immediately (instead of
+ * wait) when it finds dying (child) object, then the LFSCK
+ * thread can check whether the parent object is dying or not.
+ * So avoid above deadlock. LU-5395 */
+ cobj = lfsck_object_find_by_dev_nowait(env, tgt->ltd_tgt, fid);
if (IS_ERR(cobj)) {
+ if (lfsck_is_dead_obj(parent)) {
+ lfsck_tgt_put(tgt);
+
+ GOTO(out, rc = 0);
+ }
+
rc = PTR_ERR(cobj);
goto next;
}
* drained), and moreover, lookup has to wait until object is freed.
*/
- init_waitqueue_entry_current(waiter);
- add_wait_queue(&bkt->lsb_marche_funebre, waiter);
- set_current_state(TASK_UNINTERRUPTIBLE);
- lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
+ if (likely(waiter != NULL)) {
+ init_waitqueue_entry_current(waiter);
+ add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
+ }
+
return ERR_PTR(-EAGAIN);
}
wait_queue_t wait;
while (1) {
+ if (conf != NULL && conf->loc_flags & LOC_F_NOWAIT) {
+ obj = lu_object_find_try(env, dev, f, conf, NULL);
+
+ return obj;
+ }
+
obj = lu_object_find_try(env, dev, f, conf, &wait);
if (obj != ERR_PTR(-EAGAIN))
return obj;