Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / lov / lov_obd.c
index 2974b2a..9562a4f 100644 (file)
 #include <linux/seq_file.h>
 #include <linux/lprocfs_status.h>
 
+#include "lov_internal.h"
+
+static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
+                             int stripeno, obd_off *obd_off);
+
 struct lov_file_handles {
         struct portals_handle lfh_handle;
         atomic_t lfh_refcount;
@@ -68,7 +73,7 @@ static void lov_lfh_addref(void *lfhp)
         struct lov_file_handles *lfh = lfhp;
 
         atomic_inc(&lfh->lfh_refcount);
-        CDEBUG(D_INFO, "GETting lfh %p : new refcount %d\n", lfh,
+        CDEBUG(D_MALLOC, "GETting lfh %p : new refcount %d\n", lfh,
                atomic_read(&lfh->lfh_refcount));
 }
 
@@ -99,7 +104,7 @@ static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
 
 static void lov_lfh_put(struct lov_file_handles *lfh)
 {
-        CDEBUG(D_INFO, "PUTting lfh %p : new refcount %d\n", lfh,
+        CDEBUG(D_MALLOC, "PUTting lfh %p : new refcount %d\n", lfh,
                atomic_read(&lfh->lfh_refcount) - 1);
         LASSERT(atomic_read(&lfh->lfh_refcount) > 0 &&
                 atomic_read(&lfh->lfh_refcount) < 0x5a5a);
@@ -174,19 +179,18 @@ int lov_attach(struct obd_device *dev, obd_count len, void *data)
         struct proc_dir_entry *entry;
         int rc;
 
-        lprocfs_init_vars(&lvars);
+        lprocfs_init_vars(lov, &lvars);
         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
-        if (rc) 
+        if (rc)
                 return rc;
 
         entry = create_proc_entry("target_obd", 0444, dev->obd_proc_entry);
-        if (entry == NULL) 
+        if (entry == NULL)
                 RETURN(-ENOMEM);
-        entry->proc_fops = &ll_proc_target_fops;
+        entry->proc_fops = &lov_proc_target_fops;
         entry->data = dev;
-        
+
         return rc;
-        
 }
 
 int lov_detach(struct obd_device *dev)
@@ -214,15 +218,17 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         if (rc)
                 RETURN(rc);
 
+        exp = class_conn2export(conn);
+        spin_lock_init(&exp->exp_lov_data.led_lock);
+        INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
+
         /* We don't want to actually do the underlying connections more than
          * once, so keep track. */
         lov->refcount++;
-        if (lov->refcount > 1)
+        if (lov->refcount > 1) {
+                class_export_put(exp);
                 RETURN(0);
-
-        exp = class_conn2export(conn);
-        spin_lock_init(&exp->exp_lov_data.led_lock);
-        INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
+        }
 
         /* retrieve LOV metadata from MDS */
         rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid);
@@ -248,9 +254,9 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
          * array fits in LOV_MAX_UUID_BUFFER_SIZE and all uuids are
          * terminated), but I still need to verify it makes overall
          * sense */
-        mdesc = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*mdesc));
-        LASSERT (mdesc != NULL);
-        LASSERT_REPSWABBED (req, 0);
+        mdesc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*mdesc));
+        LASSERT(mdesc != NULL);
+        LASSERT_REPSWABBED(req, 0);
 
         *desc = *mdesc;
 
@@ -279,15 +285,15 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
          * demands on memory here. */
         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
         OBD_ALLOC(lov->tgts, lov->bufsize);
-        if (!lov->tgts) {
+        if (lov->tgts == NULL) {
                 CERROR("Out of memory\n");
                 GOTO(out_req, rc = -ENOMEM);
         }
 
         uuids = lustre_msg_buf(req->rq_repmsg, 1,
                                sizeof(*uuids) * desc->ld_tgt_count);
-        LASSERT (uuids != NULL);
-        LASSERT_REPSWABBED (req, 1);
+        LASSERT(uuids != NULL);
+        LASSERT_REPSWABBED(req, 1);
 
         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
                 struct obd_uuid *uuid = &tgts->uuid;
@@ -330,7 +336,9 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         }
 
         mdc->cl_max_mds_easize = obd_size_diskmd(conn, NULL);
-        ptlrpc_req_finished (req);
+        mdc->cl_max_mds_cookiesize = desc->ld_tgt_count *
+                sizeof(struct llog_cookie);
+        ptlrpc_req_finished(req);
         class_export_put(exp);
         RETURN (0);
 
@@ -356,7 +364,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         RETURN (rc);
 }
 
-static int lov_disconnect(struct lustre_handle *conn, int failover)
+static int lov_disconnect(struct lustre_handle *conn, int flags)
 {
         struct obd_device *obd = class_conn2obd(conn);
         struct lov_obd *lov = &obd->u.lov;
@@ -383,7 +391,7 @@ static int lov_disconnect(struct lustre_handle *conn, int failover)
                                 class_conn2obd(&lov->tgts[i].conn);
                         osc_obd->obd_no_recov = 1;
                 }
