+/**
+ * Handle one orphan under the backend /lost+found directory
+ *
+ * Insert the orphan FID into the namespace LFSCK trace file for further
+ * processing (via the subsequent namespace LFSCK second-stage scanning).
+ * At the same time, remove the orphan name entry from backend /lost+found
+ * directory. There is an interval between the orphan name entry removed
+ * from the backend /lost+found directory and the orphan FID in the LFSCK
+ * trace file handled. In such interval, the LFSCK can be reset, then
+ * all the FIDs recorded in the namespace LFSCK trace file will be dropped.
+ * To guarantee that the orphans can be found when LFSCK run next time
+ * without e2fsck again, when remove the orphan name entry, the LFSCK
+ * will set the orphan's ctime attribute as 1. Since normal applications
+ * cannot change the object's ctime attribute as 1. Then when LFSCK run
+ * next time, it can record the object (that ctime is 1) in the namespace
+ * LFSCK trace file during the first-stage scanning.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] parent pointer to the object for the backend /lost+found
+ * \param[in] ent pointer to the name entry for the target under the
+ * backend /lost+found
+ *
+ * \retval positive for repaired
+ * \retval 0 if needs to repair nothing
+ * \retval negative error number on failure
+ */
+static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *parent,
+ struct lu_dirent *ent)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_fid *key = &info->lti_fid;
+ struct lu_attr *la = &info->lti_la;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct dt_object *obj = com->lc_obj;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct dt_object *child = NULL;
+ struct thandle *th = NULL;
+ int rc = 0;
+ __u8 flags = 0;
+ bool exist = false;
+ ENTRY;
+
+ child = lfsck_object_find_by_dev(env, dev, &ent->lde_fid);
+ if (IS_ERR(child))
+ RETURN(PTR_ERR(child));
+
+ LASSERT(dt_object_exists(child));
+ LASSERT(!dt_object_remote(child));
+
+ fid_cpu_to_be(key, &ent->lde_fid);
+ rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
+ (const struct dt_key *)key, BYPASS_CAPA);
+ if (rc == 0) {
+ exist = true;
+ flags |= LNTF_CHECK_ORPHAN;
+ } else if (rc == -ENOENT) {
+ flags = LNTF_CHECK_ORPHAN;
+ } else {
+ GOTO(out, rc);
+ }
+
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(out, rc = PTR_ERR(th));
+
+ /* a1. remove name entry from backend /lost+found */
+ rc = dt_declare_delete(env, parent,
+ (const struct dt_key *)ent->lde_name, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ if (S_ISDIR(lfsck_object_type(child))) {
+ /* a2. decrease parent's nlink */
+ rc = dt_declare_ref_del(env, parent, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+ }
+
+ if (exist) {
+ /* a3. remove child's FID from the LFSCK trace file. */
+ rc = dt_declare_delete(env, obj,
+ (const struct dt_key *)key, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+ } else {
+ /* a4. set child's ctime as 1 */
+ memset(la, 0, sizeof(*la));
+ la->la_ctime = 1;
+ la->la_valid = LA_CTIME;
+ rc = dt_declare_attr_set(env, child, la, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+ }
+
+ /* a5. insert child's FID into the LFSCK trace file. */
+ rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
+ (const struct dt_key *)key, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ /* b1. remove name entry from backend /lost+found */
+ rc = dt_delete(env, parent, (const struct dt_key *)ent->lde_name, th,
+ BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ if (S_ISDIR(lfsck_object_type(child))) {
+ /* b2. decrease parent's nlink */
+ dt_write_lock(env, parent, 0);
+ rc = dt_ref_del(env, parent, th);
+ dt_write_unlock(env, parent);
+ if (rc != 0)
+ GOTO(stop, rc);
+ }
+
+ if (exist) {
+ /* a3. remove child's FID from the LFSCK trace file. */
+ rc = dt_delete(env, obj, (const struct dt_key *)key, th,
+ BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(stop, rc);
+ } else {
+ /* b4. set child's ctime as 1 */
+ rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(stop, rc);
+ }
+
+ /* b5. insert child's FID into the LFSCK trace file. */
+ rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
+ (const struct dt_key *)key, th, BYPASS_CAPA, 1);
+
+ GOTO(stop, rc = (rc == 0 ? 1 : rc));
+
+stop:
+ dt_trans_stop(env, dev, th);
+
+out:
+ lu_object_put(env, &child->do_lu);
+
+ return rc;
+}
+
+/**
+ * Handle orphans under the backend /lost+found directory
+ *
+ * Some backend checker, such as e2fsck for ldiskfs may find some orphans
+ * and put them under the backend /lost+found directory that is invisible
+ * to client. The LFSCK will scan such directory, for the original client
+ * visible orphans, add their fids into the namespace LFSCK trace file,
+ * then the subsenquent namespace LFSCK second-stage scanning can handle
+ * them as other objects to be double scanned: either move back to normal
+ * namespace, or to the global visible orphan directory:
+ * /ROOT/.lustre/lost+found/MDTxxxx/
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ */
+static void lfsck_namespace_scan_local_lpf(const struct lu_env *env,
+ struct lfsck_component *com)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_dirent *ent =
+ (struct lu_dirent *)info->lti_key;
+ struct lu_seq_range *range = &info->lti_range;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct ptlrpc_thread *thread = &lfsck->li_thread;
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct lfsck_namespace *ns = com->lc_file_ram;
+ struct dt_object *parent;
+ const struct dt_it_ops *iops;
+ struct dt_it *di;
+ struct seq_server_site *ss =
+ lu_site2seq(dev->dd_lu_dev.ld_site);
+ __u64 cookie;
+ int rc = 0;
+ __u16 type;
+ ENTRY;
+
+ parent = lfsck_object_find_by_dev(env, dev, &LU_BACKEND_LPF_FID);
+ if (IS_ERR(parent)) {
+ CERROR("%s: fail to find backend /lost+found: rc = %ld\n",
+ lfsck_lfsck2name(lfsck), PTR_ERR(parent));
+ RETURN_EXIT;
+ }
+
+ /* It is normal that the /lost+found does not exist for ZFS backend. */
+ if (!dt_object_exists(parent))
+ GOTO(out, rc = 0);
+
+ if (unlikely(!dt_try_as_dir(env, parent)))
+ GOTO(out, rc = -ENOTDIR);
+
+ CDEBUG(D_LFSCK, "%s: start to scan backend /lost+found\n",
+ lfsck_lfsck2name(lfsck));
+
+ com->lc_new_scanned = 0;
+ iops = &parent->do_index_ops->dio_it;
+ di = iops->init(env, parent, LUDA_64BITHASH | LUDA_TYPE, BYPASS_CAPA);
+ if (IS_ERR(di))
+ GOTO(out, rc = PTR_ERR(di));
+
+ rc = iops->load(env, di, 0);
+ if (rc == 0)
+ rc = iops->next(env, di);
+ else if (rc > 0)
+ rc = 0;
+
+ while (rc == 0) {
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
+ cfs_fail_val > 0) {
+ struct l_wait_info lwi;
+
+ lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
+ NULL, NULL);
+ l_wait_event(thread->t_ctl_waitq,
+ !thread_is_running(thread),
+ &lwi);
+
+ if (unlikely(!thread_is_running(thread)))
+ break;
+ }
+
+ rc = iops->rec(env, di, (struct dt_rec *)ent,
+ LUDA_64BITHASH | LUDA_TYPE);
+ if (rc == 0)
+ rc = lfsck_unpack_ent(ent, &cookie, &type);
+
+ if (unlikely(rc != 0)) {
+ CDEBUG(D_LFSCK, "%s: fail to iterate backend "
+ "/lost+found: rc = %d\n",
+ lfsck_lfsck2name(lfsck), rc);
+
+ goto skip;
+ }
+
+ /* skip dot and dotdot entries */
+ if (ent->lde_name[0] == '.' &&
+ (ent->lde_namelen == 1 ||
+ (ent->lde_namelen == 2 && ent->lde_name[1] == '.')))
+ goto next;
+
+ if (!fid_seq_in_fldb(fid_seq(&ent->lde_fid)))
+ goto skip;
+
+ if (fid_is_norm(&ent->lde_fid)) {
+ fld_range_set_mdt(range);
+ rc = fld_local_lookup(env, ss->ss_server_fld,
+ fid_seq(&ent->lde_fid), range);
+ if (rc != 0)
+ goto skip;
+ } else if (lfsck_dev_idx(dev) != 0) {
+ /* If the returned FID is IGIF, then there are three
+ * possible cases:
+ *
+ * 1) The object is upgraded from old Lustre-1.8 with
+ * IGIF assigned to such object.
+ * 2) The object is a backend local object and is
+ * invisible to client.
+ * 3) The object lost its LMV EA, and since there is
+ * no FID-in-dirent for the orphan in the backend
+ * /lost+found directory, then the low layer will
+ * return IGIF for such object.
+ *
+ * For MDTx (x != 0), it is either case 2) or case 3),
+ * but from the LFSCK view, they are indistinguishable.
+ * To be safe, the LFSCK will keep it there and report
+ * some message, then the adminstrator can handle that
+ * furtherly.
+ *
+ * For MDT0, it is more possible the case 1). The LFSCK
+ * will handle the orphan as an upgraded object. */
+ CDEBUG(D_LFSCK, "%s: the orphan %.*s with IGIF "DFID
+ "in the backend /lost+found on the MDT %04x, "
+ "to be safe, skip it.\n",
+ lfsck_lfsck2name(lfsck), ent->lde_namelen,
+ ent->lde_name, PFID(&ent->lde_fid),
+ lfsck_dev_idx(dev));
+ goto skip;
+ }
+
+ rc = lfsck_namespace_scan_local_lpf_one(env, com, parent, ent);
+
+skip:
+ down_write(&com->lc_sem);
+ com->lc_new_scanned++;
+ ns->ln_local_lpf_scanned++;
+ if (rc > 0)
+ ns->ln_local_lpf_moved++;
+ else if (rc == 0)
+ ns->ln_local_lpf_skipped++;
+ else
+ ns->ln_local_lpf_failed++;
+ up_write(&com->lc_sem);
+
+ if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+ break;
+
+next:
+ lfsck_control_speed_by_self(com);
+ if (unlikely(!thread_is_running(thread))) {
+ rc = 0;
+ break;
+ }
+
+ rc = iops->next(env, di);
+ }
+
+ iops->put(env, di);
+ iops->fini(env, di);
+
+ EXIT;
+
+out:
+ CDEBUG(D_LFSCK, "%s: stop to scan backend /lost+found: rc = %d\n",
+ lfsck_lfsck2name(lfsck), rc);
+
+ lu_object_put(env, &parent->do_lu);
+}
+