From: Fan Yong Date: Wed, 12 Feb 2014 09:21:32 +0000 (+0800) Subject: LU-3336 lfsck: orphan OST-objects iteration X-Git-Tag: 2.5.57~71 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=77eea1985bb1655e58c8b7df00703b4f08b58ec7;p=fs%2Flustre-release.git LU-3336 lfsck: orphan OST-objects iteration During the second stage scanning, the LFSCK on the MDT(s) will scan the orphan OST-objects via OSP level iteration which fetches remote orphan OST-objects information via OBD_IDX_READ RPC, and shares the existing framework/functions with others, such as quota. Implement the sponsor (the master LFSCK engine on the MDT) logic for the orphan OST-objects iteration. Implement LFSCK layout rbtree iteration - lfsck_orphan_index_ops, for slave LFSCK on OST. The lfsck_orphan_index_ops is registered onto the rbtree object. The incoming OBD_IDX_READ RPC for orphan OST-object scanning will iterate the rbtree via dt_index_read to call the registered lfsck_orphan_index_ops. Others: 1) Speed control during the second-phase scanning. 2) The LFSCK layout trace file (on the MDT) flags should be set with LF_INCOMPLETE if LFSCK slave on OST restart or failed. 3) Some code cleanup. Signed-off-by: Fan Yong Change-Id: I67d5d870dbf9b80530f4d61ed1a3e5b5df70b1a0 Reviewed-on: http://review.whamcloud.com/8303 Reviewed-by: Alex Zhuravlev Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- diff --git a/lustre/fld/fld_handler.c b/lustre/fld/fld_handler.c index bf0e827..8bef3d1 100644 --- a/lustre/fld/fld_handler.c +++ b/lustre/fld/fld_handler.c @@ -250,10 +250,8 @@ int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld, rc = fld_name_to_index(fld->lsf_name, &index); if (rc < 0) RETURN(rc); - else - rc = 0; - if (index == 0) { + if (index == 0 && rc == LDD_F_SV_TYPE_MDT) { /* On server side, all entries should be in cache. * If we can not find it in cache, just return error */ CERROR("%s: Cannot find sequence "LPX64": rc = %d\n", diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 6d6cecc..acd0fd0 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -212,6 +212,7 @@ enum dt_index_flags { */ extern const struct dt_index_features dt_directory_features; extern const struct dt_index_features dt_otable_features; +extern const struct dt_index_features dt_lfsck_orphan_features; extern const struct dt_index_features dt_lfsck_features; /* index features supported by the accounting objects */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index bc288d3..15dc9b8 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -141,6 +141,7 @@ #define SEQ_DATA_PORTAL 31 #define SEQ_CONTROLLER_PORTAL 32 #define MGS_BULK_PORTAL 33 +#define OST_IDX_PORTAL 34 /* Portal 63 is reserved for the Cray Inc DVS - nic@cray.com, roe@cray.com, n8851@cray.com */ @@ -606,6 +607,11 @@ static inline int fid_is_norm(const struct lu_fid *fid) return fid_seq_is_norm(fid_seq(fid)); } +static inline int fid_is_layout_rbtree(const struct lu_fid *fid) +{ + return fid_seq(fid) == FID_SEQ_LAYOUT_RBTREE; +} + /* convert an OST objid into an IDIF FID SEQ number */ static inline obd_seq fid_idif_seq(obd_id id, __u32 ost_idx) { @@ -953,6 +959,20 @@ static inline void ostid_le_to_cpu(const struct ost_id *src_oi, } } +struct lu_orphan_rec { + /* The MDT-object's FID referenced by the orphan OST-object */ + struct lu_fid lor_fid; + __u32 lor_uid; + __u32 lor_gid; +}; + +struct lu_orphan_ent { + /* The orphan OST-object's FID */ + struct lu_fid loe_key; + struct lu_orphan_rec loe_rec; +}; +void lustre_swab_orphan_ent(struct lu_orphan_ent *ent); + /** @} lu_fid */ /** \defgroup lu_dir lu_dir diff --git a/lustre/include/obd_target.h b/lustre/include/obd_target.h index cbe415c..cef0bd1 100644 --- a/lustre/include/obd_target.h +++ b/lustre/include/obd_target.h @@ -78,6 +78,7 @@ struct ost_obd { struct ptlrpc_service *ost_io_service; struct ptlrpc_service *ost_seq_service; struct ptlrpc_service *ost_out_service; + struct ptlrpc_service *ost_idx_service; struct mutex ost_health_mutex; }; diff --git a/lustre/lfsck/lfsck_internal.h b/lustre/lfsck/lfsck_internal.h index 59215b8..9f20729 100644 --- a/lustre/lfsck/lfsck_internal.h +++ b/lustre/lfsck/lfsck_internal.h @@ -388,6 +388,7 @@ struct lfsck_component { void *lc_file_ram; void *lc_file_disk; void *lc_data; + struct lu_fid lc_fid_latest_scanned_phase2; /* The time for last checkpoint, jiffies */ cfs_time_t lc_time_last_checkpoint; @@ -557,9 +558,14 @@ struct lfsck_thread_info { struct filter_fid lti_new_pfid; }; struct dt_allocation_hint lti_hint; + struct lu_orphan_rec lti_rec; }; /* lfsck_lib.c */ +struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref, + bool unlink); +struct lfsck_component *lfsck_component_find(struct lfsck_instance *lfsck, + __u16 type); const char *lfsck_status2names(enum lfsck_status status); void lfsck_component_cleanup(const struct lu_env *env, struct lfsck_component *com); diff --git a/lustre/lfsck/lfsck_layout.c b/lustre/lfsck/lfsck_layout.c index 251088a..f77ee87 100644 --- a/lustre/lfsck/lfsck_layout.c +++ b/lustre/lfsck/lfsck_layout.c @@ -65,6 +65,10 @@ struct lfsck_layout_seq { struct lfsck_layout_slave_target { /* link into lfsck_layout_slave_data::llsd_master_list. */ struct list_head llst_list; + /* The position for next record in the rbtree for iteration. */ + struct lu_fid llst_fid; + /* Dummy hash for iteration against the rbtree. */ + __u64 llst_hash; __u64 llst_gen; atomic_t llst_ref; __u32 llst_index; @@ -228,14 +232,17 @@ lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd, static inline struct lfsck_layout_slave_target * lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd, - __u32 index) + __u32 index, bool unlink) { struct lfsck_layout_slave_target *llst; spin_lock(&llsd->llsd_lock); list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) { if (llst->llst_index == index) { - list_del_init(&llst->llst_list); + if (unlink) + list_del_init(&llst->llst_list); + else + atomic_inc(&llst->llst_ref); spin_unlock(&llsd->llsd_lock); return llst; @@ -391,16 +398,21 @@ static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn, return 0; } -/* The caller should hold lock. */ +/* The caller should hold llsd->llsd_rb_lock. */ static struct lfsck_rbtree_node * lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd, - const struct lu_fid *fid) + const struct lu_fid *fid, bool *exact) { - struct rb_node *node = llsd->llsd_rb_root.rb_node; - struct lfsck_rbtree_node *lrn; - int rc; + struct rb_node *node = llsd->llsd_rb_root.rb_node; + struct rb_node *prev = NULL; + struct lfsck_rbtree_node *lrn = NULL; + int rc = 0; + + if (exact != NULL) + *exact = true; while (node != NULL) { + prev = node; lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node); rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid)); if (rc < 0) @@ -411,7 +423,28 @@ lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd, return lrn; } - return NULL; + if (exact == NULL) + return NULL; + + /* If there is no exactly matched one, then to the next valid one. */ + *exact = false; + + /* The rbtree is empty. */ + if (rc == 0) + return NULL; + + if (rc < 0) + return lrn; + + node = rb_next(prev); + + /* The end of the rbtree. */ + if (node == NULL) + return NULL; + + lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node); + + return lrn; } static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env, @@ -482,6 +515,8 @@ lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd, return lrn; } +extern const struct dt_index_operations lfsck_orphan_index_ops; + static int lfsck_rbtree_setup(const struct lu_env *env, struct lfsck_component *com) { @@ -498,13 +533,14 @@ static int lfsck_rbtree_setup(const struct lu_env *env, if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); - /* XXX: Generate an in-RAM object to stand for the layout rbtree. - * Scanning the layout rbtree will be via the iteration over - * the object. In the future, the rbtree may be written onto - * disk with the object. + /* Generate an in-RAM object to stand for the layout rbtree. + * Scanning the layout rbtree will be via the iteration over + * the object. In the future, the rbtree may be written onto + * disk with the object. * - * Mark the object to be as exist. */ + * Mark the object to be as exist. */ obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS; + obj->do_index_ops = &lfsck_orphan_index_ops; llsd->llsd_rb_obj = obj; llsd->llsd_rbtree_valid = 1; dev->dd_record_fid_accessed = 1; @@ -566,7 +602,7 @@ static void lfsck_rbtree_update_bitmap(const struct lu_env *env, if (!llsd->llsd_rbtree_valid) GOTO(unlock, rc = 0); - lrn = lfsck_rbtree_search(llsd, fid); + lrn = lfsck_rbtree_search(llsd, fid, NULL); if (lrn == NULL) { struct lfsck_rbtree_node *tmp; @@ -593,10 +629,8 @@ static void lfsck_rbtree_update_bitmap(const struct lu_env *env, /* Any accessed object must be a known object. */ if (!test_and_set_bit(idx, lrn->lrn_known_bitmap)) atomic_inc(&lrn->lrn_known_count); - if (accessed) { - if (!test_and_set_bit(idx, lrn->lrn_accessed_bitmap)) - atomic_inc(&lrn->lrn_accessed_count); - } + if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap)) + atomic_inc(&lrn->lrn_accessed_count); GOTO(unlock, rc = 0); @@ -1621,13 +1655,116 @@ static int lfsck_layout_trans_stop(const struct lu_env *env, return rc; } +static int lfsck_layout_scan_orphan_one(const struct lu_env *env, + struct lfsck_component *com, + struct lfsck_tgt_desc *ltd, + struct lu_orphan_rec *rec, + struct lu_fid *cfid) +{ + struct lfsck_layout *lo = com->lc_file_ram; + int rc = 0; + + /* XXX: To be extended in other patch. */ + + down_write(&com->lc_sem); + com->lc_new_scanned++; + com->lc_new_checked++; + if (rc > 0) { + lo->ll_objs_repaired[LLIT_ORPHAN - 1]++; + rc = 0; + } else if (rc < 0) { + lo->ll_objs_failed_phase2++; + } + up_write(&com->lc_sem); + + return rc; +} + static int lfsck_layout_scan_orphan(const struct lu_env *env, struct lfsck_component *com, struct lfsck_tgt_desc *ltd) { - /* XXX: To be extended in other patch. */ + struct lfsck_layout *lo = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct ost_id *oi = &info->lti_oi; + struct lu_fid *fid = &info->lti_fid; + struct dt_object *obj; + const struct dt_it_ops *iops; + struct dt_it *di; + int rc = 0; + ENTRY; - return 0; + CDEBUG(D_LFSCK, "%s: start the orphan scanning for OST%04x\n", + lfsck_lfsck2name(lfsck), ltd->ltd_index); + + ostid_set_seq(oi, FID_SEQ_IDIF); + ostid_set_id(oi, 0); + ostid_to_fid(fid, oi, ltd->ltd_index); + obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid); + if (unlikely(IS_ERR(obj))) + RETURN(PTR_ERR(obj)); + + rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features); + if (rc != 0) + GOTO(put, rc); + + iops = &obj->do_index_ops->dio_it; + di = iops->init(env, obj, 0, BYPASS_CAPA); + if (IS_ERR(di)) + GOTO(put, rc = PTR_ERR(di)); + + rc = iops->load(env, di, 0); + if (rc == -ESRCH) { + /* -ESRCH means that the orphan OST-objects rbtree has been + * cleanup because of the OSS server restart or other errors. */ + lo->ll_flags |= LF_INCOMPLETE; + GOTO(fini, rc); + } + + if (rc == 0) + rc = iops->next(env, di); + else if (rc > 0) + rc = 0; + + if (rc < 0) + GOTO(fini, rc); + + if (rc > 0) + GOTO(fini, rc = 0); + + do { + struct dt_key *key; + struct lu_orphan_rec *rec = &info->lti_rec; + + key = iops->key(env, di); + com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key; + rc = iops->rec(env, di, (struct dt_rec *)rec, 0); + if (rc == 0) + rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec, + &com->lc_fid_latest_scanned_phase2); + if (rc != 0 && bk->lb_param & LPF_FAILOUT) + GOTO(fini, rc); + + lfsck_control_speed_by_self(com); + do { + rc = iops->next(env, di); + } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT)); + } while (rc == 0); + + GOTO(fini, rc); + +fini: + iops->put(env, di); + iops->fini(env, di); +put: + lu_object_put(env, &obj->do_lu); + + CDEBUG(D_LFSCK, "%s: finish the orphan scanning for OST%04x, rc = %d\n", + lfsck_lfsck2name(lfsck), ltd->ltd_index, rc); + + return rc > 0 ? 0 : rc; } /* For the MDT-object with dangling reference, we need to re-create @@ -2223,7 +2360,7 @@ out: * mark the LFSCK as INCOMPLETE. */ if (rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -ETIMEDOUT || rc == -EHOSTDOWN || rc == -EHOSTUNREACH) { - CERROR("%s: Fail to take with OST %x: rc = %d.\n", + CERROR("%s: Fail to talk with OST %x: rc = %d.\n", lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc); lo->ll_flags |= LF_INCOMPLETE; lo->ll_objs_skipped++; @@ -2356,6 +2493,13 @@ static int lfsck_layout_assistant(void *args) llmd->llmd_in_double_scan = 1; wake_up_all(&mthread->t_ctl_waitq); + com->lc_new_checked = 0; + com->lc_new_scanned = 0; + com->lc_time_last_checkpoint = cfs_time_current(); + com->lc_time_next_checkpoint = + com->lc_time_last_checkpoint + + cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL); + while (llmd->llmd_in_double_scan) { struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs; @@ -2401,11 +2545,13 @@ orphan: <d->ltd_layout_phase_list); spin_unlock(<ds->ltd_lock); - rc = lfsck_layout_scan_orphan(env, com, - ltd); - if (rc != 0 && - bk->lb_param & LPF_FAILOUT) - GOTO(cleanup2, rc); + if (bk->lb_param & LPF_ALL_TGT) { + rc = lfsck_layout_scan_orphan( + env, com, ltd); + if (rc != 0 && + bk->lb_param & LPF_FAILOUT) + GOTO(cleanup2, rc); + } if (unlikely(llmd->llmd_exit || !thread_is_running(mthread))) @@ -2908,6 +3054,7 @@ static int lfsck_layout_prep(const struct lu_env *env, lo->ll_objs_repaired[i] = 0; pos->lp_oit_cookie = lo->ll_pos_first_inconsistent; + fid_zero(&com->lc_fid_latest_scanned_phase2); } } else { lo->ll_status = LS_SCANNING_PHASE1; @@ -3062,7 +3209,7 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env, ostid_to_fid(fid, oi, index); tgt = lfsck_tgt_get(ltds, index); if (unlikely(tgt == NULL)) { - CERROR("%s: Cannot talk with OST %x which is not join " + CERROR("%s: Cannot talk with OST %x which did not join " "the layout LFSCK.\n", lfsck_lfsck2name(lfsck), index); lo->ll_flags |= LF_INCOMPLETE; @@ -3651,8 +3798,42 @@ static int lfsck_layout_dump(const struct lu_env *env, buf += rc; len -= rc; + } else if (lo->ll_status == LS_SCANNING_PHASE2) { + cfs_duration_t duration = cfs_time_current() - + lfsck->li_time_last_checkpoint; + __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked; + __u64 speed = checked; + __u64 new_checked = com->lc_new_checked * HZ; + __u32 rtime = lo->ll_run_time_phase1 + + cfs_duration_sec(duration + HALF_SEC); + + if (duration != 0) + do_div(new_checked, duration); + if (rtime != 0) + do_div(speed, rtime); + rc = snprintf(buf, len, + "checked_phase1: "LPU64"\n" + "checked_phase2: "LPU64"\n" + "run_time_phase1: %u seconds\n" + "run_time_phase2: %u seconds\n" + "average_speed_phase1: "LPU64" items/sec\n" + "average_speed_phase2: N/A\n" + "real-time_speed_phase1: "LPU64" items/sec\n" + "real-time_speed_phase2: N/A\n" + "current_position: "DFID"\n", + checked, + lo->ll_objs_checked_phase2, + rtime, + lo->ll_run_time_phase2, + speed, + new_checked, + PFID(&com->lc_fid_latest_scanned_phase2)); + if (rc <= 0) + goto out; + + buf += rc; + len -= rc; } else { - /* XXX: LS_SCANNING_PHASE2 will be handled in the future. */ __u64 speed1 = lo->ll_objs_checked_phase1; __u64 speed2 = lo->ll_objs_checked_phase2; @@ -3988,7 +4169,7 @@ static int lfsck_layout_slave_in_notify(const struct lu_env *env, if (lr->lr_event != LE_PHASE2_DONE && lr->lr_event != LE_PEER_EXIT) RETURN(-EINVAL); - llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index); + llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true); if (llst == NULL) RETURN(-ENODEV); @@ -4092,7 +4273,8 @@ static int lfsck_layout_slave_join(const struct lu_env *env, spin_lock(&lfsck->li_lock); if (rc == 0 && !thread_is_running(&lfsck->li_thread)) { spin_unlock(&lfsck->li_lock); - llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index); + llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index, + true); if (llst != NULL) lfsck_layout_llst_put(llst); spin_lock(&lfsck->li_lock); @@ -4273,3 +4455,574 @@ out: return rc; } + +struct lfsck_orphan_it { + struct lfsck_component *loi_com; + struct lfsck_rbtree_node *loi_lrn; + struct lfsck_layout_slave_target *loi_llst; + struct lu_fid loi_key; + struct lu_orphan_rec loi_rec; + __u64 loi_hash; + unsigned int loi_over:1; +}; + +static int lfsck_fid_match_idx(const struct lu_env *env, + struct lfsck_instance *lfsck, + const struct lu_fid *fid, int idx) +{ + struct seq_server_site *ss; + struct lu_server_fld *sf; + struct lu_seq_range range = { 0 }; + int rc; + + /* All abnormal cases will be returned to MDT0. */ + if (!fid_is_norm(fid)) { + if (idx == 0) + return 1; + + return 0; + } + + ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site); + if (unlikely(ss == NULL)) + return -ENOTCONN; + + sf = ss->ss_server_fld; + LASSERT(sf != NULL); + + fld_range_set_any(&range); + rc = fld_server_lookup(env, sf, fid_seq(fid), &range); + if (rc != 0) + return rc; + + if (!fld_range_is_mdt(&range)) + return -EINVAL; + + if (range.lsr_index == idx) + return 1; + + return 0; +} + +static void lfsck_layout_destroy_orphan(const struct lu_env *env, + struct dt_device *dev, + struct dt_object *obj) +{ + struct thandle *handle; + int rc; + ENTRY; + + handle = dt_trans_create(env, dev); + if (IS_ERR(handle)) + RETURN_EXIT; + + rc = dt_declare_ref_del(env, obj, handle); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_declare_destroy(env, obj, handle); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, handle); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, obj, 0); + rc = dt_ref_del(env, obj, handle); + if (rc == 0) + rc = dt_destroy(env, obj, handle); + dt_write_unlock(env, obj); + + GOTO(stop, rc); + +stop: + dt_trans_stop(env, dev, handle); + + RETURN_EXIT; +} + +static int lfsck_orphan_index_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, + const struct dt_key *key, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +static int lfsck_orphan_index_declare_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int lfsck_orphan_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ignore_quota) +{ + return -EOPNOTSUPP; +} + +static int lfsck_orphan_index_declare_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int lfsck_orphan_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env, + struct dt_object *dt, + __u32 attr, + struct lustre_capa *capa) +{ + struct dt_device *dev = lu2dt_dev(dt->do_lu.lo_dev); + struct lfsck_instance *lfsck; + struct lfsck_component *com = NULL; + struct lfsck_layout_slave_data *llsd; + struct lfsck_orphan_it *it = NULL; + int rc = 0; + ENTRY; + + lfsck = lfsck_instance_find(dev, true, false); + if (unlikely(lfsck == NULL)) + RETURN(ERR_PTR(-ENODEV)); + + com = lfsck_component_find(lfsck, LT_LAYOUT); + if (unlikely(com == NULL)) + GOTO(out, rc = -ENOENT); + + llsd = com->lc_data; + if (!llsd->llsd_rbtree_valid) + GOTO(out, rc = -ESRCH); + + OBD_ALLOC_PTR(it); + if (it == NULL) + GOTO(out, rc = -ENOMEM); + + it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false); + if (it->loi_llst == NULL) + GOTO(out, rc = -ENODEV); + + if (dev->dd_record_fid_accessed) { + /* The first iteratino against the rbtree, scan the whole rbtree + * to remove the nodes which do NOT need to be handled. */ + write_lock(&llsd->llsd_rb_lock); + if (dev->dd_record_fid_accessed) { + struct rb_node *node; + struct rb_node *next; + struct lfsck_rbtree_node *lrn; + + /* No need to record the fid accessing anymore. */ + dev->dd_record_fid_accessed = 0; + + node = rb_first(&llsd->llsd_rb_root); + while (node != NULL) { + next = rb_next(node); + lrn = rb_entry(node, struct lfsck_rbtree_node, + lrn_node); + if (atomic_read(&lrn->lrn_known_count) <= + atomic_read(&lrn->lrn_accessed_count)) { + rb_erase(node, &llsd->llsd_rb_root); + lfsck_rbtree_free(lrn); + } + node = next; + } + } + write_unlock(&llsd->llsd_rb_lock); + } + + /* read lock the rbtree when init, and unlock when fini */ + read_lock(&llsd->llsd_rb_lock); + it->loi_com = com; + com = NULL; + + GOTO(out, rc = 0); + +out: + if (com != NULL) + lfsck_component_put(env, com); + lfsck_instance_put(env, lfsck); + if (rc != 0) { + if (it != NULL) + OBD_FREE_PTR(it); + + it = (struct lfsck_orphan_it *)ERR_PTR(rc); + } + + return (struct dt_it *)it; +} + +static void lfsck_orphan_it_fini(const struct lu_env *env, + struct dt_it *di) +{ + struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di; + struct lfsck_component *com = it->loi_com; + struct lfsck_layout_slave_data *llsd; + struct lfsck_layout_slave_target *llst; + + if (com != NULL) { + llsd = com->lc_data; + read_unlock(&llsd->llsd_rb_lock); + llst = it->loi_llst; + LASSERT(llst != NULL); + + /* Save the key and hash for iterate next. */ + llst->llst_fid = it->loi_key; + llst->llst_hash = it->loi_hash; + lfsck_layout_llst_put(llst); + lfsck_component_put(env, com); + } + OBD_FREE_PTR(it); +} + +/** + * \retval +1: the iteration finished + * \retval 0: on success, not finished + * \retval -ve: on error + */ +static int lfsck_orphan_it_next(const struct lu_env *env, + struct dt_it *di) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct filter_fid_old *pfid = &info->lti_old_pfid; + struct lu_attr *la = &info->lti_la; + struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di; + struct lu_fid *key = &it->loi_key; + struct lu_orphan_rec *rec = &it->loi_rec; + struct lfsck_component *com = it->loi_com; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_layout_slave_data *llsd = com->lc_data; + struct dt_object *obj; + struct lfsck_rbtree_node *lrn; + int pos; + int rc; + __u32 save; + __u32 idx = it->loi_llst->llst_index; + bool exact = false; + ENTRY; + + if (it->loi_over) + RETURN(1); + +again0: + lrn = it->loi_lrn; + if (lrn == NULL) { + lrn = lfsck_rbtree_search(llsd, key, &exact); + if (lrn == NULL) { + it->loi_over = 1; + RETURN(1); + } + + it->loi_lrn = lrn; + if (!exact) { + key->f_seq = lrn->lrn_seq; + key->f_oid = lrn->lrn_first_oid; + key->f_ver = 0; + } + } else { + key->f_oid++; + if (unlikely(key->f_oid == 0)) { + key->f_seq++; + it->loi_lrn = NULL; + goto again0; + } + + if (key->f_oid >= + lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) { + it->loi_lrn = NULL; + goto again0; + } + } + + if (unlikely(atomic_read(&lrn->lrn_known_count) <= + atomic_read(&lrn->lrn_accessed_count))) { + struct rb_node *next = rb_next(&lrn->lrn_node); + + while (next != NULL) { + lrn = rb_entry(next, struct lfsck_rbtree_node, + lrn_node); + if (atomic_read(&lrn->lrn_known_count) > + atomic_read(&lrn->lrn_accessed_count)) + break; + next = rb_next(next); + } + + if (next == NULL) { + it->loi_over = 1; + RETURN(1); + } + + it->loi_lrn = lrn; + key->f_seq = lrn->lrn_seq; + key->f_oid = lrn->lrn_first_oid; + key->f_ver = 0; + } + + pos = key->f_oid - lrn->lrn_first_oid; + +again1: + pos = find_next_bit(lrn->lrn_known_bitmap, + LFSCK_RBTREE_BITMAP_WIDTH, pos); + if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) { + key->f_oid = lrn->lrn_first_oid + pos; + if (unlikely(key->f_oid < lrn->lrn_first_oid)) { + key->f_seq++; + key->f_oid = 0; + } + it->loi_lrn = NULL; + goto again0; + } + + if (test_bit(pos, lrn->lrn_accessed_bitmap)) { + pos++; + goto again1; + } + + key->f_oid = lrn->lrn_first_oid + pos; + obj = lfsck_object_find(env, lfsck, key); + if (IS_ERR(obj)) { + rc = PTR_ERR(obj); + if (rc == -ENOENT) { + pos++; + goto again1; + } + RETURN(rc); + } + + dt_read_lock(env, obj, 0); + if (!dt_object_exists(obj)) { + dt_read_unlock(env, obj); + lfsck_object_put(env, obj); + pos++; + goto again1; + } + + rc = dt_attr_get(env, obj, la, BYPASS_CAPA); + if (rc != 0) + GOTO(out, rc); + + rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)), + XATTR_NAME_FID, BYPASS_CAPA); + if (rc == -ENODATA) { + /* For the pre-created OST-object, update the bitmap to avoid + * others LFSCK (second phase) iteration to touch it again. */ + if (la->la_ctime == 0) { + if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap)) + atomic_inc(&lrn->lrn_accessed_count); + + /* For the race between repairing dangling referenced + * MDT-object and unlink the file, it may left orphan + * OST-object there. Destroy it now! */ + if (unlikely(!(la->la_mode & S_ISUID))) { + dt_read_unlock(env, obj); + lfsck_layout_destroy_orphan(env, + lfsck->li_bottom, + obj); + lfsck_object_put(env, obj); + pos++; + goto again1; + } + } else if (idx == 0) { + /* If the orphan OST-object has no parent information, + * regard it as referenced by the MDT-object on MDT0. */ + fid_zero(&rec->lor_fid); + rec->lor_uid = la->la_uid; + rec->lor_gid = la->la_gid; + GOTO(out, rc = 0); + } + + dt_read_unlock(env, obj); + lfsck_object_put(env, obj); + pos++; + goto again1; + } + + if (rc < 0) + GOTO(out, rc); + + if (rc != sizeof(struct filter_fid) && + rc != sizeof(struct filter_fid_old)) + GOTO(out, rc = -EINVAL); + + fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent); + /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver, + * instead, it is the OST-object index in its parent MDT-object + * layout EA. */ + save = rec->lor_fid.f_ver; + rec->lor_fid.f_ver = 0; + rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx); + /* If the orphan OST-object does not claim the MDT, then next. + * + * If we do not know whether it matches or not, then return it + * to the MDT for further check. */ + if (rc == 0) { + dt_read_unlock(env, obj); + lfsck_object_put(env, obj); + pos++; + goto again1; + } + + rec->lor_fid.f_ver = save; + rec->lor_uid = la->la_uid; + rec->lor_gid = la->la_gid; + + CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n", + lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid), + rec->lor_uid, rec->lor_gid); + + GOTO(out, rc = 0); + +out: + dt_read_unlock(env, obj); + lfsck_object_put(env, obj); + if (rc == 0) + it->loi_hash++; + + return rc; +} + +/** + * \retval +1: locate to the exactly position + * \retval 0: cannot locate to the exactly position, + * call next() to move to a valid position. + * \retval -ve: on error + */ +static int lfsck_orphan_it_get(const struct lu_env *env, + struct dt_it *di, + const struct dt_key *key) +{ + struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di; + int rc; + + it->loi_key = *(struct lu_fid *)key; + rc = lfsck_orphan_it_next(env, di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; +} + +static void lfsck_orphan_it_put(const struct lu_env *env, + struct dt_it *di) +{ +} + +static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env, + const struct dt_it *di) +{ + struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di; + + return (struct dt_key *)&it->loi_key; +} + +static int lfsck_orphan_it_key_size(const struct lu_env *env, + const struct dt_it *di) +{ + return sizeof(struct lu_fid); +} + +static int lfsck_orphan_it_rec(const struct lu_env *env, + const struct dt_it *di, + struct dt_rec *rec, + __u32 attr) +{ + struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di; + + *(struct lu_orphan_rec *)rec = it->loi_rec; + + return 0; +} + +static __u64 lfsck_orphan_it_store(const struct lu_env *env, + const struct dt_it *di) +{ + struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di; + + return it->loi_hash; +} + +/** + * \retval +1: locate to the exactly position + * \retval 0: cannot locate to the exactly position, + * call next() to move to a valid position. + * \retval -ve: on error + */ +static int lfsck_orphan_it_load(const struct lu_env *env, + const struct dt_it *di, + __u64 hash) +{ + struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di; + struct lfsck_layout_slave_target *llst = it->loi_llst; + int rc; + + LASSERT(llst != NULL); + + if (hash != llst->llst_hash) { + CWARN("%s: the given hash "LPU64" for orphan iteration does " + "not match the one when fini "LPU64", to be reset.\n", + lfsck_lfsck2name(it->loi_com->lc_lfsck), hash, + llst->llst_hash); + fid_zero(&llst->llst_fid); + llst->llst_hash = 0; + } + + it->loi_key = llst->llst_fid; + it->loi_hash = llst->llst_hash; + rc = lfsck_orphan_it_next(env, (struct dt_it *)di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; +} + +static int lfsck_orphan_it_key_rec(const struct lu_env *env, + const struct dt_it *di, + void *key_rec) +{ + return 0; +} + +const struct dt_index_operations lfsck_orphan_index_ops = { + .dio_lookup = lfsck_orphan_index_lookup, + .dio_declare_insert = lfsck_orphan_index_declare_insert, + .dio_insert = lfsck_orphan_index_insert, + .dio_declare_delete = lfsck_orphan_index_declare_delete, + .dio_delete = lfsck_orphan_index_delete, + .dio_it = { + .init = lfsck_orphan_it_init, + .fini = lfsck_orphan_it_fini, + .get = lfsck_orphan_it_get, + .put = lfsck_orphan_it_put, + .next = lfsck_orphan_it_next, + .key = lfsck_orphan_it_key, + .key_size = lfsck_orphan_it_key_size, + .rec = lfsck_orphan_it_rec, + .store = lfsck_orphan_it_store, + .load = lfsck_orphan_it_load, + .key_rec = lfsck_orphan_it_key_rec, + } +}; diff --git a/lustre/lfsck/lfsck_lib.c b/lustre/lfsck/lfsck_lib.c index c2f2b50..de7b849 100644 --- a/lustre/lfsck/lfsck_lib.c +++ b/lustre/lfsck/lfsck_lib.c @@ -287,7 +287,7 @@ __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *lis return NULL; } -static struct lfsck_component * +struct lfsck_component * lfsck_component_find(struct lfsck_instance *lfsck, __u16 type) { struct lfsck_component *com; @@ -397,8 +397,8 @@ __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink) return NULL; } -static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key, - bool ref, bool unlink) +struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref, + bool unlink) { struct lfsck_instance *lfsck; diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 4e7c17e..17b7605 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -4752,6 +4752,8 @@ static int mdt_prepare(const struct lu_env *env, RETURN(rc); rc = lfsck_register_namespace(env, mdt->mdt_bottom, mdt->mdt_namespace); + /* The LFSCK instance is registered just now, so it must be there when + * register the namespace to such instance. */ LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc); lsp.lsp_start = NULL; diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 5f31e12..b1eba33 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -567,6 +567,17 @@ EXPORT_SYMBOL(dt_directory_features); const struct dt_index_features dt_otable_features; EXPORT_SYMBOL(dt_otable_features); +/* lfsck orphan */ +const struct dt_index_features dt_lfsck_orphan_features = { + .dif_flags = 0, + .dif_keysize_min = sizeof(struct lu_fid), + .dif_keysize_max = sizeof(struct lu_fid), + .dif_recsize_min = sizeof(struct lu_orphan_rec), + .dif_recsize_max = sizeof(struct lu_orphan_rec), + .dif_ptrsize = 4 +}; +EXPORT_SYMBOL(dt_lfsck_orphan_features); + /* lfsck */ const struct dt_index_features dt_lfsck_features = { .dif_flags = DT_IND_UPDATE, @@ -630,6 +641,8 @@ static inline const struct dt_index_features *dt_index_feat_select(__u64 seq, /* slave index should be a regular file */ return ERR_PTR(-ENOENT); return &dt_quota_slv_features; + } else if (seq == FID_SEQ_LAYOUT_RBTREE){ + return &dt_lfsck_orphan_features; } else if (seq >= FID_SEQ_NORMAL) { /* object is part of the namespace, verify that it is a * directory */ @@ -870,8 +883,9 @@ int dt_index_read(const struct lu_env *env, struct dt_device *dev, * time being */ RETURN(-EOPNOTSUPP); - if (!fid_is_quota(&ii->ii_fid)) - /* block access to all local files except quota files */ + if (!fid_is_quota(&ii->ii_fid) && !fid_is_layout_rbtree(&ii->ii_fid)) + /* Block access to all local files except quota files and + * layout brtree. */ RETURN(-EPERM); /* lookup index object subject to the transfer */ @@ -917,13 +931,16 @@ int dt_index_read(const struct lu_env *env, struct dt_device *dev, /* key isn't necessarily unique */ ii->ii_flags |= II_FL_NONUNQ; - dt_read_lock(env, obj, 0); - /* fetch object version before walking the index */ - ii->ii_version = dt_version_get(env, obj); + if (!fid_is_layout_rbtree(&ii->ii_fid)) { + dt_read_lock(env, obj, 0); + /* fetch object version before walking the index */ + ii->ii_version = dt_version_get(env, obj); + } /* walk the index and fill lu_idxpages with key/record pairs */ rc = dt_index_walk(env, obj, rdpg, dt_index_page_build ,ii); - dt_read_unlock(env, obj); + if (!fid_is_layout_rbtree(&ii->ii_fid)) + dt_read_unlock(env, obj); if (rc == 0) { /* index is empty */ diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c index 6c5439c..9900bed 100644 --- a/lustre/ofd/ofd_dev.c +++ b/lustre/ofd/ofd_dev.c @@ -443,6 +443,8 @@ static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev, } rc = lfsck_register_namespace(env, ofd->ofd_osd, ofd->ofd_namespace); + /* The LFSCK instance is registered just now, so it must be there when + * register the namespace to such instance. */ LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc); lsp.lsp_start = NULL; diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index aaf7546..fae592f 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -1075,6 +1075,418 @@ int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +struct osp_orphan_it { + int ooi_pos0; + int ooi_pos1; + int ooi_pos2; + int ooi_total_npages; + int ooi_valid_npages; + unsigned int ooi_swab:1; + __u64 ooi_next; + struct dt_object *ooi_obj; + struct lu_orphan_ent *ooi_ent; + struct page *ooi_cur_page; + struct lu_idxpage *ooi_cur_idxpage; + struct page **ooi_pages; +}; + +static int osp_orphan_index_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, + const struct dt_key *key, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_declare_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ignore_quota) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_declare_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +static struct dt_it *osp_orphan_it_init(const struct lu_env *env, + struct dt_object *dt, + __u32 attr, + struct lustre_capa *capa) +{ + struct osp_orphan_it *it; + + OBD_ALLOC_PTR(it); + if (it == NULL) + return ERR_PTR(-ENOMEM); + + it->ooi_pos2 = -1; + it->ooi_obj = dt; + + return (struct dt_it *)it; +} + +static void osp_orphan_it_fini(const struct lu_env *env, + struct dt_it *di) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct page **pages = it->ooi_pages; + int npages = it->ooi_total_npages; + int i; + + if (pages != NULL) { + for (i = 0; i < npages; i++) { + if (pages[i] != NULL) { + if (pages[i] == it->ooi_cur_page) { + kunmap(pages[i]); + it->ooi_cur_page = NULL; + } + __free_page(pages[i]); + } + } + OBD_FREE(pages, npages * sizeof(*pages)); + } + OBD_FREE_PTR(it); +} + +static int osp_orphan_it_fetch(const struct lu_env *env, + struct osp_orphan_it *it) +{ + struct lu_device *dev = it->ooi_obj->do_lu.lo_dev; + struct osp_device *osp = lu2osp_dev(dev); + struct page **pages; + struct ptlrpc_request *req = NULL; + struct ptlrpc_bulk_desc *desc; + struct idx_info *ii; + int npages; + int rc; + int i; + ENTRY; + + /* 1MB bulk */ + npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20); + npages /= PAGE_CACHE_SIZE; + + OBD_ALLOC(pages, npages * sizeof(*pages)); + if (pages == NULL) + RETURN(-ENOMEM); + + it->ooi_pages = pages; + it->ooi_total_npages = npages; + for (i = 0; i < npages; i++) { + pages[i] = alloc_page(GFP_IOFS); + if (pages[i] == NULL) + RETURN(-ENOMEM); + } + + req = ptlrpc_request_alloc(osp->opd_obd->u.cli.cl_import, + &RQF_OBD_IDX_READ); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + req->rq_request_portal = OST_IDX_PORTAL; + ptlrpc_at_set_req_timeout(req); + + desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, + MDS_BULK_PORTAL); + if (desc == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } + + for (i = 0; i < npages; i++) + ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); + + ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO); + memset(ii, 0, sizeof(*ii)); + ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE; + ii->ii_fid.f_oid = osp->opd_index; + ii->ii_fid.f_ver = 0; + ii->ii_magic = IDX_INFO_MAGIC; + ii->ii_flags = II_FL_NOHASH; + ii->ii_count = npages * LU_PAGE_COUNT; + ii->ii_hash_start = it->ooi_next; + ii->ii_attrs = + osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id; + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); + + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) + GOTO(out, rc); + + ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO); + if (ii->ii_magic != IDX_INFO_MAGIC) + GOTO(out, rc = -EPROTO); + + npages = (ii->ii_count + LU_PAGE_COUNT - 1) >> + (PAGE_CACHE_SHIFT - LU_PAGE_SHIFT); + if (npages > it->ooi_total_npages) { + CERROR("%s: returned more pages than expected, %u > %u\n", + osp->opd_obd->obd_name, npages, it->ooi_total_npages); + GOTO(out, rc = -EINVAL); + } + + it->ooi_valid_npages = npages; + if (ptlrpc_rep_need_swab(req)) + it->ooi_swab = 1; + + it->ooi_next = ii->ii_hash_end; + + GOTO(out, rc = 0); + +out: + ptlrpc_req_finished(req); + + return rc; +} + +static int osp_orphan_it_next(const struct lu_env *env, + struct dt_it *di) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct lu_idxpage *idxpage; + struct page **pages; + int rc; + int i; + ENTRY; + +again2: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos2++; + if (it->ooi_pos2 < idxpage->lip_nr) { + it->ooi_ent = + (struct lu_orphan_ent *)idxpage->lip_entries + + it->ooi_pos2; + if (it->ooi_swab) + lustre_swab_orphan_ent(it->ooi_ent); + RETURN(0); + } + + it->ooi_cur_idxpage = NULL; + it->ooi_pos1++; + +again1: + if (it->ooi_pos1 < LU_PAGE_COUNT) { + it->ooi_cur_idxpage = (void *)it->ooi_cur_page + + LU_PAGE_SIZE * it->ooi_pos1; + if (it->ooi_swab) + lustre_swab_lip_header(it->ooi_cur_idxpage); + if (it->ooi_cur_idxpage->lip_magic != LIP_MAGIC) { + struct osp_device *osp = + lu2osp_dev(it->ooi_obj->do_lu.lo_dev); + + CERROR("%s: invalid magic (%x != %x) for page " + "%d/%d while read layout orphan index\n", + osp->opd_obd->obd_name, + it->ooi_cur_idxpage->lip_magic, + LIP_MAGIC, it->ooi_pos0, it->ooi_pos1); + /* Skip this lu_page next time. */ + it->ooi_pos2 = idxpage->lip_nr - 1; + RETURN(-EINVAL); + } + it->ooi_pos2 = -1; + goto again2; + } + + kunmap(it->ooi_cur_page); + it->ooi_cur_page = NULL; + it->ooi_pos0++; + +again0: + pages = it->ooi_pages; + if (it->ooi_pos0 < it->ooi_valid_npages) { + it->ooi_cur_page = kmap(pages[it->ooi_pos0]); + it->ooi_pos1 = 0; + goto again1; + } + + for (i = 0; i < it->ooi_total_npages; i++) { + if (pages[i] != NULL) + __free_page(pages[i]); + } + OBD_FREE(pages, it->ooi_total_npages * sizeof(*pages)); + + it->ooi_pos0 = 0; + it->ooi_total_npages = 0; + it->ooi_valid_npages = 0; + it->ooi_swab = 0; + it->ooi_ent = NULL; + it->ooi_cur_page = NULL; + it->ooi_cur_idxpage = NULL; + it->ooi_pages = NULL; + } + + if (it->ooi_next == II_END_OFF) + RETURN(1); + + rc = osp_orphan_it_fetch(env, it); + if (rc == 0) + goto again0; + + RETURN(rc); +} + +static int osp_orphan_it_get(const struct lu_env *env, + struct dt_it *di, + const struct dt_key *key) +{ + return -ENOSYS; +} + +static void osp_orphan_it_put(const struct lu_env *env, + struct dt_it *di) +{ +} + +static struct dt_key *osp_orphan_it_key(const struct lu_env *env, + const struct dt_it *di) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct lu_orphan_ent *ent = it->ooi_ent; + + if (likely(ent != NULL)) + return (struct dt_key *)(&ent->loe_key); + + return NULL; +} + +static int osp_orphan_it_key_size(const struct lu_env *env, + const struct dt_it *di) +{ + return sizeof(struct lu_fid); +} + +static int osp_orphan_it_rec(const struct lu_env *env, + const struct dt_it *di, + struct dt_rec *rec, + __u32 attr) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct lu_orphan_ent *ent = it->ooi_ent; + + if (likely(ent != NULL)) { + *(struct lu_orphan_rec *)rec = ent->loe_rec; + return 0; + } + + return -EINVAL; +} + +static __u64 osp_orphan_it_store(const struct lu_env *env, + const struct dt_it *di) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + + return it->ooi_next; +} + +/** + * \retval +1: locate to the exactly position + * \retval 0: cannot locate to the exactly position, + * call next() to move to a valid position. + * \retval -ve: on error + */ +static int osp_orphan_it_load(const struct lu_env *env, + const struct dt_it *di, + __u64 hash) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + int rc; + + it->ooi_next = hash; + rc = osp_orphan_it_next(env, (struct dt_it *)di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; +} + +static int osp_orphan_it_key_rec(const struct lu_env *env, + const struct dt_it *di, + void *key_rec) +{ + return 0; +} + +static const struct dt_index_operations osp_orphan_index_ops = { + .dio_lookup = osp_orphan_index_lookup, + .dio_declare_insert = osp_orphan_index_declare_insert, + .dio_insert = osp_orphan_index_insert, + .dio_declare_delete = osp_orphan_index_declare_delete, + .dio_delete = osp_orphan_index_delete, + .dio_it = { + .init = osp_orphan_it_init, + .fini = osp_orphan_it_fini, + .next = osp_orphan_it_next, + .get = osp_orphan_it_get, + .put = osp_orphan_it_put, + .key = osp_orphan_it_key, + .key_size = osp_orphan_it_key_size, + .rec = osp_orphan_it_rec, + .store = osp_orphan_it_store, + .load = osp_orphan_it_load, + .key_rec = osp_orphan_it_key_rec, + } +}; + +static int osp_index_try(const struct lu_env *env, + struct dt_object *dt, + const struct dt_index_features *feat) +{ + if (fid_is_last_id(lu_object_fid(&dt->do_lu))) { + dt->do_index_ops = &osp_orphan_index_ops; + + return 0; + } + + return -EINVAL; +} + struct dt_object_operations osp_obj_ops = { .do_declare_attr_get = osp_declare_attr_get, .do_attr_get = osp_attr_get, @@ -1088,6 +1500,7 @@ struct dt_object_operations osp_obj_ops = { .do_create = osp_object_create, .do_declare_destroy = osp_declare_object_destroy, .do_destroy = osp_object_destroy, + .do_index_try = osp_index_try, }; static int osp_object_init(const struct lu_env *env, struct lu_object *o, diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index d4da00d..a39381f 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -331,9 +331,53 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) GOTO(out_seq, rc); } + /* Index read service */ + memset(&svc_conf, 0, sizeof(svc_conf)); + svc_conf = (typeof(svc_conf)) { + .psc_name = "ost_idx_read", + .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = OST_NBUFS, + .bc_buf_size = OST_BUFSIZE, + .bc_req_max_size = OST_MAXREQSIZE, + .bc_rep_max_size = OST_MAXREPSIZE, + .bc_req_portal = OST_IDX_PORTAL, + .bc_rep_portal = OSC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "ll_ost_idx", + .tc_thr_factor = OSS_CR_THR_FACTOR, + .tc_nthrs_init = OSS_CR_NTHRS_INIT, + .tc_nthrs_base = OSS_CR_NTHRS_BASE, + .tc_nthrs_max = OSS_CR_NTHRS_MAX, + .tc_nthrs_user = oss_num_create_threads, + .tc_cpu_affinity = 1, + .tc_ctx_tags = LCT_DT_THREAD, + }, + .psc_cpt = { + .cc_pattern = oss_cpts, + }, + .psc_ops = { + .so_req_handler = tgt_request_handle, + .so_req_printer = target_print_req, + }, + }; + ost->ost_idx_service = ptlrpc_register_service(&svc_conf, + obd->obd_proc_entry); + if (IS_ERR(ost->ost_idx_service)) { + rc = PTR_ERR(ost->ost_idx_service); + CERROR("failed to start OST index read service: rc = %d\n", rc); + ost->ost_idx_service = NULL; + GOTO(out_out, rc); + } + ping_evictor_start(); RETURN(0); + +out_out: + ptlrpc_unregister_service(ost->ost_out_service); + ost->ost_out_service = NULL; out_seq: ptlrpc_unregister_service(ost->ost_seq_service); ost->ost_seq_service = NULL; @@ -368,12 +412,14 @@ static int ost_cleanup(struct obd_device *obd) ptlrpc_unregister_service(ost->ost_io_service); ptlrpc_unregister_service(ost->ost_seq_service); ptlrpc_unregister_service(ost->ost_out_service); + ptlrpc_unregister_service(ost->ost_idx_service); ost->ost_service = NULL; ost->ost_create_service = NULL; ost->ost_io_service = NULL; ost->ost_seq_service = NULL; ost->ost_out_service = NULL; + ost->ost_idx_service = NULL; mutex_unlock(&ost->ost_health_mutex); diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 97390de..8f2b24c 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2628,3 +2628,12 @@ void lustre_swab_lfsck_reply(struct lfsck_reply *lr) CLASSERT(offsetof(typeof(*lr), lr_padding_2) != 0); } EXPORT_SYMBOL(lustre_swab_lfsck_reply); + +void lustre_swab_orphan_ent(struct lu_orphan_ent *ent) +{ + lustre_swab_lu_fid(&ent->loe_key); + lustre_swab_lu_fid(&ent->loe_rec.lor_fid); + __swab32s(&ent->loe_rec.lor_uid); + __swab32s(&ent->loe_rec.lor_gid); +} +EXPORT_SYMBOL(lustre_swab_orphan_ent); diff --git a/lustre/tests/sanity-lfsck.sh b/lustre/tests/sanity-lfsck.sh index 65b5e7c..b59bf55 100644 --- a/lustre/tests/sanity-lfsck.sh +++ b/lustre/tests/sanity-lfsck.sh @@ -1170,6 +1170,8 @@ test_12() { echo "setupall" setupall > /dev/null + mkdir -p $DIR/$tdir + echo "All the LFSCK targets should be in 'init' status." for k in $(seq $MDSCOUNT); do local STATUS=$(do_facet mds${k} $LCTL get_param -n \ @@ -1178,13 +1180,13 @@ test_12() { [ "$STATUS" == "init" ] || error "(1) MDS${k} Expect 'init', but got '$STATUS'" - $LFS mkdir -i $((k - 1)) $DIR/${k} - createmany -o $DIR/${k}/f 100 + $LFS mkdir -i $((k - 1)) $DIR/$tdir/${k} + createmany -o $DIR/$tdir/${k}/f 100 done - echo "Start namespace LFSCK on all targets by single command (-s 10)." + echo "Start namespace LFSCK on all targets by single command (-s 1)." do_facet mds1 $LCTL lfsck_start -M ${FSNAME}-MDT0000 -t namespace -A \ - -s 10 || error "(2) Fail to start LFSCK on all devices!" + -s 1 || error "(2) Fail to start LFSCK on all devices!" echo "All the LFSCK targets should be in 'scanning-phase1' status." for k in $(seq $MDSCOUNT); do @@ -1220,9 +1222,9 @@ test_12() { error "(7) MDS${k} is not the expected 'completed'" done - echo "Start layout LFSCK on all targets by single command (-s 10)." + echo "Start layout LFSCK on all targets by single command (-s 1)." do_facet mds1 $LCTL lfsck_start -M ${FSNAME}-MDT0000 -t layout -A \ - -s 10 || error "(8) Fail to start LFSCK on all devices!" + -s 1 || error "(8) Fail to start LFSCK on all devices!" echo "All the LFSCK targets should be in 'scanning-phase1' status." for k in $(seq $MDSCOUNT); do