-                rc = obd_disconnect(&lov->tgts[i].conn, failover);
+                rc = obd_disconnect(&lov->tgts[i].conn, flags);
                 if (rc) {
                         if (lov->tgts[i].active) {
                                 CERROR("Target %s disconnect error %d\n",
@@ -400,6 +408,7 @@ static int lov_disconnect(struct lustre_handle *conn, int failover)
         lov->bufsize = 0;
         lov->tgts = NULL;
 
+ out_local:
         exp = class_conn2export(conn);
         if (exp == NULL) {
                 CERROR("export handle "LPU64" invalid!  If you can reproduce, "
@@ -421,7 +430,6 @@ static int lov_disconnect(struct lustre_handle *conn, int failover)
         spin_unlock(&exp->exp_lov_data.led_lock);
         class_export_put(exp);
 
- out_local:
         rc = class_disconnect(conn, 0);
         RETURN(rc);
 }
@@ -548,6 +556,8 @@ static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
                             struct lov_stripe_md *lsm, int stripeno, int *set)
 {
+        valid &= src->o_valid;
+
         if (*set) {
                 if (valid & OBD_MD_FLSIZE) {
                         /* this handles sparse files properly */
@@ -566,68 +576,102 @@ static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
                         tgt->o_mtime = src->o_mtime;
         } else {
-                obdo_cpy_md(tgt, src, valid);
+                memcpy(tgt, src, sizeof(*tgt));
+                tgt->o_id = lsm->lsm_object_id;
                 if (valid & OBD_MD_FLSIZE)
                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
                 *set = 1;
         }
 }
 
+#ifndef log2
+#define log2(n) ffz(~(n))
+#endif
+
 /* the LOV expects oa->o_id to be set to the LOV object id */
-static int lov_create(struct lustre_handle *conn, struct obdo *oa,
+static int lov_create(struct lustre_handle *conn, struct obdo *src_oa,
                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_stripe_md *lsm;
-        struct lov_oinfo *loi;
-        struct obdo *tmp;
+        struct lov_oinfo *loi = NULL;
+        struct obdo *tmp_oa, *ret_oa;
+        struct llog_cookie *cookies = NULL;
         unsigned ost_count, ost_idx;
-        int set = 0, obj_alloc = 0;
-        int rc = 0, i;
+        int set = 0, obj_alloc = 0, cookie_sent = 0, rc = 0, i;
         ENTRY;
 
         LASSERT(ea);
 
         if (!export)
-                GOTO(out_exp, rc = -EINVAL);
+                RETURN(-EINVAL);
 
         lov = &export->exp_obd->u.lov;
 
         if (!lov->desc.ld_active_tgt_count)
                 GOTO(out_exp, rc = -EIO);
 
-        tmp = obdo_alloc();
-        if (!tmp)
+        ret_oa = obdo_alloc();
+        if (!ret_oa)
                 GOTO(out_exp, rc = -ENOMEM);
 
+        tmp_oa = obdo_alloc();
+        if (!tmp_oa)
+                GOTO(out_oa, rc = -ENOMEM);
+
         lsm = *ea;
 
         if (!lsm) {
-                rc = obd_alloc_memmd(conn, &lsm);
+                int stripes;
+                ost_count = lov_get_stripecnt(lov, 0);
+
+                /* If the MDS file was truncated up to some size, stripe over
+                 * enough OSTs to allow the file to be created at that size.
+                 */
+                if (src_oa->o_valid & OBD_MD_FLSIZE) {
+                        stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
+                        do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
+
+                        if (stripes > lov->desc.ld_active_tgt_count)
+                                GOTO(out_exp, rc = -EFBIG);
+                        if (stripes < ost_count)
+                                stripes = ost_count;
+                } else
+                        stripes = ost_count;
+
+                rc = lov_alloc_memmd(&lsm, stripes);
                 if (rc < 0)
                         GOTO(out_tmp, rc);
 
                 rc = 0;
-                lsm->lsm_magic = LOV_MAGIC;
         }
 
         ost_count = lov->desc.ld_tgt_count;
 
-        LASSERT(oa->o_valid & OBD_MD_FLID);
-        lsm->lsm_object_id = oa->o_id;
+        LASSERT(src_oa->o_valid & OBD_MD_FLID);
+        lsm->lsm_object_id = src_oa->o_id;
         if (!lsm->lsm_stripe_size)
                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
 
         if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
                 get_random_bytes(&ost_idx, 2);
                 ost_idx %= ost_count;
-        } else
+        } else {
                 ost_idx = lsm->lsm_stripe_offset;
+        }
 
         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
 
+        /* XXX LOV STACKING: need to figure out how many real OSCs */
+        if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
+                oti_alloc_cookies(oti, lsm->lsm_stripe_count);
+                if (!oti->oti_logcookies)
+                        GOTO(out_cleanup, rc = -ENOMEM);
+                cookies = oti->oti_logcookies;
+        }
+
         loi = lsm->lsm_oinfo;
         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
                 struct lov_stripe_md obj_md;
@@ -640,14 +684,30 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
                 }
 
                 /* create data objects with "parent" OA */
-                memcpy(tmp, oa, sizeof(*tmp));
+                memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+
+                /* XXX When we start creating objects on demand, we need to
+                 *     make sure that we always create the object on the
+                 *     stripe which holds the existing file size.
+                 */
+                if (src_oa->o_valid & OBD_MD_FLSIZE) {
+                        if (lov_stripe_offset(lsm, src_oa->o_size, i,
+                                              &tmp_oa->o_size) < 0 &&
+                            tmp_oa->o_size)
+                                tmp_oa->o_size--;
+
+                        CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
+                               i, tmp_oa->o_size, src_oa->o_size);
+                }
+
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp, oti);
+                err = obd_create(&lov->tgts[ost_idx].conn, tmp_oa,&obj_mdp,oti);
                 if (err) {
                         if (lov->tgts[ost_idx].active) {
                                 CERROR("error creating objid "LPX64" sub-object"
-                                       " on OST idx %d/%d: rc = %d\n", oa->o_id,
-                                       ost_idx, lsm->lsm_stripe_count, err);
+                                       " on OST idx %d/%d: rc = %d\n",
+                                       src_oa->o_id, ost_idx,
+                                       lsm->lsm_stripe_count, err);
                                 if (err > 0) {
                                         CERROR("obd_create returned invalid "
                                                "err %d\n", err);
@@ -658,17 +718,22 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
                                 rc = err;
                         continue;
                 }
-                loi->loi_id = tmp->o_id;
+                loi->loi_id = tmp_oa->o_id;
                 loi->loi_ost_idx = ost_idx;
                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
                        lsm->lsm_object_id, loi->loi_id, ost_idx);
 
                 if (set == 0)
                         lsm->lsm_stripe_offset = ost_idx;
-                lov_merge_attrs(oa, tmp, OBD_MD_FLBLKSZ, lsm, obj_alloc, &set);
-                ot_init(&loi->loi_dirty_ot_inline);
+                lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
+                                obj_alloc, &set);
                 loi->loi_dirty_ot = &loi->loi_dirty_ot_inline;
+                ot_init(loi->loi_dirty_ot);
 
