Whamcloud - gitweb
land b_hd_sec on HEAD: lctl flush stuff, plus various client gss
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index 0a39316..bd74795 100644 (file)
@@ -96,7 +96,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
 
-                if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
+                if (obd_uuid_equals(uuid, &tgt->uuid))
                         break;
         }
 
@@ -552,27 +552,27 @@ static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
         struct lmv_obd *lmv = &obd->u.lmv;
         ENTRY;
 
-        if (lcfg->lcfg_inllen1 < 1) {
+        if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
                 CERROR("LMV setup requires a descriptor\n");
                 RETURN(-EINVAL);
         }
 
-        if (lcfg->lcfg_inllen2 < 1) {
+        if (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1) {
                 CERROR("LMV setup requires an MDT UUID list\n");
                 RETURN(-EINVAL);
         }
 
-        desc = (struct lmv_desc *)lcfg->lcfg_inlbuf1;
-        if (sizeof(*desc) > lcfg->lcfg_inllen1) {
+        desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
+        if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
                 CERROR("descriptor size wrong: %d > %d\n",
-                       (int)sizeof(*desc), lcfg->lcfg_inllen1);
+                       (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
                 RETURN(-EINVAL);
         }
 
-        uuids = (struct obd_uuid *)lcfg->lcfg_inlbuf2;
-        if (sizeof(*uuids) * desc->ld_tgt_count != lcfg->lcfg_inllen2) {
+        uuids = (struct obd_uuid *)lustre_cfg_buf(lcfg, 2);
+        if (sizeof(*uuids) * desc->ld_tgt_count != LUSTRE_CFG_BUFLEN(lcfg, 2)) {
                 CERROR("UUID array size wrong: %u * %u != %u\n",
-                       sizeof(*uuids), desc->ld_tgt_count, lcfg->lcfg_inllen2);
+                       sizeof(*uuids), desc->ld_tgt_count, LUSTRE_CFG_BUFLEN(lcfg, 2));
                 RETURN(-EINVAL);
         }
 
@@ -639,10 +639,8 @@ static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                 RETURN(-ENOMEM);
                 
         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
-                if (lmv->tgts[i].ltd_exp == NULL) {
-                        CWARN("%s: NULL export for %d\n", obd->obd_name, i);
+                if (lmv->tgts[i].ltd_exp == NULL)
                         continue;
-                }
 
                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
                 if (rc) {
@@ -1032,6 +1030,53 @@ cleanup:
         return rc;
 }
 
+int lmv_enqueue_remote(struct obd_export *exp, int lock_type,
+                       struct lookup_intent *it, int lock_mode,
+                       struct mdc_op_data *data, struct lustre_handle *lockh,
+                       void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
+                       ldlm_blocking_callback cb_blocking, void *cb_data)
+{
+        struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        struct lustre_handle plock;
+        struct mdc_op_data rdata;
+        struct mds_body *body = NULL;
+        int rc = 0, pmode;
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
+        LASSERT(body != NULL);
+
+        if (!(body->valid & OBD_MD_MDS))
+                RETURN(0);
+
+        CDEBUG(D_OTHER, "ENQUEUE '%s' on "DLID4" -> "DLID4"\n",
+               LL_IT2STR(it), OLID4(&data->id1), OLID4(&body->id1));
+
+        /* we got LOOKUP lock, but we really need attrs */
+        pmode = LUSTRE_IT(it)->it_lock_mode;
+        LASSERT(pmode != 0);
+        memcpy(&plock, lockh, sizeof(plock));
+        LUSTRE_IT(it)->it_lock_mode = 0;
+        LUSTRE_IT(it)->it_data = NULL;
+        LASSERT((body->valid & OBD_MD_FID) != 0);
+
+        memcpy(&rdata, data, sizeof(rdata));
+        rdata.id1 = body->id1;
+        rdata.name = NULL;
+        rdata.namelen = 0;
+
+        LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
+        ptlrpc_req_finished(req);
+
+        rc = md_enqueue(lmv->tgts[id_group(&rdata.id1)].ltd_exp, 
+                        lock_type, it, lock_mode, &rdata, lockh, lmm, 
+                        lmmsize, cb_compl, cb_blocking, cb_data);
+        ldlm_lock_decref(&plock, pmode);
+        RETURN(rc);
+}
+
 int lmv_enqueue(struct obd_export *exp, int lock_type,
                 struct lookup_intent *it, int lock_mode,
                 struct mdc_op_data *data, struct lustre_handle *lockh,
@@ -1072,6 +1117,10 @@ int lmv_enqueue(struct obd_export *exp, int lock_type,
         rc = md_enqueue(lmv->tgts[id_group(&data->id1)].ltd_exp, 
                         lock_type, it, lock_mode, data, lockh, lmm, 
                         lmmsize, cb_compl, cb_blocking, cb_data);
+        if (rc == 0 && it->it_op == IT_OPEN)
+                rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
+                                        data, lockh, lmm, lmmsize,
+                                        cb_compl, cb_blocking, cb_data);
         RETURN(rc);
 }
 
@@ -1104,8 +1153,9 @@ repeat:
         CDEBUG(D_OTHER, "getattr_lock for %*s on "DLID4" -> "DLID4"\n",
                namelen, filename, OLID4(id), OLID4(&rid));
 
-        rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp, &rid,
-                             filename, namelen, (valid | OBD_MD_FID),
+        rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp,
+                             &rid, filename, namelen,
+                             valid == OBD_MD_FLID ? valid : valid | OBD_MD_FID,
                              ea_size, request);
         if (rc == 0) {
                 /*
@@ -1115,7 +1165,8 @@ repeat:
                  */
                 body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
                 LASSERT(body != NULL);
-                LASSERT((body->valid & OBD_MD_FID) != 0);
+                LASSERT((body->valid & OBD_MD_FID) != 0
+                                || body->valid == OBD_MD_FLID);
 
                 if (body->valid & OBD_MD_MDS) {
                         struct ptlrpc_request *req = NULL;
@@ -1150,7 +1201,7 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct lmv_obj *obj;
-        int rc;
+        int rc, mds;
         ENTRY;
         
         rc = lmv_check_connect(obd);
@@ -1159,25 +1210,31 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
 
         if (data->namelen != 0) {
                 /* usual link request */
-                obj = lmv_grab_obj(obd, &data->id1);
+                obj = lmv_grab_obj(obd, &data->id2);
                 if (obj) {
                         rc = raw_name2idx(obj->hashtype, obj->objcount, 
                                           data->name, data->namelen);
-                        data->id1 = obj->objs[rc].id;
+                        data->id2 = obj->objs[rc].id;
                         lmv_put_obj(obj);
                 }
+
+                mds = id_group(&data->id2);
                 
                 CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n",
                        OLID4(&data->id2), data->namelen, data->name,
                        OLID4(&data->id1));
         } else {
+                mds = id_group(&data->id1);
+                
                 /* request from MDS to acquire i_links for inode by id1 */
                 CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n",
                        OLID4(&data->id1));
         }
-                        
-        rc = md_link(lmv->tgts[id_group(&data->id1)].ltd_exp, 
-                     data, request);
+
+        CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n",
+               mds, OLID4(&data->id1));
+        rc = md_link(lmv->tgts[mds].ltd_exp, data, request);
+        
         RETURN(rc);
 }
 
