Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 22f3dcd..a00185f 100644 (file)
@@ -68,7 +68,8 @@ static quota_interface_t *quota_interface = NULL;
 extern quota_interface_t osc_quota_interface;
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
-static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
+static int brw_interpret(const struct lu_env *env,
+                         struct ptlrpc_request *req, void *data, int rc);
 int osc_cleanup(struct obd_device *obd);
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
@@ -200,7 +201,8 @@ static inline void osc_set_capa_size(struct ptlrpc_request *req,
                 ;
 }
 
-static int osc_getattr_interpret(struct ptlrpc_request *req,
+static int osc_getattr_interpret(const struct lu_env *env,
+                                 struct ptlrpc_request *req,
                                  struct osc_async_args *aa, int rc)
 {
         struct ost_body *body;
@@ -250,7 +252,7 @@ static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
         osc_pack_req_body(req, oinfo);
 
         ptlrpc_request_set_replen(req);
-        req->rq_interpret_reply = osc_getattr_interpret;
+        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
@@ -345,7 +347,8 @@ out:
         RETURN(rc);
 }
 
-static int osc_setattr_interpret(struct ptlrpc_request *req,
+static int osc_setattr_interpret(const struct lu_env *env,
+                                 struct ptlrpc_request *req,
                                  struct osc_async_args *aa, int rc)
 {
         struct ost_body *body;
@@ -390,15 +393,16 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
 
         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
                 LASSERT(oti);
-                *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
+                oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
         }
 
-        /* do mds to ost setattr asynchronouly */
+        /* do mds to ost setattr asynchronously */
         if (!rqset) {
                 /* Do not wait for response. */
                 ptlrpcd_add_req(req);
         } else {
-                req->rq_interpret_reply = osc_setattr_interpret;
+                req->rq_interpret_reply =
+                        (ptlrpc_interpterer_t)osc_setattr_interpret;
 
                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
                 aa = ptlrpc_req_async_args(req);
@@ -445,9 +449,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
 
         ptlrpc_request_set_replen(req);
 
-        if (oa->o_valid & OBD_MD_FLINLINE) {
-                LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
-                        oa->o_flags == OBD_FL_DELORPHAN);
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            oa->o_flags == OBD_FL_DELORPHAN) {
                 DEBUG_REQ(D_HA, req,
                           "delorphan from OST integration");
                 /* Don't resend the delorphan req */
@@ -482,7 +485,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
                         if (!oti->oti_logcookies)
                                 oti_alloc_cookies(oti, 1);
-                        *oti->oti_logcookies = *obdo_logcookie(oa);
+                        *oti->oti_logcookies = oa->o_lcookie;
                 }
         }
 
@@ -496,7 +499,8 @@ out:
         RETURN(rc);
 }
 
-static int osc_punch_interpret(struct ptlrpc_request *req,
+static int osc_punch_interpret(const struct lu_env *env,
+                               struct ptlrpc_request *req,
                                struct osc_async_args *aa, int rc)
 {
         struct ost_body *body;
@@ -553,7 +557,7 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
         ptlrpc_request_set_replen(req);
 
 
-        req->rq_interpret_reply = osc_punch_interpret;
+        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
         aa->aa_oi = oinfo;
@@ -632,13 +636,16 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
         if (res == NULL)
                 RETURN(0);
 
+        LDLM_RESOURCE_ADDREF(res);
         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
                                            lock_flags, 0, NULL);
+        LDLM_RESOURCE_DELREF(res);
         ldlm_resource_putref(res);
         RETURN(count);
 }
 
