X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fofd%2Fofd_obd.c;h=8bcbd06b09ea8ca9c0c08038e6cb07efdd079086;hp=4895d319d1709b8190f72cbbe9cc28d075d4efa8;hb=a4e8e2f71fb64e293cbf8db07f4608646fd7acd4;hpb=a0a847fee6e30542bffb4e6860499b68cf515998 diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 4895d31..8bcbd06 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,20 +42,24 @@ #define DEBUG_SUBSYSTEM S_FILTER +#include #include "ofd_internal.h" #include -#include +#include +#include static int ofd_export_stats_init(struct ofd_device *ofd, struct obd_export *exp, void *client_nid) { struct obd_device *obd = ofd_obd(ofd); struct nid_stat *stats; - int num_stats, i; + int num_stats; int rc, newnid = 0; ENTRY; + LASSERT(obd->obd_uses_nid_stats); + if (obd_uuid_equals(&exp->exp_client_uuid, &obd->obd_uuid)) /* Self-export gets no proc entry */ RETURN(0); @@ -75,21 +79,7 @@ static int ofd_export_stats_init(struct ofd_device *ofd, stats = exp->exp_nid_stats; LASSERT(stats != NULL); - OBD_ALLOC(stats->nid_brw_stats, sizeof(struct brw_stats)); - if (stats->nid_brw_stats == NULL) - GOTO(clean, rc = -ENOMEM); - - for (i = 0; i < BRW_LAST; i++) - cfs_spin_lock_init(&stats->nid_brw_stats->hist[i].oh_lock); - - rc = lprocfs_seq_create(stats->nid_proc, "brw_stats", 0644, - &ofd_per_nid_stats_fops, stats); - if (rc) - CWARN("Error adding the brw_stats file\n"); - - num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) + - LPROC_OFD_LAST - 1; - + num_stats = NUM_OBD_STATS + LPROC_OFD_LAST; stats->nid_stats = lprocfs_alloc_stats(num_stats, LPROCFS_STATS_FLAG_NOPERCPU); if (stats->nid_stats == NULL) @@ -119,7 +109,8 @@ clean: static int ofd_parse_connect_data(const struct lu_env *env, struct obd_export *exp, - struct obd_connect_data *data) + struct obd_connect_data *data, + bool new_connection) { struct ofd_device *ofd = ofd_exp(exp); struct filter_export_data *fed = &exp->exp_filter_data; @@ -128,10 +119,11 @@ static int ofd_parse_connect_data(const struct lu_env *env, RETURN(0); CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64 - " ocd_version: %x ocd_grant: %d ocd_index: %u\n", + " ocd_version: %x ocd_grant: %d ocd_index: %u" + " ocd_group %u\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_connect_flags, data->ocd_version, - data->ocd_grant, data->ocd_index); + data->ocd_grant, data->ocd_index, data->ocd_group); if (fed->fed_group != 0 && fed->fed_group != data->ocd_group) { CWARN("!!! This export (nid %s) used object group %d " @@ -145,7 +137,6 @@ static int ofd_parse_connect_data(const struct lu_env *env, fed->fed_group = data->ocd_group; data->ocd_connect_flags &= OST_CONNECT_SUPPORTED; - exp->exp_connect_flags = data->ocd_connect_flags; data->ocd_version = LUSTRE_VERSION_CODE; /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */ @@ -164,21 +155,15 @@ static int ofd_parse_connect_data(const struct lu_env *env, data->ocd_grant_extent = ofd->ofd_dt_conf.ddp_grant_frag >> 10; } - if (exp->exp_connect_flags & OBD_CONNECT_GRANT) - data->ocd_grant = ofd_grant_connect(env, exp, data->ocd_grant); + if (data->ocd_connect_flags & OBD_CONNECT_GRANT) + data->ocd_grant = ofd_grant_connect(env, exp, data->ocd_grant, + new_connection); if (data->ocd_connect_flags & OBD_CONNECT_INDEX) { struct lr_server_data *lsd = &ofd->ofd_lut.lut_lsd; - int index = lsd->lsd_ost_index; + int index = lsd->lsd_osd_index; - if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) { - /* this will only happen on the first connect */ - lsd->lsd_ost_index = data->ocd_index; - lsd->lsd_feature_compat |= OBD_COMPAT_OST; - /* sync is not needed here as lut_client_add will - * set exp_need_sync flag */ - lut_server_data_update(env, &ofd->ofd_lut, 0); - } else if (index != data->ocd_index) { + if (index != data->ocd_index) { LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index" " %u doesn't match actual OST index" " %u in last_rcvd file, bad " @@ -187,13 +172,19 @@ static int ofd_parse_connect_data(const struct lu_env *env, data->ocd_index); RETURN(-EBADF); } + if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) { + /* this will only happen on the first connect */ + lsd->lsd_feature_compat |= OBD_COMPAT_OST; + /* sync is not needed here as lut_client_add will + * set exp_need_sync flag */ + tgt_server_data_update(env, &ofd->ofd_lut, 0); + } } - if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_SIZE)) { data->ocd_brw_size = 65536; } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { data->ocd_brw_size = min(data->ocd_brw_size, - (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); + (__u32)DT_MAX_BRW_SIZE); if (data->ocd_brw_size == 0) { CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64 " ocd_version: %x ocd_grant: %d ocd_index: %u " @@ -237,14 +228,24 @@ static int ofd_parse_connect_data(const struct lu_env *env, if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES) data->ocd_maxbytes = ofd->ofd_dt_conf.ddp_maxbytes; - RETURN(0); + if (OCD_HAS_FLAG(data, PINGLESS)) { + if (ptlrpc_pinger_suppress_pings()) { + spin_lock(&exp->exp_obd->obd_dev_lock); + list_del_init(&exp->exp_obd_chain_timed); + spin_unlock(&exp->exp_obd->obd_dev_lock); + } else { + data->ocd_connect_flags &= ~OBD_CONNECT_PINGLESS; + } + } + + RETURN(0); } static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data, void *localdata) { - struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev); + struct ofd_device *ofd; int rc; ENTRY; @@ -252,6 +253,8 @@ static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp, if (exp == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); + ofd = ofd_dev(obd->obd_lu_dev); + rc = lu_env_refill((struct lu_env *)env); if (rc != 0) { CERROR("Failure to refill session: '%d'\n", rc); @@ -259,7 +262,7 @@ static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp, } ofd_info_init(env, exp); - rc = ofd_parse_connect_data(env, exp, data); + rc = ofd_parse_connect_data(env, exp, data, false); if (rc == 0) ofd_export_stats_init(ofd, exp, localdata); @@ -273,8 +276,7 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp, struct obd_export *exp; struct ofd_device *ofd; struct lustre_handle conn = { 0 }; - int rc, group; - + int rc; ENTRY; if (_exp == NULL || obd == NULL || cluuid == NULL) @@ -297,29 +299,24 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp, ofd_info_init(env, exp); - rc = ofd_parse_connect_data(env, exp, data); + rc = ofd_parse_connect_data(env, exp, data, true); if (rc) GOTO(out, rc); - group = data->ocd_group; if (obd->obd_replayable) { struct tg_export_data *ted = &exp->exp_target_data; memcpy(ted->ted_lcd->lcd_uuid, cluuid, sizeof(ted->ted_lcd->lcd_uuid)); - rc = lut_client_new(env, exp); + rc = tgt_client_new(env, exp); if (rc != 0) GOTO(out, rc); ofd_export_stats_init(ofd, exp, localdata); } - if (group == 0) - GOTO(out, rc = 0); - /* init new group */ - if (group > ofd->ofd_max_group) { - ofd->ofd_max_group = group; - rc = ofd_group_load(env, ofd, group); - } + CDEBUG(D_HA, "%s: get connection from MDS %d\n", obd->obd_name, + data ? data->ocd_group : -1); + out: if (rc != 0) { class_disconnect(exp); @@ -355,7 +352,7 @@ static int ofd_obd_disconnect(struct obd_export *exp) /* Do not erase record for recoverable client. */ if (exp->exp_obd->obd_replayable && (!exp->exp_obd->obd_fail || exp->exp_failed)) - lut_client_del(&env, exp); + tgt_client_del(&env, exp); lu_env_fini(&env); class_export_put(exp); @@ -366,18 +363,18 @@ static int ofd_init_export(struct obd_export *exp) { int rc; - cfs_spin_lock_init(&exp->exp_filter_data.fed_lock); + spin_lock_init(&exp->exp_filter_data.fed_lock); CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); - cfs_spin_lock(&exp->exp_lock); + spin_lock(&exp->exp_lock); exp->exp_connecting = 1; - cfs_spin_unlock(&exp->exp_lock); + spin_unlock(&exp->exp_lock); /* self-export doesn't need client data and ldlm initialization */ if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, &exp->exp_client_uuid))) return 0; - rc = lut_client_alloc(exp); + rc = tgt_client_alloc(exp); if (rc == 0) ldlm_init_export(exp); if (rc) @@ -402,7 +399,7 @@ static int ofd_destroy_export(struct obd_export *exp) return 0; ldlm_destroy_export(exp); - lut_client_free(exp); + tgt_client_free(exp); ofd_fmd_cleanup(exp); @@ -413,7 +410,7 @@ static int ofd_destroy_export(struct obd_export *exp) ofd_grant_discard(exp); ofd_fmd_cleanup(exp); - if (exp->exp_connect_flags & OBD_CONNECT_GRANT_SHRINK) { + if (exp_connect_flags(exp) & OBD_CONNECT_GRANT_SHRINK) { if (ofd->ofd_tot_granted_clients > 0) ofd->ofd_tot_granted_clients --; } @@ -425,6 +422,14 @@ static int ofd_destroy_export(struct obd_export *exp) return 0; } +int ofd_postrecov(const struct lu_env *env, struct ofd_device *ofd) +{ + struct lu_device *ldev = &ofd->ofd_dt_dev.dd_lu_dev; + + CDEBUG(D_HA, "%s: recovery is over\n", ofd_obd(ofd)->obd_name); + return ldev->ld_ops->ldo_recovery_complete(env, ldev); +} + int ofd_obd_postrecov(struct obd_device *obd) { struct lu_env env; @@ -438,7 +443,8 @@ int ofd_obd_postrecov(struct obd_device *obd) RETURN(rc); ofd_info_init(&env, obd->obd_self_export); - rc = ldev->ld_ops->ldo_recovery_complete(&env, ldev); + rc = ofd_postrecov(&env, ofd_dev(ldev)); + lu_env_fini(&env); RETURN(rc); } @@ -460,10 +466,10 @@ static int ofd_adapt_sptlrpc_conf(const struct lu_env *env, sptlrpc_target_update_exp_flavor(obd, &tmp_rset); - cfs_write_lock(&fo->fo_sptlrpc_lock); + write_lock(&fo->fo_sptlrpc_lock); sptlrpc_rule_set_free(&fo->fo_sptlrpc_rset); fo->fo_sptlrpc_rset = tmp_rset; - cfs_write_unlock(&fo->fo_sptlrpc_lock); + write_unlock(&fo->fo_sptlrpc_lock); return 0; } @@ -483,7 +489,7 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, __u32 keylen, void *key, __u32 vallen, void *val, struct ptlrpc_request_set *set) { - struct ofd_device *ofd = ofd_exp(exp); + struct ofd_device *ofd; int rc = 0; ENTRY; @@ -493,6 +499,8 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } + ofd = ofd_exp(exp); + if (KEY_IS(KEY_CAPA_KEY)) { rc = ofd_update_capa_key(ofd, val); if (rc) @@ -505,6 +513,7 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, } else if (KEY_IS(KEY_GRANT_SHRINK)) { struct ost_body *body = val; + ofd_info_init(env, exp); /** handle grant shrink, similar to a read request */ ofd_grant_prepare_read(env, exp, &body->oa); } else { @@ -520,8 +529,8 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, __u32 keylen, void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm) { - struct ofd_device *ofd = ofd_exp(exp); - int rc = 0; + struct ofd_device *ofd; + int rc = 0; ENTRY; @@ -530,11 +539,21 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } + /* Because ofd_get_info might be called from + * handle_request_in as well(see LU-3239), where env might + * not be initilaized correctly, and le_ses might be in + * an un-initialized state, so only refill le_ctx here */ + rc = lu_env_refill((struct lu_env *)env); + if (rc != 0) + RETURN(rc); + + ofd = ofd_exp(exp); + if (KEY_IS(KEY_BLOCKSIZE)) { __u32 *blocksize = val; if (blocksize) { if (*vallen < sizeof(*blocksize)) - RETURN(-EOVERFLOW); + GOTO(out, rc = -EOVERFLOW); *blocksize = 1 << ofd->ofd_dt_conf.ddp_block_shift; } *vallen = sizeof(*blocksize); @@ -542,42 +561,53 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, __u32 *blocksize_bits = val; if (blocksize_bits) { if (*vallen < sizeof(*blocksize_bits)) - RETURN(-EOVERFLOW); + GOTO(out, rc = -EOVERFLOW); *blocksize_bits = ofd->ofd_dt_conf.ddp_block_shift; } *vallen = sizeof(*blocksize_bits); } else if (KEY_IS(KEY_LAST_ID)) { obd_id *last_id = val; + struct ofd_seq *oseq; + + if (val == NULL) { + *vallen = sizeof(obd_id); + GOTO(out, rc = 0); + } + ofd_info_init(env, exp); + oseq = ofd_seq_load(env, ofd, + (obd_seq)exp->exp_filter_data.fed_group); + LASSERT(!IS_ERR(oseq)); if (last_id) { - if (*vallen < sizeof(*last_id)) - RETURN(-EOVERFLOW); - *last_id = ofd_last_id(ofd, - exp->exp_filter_data.fed_group); + if (*vallen < sizeof(*last_id)) { + ofd_seq_put(env, oseq); + GOTO(out, rc = -EOVERFLOW); + } + *last_id = ofd_seq_last_oid(oseq); } + ofd_seq_put(env, oseq); *vallen = sizeof(*last_id); } else if (KEY_IS(KEY_FIEMAP)) { - struct ofd_thread_info *info; struct ofd_device *ofd = ofd_exp(exp); struct ofd_object *fo; struct ll_fiemap_info_key *fm_key = key; + struct lu_fid fid; if (val == NULL) { *vallen = fiemap_count_to_size( fm_key->fiemap.fm_extent_count); - RETURN(0); + GOTO(out, rc = 0); } - info = ofd_info_init(env, exp); - - fid_ostid_unpack(&info->fti_fid, &fm_key->oa.o_oi, 0); - + rc = ostid_to_fid(&fid, &fm_key->oa.o_oi, 0); + if (rc != 0) + GOTO(out, rc); CDEBUG(D_INODE, "get FIEMAP of object "DFID"\n", - PFID(&info->fti_fid)); + PFID(&fid)); - fo = ofd_object_find(env, ofd, &info->fti_fid); + fo = ofd_object_find(env, ofd, &fid); if (IS_ERR(fo)) { CERROR("%s: error finding object "DFID"\n", - exp->exp_obd->obd_name, PFID(&info->fti_fid)); + exp->exp_obd->obd_name, PFID(&fid)); rc = PTR_ERR(fo); } else { struct ll_user_fiemap *fiemap = val; @@ -597,11 +627,44 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, } else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) { *((__u32 *) val) = ofd->ofd_sync_lock_cancel; *vallen = sizeof(__u32); + } else if (KEY_IS(KEY_LAST_FID)) { + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_seq *oseq; + struct lu_fid *fid = val; + int rc; + + if (fid == NULL) { + *vallen = sizeof(struct lu_fid); + GOTO(out, rc = 0); + } + + if (*vallen < sizeof(*fid)) + GOTO(out, rc = -EOVERFLOW); + + ofd_info_init(env, exp); + + fid_le_to_cpu(fid, fid); + + oseq = ofd_seq_load(env, ofd, + ostid_seq((struct ost_id *)fid)); + if (IS_ERR(oseq)) + GOTO(out, rc = PTR_ERR(oseq)); + + rc = ostid_to_fid(fid, &oseq->os_oi, + ofd->ofd_lut.lut_lsd.lsd_osd_index); + if (rc != 0) + GOTO(out_put, rc); + + CDEBUG(D_HA, "%s: LAST FID is "DFID"\n", ofd_name(ofd), + PFID(fid)); + *vallen = sizeof(*fid); +out_put: + ofd_seq_put(env, oseq); } else { CERROR("Not supported key %s\n", (char*)key); rc = -EOPNOTSUPP; } - +out: RETURN(rc); } @@ -611,7 +674,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, { int rc; - cfs_spin_lock(&ofd->ofd_osfs_lock); + spin_lock(&ofd->ofd_osfs_lock); if (cfs_time_before_64(ofd->ofd_osfs_age, max_age) || max_age == 0) { obd_size unstable; @@ -630,7 +693,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, /* record value of inflight counter before running statfs to * compute the diff once statfs is completed */ unstable = ofd->ofd_osfs_inflight; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); /* statfs can sleep ... hopefully not for too long since we can * call it fairly often as space fills up */ @@ -638,8 +701,8 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, if (unlikely(rc)) return rc; - cfs_spin_lock(&ofd->ofd_grant_lock); - cfs_spin_lock(&ofd->ofd_osfs_lock); + spin_lock(&ofd->ofd_grant_lock); + spin_lock(&ofd->ofd_osfs_lock); /* calculate how much space was written while we released the * ofd_osfs_lock */ unstable = ofd->ofd_osfs_inflight - unstable; @@ -663,7 +726,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, /* similarly, there is some uncertainty on write requests * between prepare & commit */ ofd->ofd_osfs_unstable += ofd->ofd_tot_pending; - cfs_spin_unlock(&ofd->ofd_grant_lock); + spin_unlock(&ofd->ofd_grant_lock); /* finally udpate cached statfs data */ ofd->ofd_osfs = *osfs; @@ -672,14 +735,14 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, ofd->ofd_statfs_inflight--; /* stop tracking */ if (ofd->ofd_statfs_inflight == 0) ofd->ofd_osfs_inflight = 0; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); if (from_cache) *from_cache = 0; } else { /* use cached statfs data */ *osfs = ofd->ofd_osfs; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); if (from_cache) *from_cache = 1; } @@ -715,7 +778,7 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, /* The QoS code on the MDS does not care about space reserved for * precreate, so take it out. */ - if (exp->exp_connect_flags & OBD_CONNECT_MDS) { + if (exp_connect_flags(exp) & OBD_CONNECT_MDS) { struct filter_export_data *fed; fed = &obd->obd_self_export->exp_filter_data; @@ -729,12 +792,8 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, osfs->os_blocks, osfs->os_bfree, osfs->os_bavail, osfs->os_files, osfs->os_ffree, osfs->os_state); - if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC, - ofd->ofd_lut.lut_lsd.lsd_ost_index)) - osfs->os_bfree = osfs->os_bavail = 2; - if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOINO, - ofd->ofd_lut.lut_lsd.lsd_ost_index)) + ofd->ofd_lut.lut_lsd.lsd_osd_index)) osfs->os_ffree = 0; /* OS_STATE_READONLY can be set by OSD already */ @@ -753,6 +812,10 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT; } + if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC, + ofd->ofd_lut.lut_lsd.lsd_osd_index)) + osfs->os_bfree = osfs->os_bavail = 2; + EXIT; out: return rc; @@ -775,10 +838,12 @@ int ofd_setattr(const struct lu_env *env, struct obd_export *exp, info = ofd_info_init(env, exp); ofd_oti2info(info, oti); - fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); - ofd_build_resid(&info->fti_fid, &info->fti_resid); + rc = ostid_to_fid(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + if (rc != 0) + RETURN(rc); - rc = ofd_auth_capa(exp, &info->fti_fid, oa->o_seq, + ost_fid_build_resid(&info->fti_fid, &info->fti_resid); + rc = ofd_auth_capa(exp, &info->fti_fid, ostid_seq(&oa->o_oi), oinfo_capa(oinfo), CAPA_OPC_META_WRITE); if (rc) GOTO(out, rc); @@ -822,18 +887,6 @@ int ofd_setattr(const struct lu_env *env, struct obd_export *exp, if (rc) GOTO(out_unlock, rc); - res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); - if (res != NULL) { - ldlm_res_lvbo_update(res, NULL, 0); - ldlm_resource_putref(res); - } - - oinfo->oi_oa->o_valid = OBD_MD_FLID; - - /* Quota release needs uid/gid info */ - rc = ofd_attr_get(env, fo, &info->fti_attr); - obdo_from_la(oinfo->oi_oa, &info->fti_attr, - OFD_VALID_FLAGS | LA_UID | LA_GID); ofd_info2oti(info, oti); ofd_counter_incr(exp, LPROC_OFD_STATS_SETATTR, oti->oti_jobid, 1); @@ -841,6 +894,19 @@ int ofd_setattr(const struct lu_env *env, struct obd_export *exp, out_unlock: ofd_object_put(env, fo); out: + if (rc == 0) { + /* we do not call this before to avoid lu_object_find() in + * ->lvbo_update() holding another reference on the object. + * otherwise concurrent destroy can make the object unavailable + * for 2nd lu_object_find() waiting for the first reference + * to go... deadlock! */ + res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); + if (res != NULL) { + ldlm_res_lvbo_update(res, NULL, 0); + ldlm_resource_putref(res); + } + } + return rc; } @@ -861,15 +927,17 @@ static int ofd_punch(const struct lu_env *env, struct obd_export *exp, info = ofd_info_init(env, exp); ofd_oti2info(info, oti); - fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); - ofd_build_resid(&info->fti_fid, &info->fti_resid); + rc = ostid_to_fid(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + if (rc != 0) + RETURN(rc); + ost_fid_build_resid(&info->fti_fid, &info->fti_resid); CDEBUG(D_INODE, "calling punch for object "DFID", valid = "LPX64 ", start = "LPD64", end = "LPD64"\n", PFID(&info->fti_fid), oinfo->oi_oa->o_valid, oinfo->oi_policy.l_extent.start, oinfo->oi_policy.l_extent.end); - rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + rc = ofd_auth_capa(exp, &info->fti_fid, ostid_seq(&oinfo->oi_oa->o_oi), oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC); if (rc) GOTO(out_env, rc); @@ -934,7 +1002,8 @@ static int ofd_destroy_by_fid(const struct lu_env *env, { struct ofd_thread_info *info = ofd_info(env); struct lustre_handle lockh; - int flags = LDLM_AST_DISCARD_DATA, rc = 0; + __u64 flags = LDLM_FL_AST_DISCARD_DATA; + __u64 rc = 0; ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; @@ -942,25 +1011,29 @@ static int ofd_destroy_by_fid(const struct lu_env *env, ENTRY; + fo = ofd_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + if (!ofd_object_exists(fo)) + GOTO(out, rc = -ENOENT); + /* Tell the clients that the object is gone now and that they should * throw away any cached pages. */ - ofd_build_resid(fid, &info->fti_resid); + ost_fid_build_resid(fid, &info->fti_resid); rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, 0, NULL, &lockh); + NULL, NULL, 0, LVB_T_NONE, NULL, &lockh); /* We only care about the side-effects, just drop the lock. */ if (rc == ELDLM_OK) ldlm_lock_decref(&lockh, LCK_PW); - fo = ofd_object_find(env, ofd, fid); - if (IS_ERR(fo)) - RETURN(PTR_ERR(fo)); LASSERT(fo != NULL); rc = ofd_object_destroy(env, fo, orphan); - + EXIT; +out: ofd_object_put(env, fo); RETURN(rc); } @@ -981,7 +1054,7 @@ int ofd_destroy(const struct lu_env *env, struct obd_export *exp, ofd_oti2info(info, oti); if (!(oa->o_valid & OBD_MD_FLGROUP)) - oa->o_seq = 0; + ostid_set_seq_mdt0(&oa->o_oi); /* check that o_misc makes sense */ if (oa->o_valid & OBD_MD_FLOBJCOUNT) @@ -997,25 +1070,34 @@ int ofd_destroy(const struct lu_env *env, struct obd_export *exp, */ if (info->fti_transno == 0) /* not replay */ info->fti_mult_trans = 1; + + CDEBUG(D_HA, "%s: Destroy object "DOSTID" count %d\n", ofd_name(ofd), + POSTID(&oa->o_oi), count); while (count > 0) { int lrc; - fid_ostid_unpack(&info->fti_fid, &oa->o_oi, 0); + lrc = ostid_to_fid(&info->fti_fid, &oa->o_oi, 0); + if (lrc != 0) { + if (rc == 0) + rc = lrc; + GOTO(out, rc); + } lrc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 0); if (lrc == -ENOENT) { CDEBUG(D_INODE, - "destroying non-existent object "LPU64"\n", - oa->o_id); + "%s: destroying non-existent object "DFID"\n", + ofd_obd(ofd)->obd_name, PFID(&info->fti_fid)); /* rewrite rc with -ENOENT only if it is 0 */ if (rc == 0) rc = lrc; } else if (lrc != 0) { - CEMERG("error destroying object "LPU64": %d\n", - oa->o_id, rc); + CERROR("%s: error destroying object "DFID": %d\n", + ofd_obd(ofd)->obd_name, PFID(&info->fti_fid), + rc); rc = lrc; } count--; - oa->o_id++; + ostid_inc_id(&oa->o_oi); } /* if we have transaction then there were some deletions, we don't @@ -1035,6 +1117,7 @@ int ofd_destroy(const struct lu_env *env, struct obd_export *exp, info->fti_transno = 0; } ofd_info2oti(info, oti); +out: RETURN(rc); } @@ -1047,40 +1130,55 @@ static int ofd_orphans_destroy(const struct lu_env *env, int skip_orphan; int rc = 0; struct ost_id oi = oa->o_oi; + __u64 end_id = ostid_id(&oa->o_oi); + struct ofd_seq *oseq; ENTRY; + oseq = ofd_seq_get(ofd, ostid_seq(&oa->o_oi)); + if (oseq == NULL) { + CERROR("%s: Can not find seq for "DOSTID"\n", + ofd_name(ofd), POSTID(&oa->o_oi)); + RETURN(-EINVAL); + } + LASSERT(exp != NULL); - skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN); + skip_orphan = !!(exp_connect_flags(exp) & OBD_CONNECT_SKIP_ORPHAN); - last = ofd_last_id(ofd, oa->o_seq); - CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n", - ofd_obd(ofd)->obd_name, oa->o_id + 1, last); + last = ofd_seq_last_oid(oseq); + LCONSOLE(D_INFO, "%s: deleting orphan objects from "DOSTID + " to "DOSTID"\n", ofd_name(ofd), ostid_seq(&oa->o_oi), + end_id + 1, ostid_seq(&oa->o_oi), last); - for (oi.oi_id = last; oi.oi_id > oa->o_id; oi.oi_id--) { - fid_ostid_unpack(&info->fti_fid, &oi, 0); + for (ostid_set_id(&oi, last); ostid_id(&oi) > end_id; + ostid_dec_id(&oi)) { + rc = ostid_to_fid(&info->fti_fid, &oi, 0); + if (rc != 0) + GOTO(out_put, rc); rc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 1); if (rc && rc != -ENOENT) /* this is pretty fatal... */ - CEMERG("error destroying precreated id "LPU64": %d\n", - oi.oi_id, rc); + CEMERG("%s: error destroying precreated id "DOSTID + ": rc = %d\n", ofd_name(ofd), POSTID(&oi), rc); if (!skip_orphan) { - ofd_last_id_set(ofd, oi.oi_id - 1, oa->o_seq); + ofd_seq_last_oid_set(oseq, ostid_id(&oi) - 1); /* update last_id on disk periodically so that if we * restart * we don't need to re-scan all of the just * deleted objects. */ - if ((oi.oi_id & 511) == 0) - ofd_last_id_write(env, ofd, oa->o_seq); + if ((ostid_id(&oi) & 511) == 0) + ofd_seq_last_oid_write(env, ofd, oseq); } } - CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n", - ofd_obd(ofd)->obd_name, oa->o_seq, oa->o_id); + CDEBUG(D_HA, "%s: after destroy: set last_id to "DOSTID"\n", + ofd_obd(ofd)->obd_name, POSTID(&oa->o_oi)); if (!skip_orphan) { - rc = ofd_last_id_write(env, ofd, oa->o_seq); + rc = ofd_seq_last_oid_write(env, ofd, oseq); } else { /* don't reuse orphan object, return last used objid */ - oa->o_id = last; + ostid_set_id(&oa->o_oi, last); rc = 0; } +out_put: + ofd_seq_put(env, oseq); RETURN(rc); } @@ -1090,25 +1188,34 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, { struct ofd_device *ofd = ofd_exp(exp); struct ofd_thread_info *info; - int rc = 0, diff; + obd_seq seq = ostid_seq(&oa->o_oi); + struct ofd_seq *oseq; + int rc = 0, diff; + int sync_trans = 0; ENTRY; info = ofd_info_init(env, exp); ofd_oti2info(info, oti); - LASSERT(oa->o_seq >= FID_SEQ_OST_MDT0); + LASSERT(ostid_seq(&oa->o_oi) >= FID_SEQ_OST_MDT0); LASSERT(oa->o_valid & OBD_MD_FLGROUP); - CDEBUG(D_INFO, "ofd_create(oa->o_seq="LPU64",oa->o_id="LPU64")\n", - oa->o_seq, oa->o_id); + CDEBUG(D_INFO, "ofd_create("DOSTID")\n", POSTID(&oa->o_oi)); + + oseq = ofd_seq_load(env, ofd, seq); + if (IS_ERR(oseq)) { + CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n", + ofd_name(ofd), seq, PTR_ERR(oseq)); + RETURN(-EINVAL); + } if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_RECREATE_OBJS)) { if (!ofd_obd(ofd)->obd_recovering || - oa->o_id > ofd_last_id(ofd, oa->o_seq)) { - CERROR("recreate objid "LPU64" > last id "LPU64"\n", - oa->o_id, ofd_last_id(ofd, oa->o_seq)); + ostid_id(&oa->o_oi) > ofd_seq_last_oid(oseq)) { + CERROR("recreate objid "DOSTID" > last id "LPU64"\n", + POSTID(&oa->o_oi), ofd_seq_last_oid(oseq)); GOTO(out_nolock, rc = -EINVAL); } /* do nothing because we create objects during first write */ @@ -1120,42 +1227,62 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, /* destroy orphans */ if (oti->oti_conn_cnt < exp->exp_conn_cnt) { CERROR("%s: dropping old orphan cleanup request\n", - ofd_obd(ofd)->obd_name); + ofd_name(ofd)); GOTO(out_nolock, rc = 0); } /* This causes inflight precreates to abort and drop lock */ - cfs_set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); - cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); - if (!cfs_test_bit(oa->o_seq, &ofd->ofd_destroys_in_progress)) { - CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", - exp->exp_obd->obd_name, oa->o_seq); + oseq->os_destroys_in_progress = 1; + mutex_lock(&oseq->os_create_lock); + if (!oseq->os_destroys_in_progress) { + CERROR("%s:["LPU64"] destroys_in_progress already" + " cleared\n", exp->exp_obd->obd_name, + ostid_seq(&oa->o_oi)); + ostid_set_id(&oa->o_oi, ofd_seq_last_oid(oseq)); GOTO(out, rc = 0); } - diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + diff = ostid_id(&oa->o_oi) - ofd_seq_last_oid(oseq); CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n", - ofd_last_id(ofd, oa->o_seq), diff); + ofd_seq_last_oid(oseq), diff); if (-diff > OST_MAX_PRECREATE) { /* FIXME: should reset precreate_next_id on MDS */ rc = 0; } else if (diff < 0) { rc = ofd_orphans_destroy(env, exp, ofd, oa); - cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + oseq->os_destroys_in_progress = 0; } else { /* XXX: Used by MDS for the first time! */ - cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + oseq->os_destroys_in_progress = 0; } } else { - cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); + mutex_lock(&oseq->os_create_lock); if (oti->oti_conn_cnt < exp->exp_conn_cnt) { CERROR("%s: dropping old precreate request\n", - ofd_obd(ofd)->obd_name); + ofd_obd(ofd)->obd_name); GOTO(out, rc = 0); } - /* only precreate if group == 0 and o_id is specfied */ - if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) { + /* only precreate if seq is 0, IDIF or normal and also o_id + * must be specfied */ + if ((!fid_seq_is_mdt(ostid_seq(&oa->o_oi)) && + !fid_seq_is_norm(ostid_seq(&oa->o_oi)) && + !fid_seq_is_idif(ostid_seq(&oa->o_oi))) || + ostid_id(&oa->o_oi) == 0) { diff = 1; /* shouldn't we create this right now? */ } else { - diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + diff = ostid_id(&oa->o_oi) - ofd_seq_last_oid(oseq); + /* Do sync create if the seq is about to used up */ + if (fid_seq_is_idif(ostid_seq(&oa->o_oi)) || + fid_seq_is_mdt0(ostid_seq(&oa->o_oi))) { + if (unlikely(ostid_id(&oa->o_oi) >= IDIF_MAX_OID - 1)) + sync_trans = 1; + } else if (fid_seq_is_norm(ostid_seq(&oa->o_oi))) { + if (unlikely(ostid_id(&oa->o_oi) >= + LUSTRE_DATA_SEQ_MAX_WIDTH - 1)) + sync_trans = 1; + } else { + CERROR("%s : invalid o_seq "DOSTID": rc = %d\n", + ofd_name(ofd), POSTID(&oa->o_oi), -EINVAL); + GOTO(out, rc = -EINVAL); + } } } if (diff > 0) { @@ -1171,20 +1298,36 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, ofd_obd(ofd)->obd_self_export, &diff); if (rc) { - CDEBUG(D_HA, "%s: failed to acquire grant space" - "for precreate (%d)\n", - ofd_obd(ofd)->obd_name, diff); + CDEBUG(D_HA, "%s: failed to acquire grant " + "space for precreate (%d): rc = %d\n", + ofd_name(ofd), diff, rc); diff = 0; } } + /* This can happen if a new OST is formatted and installed + * in place of an old one at the same index. Instead of + * precreating potentially millions of deleted old objects + * (possibly filling the OST), only precreate the last batch. + * LFSCK will eventually clean up any orphans. LU-14 */ + if (diff > 5 * OST_MAX_PRECREATE) { + diff = OST_MAX_PRECREATE / 2; + LCONSOLE_WARN("%s: precreate FID "DOSTID" is over %u " + "larger than the LAST_ID "DOSTID", only " + "precreating the last %u objects.\n", + ofd_name(ofd), POSTID(&oa->o_oi), + 5 * OST_MAX_PRECREATE, + POSTID(&oseq->os_oi), diff); + ofd_seq_last_oid_set(oseq, ostid_id(&oa->o_oi) - diff); + } + while (diff > 0) { - next_id = ofd_last_id(ofd, oa->o_seq) + 1; + next_id = ofd_seq_last_oid(oseq) + 1; count = ofd_precreate_batch(ofd, diff); - CDEBUG(D_HA, "%s: reserve %d objects in group "LPU64 + CDEBUG(D_HA, "%s: reserve %d objects in group "LPX64 " at "LPU64"\n", ofd_obd(ofd)->obd_name, - count, oa->o_seq, next_id); + count, ostid_seq(&oa->o_oi), next_id); if (cfs_time_after(jiffies, enough_time)) { LCONSOLE_WARN("%s: Slow creates, %d/%d objects" @@ -1193,10 +1336,10 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, created, diff + created, created / DISK_TIMEOUT); break; - } + } rc = ofd_precreate_objects(env, ofd, next_id, - oa->o_seq, count); + oseq, count, sync_trans); if (rc > 0) { created += rc; diff -= rc; @@ -1204,16 +1347,15 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, break; } } - if (created > 0) { + if (created > 0) /* some objects got created, we can return * them, even if last creation failed */ - oa->o_id = ofd_last_id(ofd, oa->o_seq); rc = 0; - } else { - CERROR("unable to precreate: %d\n", rc); - oa->o_id = ofd_last_id(ofd, oa->o_seq); - } + else + CERROR("%s: unable to precreate: rc = %d\n", + ofd_name(ofd), rc); + ostid_set_id(&oa->o_oi, ofd_seq_last_oid(oseq)); oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; if (!(oa->o_valid & OBD_MD_FLFLAGS) || @@ -1224,14 +1366,15 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, ofd_info2oti(info, oti); out: - cfs_mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]); + mutex_unlock(&oseq->os_create_lock); out_nolock: if (rc == 0 && ea != NULL) { struct lov_stripe_md *lsm = *ea; - lsm->lsm_object_id = oa->o_id; + lsm->lsm_oi = oa->o_oi; } - return rc; + ofd_seq_put(env, oseq); + RETURN(rc); } int ofd_getattr(const struct lu_env *env, struct obd_export *exp, @@ -1240,15 +1383,16 @@ int ofd_getattr(const struct lu_env *env, struct obd_export *exp, struct ofd_device *ofd = ofd_exp(exp); struct ofd_thread_info *info; struct ofd_object *fo; - __u64 curr_version; int rc = 0; ENTRY; info = ofd_info_init(env, exp); - fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); - rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + rc = ostid_to_fid(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + if (rc != 0) + GOTO(out, rc); + rc = ofd_auth_capa(exp, &info->fti_fid, ostid_seq(&oinfo->oi_oa->o_oi), oinfo_capa(oinfo), CAPA_OPC_META_READ); if (rc) GOTO(out, rc); @@ -1256,19 +1400,27 @@ int ofd_getattr(const struct lu_env *env, struct obd_export *exp, fo = ofd_object_find(env, ofd, &info->fti_fid); if (IS_ERR(fo)) GOTO(out, rc = PTR_ERR(fo)); + if (!ofd_object_exists(fo)) + GOTO(out_put, rc = -ENOENT); + LASSERT(fo != NULL); rc = ofd_attr_get(env, fo, &info->fti_attr); oinfo->oi_oa->o_valid = OBD_MD_FLID; - if (rc == 0) + if (rc == 0) { + __u64 curr_version; + obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS | LA_UID | LA_GID); - /* Store object version in reply */ - curr_version = dt_version_get(env, ofd_object_child(fo)); - if ((__s64)curr_version != -EOPNOTSUPP) { - oinfo->oi_oa->o_valid |= OBD_MD_FLDATAVERSION; - oinfo->oi_oa->o_data_version = curr_version; + /* Store object version in reply */ + curr_version = dt_version_get(env, ofd_object_child(fo)); + if ((__s64)curr_version != -EOPNOTSUPP) { + oinfo->oi_oa->o_valid |= OBD_MD_FLDATAVERSION; + oinfo->oi_oa->o_data_version = curr_version; + } } + +out_put: ofd_object_put(env, fo); out: RETURN(rc); @@ -1292,9 +1444,11 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp, } info = ofd_info_init(env, exp); - fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + rc = ostid_to_fid(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + if (rc != 0) + GOTO(out, rc); - rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + rc = ofd_auth_capa(exp, &info->fti_fid, ostid_seq(&oinfo->oi_oa->o_oi), oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC); if (rc) GOTO(out, rc); @@ -1307,15 +1461,14 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp, GOTO(out, rc = PTR_ERR(fo)); } - ofd_write_lock(env, fo); if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); + GOTO(put, rc = -ENOENT); if (dt_version_get(env, ofd_object_child(fo)) > ofd_obd(ofd)->obd_last_committed) { rc = dt_object_sync(env, ofd_object_child(fo)); if (rc) - GOTO(unlock, rc); + GOTO(put, rc); } oinfo->oi_oa->o_valid = OBD_MD_FLID; @@ -1324,8 +1477,64 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp, ofd_counter_incr(exp, LPROC_OFD_STATS_SYNC, oinfo->oi_jobid, 1); EXIT; -unlock: - ofd_write_unlock(env, fo); +put: + ofd_object_put(env, fo); +out: + return rc; +} + +static int ofd_ioc_get_obj_version(const struct lu_env *env, + struct ofd_device *ofd, void *karg) +{ + struct obd_ioctl_data *data = karg; + struct lu_fid fid; + struct ofd_object *fo; + dt_obj_version_t version; + int rc = 0; + + ENTRY; + + if (data->ioc_inlbuf2 == NULL || data->ioc_inllen2 != sizeof(version)) + GOTO(out, rc = -EINVAL); + + if (data->ioc_inlbuf1 != NULL && data->ioc_inllen1 == sizeof(fid)) { + fid = *(struct lu_fid *)data->ioc_inlbuf1; + } else if (data->ioc_inlbuf3 != NULL && + data->ioc_inllen3 == sizeof(__u64) && + data->ioc_inlbuf4 != NULL && + data->ioc_inllen4 == sizeof(__u64)) { + struct ost_id ostid; + + ostid_set_seq(&ostid, *(__u64 *)data->ioc_inlbuf4); + ostid_set_id(&ostid, *(__u64 *)data->ioc_inlbuf3); + rc = ostid_to_fid(&fid, &ostid, 0); + if (rc != 0) + GOTO(out, rc); + } else { + GOTO(out, rc = -EINVAL); + } + + if (!fid_is_sane(&fid)) + GOTO(out, rc = -EINVAL); + + fo = ofd_object_find(env, ofd, &fid); + if (IS_ERR(fo)) + GOTO(out, rc = PTR_ERR(fo)); + + if (!ofd_object_exists(fo)) + GOTO(out_fo, rc = -ENOENT); + + if (lu_object_remote(&fo->ofo_obj.do_lu)) + GOTO(out_fo, rc = -EREMOTE); + + version = dt_version_get(env, ofd_object_child(fo)); + if (version == 0) + GOTO(out_fo, rc = -EIO); + + *(dt_obj_version_t *)data->ioc_inlbuf2 = version; + + EXIT; +out_fo: ofd_object_put(env, fo); out: return rc; @@ -1342,7 +1551,7 @@ int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, ENTRY; CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd); - rc = lu_env_init(&env, LCT_LOCAL); + rc = lu_env_init(&env, LCT_DT_THREAD); if (rc) RETURN(rc); @@ -1360,6 +1569,27 @@ int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, if (rc == 0) rc = dt_ro(&env, ofd->ofd_osd); break; + case OBD_IOC_START_LFSCK: { + struct obd_ioctl_data *data = karg; + struct lfsck_start_param lsp; + + if (unlikely(data == NULL)) { + rc = -EINVAL; + break; + } + + lsp.lsp_start = (struct lfsck_start *)(data->ioc_inlbuf1); + lsp.lsp_namespace = ofd->ofd_namespace; + rc = lfsck_start(&env, ofd->ofd_osd, &lsp); + break; + } + case OBD_IOC_STOP_LFSCK: { + rc = lfsck_stop(&env, ofd->ofd_osd, false); + break; + } + case OBD_IOC_GET_OBJ_VERSION: + rc = ofd_ioc_get_obj_version(&env, ofd, karg); + break; default: CERROR("%s: not supported cmd = %d\n", obd->obd_name, cmd); rc = -ENOTTY; @@ -1415,11 +1645,11 @@ static int ofd_health_check(const struct lu_env *nul, struct obd_device *obd) GOTO(out, rc = -EROFS); #ifdef USE_HEALTH_CHECK_WRITE - OBD_ALLOC(info->fti_buf.lb_buf, CFS_PAGE_SIZE); + OBD_ALLOC(info->fti_buf.lb_buf, PAGE_CACHE_SIZE); if (info->fti_buf.lb_buf == NULL) GOTO(out, rc = -ENOMEM); - info->fti_buf.lb_len = CFS_PAGE_SIZE; + info->fti_buf.lb_len = PAGE_CACHE_SIZE; info->fti_off = 0; th = dt_trans_create(&env, ofd->ofd_osd); @@ -1438,7 +1668,7 @@ static int ofd_health_check(const struct lu_env *nul, struct obd_device *obd) } dt_trans_stop(&env, ofd->ofd_osd, th); - OBD_FREE(info->fti_buf.lb_buf, CFS_PAGE_SIZE); + OBD_FREE(info->fti_buf.lb_buf, PAGE_CACHE_SIZE); CDEBUG(D_INFO, "write 1 page synchronously for checking io rc %d\n",rc); #endif @@ -1447,23 +1677,6 @@ out: return !!rc; } -static int ofd_obd_notify(struct obd_device *obd, struct obd_device *unused, - enum obd_notify_event ev, void *data) -{ - switch (ev) { - case OBD_NOTIFY_CONFIG: - LASSERT(obd->obd_no_conn); - cfs_spin_lock(&obd->obd_dev_lock); - obd->obd_no_conn = 0; - cfs_spin_unlock(&obd->obd_dev_lock); - break; - default: - CDEBUG(D_INFO, "%s: Unhandled notification %#x\n", - obd->obd_name, ev); - } - return 0; -} - /* * Handle quota control requests to consult current usage/limit. * @@ -1517,6 +1730,5 @@ struct obd_ops ofd_obd_ops = { .o_precleanup = ofd_precleanup, .o_ping = ofd_ping, .o_health_check = ofd_health_check, - .o_notify = ofd_obd_notify, .o_quotactl = ofd_quotactl, };