+struct ofd_inconsistency_item {
+ struct list_head oii_list;
+ struct ofd_object *oii_obj;
+ struct lu_fid oii_pfid;
+};
+
+/**
+ * Verify single object for parent FID consistency.
+ *
+ * Part of LFSCK processing which checks single object PFID stored in extended
+ * attribute (XATTR) against real FID of MDT parent object received by LFSCK.
+ * This verifies that the OST object is being referenced by only a single MDT
+ * object.
+ *
+ * \param[in] env execution environment
+ * \param[in] ofd OFD device
+ * \param[in] oii object-related local data
+ * \param[in] lr LFSCK request data
+ */
+static void ofd_inconsistency_verify_one(const struct lu_env *env,
+ struct ofd_device *ofd,
+ struct ofd_inconsistency_item *oii,
+ struct lfsck_request *lr)
+{
+ struct ofd_object *fo = oii->oii_obj;
+ struct lu_fid *pfid = &fo->ofo_pfid;
+ int rc;
+
+ LASSERT(fo->ofo_pfid_checking);
+ LASSERT(!fo->ofo_pfid_verified);
+
+ lr->lr_fid = fo->ofo_header.loh_fid; /* OST-object itself FID. */
+ lr->lr_fid2 = oii->oii_pfid; /* client given PFID. */
+ lr->lr_fid3 = *pfid; /* OST local stored PFID. */
+
+ rc = lfsck_in_notify(env, ofd->ofd_osd, lr, NULL);
+ ofd_write_lock(env, fo);
+ switch (lr->lr_status) {
+ case LPVS_INIT:
+ LASSERT(rc <= 0);
+
+ if (rc < 0)
+ CDEBUG(D_LFSCK, "%s: fail to verify OST local stored "
+ "PFID xattr for "DFID", the client given PFID "
+ DFID", OST local stored PFID "DFID": rc = %d\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&oii->oii_pfid), PFID(pfid), rc);
+ else
+ fo->ofo_pfid_verified = 1;
+ break;
+ case LPVS_INCONSISTENT:
+ LASSERT(rc != 0);
+
+ ofd->ofd_inconsistency_self_detected++;
+ if (rc < 0)
+ CDEBUG(D_LFSCK, "%s: fail to verify the client given "
+ "PFID for "DFID", the client given PFID "DFID
+ ", local stored PFID "DFID": rc = %d\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&oii->oii_pfid), PFID(pfid), rc);
+ else
+ CDEBUG(D_LFSCK, "%s: both the client given PFID and "
+ "the OST local stored PFID are stale for the "
+ "OST-object "DFID", client given PFID is "DFID
+ ", local stored PFID is "DFID"\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&oii->oii_pfid), PFID(pfid));
+ break;
+ case LPVS_INCONSISTENT_TOFIX:
+ ofd->ofd_inconsistency_self_detected++;
+ if (rc == 0) {
+ ofd->ofd_inconsistency_self_repaired++;
+ CDEBUG(D_LFSCK, "%s: fixed the staled OST PFID xattr "
+ "for "DFID", with the client given PFID "DFID
+ ", the old stored PFID "DFID"\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&oii->oii_pfid), PFID(pfid));
+ } else if (rc < 0) {
+ CDEBUG(D_LFSCK, "%s: fail to fix the OST PFID xattr "
+ "for "DFID", client given PFID "DFID", local "
+ "stored PFID "DFID": rc = %d\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&oii->oii_pfid), PFID(pfid), rc);
+ }
+ *pfid = oii->oii_pfid;
+ fo->ofo_pfid_verified = 1;
+ break;
+ default:
+ break;
+ }
+ fo->ofo_pfid_checking = 0;
+ ofd_write_unlock(env, fo);
+
+ lu_object_put(env, &fo->ofo_obj.do_lu);
+ OBD_FREE_PTR(oii);
+}
+
+/**
+ * Verification thread to check parent FID consistency.
+ *
+ * Kernel thread to check consistency of parent FID for any
+ * new item added for checking by ofd_add_inconsistency_item().
+ *
+ * \param[in] args OFD device
+ *
+ * \retval 0 on successful thread termination
+ * \retval negative value if thread can't start
+ */
+static int ofd_inconsistency_verification_main(void *args)
+{
+ struct lu_env env;
+ struct ofd_device *ofd = args;
+ struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+ struct ofd_inconsistency_item *oii;
+ struct lfsck_request *lr = NULL;
+ struct l_wait_info lwi = { 0 };
+ int rc;
+ ENTRY;
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ thread_set_flags(thread, rc != 0 ? SVC_STOPPED : SVC_RUNNING);
+ wake_up_all(&thread->t_ctl_waitq);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ if (rc != 0)
+ RETURN(rc);
+
+ OBD_ALLOC_PTR(lr);
+ if (unlikely(lr == NULL))
+ GOTO(out, rc = -ENOMEM);
+
+ lr->lr_event = LE_PAIRS_VERIFY;
+ lr->lr_active = LFSCK_TYPE_LAYOUT;
+
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ while (1) {
+ if (unlikely(!thread_is_running(thread)))
+ break;
+
+ while (!list_empty(&ofd->ofd_inconsistency_list)) {
+ oii = list_entry(ofd->ofd_inconsistency_list.next,
+ struct ofd_inconsistency_item,
+ oii_list);
+ list_del_init(&oii->oii_list);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ ofd_inconsistency_verify_one(&env, ofd, oii, lr);
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ }
+
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ l_wait_event(thread->t_ctl_waitq,
+ !list_empty(&ofd->ofd_inconsistency_list) ||
+ !thread_is_running(thread),
+ &lwi);
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ }
+
+ while (!list_empty(&ofd->ofd_inconsistency_list)) {
+ struct ofd_object *fo;
+
+ oii = list_entry(ofd->ofd_inconsistency_list.next,
+ struct ofd_inconsistency_item,
+ oii_list);
+ list_del_init(&oii->oii_list);
+ fo = oii->oii_obj;
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+
+ ofd_write_lock(&env, fo);
+ fo->ofo_pfid_checking = 0;
+ ofd_write_unlock(&env, fo);
+
+ lu_object_put(&env, &fo->ofo_obj.do_lu);
+ OBD_FREE_PTR(oii);
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ }
+
+ OBD_FREE_PTR(lr);
+
+ GOTO(out, rc = 0);
+
+out:
+ thread_set_flags(thread, SVC_STOPPED);
+ wake_up_all(&thread->t_ctl_waitq);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ lu_env_fini(&env);
+
+ return rc;
+}
+
+/**
+ * Start parent FID verification thread.
+ *
+ * See ofd_inconsistency_verification_main().
+ *
+ * \param[in] ofd OFD device
+ *
+ * \retval 0 on successful start of thread
+ * \retval negative value on error
+ */
+int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
+{
+ struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+ struct l_wait_info lwi = { 0 };
+ struct task_struct *task;
+ int rc;
+
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ if (unlikely(thread_is_running(thread))) {
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+
+ return -EALREADY;
+ }
+
+ thread_set_flags(thread, 0);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ task = kthread_run(ofd_inconsistency_verification_main, ofd,
+ "inconsistency_verification");
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CERROR("%s: cannot start self_repair thread: rc = %d\n",
+ ofd_name(ofd), rc);
+ } else {
+ rc = 0;
+ l_wait_event(thread->t_ctl_waitq,
+ thread_is_running(thread) ||
+ thread_is_stopped(thread),
+ &lwi);
+ }
+
+ return rc;
+}
+
+/**
+ * Stop parent FID verification thread.
+ *
+ * \param[in] ofd OFD device
+ *
+ * \retval 0 on successful start of thread
+ * \retval -EALREADY if thread is already stopped
+ */
+int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd)
+{
+ struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+ struct l_wait_info lwi = { 0 };
+
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ if (thread_is_init(thread) || thread_is_stopped(thread)) {
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+
+ return -EALREADY;
+ }
+
+ thread_set_flags(thread, SVC_STOPPING);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ wake_up_all(&thread->t_ctl_waitq);
+ l_wait_event(thread->t_ctl_waitq,
+ thread_is_stopped(thread),
+ &lwi);
+
+ return 0;
+}
+
+/**
+ * Add new item for parent FID verification.
+ *
+ * Prepare new verification item and pass it to the dedicated
+ * verification thread for further processing.
+ *
+ * \param[in] env execution environment
+ * \param[in] fo OFD object
+ * \param[in] oa OBDO structure with PFID
+ */
+static void ofd_add_inconsistency_item(const struct lu_env *env,
+ struct ofd_object *fo, struct obdo *oa)
+{
+ struct ofd_device *ofd = ofd_obj2dev(fo);
+ struct ofd_inconsistency_item *oii;
+ bool wakeup = false;
+
+ OBD_ALLOC_PTR(oii);
+ if (oii == NULL)
+ return;
+
+ INIT_LIST_HEAD(&oii->oii_list);
+ lu_object_get(&fo->ofo_obj.do_lu);
+ oii->oii_obj = fo;
+ oii->oii_pfid.f_seq = oa->o_parent_seq;
+ oii->oii_pfid.f_oid = oa->o_parent_oid;
+ oii->oii_pfid.f_stripe_idx = oa->o_stripe_idx;
+
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ if (fo->ofo_pfid_checking || fo->ofo_pfid_verified) {
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ OBD_FREE_PTR(oii);
+
+ return;
+ }
+
+ fo->ofo_pfid_checking = 1;
+ if (list_empty(&ofd->ofd_inconsistency_list))
+ wakeup = true;
+ list_add_tail(&oii->oii_list, &ofd->ofd_inconsistency_list);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ if (wakeup)
+ wake_up_all(&ofd->ofd_inconsistency_thread.t_ctl_waitq);
+
+ /* XXX: When the found inconsistency exceeds some threshold,
+ * we can trigger the LFSCK to scan part of the system
+ * or the whole system, which depends on how to define
+ * the threshold, a simple way maybe like that: define
+ * the absolute value of how many inconsisteny allowed
+ * to be repaired via self detect/repair mechanism, if
+ * exceeded, then trigger the LFSCK to scan the layout
+ * inconsistency within the whole system. */
+}
+
+/**
+ * Verify parent FID of an object.
+ *
+ * Check the parent FID is sane and start extended
+ * verification procedure otherwise.
+ *
+ * \param[in] env execution environment
+ * \param[in] fo OFD object
+ * \param[in] oa OBDO structure with PFID
+ *
+ * \retval 0 on successful verification
+ * \retval -EINPROGRESS if PFID is being repaired
+ * \retval -EPERM if PFID was verified but still insane
+ */
+int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo,
+ struct obdo *oa)
+{
+ struct lu_fid *pfid = &fo->ofo_pfid;
+ int rc = 0;
+ ENTRY;
+
+ if (fid_is_sane(pfid)) {
+ if (likely(oa->o_parent_seq == pfid->f_seq &&
+ oa->o_parent_oid == pfid->f_oid &&
+ oa->o_stripe_idx == pfid->f_stripe_idx))
+ RETURN(0);
+
+ if (fo->ofo_pfid_verified)
+ RETURN(-EPERM);
+ }
+
+ /* The OST-object may be inconsistent, and we need further verification.
+ * To avoid block the RPC service thread, return -EINPROGRESS to client
+ * and make it retry later. */
+ if (fo->ofo_pfid_checking)
+ RETURN(-EINPROGRESS);
+
+ rc = ofd_object_ff_load(env, fo);
+ if (rc == -ENODATA)
+ RETURN(0);
+
+ if (rc < 0)
+ RETURN(rc);
+
+ if (likely(oa->o_parent_seq == pfid->f_seq &&
+ oa->o_parent_oid == pfid->f_oid &&
+ oa->o_stripe_idx == pfid->f_stripe_idx))
+ RETURN(0);
+
+ /* Push it to the dedicated thread for further verification. */
+ ofd_add_inconsistency_item(env, fo, oa);
+
+ RETURN(-EINPROGRESS);
+}
+
+/**
+ * Prepare buffers for read request processing.
+ *
+ * This function converts remote buffers from client to local buffers
+ * and prepares the latter.
+ *
+ * \param[in] env execution environment
+ * \param[in] exp OBD export of client
+ * \param[in] ofd OFD device
+ * \param[in] fid FID of object
+ * \param[in] la object attributes
+ * \param[in] oa OBDO structure from client
+ * \param[in] niocount number of remote buffers
+ * \param[in] rnb remote buffers
+ * \param[in] nr_local number of local buffers
+ * \param[in] lnb local buffers
+ * \param[in] jobid job ID name
+ *
+ * \retval 0 on successful prepare
+ * \retval negative value on error
+ */