X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_dev.c;h=9a66737bc033d838435b22fa0f9863c16df0b315;hp=9c533a9e14cd552fb170bb0d5954003dd3720e10;hb=f4547f0569774eb794fb143362e201f658415f4c;hpb=5165cdd4b063d523e5ae261f47818b5ba2bbc7cc diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 9c533a9..9a66737 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel, Inc. + * Copyright (c) 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -39,6 +39,7 @@ * * Author: Alex Zhuravlev * Author: Mikhail Pershin + * Author: Di Wang */ #ifndef EXPORT_SYMTAB @@ -48,6 +49,8 @@ #include #include +#include +#include #include "osp_internal.h" @@ -69,20 +72,28 @@ struct lu_object *osp_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *d) { - struct lu_object_header *h; + struct lu_object_header *h = NULL; struct osp_object *o; struct lu_object *l; - LASSERT(hdr == NULL); - OBD_SLAB_ALLOC_PTR_GFP(o, osp_object_kmem, CFS_ALLOC_IO); if (o != NULL) { l = &o->opo_obj.do_lu; - h = &o->opo_header; - lu_object_header_init(h); - dt_object_init(&o->opo_obj, h, d); - lu_object_add_top(h, l); + /* For data object, OSP obj would always be the top + * object, i.e. hdr is always NULL, see lu_object_alloc. + * But for metadata object, we always build the object + * stack from MDT. i.e. mdt_object will be the top object + * i.e. hdr != NULL */ + if (hdr == NULL) { + /* object for OST */ + h = &o->opo_header; + lu_object_header_init(h); + dt_object_init(&o->opo_obj, h, d); + lu_object_add_top(h, l); + } else { + dt_object_init(&o->opo_obj, h, d); + } l->lo_ops = &osp_lu_obj_ops; @@ -92,118 +103,228 @@ struct lu_object *osp_object_alloc(const struct lu_env *env, } } -/* Update opd_last_used_id along with checking for gap in objid sequence */ -void osp_update_last_id(struct osp_device *d, obd_id objid) +static struct dt_object +*osp_find_or_create(const struct lu_env *env, struct osp_device *osp, + struct lu_attr *attr, __u32 reg_id) { - /* - * we might have lost precreated objects due to VBR and precreate - * orphans, the gap in objid can be calculated properly only here - */ - if (objid > le64_to_cpu(d->opd_last_used_id)) { - if (objid - le64_to_cpu(d->opd_last_used_id) > 1) { - d->opd_gap_start = le64_to_cpu(d->opd_last_used_id) + 1; - d->opd_gap_count = objid - d->opd_gap_start; - CDEBUG(D_HA, "Gap in objids: %d, start = %llu\n", - d->opd_gap_count, d->opd_gap_start); - } - d->opd_last_used_id = cpu_to_le64(objid); + struct osp_thread_info *osi = osp_env_info(env); + struct dt_object_format dof = { 0 }; + struct dt_object *dto; + int rc; + ENTRY; + + lu_local_obj_fid(&osi->osi_fid, reg_id); + attr->la_valid = LA_MODE; + attr->la_mode = S_IFREG | 0644; + dof.dof_type = DFT_REGULAR; + dto = dt_find_or_create(env, osp->opd_storage, &osi->osi_fid, + &dof, attr); + if (IS_ERR(dto)) + RETURN(dto); + + rc = dt_attr_get(env, dto, attr, NULL); + if (rc) { + CERROR("%s: can't be initialized: rc = %d\n", + osp->opd_obd->obd_name, rc); + lu_object_put(env, &dto->do_lu); + RETURN(ERR_PTR(rc)); } + RETURN(dto); } -static int osp_last_used_init(const struct lu_env *env, struct osp_device *m) +static int osp_write_local_file(const struct lu_env *env, + struct osp_device *osp, + struct dt_object *dt_obj, + struct lu_buf *buf, + loff_t offset) { - struct osp_thread_info *osi = osp_env_info(env); - struct dt_object_format dof = { 0 }; - struct dt_object *o; - int rc; + struct thandle *th; + int rc; - ENTRY; - - osi->osi_attr.la_valid = LA_MODE; - osi->osi_attr.la_mode = S_IFREG | 0644; - lu_local_obj_fid(&osi->osi_fid, MDD_LOV_OBJ_OID); - dof.dof_type = DFT_REGULAR; - o = dt_find_or_create(env, m->opd_storage, &osi->osi_fid, &dof, - &osi->osi_attr); - if (IS_ERR(o)) - RETURN(PTR_ERR(o)); + th = dt_trans_create(env, osp->opd_storage); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); - rc = dt_attr_get(env, o, &osi->osi_attr, NULL); + rc = dt_declare_record_write(env, dt_obj, buf->lb_len, offset, th); + if (rc) + GOTO(out, rc); + rc = dt_trans_start_local(env, osp->opd_storage, th); if (rc) GOTO(out, rc); - /* object will be released in device cleanup path */ - m->opd_last_used_file = o; + rc = dt_record_write(env, dt_obj, buf, &offset, th); +out: + dt_trans_stop(env, osp->opd_storage, th); + RETURN(rc); +} + +static int osp_init_last_objid(const struct lu_env *env, struct osp_device *osp) +{ + struct osp_thread_info *osi = osp_env_info(env); + struct lu_fid *fid = &osp->opd_last_used_fid; + struct dt_object *dto; + int rc; + ENTRY; - if (osi->osi_attr.la_size >= sizeof(osi->osi_id) * - (m->opd_index + 1)) { - osp_objid_buf_prep(osi, m, m->opd_index); - rc = dt_record_read(env, o, &osi->osi_lb, &osi->osi_off); + dto = osp_find_or_create(env, osp, &osi->osi_attr, MDD_LOV_OBJ_OID); + if (IS_ERR(dto)) + RETURN(PTR_ERR(dto)); + /* object will be released in device cleanup path */ + if (osi->osi_attr.la_size >= + sizeof(osi->osi_id) * (osp->opd_index + 1)) { + osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_oid, + osp->opd_index); + rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off); if (rc != 0) GOTO(out, rc); } else { - /* reset value to 0, just to make sure and change file's size */ - struct thandle *th; + fid->f_oid = 0; + osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_oid, + osp->opd_index); + rc = osp_write_local_file(env, osp, dto, &osi->osi_lb, + osi->osi_off); + } + osp->opd_last_used_oid_file = dto; + RETURN(0); +out: + /* object will be released in device cleanup path */ + CERROR("%s: can't initialize lov_objid: rc = %d\n", + osp->opd_obd->obd_name, rc); + lu_object_put(env, &dto->do_lu); + osp->opd_last_used_oid_file = NULL; + RETURN(rc); +} - m->opd_last_used_id = 0; - osp_objid_buf_prep(osi, m, m->opd_index); +static int osp_init_last_seq(const struct lu_env *env, struct osp_device *osp) +{ + struct osp_thread_info *osi = osp_env_info(env); + struct lu_fid *fid = &osp->opd_last_used_fid; + struct dt_object *dto; + int rc; + ENTRY; - th = dt_trans_create(env, m->opd_storage); - if (IS_ERR(th)) - GOTO(out, rc = PTR_ERR(th)); + dto = osp_find_or_create(env, osp, &osi->osi_attr, MDD_LOV_OBJ_OSEQ); + if (IS_ERR(dto)) + RETURN(PTR_ERR(dto)); - rc = dt_declare_record_write(env, m->opd_last_used_file, - osi->osi_lb.lb_len, osi->osi_off, - th); - if (rc) { - dt_trans_stop(env, m->opd_storage, th); + /* object will be released in device cleanup path */ + if (osi->osi_attr.la_size >= + sizeof(osi->osi_id) * (osp->opd_index + 1)) { + osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq, + osp->opd_index); + rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off); + if (rc != 0) GOTO(out, rc); - } + } else { + fid->f_seq = 0; + osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq, + osp->opd_index); + rc = osp_write_local_file(env, osp, dto, &osi->osi_lb, + osi->osi_off); + } + osp->opd_last_used_seq_file = dto; + RETURN(0); +out: + /* object will be released in device cleanup path */ + CERROR("%s: can't initialize lov_seq: rc = %d\n", + osp->opd_obd->obd_name, rc); + lu_object_put(env, &dto->do_lu); + osp->opd_last_used_seq_file = NULL; + RETURN(rc); +} - rc = dt_trans_start_local(env, m->opd_storage, th); +static int osp_last_used_init(const struct lu_env *env, struct osp_device *osp) +{ + struct osp_thread_info *osi = osp_env_info(env); + int rc; + ENTRY; + + fid_zero(&osp->opd_last_used_fid); + rc = osp_init_last_objid(env, osp); + if (rc < 0) { + CERROR("%s: Can not get ids %d from old objid!\n", + osp->opd_obd->obd_name, rc); + RETURN(rc); + } + + rc = osp_init_last_seq(env, osp); + if (rc < 0) { + CERROR("%s: Can not get ids %d from old objid!\n", + osp->opd_obd->obd_name, rc); + GOTO(out, rc); + } + + if (fid_oid(&osp->opd_last_used_fid) != 0 && + fid_seq(&osp->opd_last_used_fid) == 0) { + /* Just upgrade from the old version, + * set the seq to be IDIF */ + osp->opd_last_used_fid.f_seq = + fid_idif_seq(fid_oid(&osp->opd_last_used_fid), + osp->opd_index); + osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, + &osp->opd_last_used_fid.f_seq, + osp->opd_index); + rc = osp_write_local_file(env, osp, osp->opd_last_used_seq_file, + &osi->osi_lb, osi->osi_off); if (rc) { - dt_trans_stop(env, m->opd_storage, th); + CERROR("%s : Can not write seq file: rc = %d\n", + osp->opd_obd->obd_name, rc); GOTO(out, rc); } + } - rc = dt_record_write(env, m->opd_last_used_file, &osi->osi_lb, - &osi->osi_off, th); - dt_trans_stop(env, m->opd_storage, th); - if (rc) - GOTO(out, rc); + if (!fid_is_zero(&osp->opd_last_used_fid) && + !fid_is_sane(&osp->opd_last_used_fid)) { + CERROR("%s: Got invalid FID "DFID"\n", osp->opd_obd->obd_name, + PFID(&osp->opd_last_used_fid)); + GOTO(out, rc = -EINVAL); } - RETURN(0); + + CDEBUG(D_INFO, "%s: Init last used fid "DFID"\n", + osp->opd_obd->obd_name, PFID(&osp->opd_last_used_fid)); out: - CERROR("%s: can't initialize lov_objid: %d\n", - m->opd_obd->obd_name, rc); - lu_object_put(env, &o->do_lu); - m->opd_last_used_file = NULL; - return rc; + if (rc != 0) { + if (osp->opd_last_used_oid_file != NULL) { + lu_object_put(env, &osp->opd_last_used_oid_file->do_lu); + osp->opd_last_used_oid_file = NULL; + } + if (osp->opd_last_used_seq_file != NULL) { + lu_object_put(env, &osp->opd_last_used_seq_file->do_lu); + osp->opd_last_used_seq_file = NULL; + } + } + + RETURN(rc); } static void osp_last_used_fini(const struct lu_env *env, struct osp_device *d) { - lu_object_put(env, &d->opd_last_used_file->do_lu); - d->opd_last_used_file = NULL; + /* release last_used file */ + if (d->opd_last_used_oid_file != NULL) { + lu_object_put(env, &d->opd_last_used_oid_file->do_lu); + d->opd_last_used_oid_file = NULL; + } + + if (d->opd_last_used_seq_file != NULL) { + lu_object_put(env, &d->opd_last_used_seq_file->do_lu); + d->opd_last_used_seq_file = NULL; + } } -static int osp_shutdown(const struct lu_env *env, struct osp_device *d) +static int osp_disconnect(struct osp_device *d) { - struct obd_import *imp; - int rc = 0; - ENTRY; - - /* release last_used file */ - osp_last_used_fini(env, d); + struct obd_import *imp; + int rc = 0; imp = d->opd_obd->u.cli.cl_import; /* Mark import deactivated now, so we don't try to reconnect if any * of the cleanup RPCs fails (e.g. ldlm cancel, etc). We don't * fully deactivate the import, or that would drop all requests. */ - cfs_spin_lock(&imp->imp_lock); + LASSERT(imp != NULL); + spin_lock(&imp->imp_lock); imp->imp_deactive = 1; - cfs_spin_unlock(&imp->imp_lock); + spin_unlock(&imp->imp_lock); ptlrpc_deactivate_import(imp); @@ -213,17 +334,38 @@ static int osp_shutdown(const struct lu_env *env, struct osp_device *d) (void)ptlrpc_pinger_del_import(imp); rc = ptlrpc_disconnect_import(imp, 0); + if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ESHUTDOWN) + rc = 0; if (rc) CERROR("%s: can't disconnect: rc = %d\n", d->opd_obd->obd_name, rc); ptlrpc_invalidate_import(imp); - /* stop precreate thread */ - osp_precreate_fini(d); + RETURN(rc); +} + +static int osp_shutdown(const struct lu_env *env, struct osp_device *d) +{ + int rc = 0; + ENTRY; - /* stop sync thread */ - osp_sync_fini(d); + LASSERT(env); + /* release last_used file */ + if (!d->opd_connect_mdt) + osp_last_used_fini(env, d); + + rc = osp_disconnect(d); + + if (!d->opd_connect_mdt) { + /* stop precreate thread */ + osp_precreate_fini(d); + + /* stop sync thread */ + osp_sync_fini(d); + } + + obd_fid_fini(d->opd_obd); RETURN(rc); } @@ -238,6 +380,9 @@ static int osp_process_config(const struct lu_env *env, ENTRY; switch (lcfg->lcfg_command) { + case LCFG_PRE_CLEANUP: + rc = osp_disconnect(d); + break; case LCFG_CLEANUP: lu_dev_del_linkage(dev->ld_site, dev); rc = osp_shutdown(env, d); @@ -279,7 +424,8 @@ static int osp_recovery_complete(const struct lu_env *env, ENTRY; osp->opd_recovery_completed = 1; - cfs_waitq_signal(&osp->opd_pre_waitq); + if (!osp->opd_connect_mdt) + cfs_waitq_signal(&osp->opd_pre_waitq); RETURN(rc); } @@ -300,19 +446,8 @@ static int osp_statfs(const struct lu_env *env, struct dt_device *dev, ENTRY; - if (unlikely(d->opd_imp_active == 0)) { - /* - * in case of inactive OST we return nulls - * so that caller can understand this device - * is unusable for new objects - * - * XXX: shouldn't we take normal statfs and fill - * just few specific fields with zeroes? - */ - memset(sfs, 0, sizeof(*sfs)); - sfs->os_bsize = 4096; - RETURN(0); - } + if (unlikely(d->opd_imp_active == 0)) + RETURN(-ENOTCONN); /* return recently updated data */ *sfs = d->opd_statfs; @@ -321,9 +456,19 @@ static int osp_statfs(const struct lu_env *env, struct dt_device *dev, * layer above osp (usually lod) can use ffree to estimate * how many objects are available for immediate creation */ - cfs_spin_lock(&d->opd_pre_lock); - sfs->os_ffree = d->opd_pre_last_created - d->opd_pre_next; - cfs_spin_unlock(&d->opd_pre_lock); + + spin_lock(&d->opd_pre_lock); + LASSERTF(fid_seq(&d->opd_pre_last_created_fid) == + fid_seq(&d->opd_pre_used_fid), + "last_created "DFID", next_fid "DFID"\n", + PFID(&d->opd_pre_last_created_fid), + PFID(&d->opd_pre_used_fid)); + sfs->os_fprecreated = fid_oid(&d->opd_pre_last_created_fid) - + fid_oid(&d->opd_pre_used_fid); + sfs->os_fprecreated -= d->opd_pre_reserved; + spin_unlock(&d->opd_pre_lock); + + LASSERT(sfs->os_fprecreated <= OST_MAX_PRECREATE * 2); CDEBUG(D_OTHER, "%s: "LPU64" blocks, "LPU64" free, "LPU64" avail, " LPU64" files, "LPU64" free files\n", d->opd_obd->obd_name, @@ -343,9 +488,11 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev) RETURN(0); } -static const struct dt_device_operations osp_dt_ops = { +const struct dt_device_operations osp_dt_ops = { .dt_statfs = osp_statfs, .dt_sync = osp_sync, + .dt_trans_start = osp_trans_start, + .dt_trans_stop = osp_trans_stop, }; static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *m, @@ -391,21 +538,21 @@ out: static int osp_init0(const struct lu_env *env, struct osp_device *m, struct lu_device_type *ldt, struct lustre_cfg *cfg) { - struct lprocfs_static_vars lvars = { 0 }; - struct proc_dir_entry *osc_proc_dir; - struct obd_import *imp; - class_uuid_t uuid; - char *src, *ost, *mdt, *osdname = NULL; - int rc, idx; + struct obd_device *obd; + struct obd_import *imp; + class_uuid_t uuid; + char *src, *tgt, *mdt, *osdname = NULL; + int rc, idx; ENTRY; - m->opd_obd = class_name2obd(lustre_cfg_string(cfg, 0)); - if (m->opd_obd == NULL) { + obd = class_name2obd(lustre_cfg_string(cfg, 0)); + if (obd == NULL) { CERROR("Cannot find obd with name %s\n", lustre_cfg_string(cfg, 0)); RETURN(-ENODEV); } + m->opd_obd = obd; /* There is no record in the MDT configuration for the local disk * device, so we have to extract this from elsewhere in the profile. @@ -421,29 +568,75 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m, if (src == NULL) RETURN(-EINVAL); - ost = strstr(src, "-OST"); - if (ost == NULL) + tgt = strrchr(src, '-'); + if (tgt == NULL) { + CERROR("%s: invalid target name %s\n", + m->opd_obd->obd_name, lustre_cfg_string(cfg, 0)); RETURN(-EINVAL); - - idx = simple_strtol(ost + 4, &mdt, 16); - if (mdt[0] != '-' || idx > INT_MAX || idx < 0) { - CERROR("%s: invalid OST index in '%s'\n", - m->opd_obd->obd_name, src); - GOTO(out_fini, rc = -EINVAL); } - m->opd_index = idx; - idx = ost - src; + if (strncmp(tgt, "-osc", 4) == 0) { + /* Old OSC name fsname-OSTXXXX-osc */ + for (tgt--; tgt > src && *tgt != '-'; tgt--) + ; + if (tgt == src) { + CERROR("%s: invalid target name %s\n", + m->opd_obd->obd_name, lustre_cfg_string(cfg, 0)); + RETURN(-EINVAL); + } + + if (strncmp(tgt, "-OST", 4) != 0) { + CERROR("%s: invalid target name %s\n", + m->opd_obd->obd_name, lustre_cfg_string(cfg, 0)); + RETURN(-EINVAL); + } + + idx = simple_strtol(tgt + 4, &mdt, 16); + if (mdt[0] != '-' || idx > INT_MAX || idx < 0) { + CERROR("%s: invalid OST index in '%s'\n", + m->opd_obd->obd_name, src); + RETURN(-EINVAL); + } + m->opd_index = idx; + idx = tgt - src; + } else { + /* New OSC name fsname-OSTXXXX-osc-MDTXXXX */ + if (strncmp(tgt, "-MDT", 4) != 0 && + strncmp(tgt, "-OST", 4) != 0) { + CERROR("%s: invalid target name %s\n", + m->opd_obd->obd_name, lustre_cfg_string(cfg, 0)); + RETURN(-EINVAL); + } + + if (tgt - src <= 12) { + CERROR("%s: invalid target name %s\n", + m->opd_obd->obd_name, lustre_cfg_string(cfg, 0)); + RETURN(-EINVAL); + } + + if (strncmp(tgt - 12, "-MDT", 4) == 0) + m->opd_connect_mdt = 1; + + idx = simple_strtol(tgt - 8, &mdt, 16); + if (mdt[0] != '-' || idx > INT_MAX || idx < 0) { + CERROR("%s: invalid OST index in '%s'\n", + m->opd_obd->obd_name, src); + RETURN(-EINVAL); + } + + m->opd_index = idx; + idx = tgt - src - 12; + } /* check the fsname length, and after this everything else will fit */ if (idx > MTI_NAME_MAXLEN) { CERROR("%s: fsname too long in '%s'\n", m->opd_obd->obd_name, src); - GOTO(out_fini, rc = -EINVAL); + RETURN(-EINVAL); } OBD_ALLOC(osdname, MAX_OBD_NAME); if (osdname == NULL) - GOTO(out_fini, rc = -ENOMEM); + RETURN(-ENOMEM); memcpy(osdname, src, idx); /* copy just the fsname part */ osdname[idx] = '\0'; @@ -454,12 +647,20 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m, else strcat(osdname, mdt); strcat(osdname, "-osd"); - CDEBUG(D_HA, "%s: connect to %s (%s)\n", - m->opd_obd->obd_name, osdname, src); + CDEBUG(D_HA, "%s: connect to %s (%s)\n", obd->obd_name, osdname, src); + + if (m->opd_connect_mdt) { + struct client_obd *cli = &m->opd_obd->u.cli; + + OBD_ALLOC(cli->cl_rpc_lock, sizeof(*cli->cl_rpc_lock)); + if (!cli->cl_rpc_lock) + RETURN(-ENOMEM); + osp_init_rpc_lock(cli->cl_rpc_lock); + } m->opd_dt_dev.dd_lu_dev.ld_ops = &osp_lu_ops; m->opd_dt_dev.dd_ops = &osp_dt_ops; - m->opd_obd->obd_lu_dev = &m->opd_dt_dev.dd_lu_dev; + obd->obd_lu_dev = &m->opd_dt_dev.dd_lu_dev; rc = osp_connect_to_osd(env, m, osdname); if (rc) @@ -469,64 +670,48 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m, if (rc) GOTO(out_disconnect, rc); - rc = client_obd_setup(m->opd_obd, cfg); + rc = client_obd_setup(obd, cfg); if (rc) { CERROR("%s: can't setup obd: %d\n", m->opd_obd->obd_name, rc); GOTO(out_ref, rc); } - lprocfs_osp_init_vars(&lvars); - if (lprocfs_obd_setup(m->opd_obd, lvars.obd_vars) == 0) - ptlrpc_lprocfs_register_obd(m->opd_obd); - - /* for compatibility we link old procfs's OSC entries to osp ones */ - osc_proc_dir = lprocfs_srch(proc_lustre_root, "osc"); - if (osc_proc_dir) { - cfs_proc_dir_entry_t *symlink = NULL; - char *name; + osp_lprocfs_init(m); - OBD_ALLOC(name, strlen(m->opd_obd->obd_name) + 1); - if (name == NULL) - GOTO(out, rc = -ENOMEM); + if (!m->opd_connect_mdt) { + /* Initialize last id from the storage - will be + * used in orphan cleanup. */ + rc = osp_last_used_init(env, m); + if (rc) + GOTO(out_proc, rc); + /* Initialize precreation thread, it handles new + * connections as well. */ + rc = osp_init_precreate(m); + if (rc) + GOTO(out_last_used, rc); + /* + * Initialize synhronization mechanism taking + * care of propogating changes to OST in near + * transactional manner. + */ + rc = osp_sync_init(env, m); + if (rc) + GOTO(out_precreat, rc); - strcpy(name, m->opd_obd->obd_name); - if (strstr(name, "osc")) - symlink = lprocfs_add_symlink(name, osc_proc_dir, - "../osp/%s", - m->opd_obd->obd_name); - OBD_FREE(name, strlen(m->opd_obd->obd_name) + 1); - m->opd_symlink = symlink; + rc = obd_fid_init(m->opd_obd, NULL, LUSTRE_SEQ_DATA); + if (rc) { + CERROR("%s: fid init error: rc = %d\n", + m->opd_obd->obd_name, rc); + GOTO(out, rc); + } } - - /* - * Initialize last id from the storage - will be used in orphan cleanup - */ - rc = osp_last_used_init(env, m); - if (rc) - GOTO(out_proc, rc); - - /* - * Initialize precreation thread, it handles new connections as well - */ - rc = osp_init_precreate(m); - if (rc) - GOTO(out_last_used, rc); - - /* - * Initialize synhronization mechanism taking care of propogating - * changes to OST in near transactional manner - */ - rc = osp_sync_init(env, m); - if (rc) - GOTO(out_precreat, rc); - /* * Initiate connect to OST */ ll_generate_random_uuid(uuid); class_uuid_unparse(uuid, &m->opd_cluuid); - imp = m->opd_obd->u.cli.cl_import; + imp = obd->u.cli.cl_import; rc = ptlrpc_init_import(imp); if (rc) @@ -536,21 +721,30 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m, RETURN(0); out: - /* stop sync thread */ - osp_sync_fini(m); + if (!m->opd_connect_mdt) + /* stop sync thread */ + osp_sync_fini(m); out_precreat: /* stop precreate thread */ - osp_precreate_fini(m); + if (!m->opd_connect_mdt) + osp_precreate_fini(m); out_last_used: osp_last_used_fini(env, m); out_proc: - ptlrpc_lprocfs_unregister_obd(m->opd_obd); - lprocfs_obd_cleanup(m->opd_obd); - class_destroy_import(m->opd_obd->u.cli.cl_import); - client_obd_cleanup(m->opd_obd); + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_obd_cleanup(obd); + obd_cleanup_client_import(obd); + client_obd_cleanup(obd); out_ref: ptlrpcd_decref(); out_disconnect: + if (m->opd_connect_mdt) { + struct client_obd *cli = &m->opd_obd->u.cli; + if (cli->cl_rpc_lock != NULL) { + OBD_FREE_PTR(cli->cl_rpc_lock); + cli->cl_rpc_lock = NULL; + } + } obd_disconnect(m->opd_storage_exp); out_fini: if (osdname) @@ -565,6 +759,10 @@ static struct lu_device *osp_device_free(const struct lu_env *env, ENTRY; + if (cfs_atomic_read(&lu->ld_ref) && lu->ld_site) { + LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL); + lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer); + } dt_device_fini(&m->opd_dt_dev); OBD_FREE_PTR(m); RETURN(NULL); @@ -603,8 +801,8 @@ static struct lu_device *osp_device_fini(const struct lu_env *env, ENTRY; - LASSERT(m->opd_storage_exp); - obd_disconnect(m->opd_storage_exp); + if (m->opd_storage_exp) + obd_disconnect(m->opd_storage_exp); imp = m->opd_obd->u.cli.cl_import; @@ -622,6 +820,14 @@ static struct lu_device *osp_device_fini(const struct lu_env *env, ptlrpc_lprocfs_unregister_obd(m->opd_obd); lprocfs_obd_cleanup(m->opd_obd); + if (m->opd_connect_mdt) { + struct client_obd *cli = &m->opd_obd->u.cli; + if (cli->cl_rpc_lock != NULL) { + OBD_FREE_PTR(cli->cl_rpc_lock); + cli->cl_rpc_lock = NULL; + } + } + rc = client_obd_cleanup(m->opd_obd); LASSERTF(rc == 0, "error %d\n", rc); @@ -661,28 +867,23 @@ static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp, RETURN(rc); *exp = class_conn2export(&conn); - /* Why should there ever be more than 1 connect? */ osp->opd_connects++; LASSERT(osp->opd_connects == 1); + osp->opd_exp = *exp; + imp = osp->opd_obd->u.cli.cl_import; imp->imp_dlm_handle = conn; + LASSERT(data != NULL); + LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX); ocd = &imp->imp_connect_data; - ocd->ocd_connect_flags = OBD_CONNECT_AT | - OBD_CONNECT_FULL20 | - OBD_CONNECT_INDEX | -#ifdef HAVE_LRU_RESIZE_SUPPORT - OBD_CONNECT_LRU_RESIZE | -#endif - OBD_CONNECT_MDS | - OBD_CONNECT_OSS_CAPA | - OBD_CONNECT_REQPORTAL | - OBD_CONNECT_SKIP_ORPHAN | - OBD_CONNECT_VERSION; + *ocd = *data; + + imp->imp_connect_flags_orig = ocd->ocd_connect_flags; + ocd->ocd_version = LUSTRE_VERSION_CODE; - LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX); ocd->ocd_index = data->ocd_index; imp->imp_connect_flags_orig = ocd->ocd_connect_flags; @@ -694,6 +895,14 @@ static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp, ptlrpc_pinger_add_import(imp); + if (osp->opd_connect_mdt && data->ocd_index == 0) { + /* set seq controller export for MDC0 if exists */ + struct seq_server_site *ss; + + ss = lu_site2seq(osp2lu_dev(osp)->ld_site); + ss->ss_control_exp = class_export_get(*exp); + ss->ss_server_fld->lsf_control_exp = *exp; + } out: RETURN(rc); } @@ -707,7 +916,6 @@ static int osp_obd_disconnect(struct obd_export *exp) struct obd_device *obd = exp->exp_obd; struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev); int rc; - ENTRY; /* Only disconnect the underlying layers on the final disconnect. */ @@ -715,10 +923,14 @@ static int osp_obd_disconnect(struct obd_export *exp) osp->opd_connects--; rc = class_disconnect(exp); + if (rc) { + CERROR("%s: class disconnect error: rc = %d\n", + obd->obd_name, rc); + RETURN(rc); + } /* destroy the device */ - if (rc == 0) - class_manual_cleanup(obd); + class_manual_cleanup(obd); RETURN(rc); } @@ -738,10 +950,10 @@ static int osp_obd_statfs(const struct lu_env *env, struct obd_export *exp, /* Since the request might also come from lprocfs, so we need * sync this with client_disconnect_export Bug15684 */ - cfs_down_read(&exp->exp_obd->u.cli.cl_sem); + down_read(&exp->exp_obd->u.cli.cl_sem); if (exp->exp_obd->u.cli.cl_import) imp = class_import_get(exp->exp_obd->u.cli.cl_import); - cfs_up_read(&exp->exp_obd->u.cli.cl_sem); + up_read(&exp->exp_obd->u.cli.cl_sem); if (!imp) RETURN(-ENODEV); @@ -798,12 +1010,16 @@ static int osp_import_event(struct obd_device *obd, struct obd_import *imp, case IMP_EVENT_DISCON: d->opd_got_disconnected = 1; d->opd_imp_connected = 0; + if (d->opd_connect_mdt) + break; osp_pre_update_status(d, -ENODEV); cfs_waitq_signal(&d->opd_pre_waitq); CDEBUG(D_HA, "got disconnected\n"); break; case IMP_EVENT_INACTIVE: d->opd_imp_active = 0; + if (d->opd_connect_mdt) + break; osp_pre_update_status(d, -ENODEV); cfs_waitq_signal(&d->opd_pre_waitq); CDEBUG(D_HA, "got inactive\n"); @@ -814,10 +1030,21 @@ static int osp_import_event(struct obd_device *obd, struct obd_import *imp, d->opd_new_connection = 1; d->opd_imp_connected = 1; d->opd_imp_seen_connected = 1; + if (d->opd_connect_mdt) + break; cfs_waitq_signal(&d->opd_pre_waitq); __osp_sync_check_for_work(d); CDEBUG(D_HA, "got connected\n"); break; + case IMP_EVENT_INVALIDATE: + if (obd->obd_namespace == NULL) + break; + ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY); + break; + case IMP_EVENT_OCD: + case IMP_EVENT_DEACTIVATE: + case IMP_EVENT_ACTIVATE: + break; default: CERROR("%s: unsupported import event: %#x\n", obd->obd_name, event); @@ -941,8 +1168,12 @@ static struct obd_ops osp_obd_device_ops = { .o_import_event = osp_import_event, .o_iocontrol = osp_iocontrol, .o_statfs = osp_obd_statfs, + .o_fid_init = client_fid_init, + .o_fid_fini = client_fid_fini, }; +struct llog_operations osp_mds_ost_orig_logops; + static int __init osp_mod_init(void) { struct lprocfs_static_vars lvars; @@ -964,6 +1195,21 @@ static int __init osp_mod_init(void) return rc; } + lprocfs_lwp_init_vars(&lvars); + + rc = class_register_type(&lwp_obd_device_ops, NULL, lvars.module_vars, + LUSTRE_LWP_NAME, &lwp_device_type); + if (rc != 0) { + class_unregister_type(LUSTRE_OSP_NAME); + lu_kmem_fini(osp_caches); + return rc; + } + + /* Note: add_rec/delcare_add_rec will be only used by catalogs */ + osp_mds_ost_orig_logops = llog_osd_ops; + osp_mds_ost_orig_logops.lop_add = llog_cat_add_rec; + osp_mds_ost_orig_logops.lop_declare_add = llog_cat_declare_add_rec; + osc_proc_dir = lprocfs_srch(proc_lustre_root, "osc"); if (osc_proc_dir == NULL) { osc_proc_dir = lprocfs_register("osc", proc_lustre_root, NULL, @@ -979,6 +1225,7 @@ static void __exit osp_mod_exit(void) { lprocfs_try_remove_proc_entry("osc", proc_lustre_root); + class_unregister_type(LUSTRE_LWP_NAME); class_unregister_type(LUSTRE_OSP_NAME); lu_kmem_fini(osp_caches); } @@ -988,4 +1235,3 @@ MODULE_DESCRIPTION("Lustre OST Proxy Device ("LUSTRE_OSP_NAME")"); MODULE_LICENSE("GPL"); cfs_module(osp, LUSTRE_VERSION_STRING, osp_mod_init, osp_mod_exit); -