X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fofd%2Fofd_obd.c;h=99de9eb5a74698f66b264482c8fa14fe7031becf;hb=691fffacfd3b9e9020c615281b409b34eef2dd90;hp=820cfffe7920b58a6be8b8b456c2050d0db60c28;hpb=7c7d754dbe42180ee45eb8a41f36f51447c7abfc;p=fs%2Flustre-release.git diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 820cfff..99de9eb 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -44,7 +44,7 @@ #include "ofd_internal.h" #include -#include +#include static int ofd_export_stats_init(struct ofd_device *ofd, struct obd_export *exp, void *client_nid) @@ -80,7 +80,7 @@ static int ofd_export_stats_init(struct ofd_device *ofd, GOTO(clean, rc = -ENOMEM); for (i = 0; i < BRW_LAST; i++) - cfs_spin_lock_init(&stats->nid_brw_stats->hist[i].oh_lock); + spin_lock_init(&stats->nid_brw_stats->hist[i].oh_lock); rc = lprocfs_seq_create(stats->nid_proc, "brw_stats", 0644, &ofd_per_nid_stats_fops, stats); @@ -128,10 +128,11 @@ static int ofd_parse_connect_data(const struct lu_env *env, RETURN(0); CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64 - " ocd_version: %x ocd_grant: %d ocd_index: %u\n", + " ocd_version: %x ocd_grant: %d ocd_index: %u" + " ocd_group %u\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_connect_flags, data->ocd_version, - data->ocd_grant, data->ocd_index); + data->ocd_grant, data->ocd_index, data->ocd_group); if (fed->fed_group != 0 && fed->fed_group != data->ocd_group) { CWARN("!!! This export (nid %s) used object group %d " @@ -145,7 +146,7 @@ static int ofd_parse_connect_data(const struct lu_env *env, fed->fed_group = data->ocd_group; data->ocd_connect_flags &= OST_CONNECT_SUPPORTED; - exp->exp_connect_flags = data->ocd_connect_flags; + exp->exp_connect_data = *data; data->ocd_version = LUSTRE_VERSION_CODE; /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */ @@ -164,21 +165,14 @@ static int ofd_parse_connect_data(const struct lu_env *env, data->ocd_grant_extent = ofd->ofd_dt_conf.ddp_grant_frag >> 10; } - if (exp->exp_connect_flags & OBD_CONNECT_GRANT) + if (exp_connect_flags(exp) & OBD_CONNECT_GRANT) data->ocd_grant = ofd_grant_connect(env, exp, data->ocd_grant); if (data->ocd_connect_flags & OBD_CONNECT_INDEX) { struct lr_server_data *lsd = &ofd->ofd_lut.lut_lsd; - int index = lsd->lsd_ost_index; + int index = lsd->lsd_osd_index; - if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) { - /* this will only happen on the first connect */ - lsd->lsd_ost_index = data->ocd_index; - lsd->lsd_feature_compat |= OBD_COMPAT_OST; - /* sync is not needed here as lut_client_add will - * set exp_need_sync flag */ - lut_server_data_update(env, &ofd->ofd_lut, 0); - } else if (index != data->ocd_index) { + if (index != data->ocd_index) { LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index" " %u doesn't match actual OST index" " %u in last_rcvd file, bad " @@ -187,13 +181,19 @@ static int ofd_parse_connect_data(const struct lu_env *env, data->ocd_index); RETURN(-EBADF); } + if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) { + /* this will only happen on the first connect */ + lsd->lsd_feature_compat |= OBD_COMPAT_OST; + /* sync is not needed here as lut_client_add will + * set exp_need_sync flag */ + tgt_server_data_update(env, &ofd->ofd_lut, 0); + } } - if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_SIZE)) { data->ocd_brw_size = 65536; } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) { data->ocd_brw_size = min(data->ocd_brw_size, - (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT)); + (__u32)DT_MAX_BRW_SIZE); if (data->ocd_brw_size == 0) { CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64 " ocd_version: %x ocd_grant: %d ocd_index: %u " @@ -237,14 +237,24 @@ static int ofd_parse_connect_data(const struct lu_env *env, if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES) data->ocd_maxbytes = ofd->ofd_dt_conf.ddp_maxbytes; - RETURN(0); + if (data->ocd_connect_flags & OBD_CONNECT_PINGLESS) { + if (suppress_pings) { + spin_lock(&exp->exp_obd->obd_dev_lock); + list_del_init(&exp->exp_obd_chain_timed); + spin_unlock(&exp->exp_obd->obd_dev_lock); + } else { + data->ocd_connect_flags &= ~OBD_CONNECT_PINGLESS; + } + } + + RETURN(0); } static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data, void *localdata) { - struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev); + struct ofd_device *ofd; int rc; ENTRY; @@ -252,6 +262,8 @@ static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp, if (exp == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); + ofd = ofd_dev(obd->obd_lu_dev); + rc = lu_env_refill((struct lu_env *)env); if (rc != 0) { CERROR("Failure to refill session: '%d'\n", rc); @@ -273,8 +285,7 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp, struct obd_export *exp; struct ofd_device *ofd; struct lustre_handle conn = { 0 }; - int rc, group; - + int rc; ENTRY; if (_exp == NULL || obd == NULL || cluuid == NULL) @@ -301,25 +312,20 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp, if (rc) GOTO(out, rc); - group = data->ocd_group; if (obd->obd_replayable) { struct tg_export_data *ted = &exp->exp_target_data; memcpy(ted->ted_lcd->lcd_uuid, cluuid, sizeof(ted->ted_lcd->lcd_uuid)); - rc = lut_client_new(env, exp); + rc = tgt_client_new(env, exp); if (rc != 0) GOTO(out, rc); ofd_export_stats_init(ofd, exp, localdata); } - if (group == 0) - GOTO(out, rc = 0); - /* init new group */ - if (group > ofd->ofd_max_group) { - ofd->ofd_max_group = group; - rc = ofd_group_load(env, ofd, group); - } + CDEBUG(D_HA, "%s: get connection from MDS %d\n", obd->obd_name, + data->ocd_group); + out: if (rc != 0) { class_disconnect(exp); @@ -355,7 +361,7 @@ static int ofd_obd_disconnect(struct obd_export *exp) /* Do not erase record for recoverable client. */ if (exp->exp_obd->obd_replayable && (!exp->exp_obd->obd_fail || exp->exp_failed)) - lut_client_del(&env, exp); + tgt_client_del(&env, exp); lu_env_fini(&env); class_export_put(exp); @@ -366,18 +372,18 @@ static int ofd_init_export(struct obd_export *exp) { int rc; - cfs_spin_lock_init(&exp->exp_filter_data.fed_lock); + spin_lock_init(&exp->exp_filter_data.fed_lock); CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list); - cfs_spin_lock(&exp->exp_lock); + spin_lock(&exp->exp_lock); exp->exp_connecting = 1; - cfs_spin_unlock(&exp->exp_lock); + spin_unlock(&exp->exp_lock); /* self-export doesn't need client data and ldlm initialization */ if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, &exp->exp_client_uuid))) return 0; - rc = lut_client_alloc(exp); + rc = tgt_client_alloc(exp); if (rc == 0) ldlm_init_export(exp); if (rc) @@ -402,7 +408,7 @@ static int ofd_destroy_export(struct obd_export *exp) return 0; ldlm_destroy_export(exp); - lut_client_free(exp); + tgt_client_free(exp); ofd_fmd_cleanup(exp); @@ -413,7 +419,7 @@ static int ofd_destroy_export(struct obd_export *exp) ofd_grant_discard(exp); ofd_fmd_cleanup(exp); - if (exp->exp_connect_flags & OBD_CONNECT_GRANT_SHRINK) { + if (exp_connect_flags(exp) & OBD_CONNECT_GRANT_SHRINK) { if (ofd->ofd_tot_granted_clients > 0) ofd->ofd_tot_granted_clients --; } @@ -425,6 +431,14 @@ static int ofd_destroy_export(struct obd_export *exp) return 0; } +int ofd_postrecov(const struct lu_env *env, struct ofd_device *ofd) +{ + struct lu_device *ldev = &ofd->ofd_dt_dev.dd_lu_dev; + + CDEBUG(D_HA, "%s: recovery is over\n", ofd_obd(ofd)->obd_name); + return ldev->ld_ops->ldo_recovery_complete(env, ldev); +} + int ofd_obd_postrecov(struct obd_device *obd) { struct lu_env env; @@ -438,7 +452,8 @@ int ofd_obd_postrecov(struct obd_device *obd) RETURN(rc); ofd_info_init(&env, obd->obd_self_export); - rc = ldev->ld_ops->ldo_recovery_complete(&env, ldev); + rc = ofd_postrecov(&env, ofd_dev(ldev)); + lu_env_fini(&env); RETURN(rc); } @@ -460,10 +475,10 @@ static int ofd_adapt_sptlrpc_conf(const struct lu_env *env, sptlrpc_target_update_exp_flavor(obd, &tmp_rset); - cfs_write_lock(&fo->fo_sptlrpc_lock); + write_lock(&fo->fo_sptlrpc_lock); sptlrpc_rule_set_free(&fo->fo_sptlrpc_rset); fo->fo_sptlrpc_rset = tmp_rset; - cfs_write_unlock(&fo->fo_sptlrpc_lock); + write_unlock(&fo->fo_sptlrpc_lock); return 0; } @@ -483,7 +498,7 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, __u32 keylen, void *key, __u32 vallen, void *val, struct ptlrpc_request_set *set) { - struct ofd_device *ofd = ofd_exp(exp); + struct ofd_device *ofd; int rc = 0; ENTRY; @@ -493,6 +508,8 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } + ofd = ofd_exp(exp); + if (KEY_IS(KEY_CAPA_KEY)) { rc = ofd_update_capa_key(ofd, val); if (rc) @@ -505,6 +522,7 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, } else if (KEY_IS(KEY_GRANT_SHRINK)) { struct ost_body *body = val; + ofd_info_init(env, exp); /** handle grant shrink, similar to a read request */ ofd_grant_prepare_read(env, exp, &body->oa); } else { @@ -520,7 +538,7 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, __u32 keylen, void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm) { - struct ofd_device *ofd = ofd_exp(exp); + struct ofd_device *ofd; int rc = 0; ENTRY; @@ -530,6 +548,8 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, RETURN(-EINVAL); } + ofd = ofd_exp(exp); + if (KEY_IS(KEY_BLOCKSIZE)) { __u32 *blocksize = val; if (blocksize) { @@ -548,12 +568,24 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, *vallen = sizeof(*blocksize_bits); } else if (KEY_IS(KEY_LAST_ID)) { obd_id *last_id = val; + struct ofd_seq *oseq; + + if (val == NULL) { + *vallen = sizeof(obd_id); + RETURN(0); + } + ofd_info_init(env, exp); + oseq = ofd_seq_load(env, ofd, + (obd_seq)exp->exp_filter_data.fed_group); + LASSERT(!IS_ERR(oseq)); if (last_id) { - if (*vallen < sizeof(*last_id)) + if (*vallen < sizeof(*last_id)) { + ofd_seq_put(env, oseq); RETURN(-EOVERFLOW); - *last_id = ofd_last_id(ofd, - exp->exp_filter_data.fed_group); + } + *last_id = ofd_seq_last_oid(oseq); } + ofd_seq_put(env, oseq); *vallen = sizeof(*last_id); } else if (KEY_IS(KEY_FIEMAP)) { struct ofd_thread_info *info; @@ -597,6 +629,41 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, } else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) { *((__u32 *) val) = ofd->ofd_sync_lock_cancel; *vallen = sizeof(__u32); + } else if (KEY_IS(KEY_LAST_FID)) { + struct lu_env env; + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_seq *oseq; + struct ost_id *oid = val; + int rc; + + if (oid == NULL) { + *vallen = sizeof(struct ost_id); + RETURN(0); + } + + if (*vallen < sizeof(*oid)) + RETURN(-EOVERFLOW); + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc != 0) + RETURN(rc); + ofd_info_init(&env, exp); + + ostid_le_to_cpu(oid, oid); + CDEBUG(D_HA, "Get LAST FID for seq "LPX64"\n", oid->oi_seq); + + oseq = ofd_seq_load(&env, ofd, oid->oi_seq); + if (IS_ERR(oseq)) + GOTO(out_fini, rc = PTR_ERR(oseq)); + + CDEBUG(D_HA, "LAST FID is "POSTID"\n", oseq->os_last_oid, + oseq->os_seq); + + *oid = oseq->os_oi; + *vallen = sizeof(*oid); + ofd_seq_put(&env, oseq); +out_fini: + lu_env_fini(&env); } else { CERROR("Not supported key %s\n", (char*)key); rc = -EOPNOTSUPP; @@ -611,7 +678,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, { int rc; - cfs_spin_lock(&ofd->ofd_osfs_lock); + spin_lock(&ofd->ofd_osfs_lock); if (cfs_time_before_64(ofd->ofd_osfs_age, max_age) || max_age == 0) { obd_size unstable; @@ -630,7 +697,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, /* record value of inflight counter before running statfs to * compute the diff once statfs is completed */ unstable = ofd->ofd_osfs_inflight; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); /* statfs can sleep ... hopefully not for too long since we can * call it fairly often as space fills up */ @@ -638,8 +705,8 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, if (unlikely(rc)) return rc; - cfs_spin_lock(&ofd->ofd_grant_lock); - cfs_spin_lock(&ofd->ofd_osfs_lock); + spin_lock(&ofd->ofd_grant_lock); + spin_lock(&ofd->ofd_osfs_lock); /* calculate how much space was written while we released the * ofd_osfs_lock */ unstable = ofd->ofd_osfs_inflight - unstable; @@ -663,7 +730,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, /* similarly, there is some uncertainty on write requests * between prepare & commit */ ofd->ofd_osfs_unstable += ofd->ofd_tot_pending; - cfs_spin_unlock(&ofd->ofd_grant_lock); + spin_unlock(&ofd->ofd_grant_lock); /* finally udpate cached statfs data */ ofd->ofd_osfs = *osfs; @@ -672,14 +739,14 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, ofd->ofd_statfs_inflight--; /* stop tracking */ if (ofd->ofd_statfs_inflight == 0) ofd->ofd_osfs_inflight = 0; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); if (from_cache) *from_cache = 0; } else { /* use cached statfs data */ *osfs = ofd->ofd_osfs; - cfs_spin_unlock(&ofd->ofd_osfs_lock); + spin_unlock(&ofd->ofd_osfs_lock); if (from_cache) *from_cache = 1; } @@ -715,7 +782,7 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, /* The QoS code on the MDS does not care about space reserved for * precreate, so take it out. */ - if (exp->exp_connect_flags & OBD_CONNECT_MDS) { + if (exp_connect_flags(exp) & OBD_CONNECT_MDS) { struct filter_export_data *fed; fed = &obd->obd_self_export->exp_filter_data; @@ -729,12 +796,8 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, osfs->os_blocks, osfs->os_bfree, osfs->os_bavail, osfs->os_files, osfs->os_ffree, osfs->os_state); - if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC, - ofd->ofd_lut.lut_lsd.lsd_ost_index)) - osfs->os_bfree = osfs->os_bavail = 2; - if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOINO, - ofd->ofd_lut.lut_lsd.lsd_ost_index)) + ofd->ofd_lut.lut_lsd.lsd_osd_index)) osfs->os_ffree = 0; /* OS_STATE_READONLY can be set by OSD already */ @@ -753,6 +816,10 @@ static int ofd_statfs(const struct lu_env *env, struct obd_export *exp, osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT; } + if (OBD_FAIL_CHECK_VALUE(OBD_FAIL_OST_ENOSPC, + ofd->ofd_lut.lut_lsd.lsd_osd_index)) + osfs->os_bfree = osfs->os_bavail = 2; + EXIT; out: return rc; @@ -835,10 +902,13 @@ int ofd_setattr(const struct lu_env *env, struct obd_export *exp, obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS | LA_UID | LA_GID); ofd_info2oti(info, oti); + + ofd_counter_incr(exp, LPROC_OFD_STATS_SETATTR, oti->oti_jobid, 1); + EXIT; out_unlock: ofd_object_put(env, fo); out: - RETURN(rc); + return rc; } static int ofd_punch(const struct lu_env *env, struct obd_export *exp, @@ -916,10 +986,13 @@ static int ofd_punch(const struct lu_env *env, struct obd_export *exp, obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS | LA_UID | LA_GID); ofd_info2oti(info, oti); + + ofd_counter_incr(exp, LPROC_OFD_STATS_PUNCH, oti->oti_jobid, 1); + EXIT; out: ofd_object_put(env, fo); out_env: - RETURN(rc); + return rc; } static int ofd_destroy_by_fid(const struct lu_env *env, @@ -928,7 +1001,7 @@ static int ofd_destroy_by_fid(const struct lu_env *env, { struct ofd_thread_info *info = ofd_info(env); struct lustre_handle lockh; - int flags = LDLM_AST_DISCARD_DATA, rc = 0; + __u64 flags = LDLM_AST_DISCARD_DATA, rc = 0; ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; @@ -936,21 +1009,22 @@ static int ofd_destroy_by_fid(const struct lu_env *env, ENTRY; + fo = ofd_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + /* Tell the clients that the object is gone now and that they should * throw away any cached pages. */ ofd_build_resid(fid, &info->fti_resid); rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, 0, NULL, &lockh); + NULL, NULL, 0, LVB_T_NONE, NULL, &lockh); /* We only care about the side-effects, just drop the lock. */ if (rc == ELDLM_OK) ldlm_lock_decref(&lockh, LCK_PW); - fo = ofd_object_find(env, ofd, fid); - if (IS_ERR(fo)) - RETURN(PTR_ERR(fo)); LASSERT(fo != NULL); rc = ofd_object_destroy(env, fo, orphan); @@ -998,14 +1072,15 @@ int ofd_destroy(const struct lu_env *env, struct obd_export *exp, lrc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 0); if (lrc == -ENOENT) { CDEBUG(D_INODE, - "destroying non-existent object "LPU64"\n", - oa->o_id); + "%s: destroying non-existent object "DFID"\n", + ofd_obd(ofd)->obd_name, PFID(&info->fti_fid)); /* rewrite rc with -ENOENT only if it is 0 */ if (rc == 0) rc = lrc; } else if (lrc != 0) { - CEMERG("error destroying object "LPU64": %d\n", - oa->o_id, rc); + CERROR("%s: error destroying object "DFID": %d\n", + ofd_obd(ofd)->obd_name, PFID(&info->fti_fid), + rc); rc = lrc; } count--; @@ -1041,15 +1116,24 @@ static int ofd_orphans_destroy(const struct lu_env *env, int skip_orphan; int rc = 0; struct ost_id oi = oa->o_oi; + struct ofd_seq *oseq; ENTRY; + oseq = ofd_seq_get(ofd, oa->o_seq); + if (oseq == NULL) { + CERROR("%s: Can not find seq for "LPU64":"LPU64"\n", + ofd_name(ofd), oa->o_seq, oa->o_id); + RETURN(-EINVAL); + } + LASSERT(exp != NULL); - skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN); + skip_orphan = !!(exp_connect_flags(exp) & OBD_CONNECT_SKIP_ORPHAN); - last = ofd_last_id(ofd, oa->o_seq); - CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n", - ofd_obd(ofd)->obd_name, oa->o_id + 1, last); + last = ofd_seq_last_oid(oseq); + LCONSOLE_INFO("%s: deleting orphan objects from "LPX64":"LPU64 + " to "LPU64"\n", ofd_name(ofd), oa->o_seq, + oa->o_id + 1, last); for (oi.oi_id = last; oi.oi_id > oa->o_id; oi.oi_id--) { fid_ostid_unpack(&info->fti_fid, &oi, 0); @@ -1058,23 +1142,24 @@ static int ofd_orphans_destroy(const struct lu_env *env, CEMERG("error destroying precreated id "LPU64": %d\n", oi.oi_id, rc); if (!skip_orphan) { - ofd_last_id_set(ofd, oi.oi_id - 1, oa->o_seq); + ofd_seq_last_oid_set(oseq, oi.oi_id - 1); /* update last_id on disk periodically so that if we * restart * we don't need to re-scan all of the just * deleted objects. */ if ((oi.oi_id & 511) == 0) - ofd_last_id_write(env, ofd, oa->o_seq); + ofd_seq_last_oid_write(env, ofd, oseq); } } CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n", ofd_obd(ofd)->obd_name, oa->o_seq, oa->o_id); if (!skip_orphan) { - rc = ofd_last_id_write(env, ofd, oa->o_seq); + rc = ofd_seq_last_oid_write(env, ofd, oseq); } else { /* don't reuse orphan object, return last used objid */ oa->o_id = last; rc = 0; } + ofd_seq_put(env, oseq); RETURN(rc); } @@ -1084,7 +1169,10 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, { struct ofd_device *ofd = ofd_exp(exp); struct ofd_thread_info *info; - int rc = 0, diff; + obd_seq seq = oa->o_seq; + struct ofd_seq *oseq; + int rc = 0, diff; + int sync_trans = 0; ENTRY; @@ -1095,18 +1183,25 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, LASSERT(oa->o_valid & OBD_MD_FLGROUP); CDEBUG(D_INFO, "ofd_create(oa->o_seq="LPU64",oa->o_id="LPU64")\n", - oa->o_seq, oa->o_id); + seq, oa->o_id); + + oseq = ofd_seq_load(env, ofd, seq); + if (IS_ERR(oseq)) { + CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n", + ofd_name(ofd), seq, PTR_ERR(oseq)); + RETURN(-EINVAL); + } if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_RECREATE_OBJS)) { if (!ofd_obd(ofd)->obd_recovering || - oa->o_id > ofd_last_id(ofd, oa->o_seq)) { + oa->o_id > ofd_seq_last_oid(oseq)) { CERROR("recreate objid "LPU64" > last id "LPU64"\n", - oa->o_id, ofd_last_id(ofd, oa->o_seq)); - GOTO(out, rc = -EINVAL); + oa->o_id, ofd_seq_last_oid(oseq)); + GOTO(out_nolock, rc = -EINVAL); } /* do nothing because we create objects during first write */ - GOTO(out, rc = 0); + GOTO(out_nolock, rc = 0); } /* former ofd_handle_precreate */ if ((oa->o_valid & OBD_MD_FLFLAGS) && @@ -1114,47 +1209,66 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, /* destroy orphans */ if (oti->oti_conn_cnt < exp->exp_conn_cnt) { CERROR("%s: dropping old orphan cleanup request\n", - ofd_obd(ofd)->obd_name); - GOTO(out, rc = 0); + ofd_name(ofd)); + GOTO(out_nolock, rc = 0); } /* This causes inflight precreates to abort and drop lock */ - cfs_set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); - cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); - if (!cfs_test_bit(oa->o_seq, &ofd->ofd_destroys_in_progress)) { + oseq->os_destroys_in_progress = 1; + mutex_lock(&oseq->os_create_lock); + if (!oseq->os_destroys_in_progress) { CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", exp->exp_obd->obd_name, oa->o_seq); GOTO(out, rc = 0); } - diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + diff = oa->o_id - ofd_seq_last_oid(oseq); CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n", - ofd_last_id(ofd, oa->o_seq), diff); + ofd_seq_last_oid(oseq), diff); if (-diff > OST_MAX_PRECREATE) { /* FIXME: should reset precreate_next_id on MDS */ rc = 0; } else if (diff < 0) { rc = ofd_orphans_destroy(env, exp, ofd, oa); - cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + oseq->os_destroys_in_progress = 0; } else { /* XXX: Used by MDS for the first time! */ - cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + oseq->os_destroys_in_progress = 0; } } else { - cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); + mutex_lock(&oseq->os_create_lock); if (oti->oti_conn_cnt < exp->exp_conn_cnt) { CERROR("%s: dropping old precreate request\n", - ofd_obd(ofd)->obd_name); + ofd_obd(ofd)->obd_name); GOTO(out, rc = 0); } - /* only precreate if group == 0 and o_id is specfied */ - if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) { + /* only precreate if seq is 0, IDIF or normal and also o_id + * must be specfied */ + if ((!fid_seq_is_mdt(oa->o_seq) && + !fid_seq_is_norm(oa->o_seq) && + !fid_seq_is_idif(oa->o_seq)) || oa->o_id == 0) { diff = 1; /* shouldn't we create this right now? */ } else { - diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + diff = oa->o_id - ofd_seq_last_oid(oseq); + /* Do sync create if the seq is about to used up */ + if (fid_seq_is_idif(oa->o_seq) || + fid_seq_is_mdt0(oa->o_seq)) { + if (unlikely(oa->o_id >= IDIF_MAX_OID - 1)) + sync_trans = 1; + } else if (fid_seq_is_norm(oa->o_seq)) { + if (unlikely(oa->o_id >= + LUSTRE_DATA_SEQ_MAX_WIDTH - 1)) + sync_trans = 1; + } else { + CERROR("%s : invalid o_seq "LPX64": rc = %d\n", + ofd_name(ofd), oa->o_seq, -EINVAL); + GOTO(out, rc = -EINVAL); + } } } if (diff > 0) { - obd_id next_id = ofd_last_id(ofd, oa->o_seq) + 1; - int i; + cfs_time_t enough_time = cfs_time_shift(DISK_TIMEOUT); + obd_id next_id; + int created = 0; + int count; if (!(oa->o_valid & OBD_MD_FLFLAGS) || !(oa->o_flags & OBD_FL_DELORPHAN)) { @@ -1163,32 +1277,48 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, ofd_obd(ofd)->obd_self_export, &diff); if (rc) { - CDEBUG(D_HA, "%s: failed to acquire grant space" - "for precreate (%d)\n", - ofd_obd(ofd)->obd_name, diff); + CDEBUG(D_HA, "%s: failed to acquire grant " + "space for precreate (%d): rc = %d\n", + ofd_name(ofd), diff, rc); diff = 0; } } - CDEBUG(D_HA, - "%s: reserve %d objects in group "LPU64" at "LPU64"\n", - ofd_obd(ofd)->obd_name, diff, oa->o_seq, next_id); - for (i = 0; i < diff; i++) { - rc = ofd_precreate_object(env, ofd, next_id + i, - oa->o_seq); - if (rc) + while (diff > 0) { + next_id = ofd_seq_last_oid(oseq) + 1; + count = ofd_precreate_batch(ofd, diff); + + CDEBUG(D_HA, "%s: reserve %d objects in group "LPU64 + " at "LPU64"\n", ofd_obd(ofd)->obd_name, + count, oa->o_seq, next_id); + + if (cfs_time_after(jiffies, enough_time)) { + LCONSOLE_WARN("%s: Slow creates, %d/%d objects" + " created at a rate of %d/s\n", + ofd_obd(ofd)->obd_name, + created, diff + created, + created / DISK_TIMEOUT); + break; + } + + rc = ofd_precreate_objects(env, ofd, next_id, + oseq, count, sync_trans); + if (rc > 0) { + created += rc; + diff -= rc; + } else if (rc < 0) { break; + } } - if (i > 0) { + if (created > 0) /* some objects got created, we can return * them, even if last creation failed */ - oa->o_id = ofd_last_id(ofd, oa->o_seq); rc = 0; - } else { - CERROR("unable to precreate: %d\n", rc); - oa->o_id = ofd_last_id(ofd, oa->o_seq); - } + else + CERROR("%s: unable to precreate: rc = %d\n", + ofd_name(ofd), rc); + oa->o_id = ofd_seq_last_oid(oseq); oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; if (!(oa->o_valid & OBD_MD_FLFLAGS) || @@ -1199,13 +1329,15 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp, ofd_info2oti(info, oti); out: - cfs_mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]); + mutex_unlock(&oseq->os_create_lock); +out_nolock: if (rc == 0 && ea != NULL) { struct lov_stripe_md *lsm = *ea; lsm->lsm_object_id = oa->o_id; } - return rc; + ofd_seq_put(env, oseq); + RETURN(rc); } int ofd_getattr(const struct lu_env *env, struct obd_export *exp, @@ -1281,20 +1413,23 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp, GOTO(out, rc = PTR_ERR(fo)); } - ofd_write_lock(env, fo); if (!ofd_object_exists(fo)) - GOTO(unlock, rc = -ENOENT); + GOTO(put, rc = -ENOENT); - rc = dt_object_sync(env, ofd_object_child(fo)); - if (rc) - GOTO(unlock, rc); + if (dt_version_get(env, ofd_object_child(fo)) > + ofd_obd(ofd)->obd_last_committed) { + rc = dt_object_sync(env, ofd_object_child(fo)); + if (rc) + GOTO(put, rc); + } oinfo->oi_oa->o_valid = OBD_MD_FLID; rc = ofd_attr_get(env, fo, &info->fti_attr); obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS); + + ofd_counter_incr(exp, LPROC_OFD_STATS_SYNC, oinfo->oi_jobid, 1); EXIT; -unlock: - ofd_write_unlock(env, fo); +put: ofd_object_put(env, fo); out: return rc; @@ -1360,17 +1495,23 @@ static int ofd_ping(const struct lu_env *env, struct obd_export *exp) return 0; } -static int ofd_health_check(const struct lu_env *env, struct obd_device *obd) +static int ofd_health_check(const struct lu_env *nul, struct obd_device *obd) { struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev); struct ofd_thread_info *info; + struct lu_env env; #ifdef USE_HEALTH_CHECK_WRITE struct thandle *th; #endif int rc = 0; - info = ofd_info_init(env, NULL); - rc = dt_statfs(env, ofd->ofd_osd, &info->fti_u.osfs); + /* obd_proc_read_health pass NULL env, we need real one */ + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + info = ofd_info_init(&env, NULL); + rc = dt_statfs(&env, ofd->ofd_osd, &info->fti_u.osfs); if (unlikely(rc)) GOTO(out, rc); @@ -1385,47 +1526,31 @@ static int ofd_health_check(const struct lu_env *env, struct obd_device *obd) info->fti_buf.lb_len = CFS_PAGE_SIZE; info->fti_off = 0; - th = dt_trans_create(env, ofd->ofd_osd); + th = dt_trans_create(&env, ofd->ofd_osd); if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); - rc = dt_declare_record_write(env, ofd->ofd_health_check_file, + rc = dt_declare_record_write(&env, ofd->ofd_health_check_file, info->fti_buf.lb_len, info->fti_off, th); if (rc == 0) { th->th_sync = 1; /* sync IO is needed */ - rc = dt_trans_start_local(env, ofd->ofd_osd, th); + rc = dt_trans_start_local(&env, ofd->ofd_osd, th); if (rc == 0) - rc = dt_record_write(env, ofd->ofd_health_check_file, + rc = dt_record_write(&env, ofd->ofd_health_check_file, &info->fti_buf, &info->fti_off, th); } - dt_trans_stop(env, ofd->ofd_osd, th); + dt_trans_stop(&env, ofd->ofd_osd, th); OBD_FREE(info->fti_buf.lb_buf, CFS_PAGE_SIZE); CDEBUG(D_INFO, "write 1 page synchronously for checking io rc %d\n",rc); #endif out: + lu_env_fini(&env); return !!rc; } -static int ofd_obd_notify(struct obd_device *obd, struct obd_device *unused, - enum obd_notify_event ev, void *data) -{ - switch (ev) { - case OBD_NOTIFY_CONFIG: - LASSERT(obd->obd_no_conn); - cfs_spin_lock(&obd->obd_dev_lock); - obd->obd_no_conn = 0; - cfs_spin_unlock(&obd->obd_dev_lock); - break; - default: - CDEBUG(D_INFO, "%s: Unhandled notification %#x\n", - obd->obd_name, ev); - } - return 0; -} - /* * Handle quota control requests to consult current usage/limit. * @@ -1479,6 +1604,5 @@ struct obd_ops ofd_obd_ops = { .o_precleanup = ofd_precleanup, .o_ping = ofd_ping, .o_health_check = ofd_health_check, - .o_notify = ofd_obd_notify, .o_quotactl = ofd_quotactl, };