Whamcloud - gitweb
Land b_head_quota onto HEAD (20081116_0105)
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index abb4f66..8b2b7fb 100644 (file)
@@ -69,6 +69,18 @@ static int oss_num_create_threads;
 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
                 "number of OSS create threads to start");
 
+/**
+ * Do not return server-side uid/gid to remote client
+ */
+static void ost_drop_id(struct obd_export *exp, struct  obdo *oa)
+{
+        if (exp_connect_rmtclient(exp)) {
+                oa->o_uid = -1;
+                oa->o_gid = -1;
+                oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
+        }
+}
+
 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
 {
         struct oti_req_ack_lock *ack_lock;
@@ -95,6 +107,7 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
 {
         struct ost_body *body, *repbody;
         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        struct lustre_capa *capa = NULL;
         int rc;
         ENTRY;
 
@@ -115,6 +128,9 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
                 ldlm_request_cancel(req, dlm, 0);
         }
 
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
+                capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 2);
+
         rc = lustre_pack_reply(req, 2, size, NULL);
         if (rc)
                 RETURN(rc);
@@ -124,7 +140,7 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
                                  sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL);
+        req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
         RETURN(0);
 }
 
@@ -154,6 +170,7 @@ static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
                 oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
                                                    REQ_REC_OFF + 1);
         req->rq_status = obd_getattr(exp, &oinfo);
+        ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
 
@@ -320,6 +337,7 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
         }
         repbody->oa = *oinfo.oi_oa;
+        ost_drop_id(exp, &repbody->oa);
         RETURN(rc);
 }
 
@@ -348,6 +366,7 @@ static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
                                   repbody->oa.o_blocks, capa);
+        ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
 
@@ -378,6 +397,7 @@ static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
                 oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
                                                    REQ_REC_OFF + 1);
         req->rq_status = obd_setattr(exp, &oinfo, oti);
+        ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
 
@@ -792,6 +812,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
                                          sizeof(*repbody));
                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
+                ost_drop_id(exp, &repbody->oa);
         }
 
 out_lock:
@@ -843,6 +864,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         obd_count                client_cksum = 0, server_cksum = 0;
         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
         int                      no_reply = 0;
+        __u32                    o_uid = 0, o_gid = 0;
         ENTRY;
 
         req->rq_bulk_write = 1;
@@ -970,6 +992,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
         }
 
+        if (exp_connect_rmtclient(exp)) {
+                o_uid = body->oa.o_uid;
+                o_gid = body->oa.o_gid;
+        }
         npages = OST_THREAD_POOL_SIZE;
         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
                         ioo, remote_nb, &npages, local_nb, oti, capa);
@@ -1065,6 +1091,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         /* Must commit after prep above in all cases */
         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
                           remote_nb, npages, local_nb, oti, rc);
+        if (exp_connect_rmtclient(exp)) {
+                repbody->oa.o_uid = o_uid;
+                repbody->oa.o_gid = o_gid;
+        }
 
         if (unlikely(client_cksum != server_cksum && rc == 0)) {
                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
@@ -1230,26 +1260,25 @@ static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
         RETURN(rc);
 }
 
+#ifdef HAVE_QUOTA_SUPPORT
 static int ost_handle_quotactl(struct ptlrpc_request *req)
 {
         struct obd_quotactl *oqctl, *repoqc;
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
         int rc;
         ENTRY;
 
-        oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
-                                   lustre_swab_obd_quotactl);
+        oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
         if (oqctl == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 GOTO(out, rc);
 
-        repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
-
+        repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
         req->rq_status = obd_quotactl(req->rq_export, oqctl);
         *repoqc = *oqctl;
+
 out:
         RETURN(rc);
 }
@@ -1265,15 +1294,38 @@ static int ost_handle_quotacheck(struct ptlrpc_request *req)
                 RETURN(-EPROTO);
 
         rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc) {
-                CERROR("ost: out of memory while packing quotacheck reply\n");
+        if (rc)
                 RETURN(-ENOMEM);
-        }
 
         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
         RETURN(0);
 }
 