@@ -1597,6 +1654,7 @@ int lmv_init_ea_size(struct obd_export *exp, int easize,
 }
 
 int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
+                          void *acl, int acl_size,
                           struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
         struct obd_device *obd = exp->exp_obd;
@@ -1609,8 +1667,8 @@ int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
         LASSERT(ea == NULL);
         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
 
-        rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp,
-                        oa, &obj_mdp, oti);
+        rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp, oa,
+                        acl, acl_size, &obj_mdp, oti);
 
         RETURN(rc);
 }
@@ -1630,6 +1688,7 @@ int lmv_getready(struct obd_export *exp)
  * values for "master" object, as it will be used.
  */
 int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
+                   void *acl, int acl_size,
                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
         struct obd_device *obd = exp->exp_obd;
@@ -1646,12 +1705,15 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
         LASSERT(oa != NULL);
         
         if (ea == NULL) {
-                rc = lmv_obd_create_single(exp, oa, NULL, oti);
+                rc = lmv_obd_create_single(exp, oa, acl, acl_size, NULL, oti);
                 if (rc)
                         CERROR("Can't create object, rc = %d\n", rc);
                 RETURN(rc);
         }
 
+        /* acl is only suppied when mds create single remote obj */
+        LASSERT(acl == NULL && acl_size == 0);
+
         if (*ea == NULL) {
                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
                 if (rc < 0) {
@@ -1704,7 +1766,8 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
 
-                rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti);
+                rc = obd_create(lmv->tgts[c].ltd_exp, oa, NULL, 0,
+                                &obj_mdp, oti);
                 if (rc) {
                         CERROR("obd_create() failed on MDT target %d, "
                                "error %d\n", c, rc);
@@ -1841,8 +1904,9 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
 int lmv_set_info(struct obd_export *exp, obd_count keylen,
                  void *key, obd_count vallen, void *val)
 {
-        struct obd_device *obd;
-        struct lmv_obd *lmv;
+        struct lmv_tgt_desc    *tgt;
+        struct obd_device      *obd;
+        struct lmv_obd         *lmv;
         ENTRY;
 
         obd = class_exp2obd(exp);
@@ -1862,7 +1926,6 @@ int lmv_set_info(struct obd_export *exp, obd_count keylen,
         /* maybe this could be default */
         if ((keylen == strlen("sec") && strcmp(key, "sec") == 0) ||
             (keylen == strlen("nllu") && strcmp(key, "nllu") == 0)) {
-                struct lmv_tgt_desc *tgt;
                 struct obd_export *exp;
                 int rc = 0, err, i;
 
@@ -1898,6 +1961,23 @@ int lmv_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(rc);
         }
 
+        if ((keylen == strlen("flush_cred") &&
+             strcmp(key, "flush_cred") == 0)) {
+                int rc = 0, i;
+
+                for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
+                     i++, tgt++) {
+                        if (!tgt->ltd_exp)
+                                continue;
+                        rc = obd_set_info(tgt->ltd_exp,
+                                          keylen, key, vallen, val);
+                        if (rc)
+                                RETURN(rc);
+                }
+
+                RETURN(0);
+        }
+
         RETURN(-EINVAL);
 }
 
@@ -2037,6 +2117,29 @@ int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
         RETURN(err);
 }
 
+static int lmv_cancel_unused(struct obd_export *exp,
+                             struct lov_stripe_md *lsm, 
+                            int flags, void *opaque)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        int rc = 0, err, i;
+        ENTRY;
+
+        LASSERT(lsm == NULL);
+        
+        for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+                if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].active)
+                        continue;
+                
+                err = obd_cancel_unused(lmv->tgts[i].ltd_exp,
+                                        NULL, flags, opaque);
+                if (!rc)
+                        rc = err;
+        }
+        RETURN(rc);
+}
+
 struct obd_ops lmv_obd_ops = {
         .o_owner                = THIS_MODULE,
         .o_attach               = lmv_attach,
@@ -2058,6 +2161,7 @@ struct obd_ops lmv_obd_ops = {
         .o_notify               = lmv_notify,
         .o_iocontrol            = lmv_iocontrol,
         .o_getready             = lmv_getready,
+        .o_cancel_unused        = lmv_cancel_unused,
 };
 
 struct md_ops lmv_md_ops = {