+                if (cookies)
+                        ++oti->oti_logcookies;
+                if (tmp_oa->o_valid & OBD_MD_FLCOOKIE)
+                        ++cookie_sent;
                 ++obj_alloc;
                 ++loi;
 
@@ -677,6 +742,12 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
                         GOTO(out_done, rc = 0);
         }
 
+        /* If we were passed specific striping params, then a failure to
+         * meet those requirements is an error, since we can't reallocate
+         * that memory (it might be part of a larger array or something).
+         *
+         * We can only get here if lsm_stripe_count was originally > 1.
+         */
         if (*ea != NULL) {
                 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc);
@@ -686,27 +757,61 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
         } else {
                 struct lov_stripe_md *lsm_new;
                 /* XXX LOV STACKING call into osc for sizes */
-                unsigned size = lov_stripe_md_size(obj_alloc);
+                unsigned oldsize, newsize;
+
+                if (oti && cookies && cookie_sent) {
+                        oldsize = lsm->lsm_stripe_count * sizeof(*cookies);
+                        newsize = obj_alloc * sizeof(*cookies);
+
+                        oti_alloc_cookies(oti, obj_alloc);
+                        if (oti->oti_logcookies) {
+                                memcpy(oti->oti_logcookies, cookies, newsize);
+                                OBD_FREE(cookies, oldsize);
+                                cookies = oti->oti_logcookies;
+                        } else {
+                                CWARN("'leaking' %d bytes\n", oldsize-newsize);
+                        }
+                }
 
                 CERROR("reallocating LSM for objid "LPX64": old %u new %u\n",
                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count);
-                OBD_ALLOC(lsm_new, size);
-                if (!lsm_new)
-                        GOTO(out_cleanup, rc = -ENOMEM);
-                memcpy(lsm_new, lsm, size);
-                lsm_new->lsm_stripe_count = obj_alloc;
-
-                /* XXX LOV STACKING call into osc for sizes */
-                OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
-                lsm = lsm_new;
-
+                oldsize = lov_stripe_md_size(lsm->lsm_stripe_count);
+                newsize = lov_stripe_md_size(obj_alloc);
+                OBD_ALLOC(lsm_new, newsize);
+                if (lsm_new != NULL) {
+                        memcpy(lsm_new, lsm, newsize);
+                        lsm_new->lsm_stripe_count = obj_alloc;
+                        OBD_FREE(lsm, newsize);
+                        lsm = lsm_new;
+                } else {
+                        CWARN("'leaking' %d bytes\n", oldsize - newsize);
+                }
                 rc = 0;
         }
  out_done:
         *ea = lsm;
+        if (src_oa->o_valid & OBD_MD_FLSIZE &&
+            ret_oa->o_size != src_oa->o_size) {
+                CERROR("original size "LPU64" isn't new object size "LPU64"\n",
+                       src_oa->o_size, ret_oa->o_size);
+                LBUG();
+        }
+        ret_oa->o_id = src_oa->o_id;
+        memcpy(src_oa, ret_oa, sizeof(*src_oa));
 
  out_tmp:
-        obdo_free(tmp);
+        obdo_free(tmp_oa);
+ out_oa:
+        obdo_free(ret_oa);
+        if (oti && cookies) {
+                oti->oti_logcookies = cookies;
+                if (!cookie_sent) {
+                        oti_free_cookies(oti);
+                        src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
+                } else {
+                        src_oa->o_valid |= OBD_MD_FLCOOKIE;
+                }
+        }
  out_exp:
         class_export_put(export);
         return rc;
@@ -717,15 +822,26 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
 
                 --loi;
                 /* destroy already created objects here */
-                memcpy(tmp, oa, sizeof(*tmp));
-                tmp->o_id = loi->loi_id;
-                err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL,
-                                  NULL);
+                memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+                tmp_oa->o_id = loi->loi_id;
+
+                if (oti && cookie_sent) {
+                        err = obd_log_cancel(&lov->tgts[loi->loi_ost_idx].conn,
+                                             NULL, 1, --oti->oti_logcookies,
+                                             OBD_LLOG_FL_SENDNOW);
+                        if (err)
+                                CERROR("Failed to cancel objid "LPX64" subobj "
+                                       LPX64" cookie on OST idx %d: rc = %d\n",
+                                       src_oa->o_id, loi->loi_id,
+                                       loi->loi_ost_idx, err);
+                }
+
+                err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
+                                  NULL, oti);
                 if (err)
-                        CERROR("Failed to uncreate objid "LPX64" subobj "
-                               LPX64" on OST idx %d: rc = %d\n",
-                               oa->o_id, loi->loi_id, loi->loi_ost_idx,
-                               err);
+                        CERROR("Failed to uncreate objid "LPX64" subobj "LPX64
+                               " on OST idx %d: rc = %d\n", src_oa->o_id,
+                               loi->loi_id, loi->loi_ost_idx, err);
         }
         if (*ea == NULL)
                 obd_free_memmd(conn, &lsm);
@@ -779,12 +895,12 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
-                               FD_OSTDATA_SIZE);
+                        memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
+                               sizeof(lfh->lfh_och[i].och_fh));
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
-                                  NULL, NULL);
+                                  NULL, oti);
                 if (err && lov->tgts[loi->loi_ost_idx].active) {
                         CERROR("error: destroying objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
@@ -839,8 +955,8 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
-                               FD_OSTDATA_SIZE);
+                        memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
+                               sizeof(lfh->lfh_och[i].och_fh));
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
@@ -867,12 +983,13 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
         return rc;
 }
 