+static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
+{
+        struct quota_adjust_qunit *oqaq, *repoqa;
+        struct lustre_quota_ctxt *qctxt;
+        int rc;
+        ENTRY;
+
+        qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
+        oqaq = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
+        if (oqaq == NULL)
+                GOTO(out, rc = -EPROTO);
+
+        rc = req_capsule_server_pack(&req->rq_pill);
+        if (rc)
+                GOTO(out, rc);
+
+        repoqa = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
+        req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt);
+        *repoqa = *oqaq;
+
+ out:
+        RETURN(rc);
+}
+#endif
+
 static int ost_llog_handle_connect(struct obd_export *exp,
                                    struct ptlrpc_request *req)
 {
@@ -1286,6 +1338,122 @@ static int ost_llog_handle_connect(struct obd_export *exp,
         RETURN(rc);
 }
 
+#define ost_init_sec_none(reply, exp)                                   \
+do {                                                                    \
+        reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
+                                      OBD_CONNECT_RMT_CLIENT_FORCE |    \
+                                      OBD_CONNECT_OSS_CAPA);            \
+        spin_lock(&exp->exp_lock);                                      \
+        exp->exp_connect_flags = reply->ocd_connect_flags;              \
+        spin_unlock(&exp->exp_lock);                                    \
+} while (0)
+
+static int ost_init_sec_level(struct ptlrpc_request *req)
+{
+        struct obd_export *exp = req->rq_export;
+        struct req_capsule *pill = &req->rq_pill;
+        struct obd_device *obd = exp->exp_obd;
+        struct filter_obd *filter = &obd->u.filter;
+        char *client = libcfs_nid2str(req->rq_peer.nid);
+        struct obd_connect_data *data, *reply;
+        int rc = 0, remote;
+        ENTRY;
+
+        data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
+        reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
+        if (data == NULL || reply == NULL)
+                RETURN(-EFAULT);
+
+        /* connection from MDT is always trusted */
+        if (req->rq_auth_usr_mdt) {
+                ost_init_sec_none(reply, exp);
+                RETURN(0);
+        }
+
+        /* no GSS support case */
+        if (!req->rq_auth_gss) {
+                if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
+                        CWARN("client %s -> target %s does not user GSS, "
+                              "can not run under security level %d.\n",
+                              client, obd->obd_name, filter->fo_sec_level);
+                        RETURN(-EACCES);
+                } else {
+                        ost_init_sec_none(reply, exp);
+                        RETURN(0);
+                }
+        }
+
+        /* old version case */
+        if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
+                     !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
+                if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
+                        CWARN("client %s -> target %s uses old version, "
+                              "can not run under security level %d.\n",
+                              client, obd->obd_name, filter->fo_sec_level);
+                        RETURN(-EACCES);
+                } else {
+                        CWARN("client %s -> target %s uses old version, "
+                              "run under security level %d.\n",
+                              client, obd->obd_name, filter->fo_sec_level);
+                        ost_init_sec_none(reply, exp);
+                        RETURN(0);
+                }
+        }
+
+        remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
+        if (remote) {
+                if (!req->rq_auth_remote)
+                        CDEBUG(D_SEC, "client (local realm) %s -> target %s "
+                               "asked to be remote.\n", client, obd->obd_name);
+        } else if (req->rq_auth_remote) {
+                remote = 1;
+                CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
+                       "as remote by default.\n", client, obd->obd_name);
+        }
+
+        if (remote) {
+                if (!filter->fo_fl_oss_capa) {
+                        CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
+                               " but OSS capabilities are not enabled: %d.\n",
+                               client, obd->obd_name, filter->fo_fl_oss_capa);
+                        RETURN(-EACCES);
+                }
+        }
+
+        switch (filter->fo_sec_level) {
+        case LUSTRE_SEC_NONE:
+                if (!remote) {
+                        ost_init_sec_none(reply, exp);
+                        break;
+                } else {
+                        CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
+                               "can not run under security level %d.\n",
+                               client, obd->obd_name, filter->fo_sec_level);
+                        RETURN(-EACCES);
+                }
+        case LUSTRE_SEC_REMOTE:
+                if (!remote)
+                        ost_init_sec_none(reply, exp);
+                break;
+        case LUSTRE_SEC_ALL:
+                if (!remote) {
+                        reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
+                                                      OBD_CONNECT_RMT_CLIENT_FORCE);
+                        if (!filter->fo_fl_oss_capa)
+                                reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
+
+                        spin_lock(&exp->exp_lock);
+                        exp->exp_connect_flags = reply->ocd_connect_flags;
+                        spin_unlock(&exp->exp_lock);
+                }
+                break;
+        default:
+                RETURN(-EINVAL);
+        }
+
+        RETURN(rc);
+}
+
 static int filter_export_check_flavor(struct filter_obd *filter,
                                       struct obd_export *exp,
                                       struct ptlrpc_request *req)
@@ -1382,8 +1550,11 @@ int ost_msg_check_version(struct lustre_msg *msg)
         case OST_SYNC:
         case OST_SET_INFO:
         case OST_GET_INFO:
+#ifdef HAVE_QUOTA_SUPPORT
         case OST_QUOTACHECK:
         case OST_QUOTACTL:
+        case OST_QUOTA_ADJUST_QUNIT:
+#endif
                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
@@ -1487,12 +1658,14 @@ int ost_handle(struct ptlrpc_request *req)
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
                         RETURN(0);
                 if (!rc) {
-                        struct obd_export *exp = req->rq_export;
+                        rc = ost_init_sec_level(req);
+                        if (!rc) {
+                                struct obd_export *exp = req->rq_export;
 
-                        obd = exp->exp_obd;
-
-                        rc = filter_export_check_flavor(&obd->u.filter,
-                                                        exp, req);
+                                obd = exp->exp_obd;
+                                rc = filter_export_check_flavor(&obd->u.filter,
+                                                                exp, req);
+                        }
                 }
                 break;
         }
@@ -1598,6 +1771,7 @@ int ost_handle(struct ptlrpc_request *req)
                 DEBUG_REQ(D_INODE, req, "get_info");
                 rc = ost_get_info(req->rq_export, req);
                 break;
+#ifdef HAVE_QUOTA_SUPPORT
         case OST_QUOTACHECK:
                 CDEBUG(D_INODE, "quotacheck\n");
                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
@@ -1612,6 +1786,12 @@ int ost_handle(struct ptlrpc_request *req)
                         RETURN(0);
                 rc = ost_handle_quotactl(req);
                 break;
+        case OST_QUOTA_ADJUST_QUNIT:
+                CDEBUG(D_INODE, "quota_adjust_qunit\n");
+                req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
+                rc = ost_handle_quota_adjust_qunit(req);
+                break;
+#endif
         case OBD_PING:
                 DEBUG_REQ(D_INODE, req, "ping");
                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);