Whamcloud - gitweb
Land b_hd_capa onto HEAD (20050809_1942)
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 8421c85..aeae7d0 100644 (file)
 #include <linux/lustre_ha.h>
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_log.h>
+#include <linux/lustre_lite.h>
 #include <linux/lustre_audit.h>
 #include <linux/lustre_gs.h>
+
 #include "osc_internal.h"
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
@@ -387,11 +389,14 @@ out:
 
 static int osc_punch(struct obd_export *exp, struct obdo *oa,
                      struct lov_stripe_md *md, obd_size start,
-                     obd_size end, struct obd_trans_info *oti)
+                     obd_size end, struct obd_trans_info *oti,
+                     struct lustre_capa *capa)
 {
         struct ptlrpc_request *request;
         struct ost_body *body;
-        int rc, size = sizeof(*body);
+        struct lustre_capa *req_capa;
+        int bufcnt = 0;
+        int rc, size[2] = { sizeof(*body), sizeof(*capa) };
         ENTRY;
 
         if (!oa) {
@@ -400,11 +405,12 @@ static int osc_punch(struct obd_export *exp, struct obdo *oa,
         }
 
         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
-                                  OST_PUNCH, 1, &size, NULL);
+                                  OST_PUNCH, capa ? 2 : 1, size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
+        body = lustre_msg_buf(request->rq_reqmsg, bufcnt++, sizeof (*body));
+
         memcpy(&body->oa, oa, sizeof(*oa));
 
         /* overload the size and blocks fields in the oa with start/end */
@@ -412,7 +418,14 @@ static int osc_punch(struct obd_export *exp, struct obdo *oa,
         body->oa.o_blocks = end;
         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
 
-        request->rq_replen = lustre_msg_size(1, &size);
+        if (capa) {
+                req_capa = lustre_msg_buf(request->rq_reqmsg, bufcnt++,
+                                          sizeof(*capa));
+                capa_dup2(req_capa, capa);
+                body->oa.o_valid |= OBD_MD_CAPA;
+        }
+
+        request->rq_replen = lustre_msg_size(1, size);
 
         rc = ptlrpc_queue_wait(request);
         if (rc)
@@ -764,13 +777,17 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
         struct ptlrpc_bulk_desc *desc;
         struct client_obd       *cli = &imp->imp_obd->u.cli;
         struct ost_body         *body;
+        struct lustre_id        *raw_id = obdo_id(oa);
+        struct obd_capa         *ocapa;
+        struct lustre_capa      *capa;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
         int                      niocount;
-        int                      size[3];
-        int                      i;
+        int                      size[4];
+        int                      i, bufcnt = 0;
         int                      requested_nob;
         int                      opc;
+        int                      capa_op;
         int                      rc;
 
         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
@@ -779,11 +796,27 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
                         niocount++;
 
-        size[0] = sizeof(*body);
-        size[1] = sizeof(*ioobj);
-        size[2] = niocount * sizeof(*niobuf);
+        capa_op = (opc == OST_WRITE) ? MAY_WRITE : MAY_READ;
+get_capa:
+        ocapa = capa_get(oa->o_fsuid, capa_op, raw_id->li_fid.lf_group,
+                         raw_id->li_stc.u.e3s.l3s_ino, CLIENT_CAPA,
+                         NULL, NULL, NULL);
+        if (!ocapa) {
+                if (opc == OST_READ && capa_op == MAY_READ) {
+                        /* partial write might cause read, MAY_WRITE capability
+                         * should be used here */
+                        capa_op = MAY_WRITE;
+                        goto get_capa;
+                }
+        }
+
+        size[bufcnt++] = sizeof(*body);
+        size[bufcnt++] = sizeof(*ioobj);
+        if (ocapa)
+                size[bufcnt++] = sizeof(*capa);
+        size[bufcnt++] = niocount * sizeof(*niobuf);
 
-        req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, 3, size, NULL);
+        req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, bufcnt, size, NULL);
         if (req == NULL)
                 return (-ENOMEM);
 
@@ -797,15 +830,25 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
                 GOTO(out, rc = -ENOMEM);
         /* NB request now owns desc and will free it when it gets freed */
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
-        ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
-        niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
+        bufcnt = 0;
+        body = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*body));
+        ioobj = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*ioobj));
+        if (ocapa)
+                capa = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*capa));
+        niobuf = lustre_msg_buf(req->rq_reqmsg, bufcnt++,
+                                niocount * sizeof(*niobuf));
 
         memcpy(&body->oa, oa, sizeof(*oa));
 
         obdo_to_ioobj(oa, ioobj);
         ioobj->ioo_bufcnt = niocount;
 
+        if (ocapa) {
+                capa_dup(capa, ocapa);
+                body->oa.o_valid |= OBD_MD_CAPA;
+                capa_put(ocapa, CLIENT_CAPA);
+        }
+
         LASSERT (page_count > 0);
 
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
@@ -843,7 +886,8 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
         }
 
         LASSERT((void *)(niobuf - niocount) ==
-                lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
+                lustre_msg_buf(req->rq_reqmsg, bufcnt - 1,
+                               niocount * sizeof(*niobuf)));
         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
 
         /* size[0] still sizeof (*body) */
@@ -2917,6 +2961,7 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
+
         if (keylen == strlen("unrecovery") &&
             memcmp(key, "unrecovery", keylen) == 0) {
                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
@@ -2925,9 +2970,10 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
+
         if (keylen == strlen("initial_recov") &&
             memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
-                struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
+                struct obd_import *imp = class_exp2cliimp(exp);
                 if (vallen != sizeof(int))
                         RETURN(-EINVAL);
                 imp->imp_initial_recov = *(int *)val;
@@ -3005,8 +3051,7 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(rc);
         }
 
-        if (keylen == strlen("sec") &&
-            memcmp(key, "sec", keylen) == 0) {
+        if (keylen == strlen("sec") && memcmp(key, "sec", keylen) == 0) {
                 struct client_obd *cli = &exp->exp_obd->u.cli;
 
                 cli->cl_sec_flavor = ptlrpcs_name2flavor(val);
@@ -3041,6 +3086,29 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
+        if (keylen == 8 && memcmp(key, "capa_key", 8) == 0) {
+                struct ptlrpc_request *req;
+                char *bufs[2] = {key, val};
+                unsigned long irqflags;
+                int rc, size[2] = {keylen, vallen};
+
+                LASSERT(vallen == sizeof(struct lustre_capa_key));
+
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+                                      OST_SET_INFO, 2, size, bufs);
+                if (req == NULL)
+                        RETURN(-ENOMEM);
+
+                spin_lock_irqsave (&req->rq_lock, irqflags);
+                req->rq_replay = 1;
+                spin_unlock_irqrestore (&req->rq_lock, irqflags);
+
+                req->rq_replen = lustre_msg_size(0, NULL);
+                rc = ptlrpc_queue_wait(req);
+                ptlrpc_req_finished(req);
+                RETURN(rc);
+        }
+
         if (keylen == strlen("setext") &&
             memcmp(key, "setext", keylen) == 0) {
                 struct client_obd *cli = &exp->exp_obd->u.cli;
@@ -3270,6 +3338,7 @@ static int osc_cleanup(struct obd_device *obd, int flags)
         RETURN(rc);
 }
 
+       
 struct obd_ops osc_obd_ops = {
         .o_owner                = THIS_MODULE,
         .o_attach               = osc_attach,