X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flov%2Flov_obd.c;h=0e7ad8207039ea6274dd68940a23eb9fff3838b8;hp=3e6b2d2437c4cff5de0634efe7f6908e9750cc08;hb=040033cef24c5aca2967daf2da7a862abcd074cf;hpb=5c0b2855f617d17e3c0749244edeca1b706f8a7e diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 3e6b2d2..0e7ad82 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -24,9 +24,17 @@ #define EXPORT_SYMTAB #define DEBUG_SUBSYSTEM S_LOV - +#ifdef __KERNEL__ #include #include +#include +#include +#include +#include +#else +#include +#endif + #include #include #include @@ -35,20 +43,16 @@ #include #include #include -#include -#include -#include -#include #include - static kmem_cache_t *lov_file_cache; struct lov_file_handles { struct list_head lfh_list; __u64 lfh_cookie; int lfh_count; - struct lustre_handle *lfh_handles; + char *lfh_data; /* an array of opaque data saved on behalf of + * each osc, FD_OSTDATA_SIZE bytes for each */ }; struct lov_lock_handles { @@ -87,10 +91,10 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, struct lov_obd *lov = &obd->u.lov; struct client_obd *mdc = &lov->mdcobd->u.cli; struct lov_desc *desc = &lov->desc; + struct lov_tgt_desc *tgts; struct obd_export *exp; struct lustre_handle mdc_conn; struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"}; - struct obd_uuid uuid; char *tmp; int rc, rc2, i; ENTRY; @@ -138,7 +142,8 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc)); lov_unpackdesc(desc); - if (req->rq_repmsg->buflens[1] < sizeof(uuid.uuid)*desc->ld_tgt_count){ + if (req->rq_repmsg->buflens[1] < + sizeof(desc->ld_uuid.uuid) * desc->ld_tgt_count){ CERROR("LOV desc: invalid uuid array returned\n"); GOTO(out_conn, rc = -EINVAL); } @@ -178,44 +183,44 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, } tmp = lustre_msg_buf(req->rq_repmsg, 1); - for (i = 0; i < desc->ld_tgt_count; i++) { - struct obd_device *tgt; + for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) { + struct obd_uuid *uuid = &tgts->uuid; + struct obd_device *tgt_obd; struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" }; - strncpy(uuid.uuid, tmp, sizeof(uuid.uuid)); - memcpy(&lov->tgts[i].uuid, &uuid, sizeof(uuid)); - tgt = client_tgtuuid2obd(&uuid); - tmp += sizeof(uuid.uuid); + obd_str2uuid(uuid, tmp); + tgt_obd = client_tgtuuid2obd(uuid); + tmp += sizeof(uuid->uuid); - if (!tgt) { - CERROR("Target %s not attached\n", uuid.uuid); + if (!tgt_obd) { + CERROR("Target %s not attached\n", uuid->uuid); GOTO(out_disc, rc = -EINVAL); } - if (!(tgt->obd_flags & OBD_SET_UP)) { - CERROR("Target %s not set up\n", uuid.uuid); + if (!(tgt_obd->obd_flags & OBD_SET_UP)) { + CERROR("Target %s not set up\n", uuid->uuid); GOTO(out_disc, rc = -EINVAL); } - rc = obd_connect(&lov->tgts[i].conn, tgt, &lov_osc_uuid, recovd, + rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid, recovd, recover); if (rc) { - CERROR("Target %s connect error %d\n", uuid.uuid, - rc); + CERROR("Target %s connect error %d\n", uuid->uuid, rc); GOTO(out_disc, rc); } - rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn, - sizeof(struct obd_device *), obd, NULL); + rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn, + sizeof(struct obd_device *), obd, NULL); if (rc) { CERROR("Target %s REGISTER_LOV error %d\n", - uuid.uuid, rc); + uuid->uuid, rc); + obd_disconnect(&tgts->conn); GOTO(out_disc, rc); } desc->ld_active_tgt_count++; - lov->tgts[i].active = 1; + tgts->active = 1; } mdc->cl_max_mds_easize = obd_size_wiremd(conn, NULL); @@ -225,12 +230,13 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, RETURN(rc); out_disc: - i--; /* skip failed-connect OSC */ while (i-- > 0) { - desc->ld_active_tgt_count--; - lov->tgts[i].active = 0; - memcpy(&uuid, &lov->tgts[i].uuid, sizeof(uuid)); - rc2 = obd_disconnect(&lov->tgts[i].conn); + struct obd_uuid uuid; + --tgts; + --desc->ld_active_tgt_count; + tgts->active = 0; + obd_str2uuid(&uuid, tgts->uuid.uuid); + rc2 = obd_disconnect(&tgts->conn); if (rc2) CERROR("error: LOV target %s disconnect on OST idx %d: " "rc = %d\n", uuid.uuid, i, rc2); @@ -284,9 +290,8 @@ static int lov_disconnect(struct lustre_handle *conn) CERROR("discarding open LOV handle %p:"LPX64"\n", lfh, lfh->lfh_cookie); list_del(&lfh->lfh_list); - OBD_FREE(lfh->lfh_handles, - lfh->lfh_count * sizeof(*lfh->lfh_handles)); - kmem_cache_free(lov_file_cache, lfh); + OBD_FREE(lfh->lfh_data, lfh->lfh_count * FD_OSTDATA_SIZE); + PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh)); } spin_unlock(&exp->exp_lov_data.led_lock); @@ -538,6 +543,8 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, if (!lsm_new) GOTO(out_cleanup, rc = -ENOMEM); memcpy(lsm_new, lsm, size); + lsm_new->lsm_stripe_count = obj_alloc; + /* XXX LOV STACKING call into osc for sizes */ OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count)); lsm = lsm_new; @@ -609,8 +616,9 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; if (lfh) - memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i], - sizeof(lfh->lfh_handles[i])); + memcpy(obdo_handle(&tmp), + lfh->lfh_data + i * FD_OSTDATA_SIZE, + FD_OSTDATA_SIZE); else tmp.o_valid &= ~OBD_MD_FLHANDLE; err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, @@ -722,8 +730,9 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; if (lfh) - memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i], - sizeof(lfh->lfh_handles[i])); + memcpy(obdo_handle(&tmp), + lfh->lfh_data + i * FD_OSTDATA_SIZE, + FD_OSTDATA_SIZE); else tmp.o_valid &= ~OBD_MD_FLHANDLE; @@ -794,8 +803,9 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, obdo_cpy_md(tmp, oa, oa->o_valid); if (lfh) - memcpy(obdo_handle(tmp), &lfh->lfh_handles[i], - sizeof(lfh->lfh_handles[i])); + memcpy(obdo_handle(tmp), + lfh->lfh_data + i * FD_OSTDATA_SIZE, + FD_OSTDATA_SIZE); else tmp->o_valid &= ~OBD_MD_FLHANDLE; @@ -852,12 +862,11 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, if (!tmp) RETURN(-ENOMEM); - lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL); + PORTAL_SLAB_ALLOC(lfh, lov_file_cache, sizeof(*lfh)); if (!lfh) GOTO(out_tmp, rc = -ENOMEM); - OBD_ALLOC(lfh->lfh_handles, - lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles)); - if (!lfh->lfh_handles) + OBD_ALLOC(lfh->lfh_data, lsm->lsm_stripe_count * FD_OSTDATA_SIZE); + if (!lfh->lfh_data) GOTO(out_lfh, rc = -ENOMEM); lov = &export->exp_obd->u.lov; @@ -876,21 +885,20 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL, NULL); if (rc) { - if (lov->tgts[loi->loi_ost_idx].active) { - CERROR("error: open objid "LPX64" subobj "LPX64 - " on OST idx %d: rc = %d\n", - oa->o_id, lsm->lsm_oinfo[i].loi_id, - loi->loi_ost_idx, rc); - goto out_handles; - } - continue; + if (!lov->tgts[loi->loi_ost_idx].active) + continue; + CERROR("error: open objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", + oa->o_id, lsm->lsm_oinfo[i].loi_id, + loi->loi_ost_idx, rc); + goto out_handles; } lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set); if (tmp->o_valid & OBD_MD_FLHANDLE) - memcpy(&lfh->lfh_handles[i], obdo_handle(tmp), - sizeof(lfh->lfh_handles[i])); + memcpy(lfh->lfh_data + i * FD_OSTDATA_SIZE, + obdo_handle(tmp), FD_OSTDATA_SIZE); } handle = obdo_handle(oa); @@ -920,8 +928,8 @@ out_handles: memcpy(tmp, oa, sizeof(*tmp)); tmp->o_id = loi->loi_id; - memcpy(obdo_handle(tmp), &lfh->lfh_handles[i], - sizeof(lfh->lfh_handles[i])); + memcpy(obdo_handle(tmp), lfh->lfh_data + i * FD_OSTDATA_SIZE, + FD_OSTDATA_SIZE); err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL, NULL); @@ -932,11 +940,9 @@ out_handles: } } - OBD_FREE(lfh->lfh_handles, - lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles)); + OBD_FREE(lfh->lfh_data, lsm->lsm_stripe_count * FD_OSTDATA_SIZE); out_lfh: - lfh->lfh_cookie = DEAD_HANDLE_MAGIC; - kmem_cache_free(lov_file_cache, lfh); + PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh)); goto out_tmp; } @@ -981,8 +987,9 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; if (lfh) - memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i], - sizeof(lfh->lfh_handles[i])); + memcpy(obdo_handle(&tmp), + lfh->lfh_data + i * FD_OSTDATA_SIZE, + FD_OSTDATA_SIZE); else tmp.o_valid &= ~OBD_MD_FLHANDLE; @@ -999,11 +1006,12 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, } } if (lfh) { + spin_lock(&export->exp_lov_data.led_lock); list_del(&lfh->lfh_list); - OBD_FREE(lfh->lfh_handles, - lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles)); - lfh->lfh_cookie = DEAD_HANDLE_MAGIC; - kmem_cache_free(lov_file_cache, lfh); + spin_unlock(&export->exp_lov_data.led_lock); + + OBD_FREE(lfh->lfh_data, lsm->lsm_stripe_count*FD_OSTDATA_SIZE); + PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh)); } RETURN(rc); @@ -1101,8 +1109,9 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = loi->loi_id; if (lfh) - memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i], - sizeof(lfh->lfh_handles[i])); + memcpy(obdo_handle(&tmp), + lfh->lfh_data + i * FD_OSTDATA_SIZE, + FD_OSTDATA_SIZE); else tmp.o_valid &= ~OBD_MD_FLHANDLE; @@ -1287,8 +1296,9 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm, lockh->addr = (__u64)(unsigned long)lov_lockh; lockh->cookie = lov_lockh->llh_cookie; lov_lockhp = lov_lockh->llh_handles; - } else + } else { lov_lockhp = lockh; + } lov = &export->exp_obd->u.lov; for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; @@ -1332,7 +1342,7 @@ out_locks: struct lov_stripe_md submd; int err; - if (lov_lockhp->addr == 0 || + if (lov_lockhp->cookie == 0 || lov->tgts[loi->loi_ost_idx].active == 0) continue; @@ -1354,7 +1364,6 @@ out_locks: sizeof(*lov_lockh->llh_handles) * lsm->lsm_stripe_count); } - lockh->addr = 0; lockh->cookie = DEAD_HANDLE_MAGIC; RETURN(rc); @@ -1403,7 +1412,7 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm, struct lov_stripe_md submd; int err; - if (lov_lockhp->addr == 0) { + if (lov_lockhp->cookie == 0) { CDEBUG(D_HA, "lov idx %d no lock?\n", loi->loi_ost_idx); continue; } @@ -1431,7 +1440,6 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm, sizeof(*lov_lockh->llh_handles) * lsm->lsm_stripe_count); } - lockh->addr = 0; lockh->cookie = DEAD_HANDLE_MAGIC; RETURN(rc); @@ -1647,7 +1655,7 @@ struct obd_ops lov_obd_ops = { o_iocontrol: lov_iocontrol }; -static int __init lov_init(void) +int __init lov_init(void) { struct lprocfs_static_vars lvars; int rc; @@ -1673,9 +1681,11 @@ static void __exit lov_exit(void) class_unregister_type(OBD_LOV_DEVICENAME); } +#ifdef __KERNEL__ MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver"); MODULE_LICENSE("GPL"); module_init(lov_init); module_exit(lov_exit); +#endif