Whamcloud - gitweb
Directly associate cached pages to lock that protect those pages,
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 3a8b7ab..679e78f 100644 (file)
 #include <lustre_log.h>
 #include <lustre_debug.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 #include "osc_internal.h"
 
 static quota_interface_t *quota_interface = NULL;
 extern quota_interface_t osc_quota_interface;
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
+int osc_cleanup(struct obd_device *obd);
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
@@ -2560,9 +2562,12 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                         struct lov_oinfo *loi, cfs_page_t *page,
                         obd_off offset, struct obd_async_page_ops *ops,
-                        void *data, void **res)
+                        void *data, void **res, int nocache,
+                        struct lustre_handle *lockh)
 {
         struct osc_async_page *oap;
+        struct ldlm_res_id oid = {{0}};
+        int rc = 0;
         ENTRY;
 
         if (!page)
@@ -2582,9 +2587,25 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+        CFS_INIT_LIST_HEAD(&oap->oap_page_list);
 
         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
 
+        spin_lock_init(&oap->oap_lock);
+
+        /* If the page was marked as notcacheable - don't add to any locks */ 
+        if (!nocache) {
+                oid.name[0] = loi->loi_id;
+                oid.name[2] = loi->loi_gr;
+                /* This is the only place where we can call cache_add_extent
+                   without oap_lock, because this page is locked now, and
+                   the lock we are adding it to is referenced, so cannot lose
+                   any pages either. */
+                rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
+                if (rc)
+                        RETURN(rc);
+        }
+
         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
         RETURN(0);
 }
@@ -2869,6 +2890,7 @@ static int osc_teardown_async_page(struct obd_export *exp,
                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
         }
         loi_list_maint(cli, loi);
+        cache_remove_extent(cli->cl_cache, oap);
 
         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
 out:
@@ -2876,6 +2898,49 @@ out:
         RETURN(rc);
 }
 
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+                           struct ldlm_lock_desc *new, void *data,
+                           int flag)
+{
+        struct lustre_handle lockh = { 0 };
+        int rc;
+        ENTRY;  
+                
+        if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
+                LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
+                LBUG(); 
+        }       
+
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
+                break;
+        case LDLM_CB_CANCELING: {
+
+                ldlm_lock2handle(lock, &lockh);
+                /* This lock wasn't granted, don't try to do anything */
+                if (lock->l_req_mode != lock->l_granted_mode)
+                        RETURN(0);
+
+                cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
+                                  &lockh);
+
+                if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
+                        lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
+                                                          lock, new, data,flag);
+                break;
+        }
+        default:
+                LBUG();
+        }
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(osc_extent_blocking_cb);
+
 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                                     int flags)
 {
@@ -2920,8 +2985,8 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
         return 0;
 }
 
-static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
-                            int intent, int rc)
+static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
+                            struct obd_info *oinfo, int intent, int rc)
 {
         ENTRY;
 
@@ -2945,6 +3010,9 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
         }
 
+        if (!rc)
+                cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
+
         /* Call the update callback. */
         rc = oinfo->oi_cb_up(oinfo, rc);
         RETURN(rc);
@@ -2971,7 +3039,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
                                    aa->oa_oi->oi_lockh, rc);
 
         /* Complete osc stuff. */
-        rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
+        rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
 
         /* Release the lock for async request. */
         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
@@ -3101,7 +3169,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
         }
 
-        rc = osc_enqueue_fini(req, oinfo, intent, rc);
+        rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
         if (intent)
                 ptlrpc_req_finished(req);
 
@@ -3836,6 +3904,11 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
+                cli->cl_cache = cache_create(obd);
+                if (!cli->cl_cache) {
+                        osc_cleanup(obd);
+                        rc = -ENOMEM;
+                }
         }
 
         RETURN(rc);
@@ -3901,12 +3974,50 @@ int osc_cleanup(struct obd_device *obd)
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
+        cache_destroy(obd->u.cli.cl_cache);
         rc = client_obd_cleanup(obd);
 
         ptlrpcd_decref();
         RETURN(rc);
 }
 
+static int osc_register_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func,
+                                        obd_pin_extent_cb pin_cb)
+{
+        return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
+                                           pin_cb);
+}
+
+static int osc_unregister_page_removal_cb(struct obd_export *exp,
+                                          obd_page_removal_cb_t func)
+{
+        return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
+}
+
+static int osc_register_lock_cancel_cb(struct obd_export *exp,
+                                       obd_lock_cancel_cb cb)
+{
+        LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+
+        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
+        return 0;
+}
+
+static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb cb)
+{
+        if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
+                CERROR("Unregistering cancel cb %p, while only %p was "
+                       "registered\n", cb,
+                       exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
+                RETURN(-EINVAL);
+        }
+
+        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
+        return 0;
+}
+
 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
 {
         struct lustre_cfg *lcfg = buf;
@@ -3972,6 +4083,10 @@ struct obd_ops osc_obd_ops = {
         .o_llog_init            = osc_llog_init,
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
+        .o_register_page_removal_cb = osc_register_page_removal_cb,
+        .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
+        .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
+        .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
 };
 int __init osc_init(void)
 {