-static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
-                                 struct lov_getattr_async_args *aa, int rc)
+static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, 
+                                 int rc)
 {
+        struct lov_getattr_async_args *aa = data;
         struct lov_stripe_md *lsm = aa->aa_lsm;
         struct obdo          *oa = aa->aa_oa;
-        struct obdo          *obdos = aa->aa_stripe_oas;
+        struct obdo          *obdos = aa->aa_obdos;
         struct lov_oinfo     *loi;
         int                   i;
         int                   set = 0;
@@ -881,8 +998,8 @@ static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
         if (rc == 0) {
                 /* NB all stripe requests succeeded to get here */
 
-                for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
-                     i++,loi++) {
+                for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+                     i++, loi++) {
                         if (obdos[i].o_valid == 0)      /* inactive stripe */
                                 continue;
 
@@ -955,8 +1072,8 @@ static int lov_getattr_async (struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&obdos[i], oa, sizeof(obdos[i]));
                 obdos[i].o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&obdos[i]), lfh->lfh_och + i,
-                               FD_OSTDATA_SIZE);
+                        memcpy(obdo_handle(&obdos[i]), &lfh->lfh_och[i].och_fh,
+                               sizeof(lfh->lfh_och[i].och_fh));
                 else
                         obdos[i].o_valid &= ~OBD_MD_FLHANDLE;
 
@@ -980,7 +1097,7 @@ static int lov_getattr_async (struct lustre_handle *conn, struct obdo *oa,
         aa = (struct lov_getattr_async_args *)&rqset->set_args;
         aa->aa_lsm = lsm;
         aa->aa_oa = oa;
-        aa->aa_stripe_oas = obdos;
+        aa->aa_obdos = obdos;
         GOTO (out, rc = 0);
 
  out_obdos:
@@ -992,10 +1109,10 @@ static int lov_getattr_async (struct lustre_handle *conn, struct obdo *oa,
         RETURN (rc);
 }
 
-static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
+static int lov_setattr(struct lustre_handle *conn, struct obdo *src_oa,
                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
 {
-        struct obdo *tmp;
+        struct obdo *tmp_oa, *ret_oa;
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
@@ -1009,18 +1126,17 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
         if (!export || !export->exp_obd)
                 GOTO(out, rc = -ENODEV);
 
-        /* size changes should go through punch and not setattr */
-        LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
-
-        /* for now, we only expect mtime updates here */
-        LASSERT(!(oa->o_valid & ~(OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME)));
-
-        tmp = obdo_alloc();
-        if (!tmp)
+        /* for now, we only expect time updates here */
+        LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE|OBD_MD_FLMODE|
+                                      OBD_MD_FLATIME | OBD_MD_FLMTIME |
+                                      OBD_MD_FLCTIME)));
+        ret_oa = obdo_alloc();
+        if (!ret_oa)
                 GOTO(out, rc = -ENOMEM);
 
-        if (oa->o_valid & OBD_MD_FLHANDLE)
-                lfh = lov_handle2lfh(obdo_handle(oa));
+        tmp_oa = obdo_alloc();
+        if (!tmp_oa)
+                GOTO(out_oa, rc = -ENOMEM);
 
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
@@ -1031,46 +1147,54 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
                         continue;
                 }
 
-                obdo_cpy_md(tmp, oa, oa->o_valid);
+                memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
 
                 if (lfh)
-                        memcpy(obdo_handle(tmp), lfh->lfh_och + i,
-                               FD_OSTDATA_SIZE);
+                        memcpy(obdo_handle(tmp_oa), &lfh->lfh_och[i].och_fh,
+                               sizeof(lfh->lfh_och[i].och_fh));
                 else
-                        tmp->o_valid &= ~OBD_MD_FLHANDLE;
+                        tmp_oa->o_valid &= ~OBD_MD_FLHANDLE;
 
-                tmp->o_id = loi->loi_id;
+                tmp_oa->o_id = loi->loi_id;
 
-                err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp,
+                err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
                                   NULL, NULL);
                 if (err) {
                         if (lov->tgts[loi->loi_ost_idx].active) {
                                 CERROR("error: setattr objid "LPX64" subobj "
                                        LPX64" on OST idx %d: rc = %d\n",
-                                       oa->o_id, loi->loi_id, loi->loi_ost_idx,
-                                       err);
+                                       src_oa->o_id, loi->loi_id,
+                                       loi->loi_ost_idx, err);
                                 if (!rc)
                                         rc = err;
                         }
-                } else
-                        set = 1;
+                        continue;
+                }
+
+                lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set);
         }
-        obdo_free(tmp);
         if (!set && !rc)
                 rc = -EIO;
         if (lfh != NULL)
                 lov_lfh_put(lfh);
-        GOTO(out, rc);
- out:
+
+        ret_oa->o_id = src_oa->o_id;
+        memcpy(src_oa, ret_oa, sizeof(*src_oa));
+        GOTO(out_tmp, rc);
+out_tmp:
+        obdo_free(tmp_oa);
+out_oa:
+        obdo_free(ret_oa);
+out:
         class_export_put(export);
         return rc;
 }
 
-static int lov_open(struct lustre_handle *conn, struct obdo *oa,
+static int lov_open(struct lustre_handle *conn, struct obdo *src_oa,
                     struct lov_stripe_md *lsm, struct obd_trans_info *oti,
                     struct obd_client_handle *och)
 {
-        struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
+        struct obdo *tmp_oa, *ret_oa;
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
@@ -1085,20 +1209,24 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         if (!export || !export->exp_obd)
                 GOTO(out_exp, rc = -ENODEV);
 
-        tmp = obdo_alloc();
-        if (!tmp)
+        ret_oa = obdo_alloc();
+        if (!ret_oa)
                 GOTO(out_exp, rc = -ENOMEM);
 
+        tmp_oa = obdo_alloc();
+        if (!tmp_oa)
+                GOTO(out_oa, rc = -ENOMEM);
+
         lfh = lov_lfh_new();
         if (lfh == NULL)
                 GOTO(out_tmp, rc = -ENOMEM);
-        OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof *och);
+        OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof(*och));
         if (!lfh->lfh_och)
                 GOTO(out_lfh, rc = -ENOMEM);
 
         lov = &export->exp_obd->u.lov;
-        oa->o_size = 0;
-        oa->o_blocks = 0;
+        src_oa->o_size = 0;
+        src_oa->o_blocks = 0;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
@@ -1106,11 +1234,11 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                 }
 
                 /* create data objects with "parent" OA */
-                memcpy(tmp, oa, sizeof(*tmp));
-                tmp->o_id = loi->loi_id;
+                memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+                tmp_oa->o_id = loi->loi_id;
 