-static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
+static int osc_destroy_interpret(const struct lu_env *env,
+                                 struct ptlrpc_request *req, void *data,
                                  int rc)
 {
         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
@@ -713,8 +720,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         ptlrpc_at_set_req_timeout(req);
 
         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
-                memcpy(obdo_logcookie(oa), oti->oti_logcookies,
-                       sizeof(*oti->oti_logcookies));
+                oa->o_lcookie = *oti->oti_logcookies;
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
         body->oa = *oa;
@@ -961,7 +967,7 @@ static int check_write_rcs(struct ptlrpc_request *req,
 
         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
-                       requested_nob, req->rq_bulk->bd_nob_transferred);
+                       req->rq_bulk->bd_nob_transferred, requested_nob);
                 return(-EPROTO);
         }
 
@@ -2038,7 +2044,8 @@ static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
         EXIT;
 }
 
-static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
+static int brw_interpret(const struct lu_env *env,
+                         struct ptlrpc_request *req, void *data, int rc)
 {
         struct osc_brw_async_args *aa = data;
         struct client_obd *cli;
@@ -2098,6 +2105,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
         void *caller_data = NULL;
         struct obd_capa *ocapa;
         struct osc_async_page *oap;
+        struct ldlm_lock *lock = NULL;
         int i, rc;
 
         ENTRY;
@@ -2116,6 +2124,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
                 if (ops == NULL) {
                         ops = oap->oap_caller_ops;
                         caller_data = oap->oap_caller_data;
+                        lock = oap->oap_ldlm_lock;
                 }
                 pga[i] = &oap->oap_brw_page;
                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
@@ -2128,6 +2137,10 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
         LASSERT(ops != NULL);
         ops->ap_fill_obdo(caller_data, cmd, oa);
         ocapa = ops->ap_lookup_capa(caller_data, cmd);
+        if (lock) {
+                oa->o_handle = lock->l_remote_handle;
+                oa->o_valid |= OBD_MD_FLHANDLE;
+        }
 
         sort_brw_pages(pga, page_count);
         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
@@ -3040,7 +3053,6 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
         }
 #endif
         lock->l_ast_data = data;
-        lock->l_flags |= (flags & LDLM_FL_NO_LRU);
         unlock_res_and_lock(lock);
         LDLM_LOCK_PUT(lock);
 }
@@ -3089,7 +3101,8 @@ static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
         RETURN(rc);
 }
 
