Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / lov / lov_obd.c
index 3e6b2d2..0e7ad82 100644 (file)
 
 #define EXPORT_SYMTAB
 #define DEBUG_SUBSYSTEM S_LOV
-
+#ifdef __KERNEL__
 #include <linux/slab.h>
 #include <linux/module.h>
+#include <linux/init.h>
+#include <linux/random.h>
+#include <linux/slab.h>
+#include <asm/div64.h>
+#else
+#include <liblustre.h>
+#endif
+
 #include <linux/obd_support.h>
 #include <linux/lustre_lib.h>
 #include <linux/lustre_net.h>
 #include <linux/lustre_mds.h>
 #include <linux/obd_class.h>
 #include <linux/obd_lov.h>
-#include <linux/init.h>
-#include <linux/random.h>
-#include <linux/slab.h>
-#include <asm/div64.h>
 #include <linux/lprocfs_status.h>
 
-
 static kmem_cache_t *lov_file_cache;
 
 struct lov_file_handles {
         struct list_head lfh_list;
         __u64 lfh_cookie;
         int lfh_count;
-        struct lustre_handle *lfh_handles;
+        char *lfh_data; /* an array of opaque data saved on behalf of
+                        * each osc, FD_OSTDATA_SIZE bytes for each */
 };
 
 struct lov_lock_handles {
@@ -87,10 +91,10 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         struct lov_obd *lov = &obd->u.lov;
         struct client_obd *mdc = &lov->mdcobd->u.cli;
         struct lov_desc *desc = &lov->desc;
+        struct lov_tgt_desc *tgts;
         struct obd_export *exp;
         struct lustre_handle mdc_conn;
         struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
-        struct obd_uuid uuid;
         char *tmp;
         int rc, rc2, i;
         ENTRY;
@@ -138,7 +142,8 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
         lov_unpackdesc(desc);
 
-        if (req->rq_repmsg->buflens[1] < sizeof(uuid.uuid)*desc->ld_tgt_count){
+        if (req->rq_repmsg->buflens[1] <
+            sizeof(desc->ld_uuid.uuid) * desc->ld_tgt_count){
                 CERROR("LOV desc: invalid uuid array returned\n");
                 GOTO(out_conn, rc = -EINVAL);
         }
@@ -178,44 +183,44 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         }
 
         tmp = lustre_msg_buf(req->rq_repmsg, 1);
-        for (i = 0; i < desc->ld_tgt_count; i++) {
-                struct obd_device *tgt;
+        for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
+                struct obd_uuid *uuid = &tgts->uuid;
+                struct obd_device *tgt_obd;
                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
 
-                strncpy(uuid.uuid, tmp, sizeof(uuid.uuid));
-                memcpy(&lov->tgts[i].uuid, &uuid, sizeof(uuid));
-                tgt = client_tgtuuid2obd(&uuid);
-                tmp += sizeof(uuid.uuid);
+                obd_str2uuid(uuid, tmp);
+                tgt_obd = client_tgtuuid2obd(uuid);
+                tmp += sizeof(uuid->uuid);
 
-                if (!tgt) {
-                        CERROR("Target %s not attached\n", uuid.uuid);
+                if (!tgt_obd) {
+                        CERROR("Target %s not attached\n", uuid->uuid);
                         GOTO(out_disc, rc = -EINVAL);
                 }
 
-                if (!(tgt->obd_flags & OBD_SET_UP)) {
-                        CERROR("Target %s not set up\n", uuid.uuid);
+                if (!(tgt_obd->obd_flags & OBD_SET_UP)) {
+                        CERROR("Target %s not set up\n", uuid->uuid);
                         GOTO(out_disc, rc = -EINVAL);
                 }
 
-                rc = obd_connect(&lov->tgts[i].conn, tgt, &lov_osc_uuid, recovd,
+                rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid, recovd,
                                  recover);
 
                 if (rc) {
-                        CERROR("Target %s connect error %d\n", uuid.uuid,
-                               rc);
+                        CERROR("Target %s connect error %d\n", uuid->uuid, rc);
                         GOTO(out_disc, rc);
                 }
 
-                rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
-                                    sizeof(struct obd_device *), obd, NULL);
+                rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn,
+                                   sizeof(struct obd_device *), obd, NULL);
                 if (rc) {
                         CERROR("Target %s REGISTER_LOV error %d\n",
-                               uuid.uuid, rc);
+                               uuid->uuid, rc);
+                        obd_disconnect(&tgts->conn);
                         GOTO(out_disc, rc);
                 }
 
                 desc->ld_active_tgt_count++;
-                lov->tgts[i].active = 1;
+                tgts->active = 1;
         }
 
         mdc->cl_max_mds_easize = obd_size_wiremd(conn, NULL);