-                rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
-                              NULL, NULL, lfh->lfh_och + i);
+                rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
+                              NULL, NULL, &lfh->lfh_och[i]);
                 if (rc) {
                         if (!lov->tgts[loi->loi_ost_idx].active) {
                                 rc = 0;
@@ -1118,27 +1246,31 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                         }
                         CERROR("error: open objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
-                               oa->o_id, lsm->lsm_oinfo[i].loi_id,
+                               src_oa->o_id, lsm->lsm_oinfo[i].loi_id,
                                loi->loi_ost_idx, rc);
                         goto out_handles;
                 }
 
-                lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
+                lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set);
         }
 
         lfh->lfh_count = lsm->lsm_stripe_count;
         och->och_fh.cookie = lfh->lfh_handle.h_cookie;
-        obdo_handle(oa)->cookie = lfh->lfh_handle.h_cookie;
-        oa->o_valid |= OBD_MD_FLHANDLE;
+        obdo_handle(ret_oa)->cookie = lfh->lfh_handle.h_cookie;
+        ret_oa->o_valid |= OBD_MD_FLHANDLE;
+        ret_oa->o_id = src_oa->o_id;
+        memcpy(src_oa, ret_oa, sizeof(*src_oa));
 
-        /* llfh refcount transfers to list */
+        /* lfh refcount transfers to list */
         spin_lock(&export->exp_lov_data.led_lock);
         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
         spin_unlock(&export->exp_lov_data.led_lock);
 
         GOTO(out_tmp, rc);
  out_tmp:
-        obdo_free(tmp);
+        obdo_free(tmp_oa);
+ out_oa:
+        obdo_free(ret_oa);
  out_exp:
         class_export_put(export);
         return rc;
@@ -1150,16 +1282,16 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                 if (lov->tgts[loi->loi_ost_idx].active == 0)
                         continue;
 
-                memcpy(tmp, oa, sizeof(*tmp));
-                tmp->o_id = loi->loi_id;
-                memcpy(obdo_handle(tmp), lfh->lfh_och + i, FD_OSTDATA_SIZE);
+                memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+                tmp_oa->o_id = loi->loi_id;
+                memcpy(obdo_handle(tmp_oa), &lfh->lfh_och[i], FD_OSTDATA_SIZE);
 
-                err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
+                err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
                                 NULL, NULL);
                 if (err && lov->tgts[loi->loi_ost_idx].active) {
                         CERROR("error: closing objid "LPX64" subobj "LPX64
                                " on OST idx %d after open error: rc=%d\n",
-                               oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+                               src_oa->o_id, loi->loi_id, loi->loi_ost_idx,err);
                 }
         }
 
@@ -1189,6 +1321,8 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
 
         if (oa->o_valid & OBD_MD_FLHANDLE)
                 lfh = lov_handle2lfh(obdo_handle(oa));
+        if (!lfh)
+                LBUG();
 
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
@@ -1198,7 +1332,7 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
+                        memcpy(obdo_handle(&tmp), &lfh->lfh_och[i],
                                FD_OSTDATA_SIZE);
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
@@ -1223,18 +1357,16 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
 
                 OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
                 lov_lfh_destroy(lfh);
+                LASSERT(atomic_read(&lfh->lfh_refcount) == 1);
                 lov_lfh_put(lfh); /* balance handle2lfh above */
-        }
+        } else
+                LBUG();
         GOTO(out, rc);
  out:
         class_export_put(export);
         return rc;
 }
 
-#ifndef log2
-#define log2(n) ffz(~(n))
-#endif
-
 /* we have an offset in file backed by an lov and want to find out where
  * that offset lands in our given stripe of the file.  for the easy
  * case where the offset is within the stripe, we just have to scale the
@@ -1404,8 +1536,8 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
-                        memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
-                               FD_OSTDATA_SIZE);
+                        memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
+                               sizeof(lfh->lfh_och[i].och_fh));
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
@@ -1455,7 +1587,7 @@ static int lov_brw_check(struct lov_obd *lov, struct lov_stripe_md *lsm,
         return 0;
 }
 
-static int lov_brw(int cmd, struct lustre_handle *conn,
+static int lov_brw(int cmd, struct lustre_handle *conn, struct obdo *src_oa,
                    struct lov_stripe_md *lsm, obd_count oa_bufs,
                    struct brw_page *pga, struct obd_trans_info *oti)
 {
@@ -1467,10 +1599,12 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
                 int ost_idx;
         } *stripeinfo, *si, *si_last;
         struct obd_export *export = class_conn2export(conn);
+        struct obdo *ret_oa = NULL, *tmp_oa = NULL;
+        struct lov_file_handles *lfh = NULL;
         struct lov_obd *lov;
         struct brw_page *ioarr;
         struct lov_oinfo *loi;
-        int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
+        int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count, set = 0;
         ENTRY;
 
         if (lsm_bad_magic(lsm))
@@ -1495,6 +1629,21 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
         if (!ioarr)
                 GOTO(out_where, rc = -ENOMEM);
 
+        if (src_oa) {
+                ret_oa = obdo_alloc();
+                if (!ret_oa)
+                        GOTO(out_ioarr, rc = -ENOMEM);
+
+                tmp_oa = obdo_alloc();
+                if (!tmp_oa)
+                        GOTO(out_oa, rc = -ENOMEM);
+
+                if (src_oa->o_valid & OBD_MD_FLHANDLE)
+                        lfh = lov_handle2lfh(obdo_handle(src_oa));
+                else
+                        src_oa->o_valid &= ~OBD_MD_FLHANDLE;
+        }
+
         for (i = 0; i < oa_bufs; i++) {
                 where[i] = lov_stripe_number(lsm, pga[i].off);
                 stripeinfo[where[i]].bufct++;
@@ -1524,23 +1673,46 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
 
                 if (lov->tgts[si->ost_idx].active == 0) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
-                        GOTO(out_ioarr, rc = -EIO);
+                        GOTO(out_oa, rc = -EIO);
                 }
 
                 if (si->bufct) {
                         LASSERT(shift < oa_bufs);
-                        rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
+                        if (src_oa) {
+                                memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+                                if (lfh)
+                                        memcpy(obdo_handle(tmp_oa),
+                                               &lfh->lfh_och[i].och_fh,
+                                               sizeof(lfh->lfh_och[i].och_fh));
+                        }
+
+                        tmp_oa->o_id = si->lsm.lsm_object_id;
+                        rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn, tmp_oa,
                                      &si->lsm, si->bufct, &ioarr[shift],
                                      oti);
                         if (rc)
                                 GOTO(out_ioarr, rc);
+
+                        lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
+                                        i, &set);
                 }
         }
-        GOTO(out_ioarr, rc);
+
+        ret_oa->o_id = src_oa->o_id;
+        memcpy(src_oa, ret_oa, sizeof(*src_oa));
+
+        GOTO(out_oa, rc);
+ out_oa:
+        if (tmp_oa)
+                obdo_free(tmp_oa);
+        if (ret_oa)
+                obdo_free(ret_oa);
  out_ioarr:
         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
  out_where:
         OBD_FREE(where, sizeof(*where) * oa_bufs);
+        if (lfh)
+                lov_lfh_put(lfh);
  out_sinfo:
         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
  out_exp:
@@ -1548,18 +1720,43 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
         return rc;
 }
 
-static int lov_brw_interpret (struct ptlrpc_request_set *set,
-                              struct lov_brw_async_args *aa, int rc)
+static int lov_brw_interpret(struct ptlrpc_request_set *rqset,
+                             struct lov_brw_async_args *aa, int rc)
 {
-        obd_count        oa_bufs = aa->aa_oa_bufs;
-        struct brw_page *ioarr = aa->aa_ioarr;
+        struct lov_stripe_md *lsm = aa->aa_lsm;
+        obd_count             oa_bufs = aa->aa_oa_bufs;
+        struct obdo          *oa = aa->aa_oa;
+        struct obdo          *obdos = aa->aa_obdos;
+        struct brw_page      *ioarr = aa->aa_ioarr;
+        struct lov_oinfo     *loi;
+        int i, set = 0;
         ENTRY;
 
-        OBD_FREE (ioarr, sizeof (*ioarr) * oa_bufs);
-        RETURN (rc);
+        if (rc == 0) {
+                /* NB all stripe requests succeeded to get here */
+
+                for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+                     i++, loi++) {
+                        if (obdos[i].o_valid == 0)      /* inactive stripe */
+                                continue;
+
+                        lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
+                                        i, &set);
+                }
+
+                if (!set) {
+                        CERROR("No stripes had valid attrs\n");
+                        rc = -EIO;
+                }
+        }
+        oa->o_id = lsm->lsm_object_id;
+
+        OBD_FREE(obdos, lsm->lsm_stripe_count * sizeof(*obdos));
+        OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
+        RETURN(rc);
 }
 