-static int osc_enqueue_interpret(struct ptlrpc_request *req,
+static int osc_enqueue_interpret(const struct lu_env *env,
+                                 struct ptlrpc_request *req,
                                  struct osc_enqueue_args *aa, int rc)
 {
         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
@@ -3232,7 +3245,8 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                         aa->oa_ei = einfo;
                         aa->oa_exp = exp;
 
-                        req->rq_interpret_reply = osc_enqueue_interpret;
+                        req->rq_interpret_reply =
+                                (ptlrpc_interpterer_t)osc_enqueue_interpret;
                         ptlrpc_set_add_req(rqset, req);
                 } else if (intent) {
                         ptlrpc_req_finished(req);
@@ -3315,21 +3329,8 @@ static int osc_cancel_unused(struct obd_export *exp,
         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
 }
 
-static int osc_join_lru(struct obd_export *exp,
-                        struct lov_stripe_md *lsm, int join)
-{
-        struct obd_device *obd = class_exp2obd(exp);
-        struct ldlm_res_id res_id, *resp = NULL;
-
-        if (lsm != NULL) {
-                resp = osc_build_res_name(lsm->lsm_object_id,
-                                          lsm->lsm_object_gr, &res_id);
-        }
-
-        return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
-}
-
-static int osc_statfs_interpret(struct ptlrpc_request *req,
+static int osc_statfs_interpret(const struct lu_env *env,
+                                struct ptlrpc_request *req,
                                 struct osc_async_args *aa, int rc)
 {
         struct obd_statfs *msfs;
@@ -3382,7 +3383,7 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
                 req->rq_no_delay = 1;
         }
 
-        req->rq_interpret_reply = osc_statfs_interpret;
+        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
         aa->aa_oi = oinfo;
@@ -3462,29 +3463,45 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
  */
 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
 {
-        struct lov_user_md lum, *lumk;
+        /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
+        struct lov_user_md_v3 lum, *lumk;
+        struct lov_user_ost_data_v1 *lmm_objects;
         int rc = 0, lum_size;
         ENTRY;
 
         if (!lsm)
                 RETURN(-ENODATA);
 
-        if (copy_from_user(&lum, lump, sizeof(lum)))
+        /* we only need the header part from user space to get lmm_magic and
+         * lmm_stripe_count, (the header part is common to v1 and v3) */
+        lum_size = sizeof(struct lov_user_md_v1);
+        if (copy_from_user(&lum, lump, lum_size))
                 RETURN(-EFAULT);
 
-        if (lum.lmm_magic != LOV_USER_MAGIC)
+        if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
+            (lum.lmm_magic != LOV_USER_MAGIC_V3))
                 RETURN(-EINVAL);
 
+        /* lov_user_md_vX and lov_mds_md_vX must have the same size */
+        LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
+        LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
+        LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
+
+        /* we can use lov_mds_md_size() to compute lum_size
+         * because lov_user_md_vX and lov_mds_md_vX have the same size */
         if (lum.lmm_stripe_count > 0) {
-                lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
+                lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
                 OBD_ALLOC(lumk, lum_size);
                 if (!lumk)
                         RETURN(-ENOMEM);
 
-                lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
-                lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
+                if (lum.lmm_magic == LOV_USER_MAGIC_V1)
+                        lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
+                else
+                        lmm_objects = &(lumk->lmm_objects[0]);
+                lmm_objects->l_object_id = lsm->lsm_object_id;
         } else {
-                lum_size = sizeof(lum);
+                lum_size = lov_mds_md_size(0, lum.lmm_magic);
                 lumk = &lum;
         }
 
@@ -3682,7 +3699,8 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen,
         RETURN(-EINVAL);
 }
 
-static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
+static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
+                                          struct ptlrpc_request *req,
                                           void *aa, int rc)
 {
         struct llog_ctxt *ctxt;
@@ -3810,7 +3828,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
 
         ptlrpc_request_set_replen(req);
         ptlrpc_set_add_req(set, req);
-        ptlrpc_check_set(set);
+        ptlrpc_check_set(NULL, set);
 
         RETURN(0);
 }
@@ -3848,8 +3866,14 @@ static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
 
         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
                         NULL, &osc_size_repl_logops);
-        if (rc)
+        if (rc) {
+                struct llog_ctxt *ctxt =
+                        llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+                if (ctxt)
+                        llog_cleanup(ctxt);
                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
+        }
+        GOTO(out, rc);
 out:
         if (rc) {
                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
@@ -3857,7 +3881,7 @@ out:
                 CERROR("logid "LPX64":0x%x\n",
                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
         }
-        RETURN(rc);
+        return rc;
 }
 
 static int osc_llog_finish(struct obd_device *obd, int count)
@@ -3882,7 +3906,8 @@ static int osc_llog_finish(struct obd_device *obd, int count)
 static int osc_reconnect(const struct lu_env *env,
                          struct obd_export *exp, struct obd_device *obd,
                          struct obd_uuid *cluuid,
-                         struct obd_connect_data *data)
+                         struct obd_connect_data *data,
+                         void *localdata)
 {
         struct client_obd *cli = &obd->u.cli;
 
@@ -4203,7 +4228,6 @@ struct obd_ops osc_obd_ops = {
         .o_change_cbdata        = osc_change_cbdata,
         .o_cancel               = osc_cancel,
         .o_cancel_unused        = osc_cancel_unused,
-        .o_join_lru             = osc_join_lru,
         .o_iocontrol            = osc_iocontrol,
         .o_get_info             = osc_get_info,
         .o_set_info_async       = osc_set_info_async,
@@ -4216,6 +4240,7 @@ struct obd_ops osc_obd_ops = {
         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
 };
+
 int __init osc_init(void)
 {
         struct lprocfs_static_vars lvars = { 0 };