X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fofd%2Fofd_obd.c;h=dbb7fc9decf1f4379d485a49b62d4602a3e4bd25;hp=1f8e9174c4bef679c66f7090fae7315a88567189;hb=6d33530cc018602a665fc5e724f9f64dcba421c9;hpb=234ce16dba60f2e2c2177e5cde21efd75285e4b4 diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 1f8e917..dbb7fc9 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -44,14 +44,14 @@ #include "ofd_internal.h" #include -#include +#include static int ofd_export_stats_init(struct ofd_device *ofd, struct obd_export *exp, void *client_nid) { struct obd_device *obd = ofd_obd(ofd); struct nid_stat *stats; - int num_stats, i; + int num_stats; int rc, newnid = 0; ENTRY; @@ -75,18 +75,6 @@ static int ofd_export_stats_init(struct ofd_device *ofd, stats = exp->exp_nid_stats; LASSERT(stats != NULL); - OBD_ALLOC(stats->nid_brw_stats, sizeof(struct brw_stats)); - if (stats->nid_brw_stats == NULL) - GOTO(clean, rc = -ENOMEM); - - for (i = 0; i < BRW_LAST; i++) - cfs_spin_lock_init(&stats->nid_brw_stats->hist[i].oh_lock); - - rc = lprocfs_seq_create(stats->nid_proc, "brw_stats", 0644, - &ofd_per_nid_stats_fops, stats); - if (rc) - CWARN("Error adding the brw_stats file\n"); - num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) + LPROC_OFD_LAST - 1; @@ -128,10 +116,11 @@ static int ofd_parse_connect_data(const struct lu_env *env, RETURN(0); CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64 - " ocd_version: %x ocd_grant: %d ocd_index: %u\n", + " ocd_version: %x ocd_grant: %d ocd_index: %u" + " ocd_group %u\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_connect_flags, data->ocd_version, - data->ocd_grant, data->ocd_index); + data->ocd_grant, data->ocd_index, data->ocd_group); if (fed->fed_group != 0 && fed->fed_group != data->ocd_group) { CWARN("!!! This export (nid %s) used object group %d " @@ -145,7 +134,7 @@ static int ofd_parse_connect_data(const struct lu_env *env, fed->fed_group = data->ocd_group; data->ocd_connect_flags &= OST_CONNECT_SUPPORTED; - exp->exp_connect_flags = data->ocd_connect_flags; + exp->exp_connect_data = *data; data->ocd_version = LUSTRE_VERSION_CODE; /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */ @@ -164,21 +153,14 @@ static int ofd_parse_connect_data(const struct lu_env *env, data->ocd_grant_extent = ofd->ofd_dt_conf.ddp_grant_frag >> 10; } - if (exp->exp_connect_flags & OBD_CONNECT_GRANT) + if (exp_connect_flags(exp) & OBD_CONNECT_GRANT) data->ocd_grant = ofd_grant_connect(env, exp, data->ocd_grant); if (data->ocd_connect_flags & OBD_CONNECT_INDEX) { struct lr_server_data *lsd = &ofd->ofd_lut.lut_lsd; - int index = lsd->lsd_ost_index; + int index = lsd->lsd_osd_index; - if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) { - /* this will only happen on the first connect */ - lsd->lsd_ost_index = data->ocd_index; - lsd->lsd_feature_compat |= OBD_COMPAT_OST; - /* sync is not needed here as lut_client_add will - * set exp_need_sync flag */ - lut_server_data_update(env, &ofd->ofd_lut, 0); - } else if (index != data->ocd_index) { + if (index != data->ocd_index) { LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index" " %u doesn't match actual OST index" " %u in last_rcvd file, bad " @@ -187,13 +169,19 @@ static int ofd_parse_connect_data(const struct lu_env *env, data->ocd_index); RETURN(-EBADF); } + if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) { + /* this will only happen on the first connect */ + lsd->lsd_feature_compat |= OBD_COMPAT_OST; + /* sync is not needed here as lut_client_add will + * set exp_need_sync flag */ + tgt_server_data_update(env, &ofd->ofd_lut, 0); + } } - if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_SIZE)) { data->ocd_brw_size = 65536; } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { data->ocd_brw_size = min(data->ocd_brw_size, - (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); + (__u32)DT_MAX_BRW_SIZE); if (data->ocd_brw_size == 0) { CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64 " ocd_version: %x ocd_grant: %d ocd_index: %u " @@ -237,14 +225,24 @@ static int ofd_parse_connect_data(const struct lu_env *env, if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES) data->ocd_maxbytes = ofd->ofd_dt_conf.ddp_maxbytes; - RETURN(0); + if (data->ocd_connect_flags & OBD_CONNECT_PINGLESS) { + if (suppress_pings) { + spin_lock(&exp->exp_obd->obd_dev_lock); + list_del_init(&exp->exp_obd_chain_timed); + spin_unlock(&exp->exp_obd->obd_dev_lock); + } else { + data->ocd_connect_flags &= ~OBD_CONNECT_PINGLESS; + } + } + + RETURN(0); } static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data, void *localdata) { - struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev); + struct ofd_device *ofd; int rc; ENTRY; @@ -252,6 +250,8 @@ static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp, if (exp == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); + ofd = ofd_dev(obd->obd_lu_dev); + rc = lu_env_refill((struct lu_env *)env); if (rc != 0) { CERROR("Failure to refill session: '%d'\n", rc); @@ -273,8 +273,7 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp, struct obd_export *exp; struct ofd_device *ofd; struct lustre_handle conn = { 0 }; - int rc, group; - + int rc; ENTRY; if (_exp == NULL || obd == NULL || cluuid == NULL) @@ -301,25 +300,20 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp, if (rc) GOTO(out, rc); - group = data->ocd_group; if (obd->obd_replayable) { struct tg_export_data *ted = &exp->exp_target_data; memcpy(ted->ted_lcd->lcd_uuid, cluuid, sizeof(ted->ted_lcd->lcd_uuid)); - rc = lut_client_new(env, exp); + rc = tgt_client_new(env, exp); if (rc != 0) GOTO(out, rc); ofd_export_stats_init(ofd, exp, localdata); } - if (group == 0) - GOTO(out, rc = 0); - /* init new group */ - if (group > ofd->ofd_max_group) { - ofd->ofd_max_group = group; - rc = ofd_group_load(env, ofd, group); - } + CDEBUG(D_HA, "%s: get connection from MDS %d\n", obd->obd_name, + data->ocd_group); + out: if (rc != 0) { class_disconnect(exp); @@ -355,7 +349,7 @@ static int ofd_obd_disconnect(struct obd_export *exp) /* Do not erase record for recoverable client. */ if (exp->exp_obd->obd_replayable && (!exp->exp_obd->obd_fail || exp->exp_failed)) - lut_client_del(&env, exp); + tgt_client_del(&env, exp); lu_env_fini(&env); class_export_put(exp); @@ -366,18 +360,18 @@ static int ofd_init_export(struct obd_export *exp) { int rc; - cfs_spin_lock_init(&exp->exp_filter_data.fed_lock); + spin_lock_init(&exp->exp_filter_data.fed_lock); CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); - cfs_spin_lock(&exp->exp_lock); + spin_lock(&exp->exp_lock); exp->exp_connecting = 1; - cfs_spin_unlock(&exp->exp_lock); + spin_unlock(&exp->exp_lock); /* self-export doesn't need client data and ldlm initialization */ if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, &exp->exp_client_uuid))) return 0; - rc = lut_client_alloc(exp); + rc = tgt_client_alloc(exp); if (rc == 0) ldlm_init_export(exp); if (rc) @@ -402,7 +396,7 @@ static int ofd_destroy_export(struct obd_export *exp) return 0; ldlm_destroy_export(exp); - lut_client_free(exp); + tgt_client_free(exp); ofd_fmd_cleanup(exp); @@ -413,7 +407,7 @@ static int ofd_destroy_export(struct obd_export *exp) ofd_grant_discard(exp); ofd_fmd_cleanup(exp); - if (exp->exp_connect_flags & OBD_CONNECT_GRANT_SHRINK) { + if (exp_connect_flags(exp) & OBD_CONNECT_GRANT_SHRINK) { if (ofd->ofd_tot_granted_clients > 0) ofd->ofd_tot_granted_clients --; } @@ -425,6 +419,14 @@ static int ofd_destroy_export(struct obd_export *exp) return 0; } +int ofd_postrecov(const struct lu_env *env, struct ofd_device *ofd) +{ + struct lu_device *ldev = &ofd->ofd_dt_dev.dd_lu_dev; + + CDEBUG(D_HA, "%s: recovery is over\n", ofd_obd(ofd)->obd_name); + return ldev->ld_ops->ldo_recovery_complete(env, ldev); +} + int ofd_obd_postrecov(struct obd_device *obd) { struct lu_env env; @@ -438,7 +440,8 @@ int ofd_obd_postrecov(struct obd_device *obd) RETURN(rc); ofd_info_init(&env, obd->obd_self_export); - rc = ldev->ld_ops->ldo_recovery_complete(&env, ldev); + rc = ofd_postrecov(&env, ofd_dev(ldev)); + lu_env_fini(&env); RETURN(rc); } @@ -460,10 +463,10 @@ static int ofd_adapt_sptlrpc_conf(const struct lu_env *env, sptlrpc_target_update_exp_flavor(obd, &tmp_rset); - cfs_write_lock(&fo->fo_sptlrpc_lock); + write_lock(&fo->fo_sptlrpc_lock); sptlrpc_rule_set_free(&fo->fo_sptlrpc_rset); fo->fo_sptlrpc_rset = tmp_rset; - cfs_write_unlock(&fo->fo_sptlrpc_lock); + write_unlock(&fo->fo_sptlrpc_lock); return 0; } @@ -483,7 +486,7 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, __u32 keylen, void *key, __u32 vallen, void *val, struct ptlrpc_request_set *set) { - struct ofd_device *ofd = ofd_exp(exp); + struct ofd_device *ofd; int rc = 0; ENTRY; @@ -493,6 +496,8 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } + ofd = ofd_exp(exp); + if (KEY_IS(KEY_CAPA_KEY)) { rc = ofd_update_capa_key(ofd, val); if (rc) @@ -505,6 +510,7 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, } else if (KEY_IS(KEY_GRANT_SHRINK)) { struct ost_body *body = val; + ofd_info_init(env, exp); /** handle grant shrink, similar to a read request */ ofd_grant_prepare_read(env, exp, &body->oa); } else { @@ -520,7 +526,7 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, __u32 keylen, void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm) { - struct ofd_device *ofd = ofd_exp(exp); + struct ofd_device *ofd; int rc = 0; ENTRY; @@ -530,6 +536,8 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } + ofd = ofd_exp(exp); + if (KEY_IS(KEY_BLOCKSIZE)) { __u32 *blocksize = val; if (blocksize) { @@ -548,12 +556,24 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, *vallen = sizeof(*blocksize_bits); } else if (KEY_IS(KEY_LAST_ID)) { obd_id *last_id = val; + struct ofd_seq *oseq; + + if (val == NULL) { + *vallen = sizeof(obd_id); + RETURN(0); + } + ofd_info_init(env, exp); + oseq = ofd_seq_load(env, ofd, + (obd_seq)exp->exp_filter_data.fed_group); + LASSERT(!IS_ERR(oseq)); if (last_id) { - if (*vallen < sizeof(*last_id)) + if (*vallen < sizeof(*last_id)) { + ofd_seq_put(env, oseq); RETURN(-EOVERFLOW); - *last_id = ofd_last_id(ofd, - exp->exp_filter_data.fed_group); + } + *last_id = ofd_seq_last_oid(oseq); } + ofd_seq_put(env, oseq); *vallen = sizeof(*last_id); } else if (KEY_IS(KEY_FIEMAP)) { struct ofd_thread_info *info; @@ -597,6 +617,41 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, } else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) { *((__u32 *) val) = ofd->ofd_sync_lock_cancel; *vallen = sizeof(__u32); + } else if (KEY_IS(KEY_LAST_FID)) { + struct lu_env env; + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_seq *oseq; + struct ost_id *oid = val; + int rc; + + if (oid == NULL) { + *vallen = sizeof(struct ost_id); + RETURN(0); + } + + if (*vallen < sizeof(*oid)) + RETURN(-EOVERFLOW); + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc != 0) + RETURN(rc); + ofd_info_init(&env, exp); + + ostid_le_to_cpu(oid, oid); + CDEBUG(D_HA, "Get LAST FID for seq "LPX64"\n", oid->oi_seq); + + oseq = ofd_seq_load(&env, ofd, oid->oi_seq); + if (IS_ERR(oseq)) + GOTO(out_fini, rc = PTR_ERR(oseq)); + + CDEBUG(D_HA, "LAST FID is "POSTID"\n", oseq->os_last_oid, + oseq->os_seq); + + *oid = oseq->os_oi; + *vallen = sizeof(*oid); + ofd_seq_put(&env, oseq); +out_fini: + lu_env_fini(&env); } else { CERROR("Not supported key %s\n", (char*)key); rc = -EOPNOTSUPP; @@ -611,7 +666,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, { int rc; - cfs_spin_lock(&ofd->ofd_osfs_lock); + spin_lock(&ofd->ofd_osfs_lock); if (cfs_time_before_64(ofd->ofd_osfs_age, max_age) || max_age == 0) { obd_size unstable; @@ -630,7 +685,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, /* record value of inflight counter before running statfs to * compute the diff once statfs is completed */ unstable = ofd->ofd_osfs_inflight; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); /* statfs can sleep ... hopefully not for too long since we can * call it fairly often as space fills up */ @@ -638,8 +693,8 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, if (unlikely(rc)) return rc; - cfs_spin_lock(&ofd->ofd_grant_lock); - cfs_spin_lock(&ofd->ofd_osfs_lock); + spin_lock(&ofd->ofd_grant_lock); + spin_lock(&ofd->ofd_osfs_lock); /* calculate how much space was written while we released the * ofd_osfs_lock */ unstable = ofd->ofd_osfs_inflight - unstable; @@ -663,7 +718,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, /* similarly, there is some uncertainty on write requests * between prepare & commit */ ofd->ofd_osfs_unstable += ofd->ofd_tot_pending; - cfs_spin_unlock(&ofd->ofd_grant_lock); + spin_unlock(&ofd->ofd_grant_lock); /* finally udpate cached statfs data */ ofd->ofd_osfs = *osfs; @@ -672,14 +727,14 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, ofd->ofd_statfs_inflight--; /* stop tracking */ if (ofd->ofd_statfs_inflight == 0) ofd->ofd_osfs_inflight = 0; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); if (from_cache) *from_cache = 0; } else { /* use cached statfs data */ *osfs = ofd->ofd_osfs; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); if (from_cache) *from_cache = 1; } @@ -715,7 +770,7 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, /* The QoS code on the MDS does not care about space reserved for * precreate, so take it out. */ - if (exp->exp_connect_flags & OBD_CONNECT_MDS) { + if (exp_connect_flags(exp) & OBD_CONNECT_MDS) { struct filter_export_data *fed; fed = &obd->obd_self_export->exp_filter_data; @@ -730,7 +785,7 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, osfs->os_files, osfs->os_ffree, osfs->os_state); if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOINO, - ofd->ofd_lut.lut_lsd.lsd_ost_index)) + ofd->ofd_lut.lut_lsd.lsd_osd_index)) osfs->os_ffree = 0; /* OS_STATE_READONLY can be set by OSD already */ @@ -750,7 +805,7 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, } if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC, - ofd->ofd_lut.lut_lsd.lsd_ost_index)) + ofd->ofd_lut.lut_lsd.lsd_osd_index)) osfs->os_bfree = osfs->os_bavail = 2; EXIT; @@ -934,7 +989,7 @@ static int ofd_destroy_by_fid(const struct lu_env *env, { struct ofd_thread_info *info = ofd_info(env); struct lustre_handle lockh; - int flags = LDLM_AST_DISCARD_DATA, rc = 0; + __u64 flags = LDLM_AST_DISCARD_DATA, rc = 0; ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; @@ -942,21 +997,22 @@ static int ofd_destroy_by_fid(const struct lu_env *env, ENTRY; + fo = ofd_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + /* Tell the clients that the object is gone now and that they should * throw away any cached pages. */ ofd_build_resid(fid, &info->fti_resid); rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, 0, NULL, &lockh); + NULL, NULL, 0, LVB_T_NONE, NULL, &lockh); /* We only care about the side-effects, just drop the lock. */ if (rc == ELDLM_OK) ldlm_lock_decref(&lockh, LCK_PW); - fo = ofd_object_find(env, ofd, fid); - if (IS_ERR(fo)) - RETURN(PTR_ERR(fo)); LASSERT(fo != NULL); rc = ofd_object_destroy(env, fo, orphan); @@ -1004,14 +1060,15 @@ int ofd_destroy(const struct lu_env *env, struct obd_export *exp, lrc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 0); if (lrc == -ENOENT) { CDEBUG(D_INODE, - "destroying non-existent object "LPU64"\n", - oa->o_id); + "%s: destroying non-existent object "DFID"\n", + ofd_obd(ofd)->obd_name, PFID(&info->fti_fid)); /* rewrite rc with -ENOENT only if it is 0 */ if (rc == 0) rc = lrc; } else if (lrc != 0) { - CEMERG("error destroying object "LPU64": %d\n", - oa->o_id, rc); + CERROR("%s: error destroying object "DFID": %d\n", + ofd_obd(ofd)->obd_name, PFID(&info->fti_fid), + rc); rc = lrc; } count--; @@ -1047,15 +1104,24 @@ static int ofd_orphans_destroy(const struct lu_env *env, int skip_orphan; int rc = 0; struct ost_id oi = oa->o_oi; + struct ofd_seq *oseq; ENTRY; + oseq = ofd_seq_get(ofd, oa->o_seq); + if (oseq == NULL) { + CERROR("%s: Can not find seq for "LPU64":"LPU64"\n", + ofd_name(ofd), oa->o_seq, oa->o_id); + RETURN(-EINVAL); + } + LASSERT(exp != NULL); - skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN); + skip_orphan = !!(exp_connect_flags(exp) & OBD_CONNECT_SKIP_ORPHAN); - last = ofd_last_id(ofd, oa->o_seq); - CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n", - ofd_obd(ofd)->obd_name, oa->o_id + 1, last); + last = ofd_seq_last_oid(oseq); + LCONSOLE_INFO("%s: deleting orphan objects from "LPX64":"LPU64 + " to "LPU64"\n", ofd_name(ofd), oa->o_seq, + oa->o_id + 1, last); for (oi.oi_id = last; oi.oi_id > oa->o_id; oi.oi_id--) { fid_ostid_unpack(&info->fti_fid, &oi, 0); @@ -1064,23 +1130,24 @@ static int ofd_orphans_destroy(const struct lu_env *env, CEMERG("error destroying precreated id "LPU64": %d\n", oi.oi_id, rc); if (!skip_orphan) { - ofd_last_id_set(ofd, oi.oi_id - 1, oa->o_seq); + ofd_seq_last_oid_set(oseq, oi.oi_id - 1); /* update last_id on disk periodically so that if we * restart * we don't need to re-scan all of the just * deleted objects. */ if ((oi.oi_id & 511) == 0) - ofd_last_id_write(env, ofd, oa->o_seq); + ofd_seq_last_oid_write(env, ofd, oseq); } } CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n", ofd_obd(ofd)->obd_name, oa->o_seq, oa->o_id); if (!skip_orphan) { - rc = ofd_last_id_write(env, ofd, oa->o_seq); + rc = ofd_seq_last_oid_write(env, ofd, oseq); } else { /* don't reuse orphan object, return last used objid */ oa->o_id = last; rc = 0; } + ofd_seq_put(env, oseq); RETURN(rc); } @@ -1090,7 +1157,10 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, { struct ofd_device *ofd = ofd_exp(exp); struct ofd_thread_info *info; - int rc = 0, diff; + obd_seq seq = oa->o_seq; + struct ofd_seq *oseq; + int rc = 0, diff; + int sync_trans = 0; ENTRY; @@ -1101,14 +1171,21 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, LASSERT(oa->o_valid & OBD_MD_FLGROUP); CDEBUG(D_INFO, "ofd_create(oa->o_seq="LPU64",oa->o_id="LPU64")\n", - oa->o_seq, oa->o_id); + seq, oa->o_id); + + oseq = ofd_seq_load(env, ofd, seq); + if (IS_ERR(oseq)) { + CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n", + ofd_name(ofd), seq, PTR_ERR(oseq)); + RETURN(-EINVAL); + } if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_RECREATE_OBJS)) { if (!ofd_obd(ofd)->obd_recovering || - oa->o_id > ofd_last_id(ofd, oa->o_seq)) { + oa->o_id > ofd_seq_last_oid(oseq)) { CERROR("recreate objid "LPU64" > last id "LPU64"\n", - oa->o_id, ofd_last_id(ofd, oa->o_seq)); + oa->o_id, ofd_seq_last_oid(oseq)); GOTO(out_nolock, rc = -EINVAL); } /* do nothing because we create objects during first write */ @@ -1120,42 +1197,59 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, /* destroy orphans */ if (oti->oti_conn_cnt < exp->exp_conn_cnt) { CERROR("%s: dropping old orphan cleanup request\n", - ofd_obd(ofd)->obd_name); + ofd_name(ofd)); GOTO(out_nolock, rc = 0); } /* This causes inflight precreates to abort and drop lock */ - cfs_set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); - cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); - if (!cfs_test_bit(oa->o_seq, &ofd->ofd_destroys_in_progress)) { + oseq->os_destroys_in_progress = 1; + mutex_lock(&oseq->os_create_lock); + if (!oseq->os_destroys_in_progress) { CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", exp->exp_obd->obd_name, oa->o_seq); GOTO(out, rc = 0); } - diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + diff = oa->o_id - ofd_seq_last_oid(oseq); CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n", - ofd_last_id(ofd, oa->o_seq), diff); + ofd_seq_last_oid(oseq), diff); if (-diff > OST_MAX_PRECREATE) { /* FIXME: should reset precreate_next_id on MDS */ rc = 0; } else if (diff < 0) { rc = ofd_orphans_destroy(env, exp, ofd, oa); - cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + oseq->os_destroys_in_progress = 0; } else { /* XXX: Used by MDS for the first time! */ - cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + oseq->os_destroys_in_progress = 0; } } else { - cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); + mutex_lock(&oseq->os_create_lock); if (oti->oti_conn_cnt < exp->exp_conn_cnt) { CERROR("%s: dropping old precreate request\n", - ofd_obd(ofd)->obd_name); + ofd_obd(ofd)->obd_name); GOTO(out, rc = 0); } - /* only precreate if group == 0 and o_id is specfied */ - if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) { + /* only precreate if seq is 0, IDIF or normal and also o_id + * must be specfied */ + if ((!fid_seq_is_mdt(oa->o_seq) && + !fid_seq_is_norm(oa->o_seq) && + !fid_seq_is_idif(oa->o_seq)) || oa->o_id == 0) { diff = 1; /* shouldn't we create this right now? */ } else { - diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + diff = oa->o_id - ofd_seq_last_oid(oseq); + /* Do sync create if the seq is about to used up */ + if (fid_seq_is_idif(oa->o_seq) || + fid_seq_is_mdt0(oa->o_seq)) { + if (unlikely(oa->o_id >= IDIF_MAX_OID - 1)) + sync_trans = 1; + } else if (fid_seq_is_norm(oa->o_seq)) { + if (unlikely(oa->o_id >= + LUSTRE_DATA_SEQ_MAX_WIDTH - 1)) + sync_trans = 1; + } else { + CERROR("%s : invalid o_seq "LPX64": rc = %d\n", + ofd_name(ofd), oa->o_seq, -EINVAL); + GOTO(out, rc = -EINVAL); + } } } if (diff > 0) { @@ -1171,15 +1265,15 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, ofd_obd(ofd)->obd_self_export, &diff); if (rc) { - CDEBUG(D_HA, "%s: failed to acquire grant space" - "for precreate (%d)\n", - ofd_obd(ofd)->obd_name, diff); + CDEBUG(D_HA, "%s: failed to acquire grant " + "space for precreate (%d): rc = %d\n", + ofd_name(ofd), diff, rc); diff = 0; } } while (diff > 0) { - next_id = ofd_last_id(ofd, oa->o_seq) + 1; + next_id = ofd_seq_last_oid(oseq) + 1; count = ofd_precreate_batch(ofd, diff); CDEBUG(D_HA, "%s: reserve %d objects in group "LPU64 @@ -1193,10 +1287,10 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, created, diff + created, created / DISK_TIMEOUT); break; - } + } rc = ofd_precreate_objects(env, ofd, next_id, - oa->o_seq, count); + oseq, count, sync_trans); if (rc > 0) { created += rc; diff -= rc; @@ -1204,16 +1298,15 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, break; } } - if (created > 0) { + if (created > 0) /* some objects got created, we can return * them, even if last creation failed */ - oa->o_id = ofd_last_id(ofd, oa->o_seq); rc = 0; - } else { - CERROR("unable to precreate: %d\n", rc); - oa->o_id = ofd_last_id(ofd, oa->o_seq); - } + else + CERROR("%s: unable to precreate: rc = %d\n", + ofd_name(ofd), rc); + oa->o_id = ofd_seq_last_oid(oseq); oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; if (!(oa->o_valid & OBD_MD_FLFLAGS) || @@ -1224,14 +1317,15 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, ofd_info2oti(info, oti); out: - cfs_mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]); + mutex_unlock(&oseq->os_create_lock); out_nolock: if (rc == 0 && ea != NULL) { struct lov_stripe_md *lsm = *ea; lsm->lsm_object_id = oa->o_id; } - return rc; + ofd_seq_put(env, oseq); + RETURN(rc); } int ofd_getattr(const struct lu_env *env, struct obd_export *exp, @@ -1307,15 +1401,14 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp, GOTO(out, rc = PTR_ERR(fo)); } - ofd_write_lock(env, fo); if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); + GOTO(put, rc = -ENOENT); if (dt_version_get(env, ofd_object_child(fo)) > ofd_obd(ofd)->obd_last_committed) { rc = dt_object_sync(env, ofd_object_child(fo)); if (rc) - GOTO(unlock, rc); + GOTO(put, rc); } oinfo->oi_oa->o_valid = OBD_MD_FLID; @@ -1324,8 +1417,64 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp, ofd_counter_incr(exp, LPROC_OFD_STATS_SYNC, oinfo->oi_jobid, 1); EXIT; -unlock: - ofd_write_unlock(env, fo); +put: + ofd_object_put(env, fo); +out: + return rc; +} + +static int ofd_ioc_get_obj_version(const struct lu_env *env, + struct ofd_device *ofd, void *karg) +{ + struct obd_ioctl_data *data = karg; + struct lu_fid fid; + struct ofd_object *fo; + dt_obj_version_t version; + int rc = 0; + + ENTRY; + + if (data->ioc_inlbuf2 == NULL || data->ioc_inllen2 != sizeof(version)) + GOTO(out, rc = -EINVAL); + + if (data->ioc_inlbuf1 != NULL && data->ioc_inllen1 == sizeof(fid)) { + fid = *(struct lu_fid *)data->ioc_inlbuf1; + } else if (data->ioc_inlbuf3 != NULL && + data->ioc_inllen3 == sizeof(__u64) && + data->ioc_inlbuf4 != NULL && + data->ioc_inllen4 == sizeof(__u64)) { + struct ost_id ostid; + + ostid.oi_id = *(__u64 *)data->ioc_inlbuf3; + ostid.oi_seq = *(__u64 *)data->ioc_inlbuf4; + rc = fid_ostid_unpack(&fid, &ostid, 0); + if (rc != 0) + GOTO(out, rc); + } else { + GOTO(out, rc = -EINVAL); + } + + if (!fid_is_sane(&fid)) + GOTO(out, rc = -EINVAL); + + fo = ofd_object_find(env, ofd, &fid); + if (IS_ERR(fo)) + GOTO(out, rc = PTR_ERR(fo)); + + if (!ofd_object_exists(fo)) + GOTO(out_fo, rc = -ENOENT); + + if (lu_object_remote(&fo->ofo_obj.do_lu)) + GOTO(out_fo, rc = -EREMOTE); + + version = dt_version_get(env, ofd_object_child(fo)); + if (version == 0) + GOTO(out_fo, rc = -EIO); + + *(dt_obj_version_t *)data->ioc_inlbuf2 = version; + + EXIT; +out_fo: ofd_object_put(env, fo); out: return rc; @@ -1342,7 +1491,7 @@ int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, ENTRY; CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd); - rc = lu_env_init(&env, LCT_LOCAL); + rc = lu_env_init(&env, LCT_DT_THREAD); if (rc) RETURN(rc); @@ -1360,6 +1509,9 @@ int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, if (rc == 0) rc = dt_ro(&env, ofd->ofd_osd); break; + case OBD_IOC_GET_OBJ_VERSION: + rc = ofd_ioc_get_obj_version(&env, ofd, karg); + break; default: CERROR("%s: not supported cmd = %d\n", obd->obd_name, cmd); rc = -ENOTTY; @@ -1447,23 +1599,6 @@ out: return !!rc; } -static int ofd_obd_notify(struct obd_device *obd, struct obd_device *unused, - enum obd_notify_event ev, void *data) -{ - switch (ev) { - case OBD_NOTIFY_CONFIG: - LASSERT(obd->obd_no_conn); - cfs_spin_lock(&obd->obd_dev_lock); - obd->obd_no_conn = 0; - cfs_spin_unlock(&obd->obd_dev_lock); - break; - default: - CDEBUG(D_INFO, "%s: Unhandled notification %#x\n", - obd->obd_name, ev); - } - return 0; -} - /* * Handle quota control requests to consult current usage/limit. * @@ -1517,6 +1652,5 @@ struct obd_ops ofd_obd_ops = { .o_precleanup = ofd_precleanup, .o_ping = ofd_ping, .o_health_check = ofd_health_check, - .o_notify = ofd_obd_notify, .o_quotactl = ofd_quotactl, };