-static int lov_brw_async(int cmd, struct lustre_handle *conn,
+static int lov_brw_async(int cmd, struct lustre_handle *conn, struct obdo *oa,
                          struct lov_stripe_md *lsm, obd_count oa_bufs,
                          struct brw_page *pga, struct ptlrpc_request_set *set,
                          struct obd_trans_info *oti)
@@ -1573,7 +1770,9 @@ static int lov_brw_async(int cmd, struct lustre_handle *conn,
         } *stripeinfo, *si, *si_last;
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
+        struct lov_file_handles *lfh = NULL;
         struct brw_page *ioarr;
+        struct obdo *obdos = NULL;
         struct lov_oinfo *loi;
         struct lov_brw_async_args *aa;
         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
@@ -1597,9 +1796,20 @@ static int lov_brw_async(int cmd, struct lustre_handle *conn,
         if (!where)
                 GOTO(out_sinfo, rc = -ENOMEM);
 
+        if (oa) {
+                OBD_ALLOC(obdos, sizeof(*obdos) * stripe_count);
+                if (!obdos)
+                        GOTO(out_where, rc = -ENOMEM);
+
+                if (oa->o_valid & OBD_MD_FLHANDLE)
+                        lfh = lov_handle2lfh(obdo_handle(oa));
+                else
+                        oa->o_valid &= ~OBD_MD_FLHANDLE;
+        }
+
         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
         if (!ioarr)
-                GOTO(out_where, rc = -ENOMEM);
+                GOTO(out_obdos, rc = -ENOMEM);
 
         for (i = 0; i < oa_bufs; i++) {
                 where[i] = lov_stripe_number(lsm, pga[i].off);
@@ -1612,6 +1822,15 @@ static int lov_brw_async(int cmd, struct lustre_handle *conn,
                         si->index = si_last->index + si_last->bufct;
                 si->lsm.lsm_object_id = loi->loi_id;
                 si->ost_idx = loi->loi_ost_idx;
+
+                if (oa) {
+                        memcpy(&obdos[i], oa, sizeof(*obdos));
+                        obdos[i].o_id = si->lsm.lsm_object_id;
+                        if (lfh)
+                                memcpy(obdo_handle(&obdos[i]),
+                                       &lfh->lfh_och[i].och_fh,
+                                       sizeof(lfh->lfh_och[i].och_fh));
+                }
         }
 
         for (i = 0; i < oa_bufs; i++) {
@@ -1637,24 +1856,35 @@ static int lov_brw_async(int cmd, struct lustre_handle *conn,
                 }
 
                 LASSERT(shift < oa_bufs);
+
                 rc = obd_brw_async(cmd, &lov->tgts[si->ost_idx].conn,
-                                   &si->lsm, si->bufct, &ioarr[shift],
-                                   set, oti);
+                                   &obdos[i], &si->lsm, si->bufct,
+                                   &ioarr[shift], set, oti);
                 if (rc)
                         GOTO(out_ioarr, rc);
         }
-        LASSERT (rc == 0);
-        LASSERT (set->set_interpret == NULL);
-        set->set_interpret = lov_brw_interpret;
-        LASSERT (sizeof (set->set_args) >= sizeof (struct lov_brw_async_args));
+        LASSERT(rc == 0);
+        LASSERT(set->set_interpret == NULL);
+        set->set_interpret = (set_interpreter_func)lov_brw_interpret;
+        LASSERT(sizeof(set->set_args) >= sizeof(struct lov_brw_async_args));
         aa = (struct lov_brw_async_args *)&set->set_args;
-        aa->aa_oa_bufs = oa_bufs;
+        aa->aa_lsm = lsm;
+        aa->aa_obdos = obdos;
+        aa->aa_oa = oa;
         aa->aa_ioarr = ioarr;
+        aa->aa_oa_bufs = oa_bufs;
+
+        /* Don't free ioarr or obdos - that's done in lov_brw_interpret */
         GOTO(out_where, rc);
+
  out_ioarr:
         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
+ out_obdos:
+        OBD_FREE(obdos, stripe_count * sizeof(*obdos));
  out_where:
         OBD_FREE(where, sizeof(*where) * oa_bufs);
+        if (lfh)
+                lov_lfh_put(lfh);
  out_sinfo:
         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
  out_exp:
@@ -1980,20 +2210,16 @@ static int lov_cancel_unused(struct lustre_handle *conn,
                         (tot) += (add);                                 \
         } while(0)
 
-static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
+static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
+                      unsigned long max_age)
 {
-        struct obd_export *tgt_export;
-        struct lov_obd *lov;
+        struct lov_obd *lov = &obd->u.lov;
         struct obd_statfs lov_sfs;
         int set = 0;
         int rc = 0;
         int i;
         ENTRY;
 
-        if (!export || !export->exp_obd)
-                RETURN(-ENODEV);
-
-        lov = &export->exp_obd->u.lov;
 
         /* We only get block data from the OBD */
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
@@ -2004,14 +2230,8 @@ static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
                         continue;
                 }
 
-                tgt_export = class_conn2export(&lov->tgts[i].conn);
-                if (!tgt_export) {
-                        CDEBUG(D_HA, "lov idx %d NULL export\n", i);
-                        continue;
-                }
-
-                err = obd_statfs(tgt_export, &lov_sfs);
-                class_export_put(tgt_export);
+                err = obd_statfs(class_conn2obd(&lov->tgts[i].conn), &lov_sfs,
+                                 max_age);
                 if (err) {
                         if (lov->tgts[i].active) {
                                 CERROR("error: statfs OSC %s on OST idx %d: "
@@ -2022,6 +2242,7 @@ static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
                         }
                         continue;
                 }
+
                 if (!set) {
                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
                         set = 1;
@@ -2044,6 +2265,7 @@ static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
                 }
         }
+
         if (set) {
                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
                                          lov->desc.ld_default_stripe_count :
@@ -2055,6 +2277,7 @@ static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
                         do_div(osfs->os_ffree, expected_stripes);
         } else if (!rc)
                 rc = -EIO;
+
         RETURN(rc);
 }
 
