X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fofd%2Fofd_fs.c;h=e8d4cb2f22f5ddd58cd56dd4f9c8989d040a109b;hb=5707f919e41dffe024d5852321e0216eb105e206;hp=e85ba4d9fa1037a97823e72fac871d6a361d109f;hpb=3269ac0f5186c3c319d10e8e642ea56e5dbbbdf2;p=fs%2Flustre-release.git diff --git a/lustre/ofd/ofd_fs.c b/lustre/ofd/ofd_fs.c index e85ba4d..e8d4cb2 100644 --- a/lustre/ofd/ofd_fs.c +++ b/lustre/ofd/ofd_fs.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -85,7 +85,7 @@ struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, obd_seq seq) read_lock(&ofd->ofd_seq_list_lock); cfs_list_for_each_entry(oseq, &ofd->ofd_seq_list, os_list) { - if (oseq->os_seq == seq) { + if (ostid_seq(&oseq->os_oi) == seq) { cfs_atomic_inc(&oseq->os_refc); read_unlock(&ofd->ofd_seq_list_lock); return oseq; @@ -132,7 +132,7 @@ static struct ofd_seq *ofd_seq_add(const struct lu_env *env, write_lock(&ofd->ofd_seq_list_lock); cfs_list_for_each_entry(os, &ofd->ofd_seq_list, os_list) { - if (os->os_seq == new_seq->os_seq) { + if (ostid_seq(&os->os_oi) == ostid_seq(&new_seq->os_oi)) { cfs_atomic_inc(&os->os_refc); write_unlock(&ofd->ofd_seq_list_lock); /* The seq has not been added to the list */ @@ -152,7 +152,7 @@ obd_id ofd_seq_last_oid(struct ofd_seq *oseq) obd_id id; spin_lock(&oseq->os_last_oid_lock); - id = oseq->os_last_oid; + id = ostid_id(&oseq->os_oi); spin_unlock(&oseq->os_last_oid_lock); return id; @@ -161,8 +161,8 @@ obd_id ofd_seq_last_oid(struct ofd_seq *oseq) void ofd_seq_last_oid_set(struct ofd_seq *oseq, obd_id id) { spin_lock(&oseq->os_last_oid_lock); - if (likely(oseq->os_last_oid < id)) - oseq->os_last_oid = id; + if (likely(ostid_id(&oseq->os_oi) < id)) + ostid_set_id(&oseq->os_oi, id); spin_unlock(&oseq->os_last_oid_lock); } @@ -175,25 +175,67 @@ int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd, ENTRY; + tmp = cpu_to_le64(ofd_seq_last_oid(oseq)); + info->fti_buf.lb_buf = &tmp; info->fti_buf.lb_len = sizeof(tmp); info->fti_off = 0; - CDEBUG(D_INODE, "%s: write last_objid for seq "LPX64" : "LPX64"\n", - ofd_name(ofd), oseq->os_seq, ofd_seq_last_oid(oseq)); - - tmp = cpu_to_le64(ofd_seq_last_oid(oseq)); - rc = ofd_record_write(env, ofd, oseq->os_lastid_obj, &info->fti_buf, &info->fti_off); + + CDEBUG(D_INODE, "%s: write last_objid "DOSTID": rc = %d\n", + ofd_name(ofd), POSTID(&oseq->os_oi), rc); + RETURN(rc); } +static void ofd_deregister_seq_exp(struct ofd_device *ofd) +{ + struct seq_server_site *ss = &ofd->ofd_seq_site; + + if (ss->ss_client_seq != NULL) { + lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp); + ss->ss_client_seq->lcs_exp = NULL; + } + + if (ss->ss_server_fld != NULL) { + lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp); + ss->ss_server_fld->lsf_control_exp = NULL; + } +} + +static int ofd_fld_fini(const struct lu_env *env, + struct ofd_device *ofd) +{ + struct seq_server_site *ss = &ofd->ofd_seq_site; + ENTRY; + + if (ss && ss->ss_server_fld) { + fld_server_fini(env, ss->ss_server_fld); + OBD_FREE_PTR(ss->ss_server_fld); + ss->ss_server_fld = NULL; + } + + RETURN(0); +} + void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd) { struct ofd_seq *oseq; struct ofd_seq *tmp; cfs_list_t dispose; + int rc; + + ofd_deregister_seq_exp(ofd); + + rc = ofd_fid_fini(env, ofd); + if (rc != 0) + CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc); + + rc = ofd_fld_fini(env, ofd); + if (rc != 0) + CERROR("%s: fld fini error: rc = %d\n", ofd_name(ofd), rc); CFS_INIT_LIST_HEAD(&dispose); write_lock(&ofd->ofd_seq_list_lock); @@ -211,6 +253,10 @@ void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd) return; } +/** + * + * \retval the seq with seq number or errno (never NULL) + */ struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd, obd_seq seq) { @@ -251,7 +297,7 @@ struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd, CFS_INIT_LIST_HEAD(&oseq->os_list); mutex_init(&oseq->os_create_lock); spin_lock_init(&oseq->os_last_oid_lock); - oseq->os_seq = seq; + ostid_set_seq(&oseq->os_oi, seq); cfs_atomic_set(&oseq->os_refc, 1); @@ -261,7 +307,7 @@ struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd, if (info->fti_attr.la_size == 0) { /* object is just created, initialize last id */ - oseq->os_last_oid = OFD_INIT_OBJID; + ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID); ofd_seq_last_oid_write(env, ofd, oseq); } else if (info->fti_attr.la_size == sizeof(lastid)) { info->fti_off = 0; @@ -274,7 +320,7 @@ struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd, ofd_name(ofd), rc); GOTO(cleanup, rc); } - oseq->os_last_oid = le64_to_cpu(lastid); + ofd_seq_last_oid_set(oseq, le64_to_cpu(lastid)); } else { CERROR("%s: corrupted size "LPU64" LAST_ID of seq "LPX64"\n", ofd_name(ofd), (__u64)info->fti_attr.la_size, seq); @@ -282,19 +328,94 @@ struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd, } oseq = ofd_seq_add(env, ofd, oseq); - RETURN(oseq); + RETURN((oseq != NULL) ? oseq : ERR_PTR(-ENOENT)); cleanup: ofd_seq_put(env, oseq); return ERR_PTR(rc); } +static int ofd_fld_init(const struct lu_env *env, const char *uuid, + struct ofd_device *ofd) +{ + struct seq_server_site *ss = &ofd->ofd_seq_site; + int rc; + ENTRY; + + OBD_ALLOC_PTR(ss->ss_server_fld); + if (ss->ss_server_fld == NULL) + RETURN(rc = -ENOMEM); + + rc = fld_server_init(env, ss->ss_server_fld, ofd->ofd_osd, uuid, + ss->ss_node_id, LU_SEQ_RANGE_OST); + if (rc) { + OBD_FREE_PTR(ss->ss_server_fld); + ss->ss_server_fld = NULL; + RETURN(rc); + } + RETURN(0); +} + +static int ofd_register_seq_exp(struct ofd_device *ofd) +{ + struct seq_server_site *ss = &ofd->ofd_seq_site; + char *lwp_name = NULL; + int rc; + + OBD_ALLOC(lwp_name, MAX_OBD_NAME); + if (lwp_name == NULL) + GOTO(out_free, rc = -ENOMEM); + + rc = tgt_name2lwpname(ofd_name(ofd), lwp_name); + if (rc != 0) + GOTO(out_free, rc); + + rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp, + NULL, NULL); + if (rc != 0) + GOTO(out_free, rc); + + rc = lustre_register_lwp_item(lwp_name, + &ss->ss_server_fld->lsf_control_exp, + NULL, NULL); + if (rc != 0) { + lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp); + ss->ss_client_seq->lcs_exp = NULL; + GOTO(out_free, rc); + } +out_free: + if (lwp_name != NULL) + OBD_FREE(lwp_name, MAX_OBD_NAME); + + return rc; +} + /* object sequence management */ int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd) { + int rc; + + rc = ofd_fid_init(env, ofd); + if (rc != 0) { + CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc); + return rc; + } + + rc = ofd_fld_init(env, ofd_name(ofd), ofd); + if (rc) { + CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc); + return rc; + } + + rc = ofd_register_seq_exp(ofd); + if (rc) { + CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc); + return rc; + } + rwlock_init(&ofd->ofd_seq_list_lock); CFS_INIT_LIST_HEAD(&ofd->ofd_seq_list); ofd->ofd_seq_count = 0; - return 0; + return rc; } int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd, @@ -320,7 +441,7 @@ int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd, __u64 last_rcvd; /* Don't assume off is incremented properly by - * fsfilt_read_record(), in case sizeof(*lcd) + * read_record(), in case sizeof(*lcd) * isn't the same as fsd->lsd_client_size. */ off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size; rc = tgt_client_data_read(env, &ofd->ofd_lut, lcd, &off, cl_idx);