@@ -225,12 +230,13 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         RETURN(rc);
 
  out_disc:
-        i--; /* skip failed-connect OSC */
         while (i-- > 0) {
-                desc->ld_active_tgt_count--;
-                lov->tgts[i].active = 0;
-                memcpy(&uuid, &lov->tgts[i].uuid, sizeof(uuid));
-                rc2 = obd_disconnect(&lov->tgts[i].conn);
+                struct obd_uuid uuid;
+                --tgts;
+                --desc->ld_active_tgt_count;
+                tgts->active = 0;
+                obd_str2uuid(&uuid, tgts->uuid.uuid);
+                rc2 = obd_disconnect(&tgts->conn);
                 if (rc2)
                         CERROR("error: LOV target %s disconnect on OST idx %d: "
                                "rc = %d\n", uuid.uuid, i, rc2);
@@ -284,9 +290,8 @@ static int lov_disconnect(struct lustre_handle *conn)
                 CERROR("discarding open LOV handle %p:"LPX64"\n",
                        lfh, lfh->lfh_cookie);
                 list_del(&lfh->lfh_list);
-                OBD_FREE(lfh->lfh_handles,
-                         lfh->lfh_count * sizeof(*lfh->lfh_handles));
-                kmem_cache_free(lov_file_cache, lfh);
+                OBD_FREE(lfh->lfh_data, lfh->lfh_count * FD_OSTDATA_SIZE);
+                PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
         }
         spin_unlock(&exp->exp_lov_data.led_lock);
 
@@ -538,6 +543,8 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
                 if (!lsm_new)
                         GOTO(out_cleanup, rc = -ENOMEM);
                 memcpy(lsm_new, lsm, size);
+                lsm_new->lsm_stripe_count = obj_alloc;
+
                 /* XXX LOV STACKING call into osc for sizes */
                 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
                 lsm = lsm_new;
@@ -609,8 +616,9 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
-                               sizeof(lfh->lfh_handles[i]));
+                        memcpy(obdo_handle(&tmp),
+                               lfh->lfh_data + i * FD_OSTDATA_SIZE,
+                               FD_OSTDATA_SIZE);
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
@@ -722,8 +730,9 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
-                               sizeof(lfh->lfh_handles[i]));
+                        memcpy(obdo_handle(&tmp),
+                               lfh->lfh_data + i * FD_OSTDATA_SIZE,
+                               FD_OSTDATA_SIZE);
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
@@ -794,8 +803,9 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
                 obdo_cpy_md(tmp, oa, oa->o_valid);
 
                 if (lfh)
-                        memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
-                               sizeof(lfh->lfh_handles[i]));
+                        memcpy(obdo_handle(tmp),
+                               lfh->lfh_data + i * FD_OSTDATA_SIZE,
+                               FD_OSTDATA_SIZE);
                 else
                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
 
@@ -852,12 +862,11 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         if (!tmp)
                 RETURN(-ENOMEM);
 
-        lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
+        PORTAL_SLAB_ALLOC(lfh, lov_file_cache, sizeof(*lfh));
         if (!lfh)
                 GOTO(out_tmp, rc = -ENOMEM);
-        OBD_ALLOC(lfh->lfh_handles,
-                  lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
-        if (!lfh->lfh_handles)
+        OBD_ALLOC(lfh->lfh_data, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
+        if (!lfh->lfh_data)
                 GOTO(out_lfh, rc = -ENOMEM);
 
         lov = &export->exp_obd->u.lov;
@@ -876,21 +885,20 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
                               NULL, NULL);
                 if (rc) {
-                        if (lov->tgts[loi->loi_ost_idx].active) {
-                                CERROR("error: open objid "LPX64" subobj "LPX64
-                                       " on OST idx %d: rc = %d\n",
-                                       oa->o_id, lsm->lsm_oinfo[i].loi_id,
-                                       loi->loi_ost_idx, rc);
-                                goto out_handles;
-                        }
-                        continue;
+                        if (!lov->tgts[loi->loi_ost_idx].active)
+                                continue;
+                        CERROR("error: open objid "LPX64" subobj "LPX64
+                               " on OST idx %d: rc = %d\n",
+                               oa->o_id, lsm->lsm_oinfo[i].loi_id,
+                               loi->loi_ost_idx, rc);
+                        goto out_handles;
                 }
 
                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
 
                 if (tmp->o_valid & OBD_MD_FLHANDLE)
-                        memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
-                               sizeof(lfh->lfh_handles[i]));
+                        memcpy(lfh->lfh_data + i * FD_OSTDATA_SIZE,
+                               obdo_handle(tmp), FD_OSTDATA_SIZE);
         }
 
         handle = obdo_handle(oa);