@@ -2191,7 +2414,28 @@ static int lov_get_info(struct lustre_handle *conn, __u32 keylen,
         RETURN(-EINVAL);
 }
 
-static int lov_mark_page_dirty(struct lustre_handle *conn, 
+static int lov_set_info(struct lustre_handle *conn, obd_count keylen,
+                        void *key, obd_count vallen, void *val)
+{
+        struct obd_device *obddev = class_conn2obd(conn);
+        struct lov_obd *lov = &obddev->u.lov;
+        int i, rc = 0;
+        ENTRY;
+
+        if (keylen < strlen("mds_conn") ||
+            memcmp(key, "mds_conn", strlen("mds_conn")) != 0)
+                RETURN(-EINVAL);
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                int er;
+                er = obd_set_info(&lov->tgts[i].conn, keylen, key, vallen, val);
+                if (!rc)
+                        rc = er;
+        }
+        RETURN(rc);
+}
+
+static int lov_mark_page_dirty(struct lustre_handle *conn,
                                struct lov_stripe_md *lsm, unsigned long offset)
 {
         struct lov_obd *lov = &class_conn2obd(conn)->u.lov;
@@ -2209,12 +2453,12 @@ static int lov_mark_page_dirty(struct lustre_handle *conn,
                 RETURN(-ENOMEM);
 
         stripe = lov_stripe_number(lsm, (obd_off)offset << PAGE_CACHE_SHIFT);
-        lov_stripe_offset(lsm, (obd_off)offset << PAGE_CACHE_SHIFT, stripe, 
+        lov_stripe_offset(lsm, (obd_off)offset << PAGE_CACHE_SHIFT, stripe,
                           &off);
         off >>= PAGE_CACHE_SHIFT;
 
         loi = &lsm->lsm_oinfo[stripe];
-        CDEBUG(D_INODE, "off %lu => off %lu on stripe %d\n", offset, 
+        CDEBUG(D_INODE, "off %lu => off %lu on stripe %d\n", offset,
                (unsigned long)off, stripe);
         submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
 
@@ -2223,7 +2467,7 @@ static int lov_mark_page_dirty(struct lustre_handle *conn,
         RETURN(rc);
 }
 
-static int lov_clear_dirty_pages(struct lustre_handle *conn, 
+static int lov_clear_dirty_pages(struct lustre_handle *conn,
                                  struct lov_stripe_md *lsm, unsigned long start,
                                  unsigned long end, unsigned long *cleared)
 
@@ -2267,11 +2511,11 @@ static int lov_clear_dirty_pages(struct lustre_handle *conn,
                 obd_start >>= PAGE_CACHE_SHIFT;
                 obd_end >>= PAGE_CACHE_SHIFT;
 
-                CDEBUG(D_INODE, "offs [%lu,%lu] => offs [%lu,%lu] stripe %d\n", 
-                       start, end, (unsigned long)obd_start, 
+                CDEBUG(D_INODE, "offs [%lu,%lu] => offs [%lu,%lu] stripe %d\n",
+                       start, end, (unsigned long)obd_start,
                        (unsigned long)obd_end, loi->loi_ost_idx);
                 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
-                rc = obd_clear_dirty_pages(&lov->tgts[loi->loi_ost_idx].conn, 
+                rc = obd_clear_dirty_pages(&lov->tgts[loi->loi_ost_idx].conn,
                                            submd, obd_start, obd_end,
                                            &osc_cleared);
                 if (rc)
@@ -2310,15 +2554,14 @@ static int lov_last_dirty_offset(struct lustre_handle *conn,
         *offset = 0;
         lov = &export->exp_obd->u.lov;
         rc = -ENOENT;
-        for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; 
-                                          i++, loi++) {
 
+        for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++){
                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
                 skip = (lsm->lsm_stripe_count - 1) * count;
 
                 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
 
-                err = obd_last_dirty_offset(&lov->tgts[loi->loi_ost_idx].conn, 
+                err = obd_last_dirty_offset(&lov->tgts[loi->loi_ost_idx].conn,
                                             submd, &tmp);
                 if (err == -ENOENT)
                         continue;
@@ -2326,7 +2569,7 @@ static int lov_last_dirty_offset(struct lustre_handle *conn,
                         GOTO(out_exp, rc = err);
 
                 rc = 0;
-                if (tmp != ~0) 
+                if (tmp != ~0)
                         tmp += (tmp/count * skip) + (i * count);
                 if (tmp > *offset)
                         *offset = tmp;
@@ -2338,6 +2581,100 @@ out_exp:
         RETURN(rc);
 }
 
+/* For LOV catalogs, we "nest" catalogs from the parent catalog.  What this
+ * means is that the parent catalog has a bunch of log cookies that are
+ * pointing at one catalog for each OSC.  The OSC catalogs in turn hold
+ * cookies for actual log files. */
+static int lov_get_catalogs(struct lov_obd *lov, struct llog_handle *cathandle)
+{
+        int i, rc;
+
+        ENTRY;
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                lov->tgts[i].ltd_cathandle = llog_new_log(cathandle,
+                                                          &lov->tgts[i].uuid);
+                if (IS_ERR(lov->tgts[i].ltd_cathandle))
+                        continue;
+                rc = llog_init_catalog(cathandle, &lov->tgts[i].uuid);
+                if (rc)
+                        GOTO(err_logs, rc);
+        }
+        lov->lo_catalog_loaded = 1;
+        RETURN(0);
+err_logs:
+        while (i-- > 0) {
+                llog_delete_log(cathandle, lov->tgts[i].ltd_cathandle);
+                llog_close_log(cathandle, lov->tgts[i].ltd_cathandle);
+        }
+        return rc;
+}
+
+/* Add log records for each OSC that this object is striped over, and return
+ * cookies for each one.  We _would_ have nice abstraction here, except that
+ * we need to keep cookies in stripe order, even if some are NULL, so that
+ * the right cookies are passed back to the right OSTs at the client side.
+ * Unset cookies should be all-zero (which will never occur naturally). */
+static int lov_log_add(struct lustre_handle *conn,
+                       struct llog_handle *cathandle,
+                       struct llog_trans_hdr *rec, struct lov_stripe_md *lsm,
+                       struct llog_cookie *logcookies, int numcookies)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct lov_obd *lov = &obd->u.lov;
+        struct lov_oinfo *loi;
+        int i, rc = 0;
+        ENTRY;
+
+        LASSERT(logcookies && numcookies >= lsm->lsm_stripe_count);
+
+        if (unlikely(!lov->lo_catalog_loaded))
+                lov_get_catalogs(lov, cathandle);
+
+        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+                rc += obd_log_add(&lov->tgts[loi->loi_ost_idx].conn,
+                                  lov->tgts[loi->loi_ost_idx].ltd_cathandle,
+                                  rec, NULL, logcookies + rc, numcookies - rc);
+        }
+
+        RETURN(rc);
+}
+
+static int lov_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+                          int count, struct llog_cookie *cookies, int flags)
+{
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
+        struct lov_oinfo *loi;
+        int rc = 0, i;
+        ENTRY;
+
+        LASSERT(lsm != NULL);
+        if (export == NULL || export->exp_obd == NULL)
+                GOTO(out, rc = -ENODEV);
+
+        LASSERT(count == lsm->lsm_stripe_count);
+
+        loi = lsm->lsm_oinfo;
+        lov = &export->exp_obd->u.lov;
+        for (i = 0; i < count; i++, cookies++, loi++) {
+                int err;
+
+                err = obd_log_cancel(&lov->tgts[loi->loi_ost_idx].conn,
+                                     NULL, 1, cookies, flags);
+                if (err && lov->tgts[loi->loi_ost_idx].active) {
+                        CERROR("error: objid "LPX64" subobj "LPX64
+                               " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
+                               loi->loi_id, loi->loi_ost_idx, err);
+                        if (!rc)
+                                rc = err;
+                }
+        }
+        GOTO(out, rc);
+ out:
+        class_export_put(export);
+        return rc;
+}
+
 struct obd_ops lov_obd_ops = {
         o_owner:       THIS_MODULE,
         o_attach:      lov_attach,
@@ -2364,9 +2701,12 @@ struct obd_ops lov_obd_ops = {
         o_cancel_unused: lov_cancel_unused,
         o_iocontrol:   lov_iocontrol,
         o_get_info:    lov_get_info,
-        .o_mark_page_dirty =    lov_mark_page_dirty,
-        .o_clear_dirty_pages =    lov_clear_dirty_pages,
-        .o_last_dirty_offset =    lov_last_dirty_offset,
+        o_set_info:    lov_set_info,
+        o_log_add:     lov_log_add,
+        o_log_cancel:  lov_log_cancel,
+        o_mark_page_dirty:   lov_mark_page_dirty,
+        o_clear_dirty_pages: lov_clear_dirty_pages,
+        o_last_dirty_offset: lov_last_dirty_offset,
 };
 
 int __init lov_init(void)
@@ -2374,15 +2714,13 @@ int __init lov_init(void)
         struct lprocfs_static_vars lvars;
         int rc;
 
-        printk(KERN_INFO "Lustre Logical Object Volume driver; "
-               "info@clusterfs.com\n");
-        lprocfs_init_vars(&lvars);
+        lprocfs_init_vars(lov, &lvars);
         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
                                  OBD_LOV_DEVICENAME);
         RETURN(rc);
 }
 
-static void __exit lov_exit(void)
+static void /*__exit*/ lov_exit(void)
 {
         class_unregister_type(OBD_LOV_DEVICENAME);
 }