+#include <lustre_nodemap.h>
+
+struct ofd_inconsistency_item {
+ struct list_head oii_list;
+ struct ofd_object *oii_obj;
+ struct filter_fid oii_ff;
+};
+
+/**
+ * Verify single object for parent FID consistency.
+ *
+ * Part of LFSCK processing which checks single object PFID stored in extended
+ * attribute (XATTR) against real FID of MDT parent object received by LFSCK.
+ * This verifies that the OST object is being referenced by only a single MDT
+ * object.
+ *
+ * \param[in] env execution environment
+ * \param[in] ofd OFD device
+ * \param[in] oii object-related local data
+ * \param[in] lrl LFSCK request data
+ */
+static void ofd_inconsistency_verify_one(const struct lu_env *env,
+ struct ofd_device *ofd,
+ struct ofd_inconsistency_item *oii,
+ struct lfsck_req_local *lrl)
+{
+ struct ofd_object *fo = oii->oii_obj;
+ struct filter_fid *client_ff = &oii->oii_ff;
+ struct filter_fid *local_ff = &fo->ofo_ff;
+ int rc;
+
+ LASSERT(fo->ofo_pfid_checking);
+ LASSERT(!fo->ofo_pfid_verified);
+
+ lrl->lrl_fid = fo->ofo_header.loh_fid; /* OST-object itself FID. */
+ lrl->lrl_ff_client = *client_ff; /* client given PFID. */
+ lrl->lrl_ff_local = *local_ff; /* OST local stored PFID. */
+
+ rc = lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
+ ofd_write_lock(env, fo);
+ switch (lrl->lrl_status) {
+ case LPVS_INIT:
+ LASSERT(rc <= 0);
+
+ if (rc < 0)
+ CDEBUG(D_LFSCK, "%s: fail to verify OST local stored "
+ "PFID xattr for "DFID", the client given PFID "
+ DFID", OST local stored PFID "DFID": rc = %d\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&client_ff->ff_parent),
+ PFID(&local_ff->ff_parent), rc);
+ else
+ fo->ofo_pfid_verified = 1;
+ break;
+ case LPVS_INCONSISTENT:
+ LASSERT(rc != 0);
+
+ ofd->ofd_inconsistency_self_detected++;
+ if (rc < 0)
+ CDEBUG(D_LFSCK, "%s: fail to verify the client given "
+ "PFID for "DFID", the client given PFID "DFID
+ ", local stored PFID "DFID": rc = %d\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&client_ff->ff_parent),
+ PFID(&local_ff->ff_parent), rc);
+ else
+ CDEBUG(D_LFSCK, "%s: both the client given PFID and "
+ "the OST local stored PFID are stale for the "
+ "OST-object "DFID", client given PFID is "DFID
+ ", local stored PFID is "DFID"\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&client_ff->ff_parent),
+ PFID(&local_ff->ff_parent));
+ break;
+ case LPVS_INCONSISTENT_TOFIX:
+ ofd->ofd_inconsistency_self_detected++;
+ if (rc == 0) {
+ ofd->ofd_inconsistency_self_repaired++;
+ CDEBUG(D_LFSCK, "%s: fixed the staled OST PFID xattr "
+ "for "DFID", with the client given PFID "DFID
+ ", the old stored PFID "DFID"\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&client_ff->ff_parent),
+ PFID(&local_ff->ff_parent));
+ } else if (rc < 0) {
+ CDEBUG(D_LFSCK, "%s: fail to fix the OST PFID xattr "
+ "for "DFID", client given PFID "DFID", local "
+ "stored PFID "DFID": rc = %d\n",
+ ofd_name(ofd), PFID(&fo->ofo_header.loh_fid),
+ PFID(&client_ff->ff_parent),
+ PFID(&local_ff->ff_parent), rc);
+ }
+ local_ff->ff_parent = client_ff->ff_parent;
+ fo->ofo_pfid_verified = 1;
+ break;
+ default:
+ break;
+ }
+ fo->ofo_pfid_checking = 0;
+ ofd_write_unlock(env, fo);
+
+ ofd_object_put(env, fo);
+ OBD_FREE_PTR(oii);
+}
+
+/**
+ * Verification thread to check parent FID consistency.
+ *
+ * Kernel thread to check consistency of parent FID for any
+ * new item added for checking by ofd_add_inconsistency_item().
+ *
+ * \param[in] args OFD device
+ *
+ * \retval 0 on successful thread termination
+ * \retval negative value if thread can't start
+ */
+static int ofd_inconsistency_verification_main(void *args)
+{
+ struct lu_env env;
+ struct ofd_device *ofd = args;
+ struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+ struct ofd_inconsistency_item *oii;
+ struct lfsck_req_local *lrl = NULL;
+ int rc;
+ ENTRY;
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ thread_set_flags(thread, rc ? SVC_STOPPED : SVC_RUNNING);
+ wake_up_all(&thread->t_ctl_waitq);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ if (rc)
+ RETURN(rc);
+
+ OBD_ALLOC_PTR(lrl);
+ if (unlikely(!lrl))
+ GOTO(out_unlocked, rc = -ENOMEM);
+
+ lrl->lrl_event = LEL_PAIRS_VERIFY_LOCAL;
+ lrl->lrl_active = LFSCK_TYPE_LAYOUT;
+
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ while (1) {
+ if (unlikely(!thread_is_running(thread)))
+ break;
+
+ while (!list_empty(&ofd->ofd_inconsistency_list)) {
+ oii = list_entry(ofd->ofd_inconsistency_list.next,
+ struct ofd_inconsistency_item,
+ oii_list);
+ list_del_init(&oii->oii_list);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ ofd_inconsistency_verify_one(&env, ofd, oii, lrl);
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ }
+
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ wait_event_idle(thread->t_ctl_waitq,
+ !list_empty(&ofd->ofd_inconsistency_list) ||
+ !thread_is_running(thread));
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ }
+
+ while (!list_empty(&ofd->ofd_inconsistency_list)) {
+ struct ofd_object *fo;
+
+ oii = list_entry(ofd->ofd_inconsistency_list.next,
+ struct ofd_inconsistency_item,
+ oii_list);
+ list_del_init(&oii->oii_list);
+ fo = oii->oii_obj;
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+
+ ofd_write_lock(&env, fo);
+ fo->ofo_pfid_checking = 0;
+ ofd_write_unlock(&env, fo);
+
+ ofd_object_put(&env, fo);
+ OBD_FREE_PTR(oii);
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ }
+
+ OBD_FREE_PTR(lrl);
+
+ GOTO(out, rc = 0);
+
+out_unlocked:
+ spin_lock(&ofd->ofd_inconsistency_lock);
+out:
+ thread_set_flags(thread, SVC_STOPPED);
+ wake_up_all(&thread->t_ctl_waitq);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ lu_env_fini(&env);
+
+ return rc;
+}
+
+/**
+ * Start parent FID verification thread.
+ *
+ * See ofd_inconsistency_verification_main().
+ *
+ * \param[in] ofd OFD device
+ *
+ * \retval 0 on successful start of thread
+ * \retval negative value on error
+ */
+int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
+{
+ struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+ struct task_struct *task;
+ int rc;
+
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ if (unlikely(thread_is_running(thread))) {
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+
+ return -EALREADY;
+ }
+
+ thread_set_flags(thread, 0);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ task = kthread_run(ofd_inconsistency_verification_main, ofd,
+ "inconsistency_verification");
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CERROR("%s: cannot start self_repair thread: rc = %d\n",
+ ofd_name(ofd), rc);
+ } else {
+ rc = 0;
+ wait_event_idle(thread->t_ctl_waitq,
+ thread_is_running(thread) ||
+ thread_is_stopped(thread));
+ }
+
+ return rc;
+}
+
+/**
+ * Stop parent FID verification thread.
+ *
+ * \param[in] ofd OFD device
+ *
+ * \retval 0 on successful start of thread
+ * \retval -EALREADY if thread is already stopped
+ */
+int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd)
+{
+ struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+
+ spin_lock(&ofd->ofd_inconsistency_lock);
+ if (thread_is_init(thread) || thread_is_stopped(thread)) {
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+
+ return -EALREADY;
+ }
+
+ thread_set_flags(thread, SVC_STOPPING);
+ spin_unlock(&ofd->ofd_inconsistency_lock);
+ wake_up_all(&thread->t_ctl_waitq);
+ wait_event_idle(thread->t_ctl_waitq,
+ thread_is_stopped(thread));
+
+ return 0;
+}