#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_LOV
-
+#ifdef __KERNEL__
#include <linux/slab.h>
#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/random.h>
+#include <linux/slab.h>
+#include <asm/div64.h>
+#else
+#include <liblustre.h>
+#endif
+
#include <linux/obd_support.h>
#include <linux/lustre_lib.h>
#include <linux/lustre_net.h>
#include <linux/lustre_mds.h>
#include <linux/obd_class.h>
#include <linux/obd_lov.h>
-#include <linux/init.h>
-#include <linux/random.h>
-#include <linux/slab.h>
-#include <asm/div64.h>
#include <linux/lprocfs_status.h>
-
static kmem_cache_t *lov_file_cache;
struct lov_file_handles {
struct list_head lfh_list;
__u64 lfh_cookie;
int lfh_count;
- struct lustre_handle *lfh_handles;
+ char *lfh_data; /* an array of opaque data saved on behalf of
+ * each osc, FD_OSTDATA_SIZE bytes for each */
};
struct lov_lock_handles {
struct lov_obd *lov = &obd->u.lov;
struct client_obd *mdc = &lov->mdcobd->u.cli;
struct lov_desc *desc = &lov->desc;
+ struct lov_tgt_desc *tgts;
struct obd_export *exp;
struct lustre_handle mdc_conn;
struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
- struct obd_uuid uuid;
char *tmp;
int rc, rc2, i;
ENTRY;
memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
lov_unpackdesc(desc);
- if (req->rq_repmsg->buflens[1] < sizeof(uuid.uuid)*desc->ld_tgt_count){
+ if (req->rq_repmsg->buflens[1] <
+ sizeof(desc->ld_uuid.uuid) * desc->ld_tgt_count){
CERROR("LOV desc: invalid uuid array returned\n");
GOTO(out_conn, rc = -EINVAL);
}
}
tmp = lustre_msg_buf(req->rq_repmsg, 1);
- for (i = 0; i < desc->ld_tgt_count; i++) {
- struct obd_device *tgt;
+ for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
+ struct obd_uuid *uuid = &tgts->uuid;
+ struct obd_device *tgt_obd;
struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
- strncpy(uuid.uuid, tmp, sizeof(uuid.uuid));
- memcpy(&lov->tgts[i].uuid, &uuid, sizeof(uuid));
- tgt = client_tgtuuid2obd(&uuid);
- tmp += sizeof(uuid.uuid);
+ obd_str2uuid(uuid, tmp);
+ tgt_obd = client_tgtuuid2obd(uuid);
+ tmp += sizeof(uuid->uuid);
- if (!tgt) {
- CERROR("Target %s not attached\n", uuid.uuid);
+ if (!tgt_obd) {
+ CERROR("Target %s not attached\n", uuid->uuid);
GOTO(out_disc, rc = -EINVAL);
}
- if (!(tgt->obd_flags & OBD_SET_UP)) {
- CERROR("Target %s not set up\n", uuid.uuid);
+ if (!(tgt_obd->obd_flags & OBD_SET_UP)) {
+ CERROR("Target %s not set up\n", uuid->uuid);
GOTO(out_disc, rc = -EINVAL);
}
- rc = obd_connect(&lov->tgts[i].conn, tgt, &lov_osc_uuid, recovd,
+ rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid, recovd,
recover);
if (rc) {
- CERROR("Target %s connect error %d\n", uuid.uuid,
- rc);
+ CERROR("Target %s connect error %d\n", uuid->uuid, rc);
GOTO(out_disc, rc);
}
- rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
- sizeof(struct obd_device *), obd, NULL);
+ rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn,
+ sizeof(struct obd_device *), obd, NULL);
if (rc) {
CERROR("Target %s REGISTER_LOV error %d\n",
- uuid.uuid, rc);
+ uuid->uuid, rc);
+ obd_disconnect(&tgts->conn);
GOTO(out_disc, rc);
}
desc->ld_active_tgt_count++;
- lov->tgts[i].active = 1;
+ tgts->active = 1;
}
mdc->cl_max_mds_easize = obd_size_wiremd(conn, NULL);
RETURN(rc);
out_disc:
- i--; /* skip failed-connect OSC */
while (i-- > 0) {
- desc->ld_active_tgt_count--;
- lov->tgts[i].active = 0;
- memcpy(&uuid, &lov->tgts[i].uuid, sizeof(uuid));
- rc2 = obd_disconnect(&lov->tgts[i].conn);
+ struct obd_uuid uuid;
+ --tgts;
+ --desc->ld_active_tgt_count;
+ tgts->active = 0;
+ obd_str2uuid(&uuid, tgts->uuid.uuid);
+ rc2 = obd_disconnect(&tgts->conn);
if (rc2)
CERROR("error: LOV target %s disconnect on OST idx %d: "
"rc = %d\n", uuid.uuid, i, rc2);
CERROR("discarding open LOV handle %p:"LPX64"\n",
lfh, lfh->lfh_cookie);
list_del(&lfh->lfh_list);
- OBD_FREE(lfh->lfh_handles,
- lfh->lfh_count * sizeof(*lfh->lfh_handles));
- kmem_cache_free(lov_file_cache, lfh);
+ OBD_FREE(lfh->lfh_data, lfh->lfh_count * FD_OSTDATA_SIZE);
+ PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
}
spin_unlock(&exp->exp_lov_data.led_lock);
if (!lsm_new)
GOTO(out_cleanup, rc = -ENOMEM);
memcpy(lsm_new, lsm, size);
+ lsm_new->lsm_stripe_count = obj_alloc;
+
/* XXX LOV STACKING call into osc for sizes */
OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
lsm = lsm_new;
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
- sizeof(lfh->lfh_handles[i]));
+ memcpy(obdo_handle(&tmp),
+ lfh->lfh_data + i * FD_OSTDATA_SIZE,
+ FD_OSTDATA_SIZE);
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
- sizeof(lfh->lfh_handles[i]));
+ memcpy(obdo_handle(&tmp),
+ lfh->lfh_data + i * FD_OSTDATA_SIZE,
+ FD_OSTDATA_SIZE);
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
obdo_cpy_md(tmp, oa, oa->o_valid);
if (lfh)
- memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
- sizeof(lfh->lfh_handles[i]));
+ memcpy(obdo_handle(tmp),
+ lfh->lfh_data + i * FD_OSTDATA_SIZE,
+ FD_OSTDATA_SIZE);
else
tmp->o_valid &= ~OBD_MD_FLHANDLE;
if (!tmp)
RETURN(-ENOMEM);
- lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
+ PORTAL_SLAB_ALLOC(lfh, lov_file_cache, sizeof(*lfh));
if (!lfh)
GOTO(out_tmp, rc = -ENOMEM);
- OBD_ALLOC(lfh->lfh_handles,
- lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
- if (!lfh->lfh_handles)
+ OBD_ALLOC(lfh->lfh_data, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
+ if (!lfh->lfh_data)
GOTO(out_lfh, rc = -ENOMEM);
lov = &export->exp_obd->u.lov;
rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
NULL, NULL);
if (rc) {
- if (lov->tgts[loi->loi_ost_idx].active) {
- CERROR("error: open objid "LPX64" subobj "LPX64
- " on OST idx %d: rc = %d\n",
- oa->o_id, lsm->lsm_oinfo[i].loi_id,
- loi->loi_ost_idx, rc);
- goto out_handles;
- }
- continue;
+ if (!lov->tgts[loi->loi_ost_idx].active)
+ continue;
+ CERROR("error: open objid "LPX64" subobj "LPX64
+ " on OST idx %d: rc = %d\n",
+ oa->o_id, lsm->lsm_oinfo[i].loi_id,
+ loi->loi_ost_idx, rc);
+ goto out_handles;
}
lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
if (tmp->o_valid & OBD_MD_FLHANDLE)
- memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
- sizeof(lfh->lfh_handles[i]));
+ memcpy(lfh->lfh_data + i * FD_OSTDATA_SIZE,
+ obdo_handle(tmp), FD_OSTDATA_SIZE);
}
handle = obdo_handle(oa);
memcpy(tmp, oa, sizeof(*tmp));
tmp->o_id = loi->loi_id;
- memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
- sizeof(lfh->lfh_handles[i]));
+ memcpy(obdo_handle(tmp), lfh->lfh_data + i * FD_OSTDATA_SIZE,
+ FD_OSTDATA_SIZE);
err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
NULL, NULL);
}
}
- OBD_FREE(lfh->lfh_handles,
- lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
+ OBD_FREE(lfh->lfh_data, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
out_lfh:
- lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
- kmem_cache_free(lov_file_cache, lfh);
+ PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
goto out_tmp;
}
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
- sizeof(lfh->lfh_handles[i]));
+ memcpy(obdo_handle(&tmp),
+ lfh->lfh_data + i * FD_OSTDATA_SIZE,
+ FD_OSTDATA_SIZE);
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
}
}
if (lfh) {
+ spin_lock(&export->exp_lov_data.led_lock);
list_del(&lfh->lfh_list);
- OBD_FREE(lfh->lfh_handles,
- lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
- lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
- kmem_cache_free(lov_file_cache, lfh);
+ spin_unlock(&export->exp_lov_data.led_lock);
+
+ OBD_FREE(lfh->lfh_data, lsm->lsm_stripe_count*FD_OSTDATA_SIZE);
+ PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
}
RETURN(rc);
memcpy(&tmp, oa, sizeof(tmp));
tmp.o_id = loi->loi_id;
if (lfh)
- memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
- sizeof(lfh->lfh_handles[i]));
+ memcpy(obdo_handle(&tmp),
+ lfh->lfh_data + i * FD_OSTDATA_SIZE,
+ FD_OSTDATA_SIZE);
else
tmp.o_valid &= ~OBD_MD_FLHANDLE;
lockh->addr = (__u64)(unsigned long)lov_lockh;
lockh->cookie = lov_lockh->llh_cookie;
lov_lockhp = lov_lockh->llh_handles;
- } else
+ } else {
lov_lockhp = lockh;
+ }
lov = &export->exp_obd->u.lov;
for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
struct lov_stripe_md submd;
int err;
- if (lov_lockhp->addr == 0 ||
+ if (lov_lockhp->cookie == 0 ||
lov->tgts[loi->loi_ost_idx].active == 0)
continue;
sizeof(*lov_lockh->llh_handles) *
lsm->lsm_stripe_count);
}
- lockh->addr = 0;
lockh->cookie = DEAD_HANDLE_MAGIC;
RETURN(rc);
struct lov_stripe_md submd;
int err;
- if (lov_lockhp->addr == 0) {
+ if (lov_lockhp->cookie == 0) {
CDEBUG(D_HA, "lov idx %d no lock?\n", loi->loi_ost_idx);
continue;
}
sizeof(*lov_lockh->llh_handles) *
lsm->lsm_stripe_count);
}
- lockh->addr = 0;
lockh->cookie = DEAD_HANDLE_MAGIC;
RETURN(rc);
o_iocontrol: lov_iocontrol
};
-static int __init lov_init(void)
+int __init lov_init(void)
{
struct lprocfs_static_vars lvars;
int rc;
class_unregister_type(OBD_LOV_DEVICENAME);
}
+#ifdef __KERNEL__
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
MODULE_LICENSE("GPL");
module_init(lov_init);
module_exit(lov_exit);
+#endif