@@ -920,8 +928,8 @@ out_handles:
 
                 memcpy(tmp, oa, sizeof(*tmp));
                 tmp->o_id = loi->loi_id;
-                memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
-                       sizeof(lfh->lfh_handles[i]));
+                memcpy(obdo_handle(tmp), lfh->lfh_data + i * FD_OSTDATA_SIZE,
+                       FD_OSTDATA_SIZE);
 
                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
                                 NULL, NULL);
@@ -932,11 +940,9 @@ out_handles:
                 }
         }
 
-        OBD_FREE(lfh->lfh_handles,
-                 lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
+        OBD_FREE(lfh->lfh_data, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
 out_lfh:
-        lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
-        kmem_cache_free(lov_file_cache, lfh);
+        PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
         goto out_tmp;
 }
 
@@ -981,8 +987,9 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
-                               sizeof(lfh->lfh_handles[i]));
+                        memcpy(obdo_handle(&tmp),
+                               lfh->lfh_data + i * FD_OSTDATA_SIZE,
+                               FD_OSTDATA_SIZE);
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
@@ -999,11 +1006,12 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
                 }
         }
         if (lfh) {
+                spin_lock(&export->exp_lov_data.led_lock);
                 list_del(&lfh->lfh_list);
-                OBD_FREE(lfh->lfh_handles,
-                         lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
-                lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
-                kmem_cache_free(lov_file_cache, lfh);
+                spin_unlock(&export->exp_lov_data.led_lock);
+
+                OBD_FREE(lfh->lfh_data, lsm->lsm_stripe_count*FD_OSTDATA_SIZE);
+                PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
         }
 
         RETURN(rc);
@@ -1101,8 +1109,9 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
-                               sizeof(lfh->lfh_handles[i]));
+                        memcpy(obdo_handle(&tmp),
+                               lfh->lfh_data + i * FD_OSTDATA_SIZE,
+                               FD_OSTDATA_SIZE);
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
@@ -1287,8 +1296,9 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 lockh->addr = (__u64)(unsigned long)lov_lockh;
                 lockh->cookie = lov_lockh->llh_cookie;
                 lov_lockhp = lov_lockh->llh_handles;
-        } else
+        } else {
                 lov_lockhp = lockh;
+        }
 
         lov = &export->exp_obd->u.lov;
         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
@@ -1332,7 +1342,7 @@ out_locks:
                 struct lov_stripe_md submd;
                 int err;
 
-                if (lov_lockhp->addr == 0 ||
+                if (lov_lockhp->cookie == 0 ||
                     lov->tgts[loi->loi_ost_idx].active == 0)
                         continue;
 
@@ -1354,7 +1364,6 @@ out_locks:
                           sizeof(*lov_lockh->llh_handles) *
                           lsm->lsm_stripe_count);
         }
-        lockh->addr = 0;
         lockh->cookie = DEAD_HANDLE_MAGIC;
 
         RETURN(rc);
@@ -1403,7 +1412,7 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 struct lov_stripe_md submd;
                 int err;
 
-                if (lov_lockhp->addr == 0) {
+                if (lov_lockhp->cookie == 0) {
                         CDEBUG(D_HA, "lov idx %d no lock?\n", loi->loi_ost_idx);
                         continue;
                 }
@@ -1431,7 +1440,6 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                           sizeof(*lov_lockh->llh_handles) *
                           lsm->lsm_stripe_count);
         }
-        lockh->addr = 0;
         lockh->cookie = DEAD_HANDLE_MAGIC;
 
         RETURN(rc);
@@ -1647,7 +1655,7 @@ struct obd_ops lov_obd_ops = {
         o_iocontrol:   lov_iocontrol
 };
 
-static int __init lov_init(void)
+int __init lov_init(void)
 {
         struct lprocfs_static_vars lvars;
         int rc;
@@ -1673,9 +1681,11 @@ static void __exit lov_exit(void)
         class_unregister_type(OBD_LOV_DEVICENAME);
 }
 
+#ifdef __KERNEL__
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
 MODULE_LICENSE("GPL");
 
 module_init(lov_init);
 module_exit(lov_exit);
+#endif