#include <lustre_log.h>
#include <lustre_debug.h>
#include <lustre_param.h>
+#include <lustre_cache.h>
#include "osc_internal.h"
static quota_interface_t *quota_interface = NULL;
extern quota_interface_t osc_quota_interface;
static void osc_release_ppga(struct brw_page **ppga, obd_count count);
+int osc_cleanup(struct obd_device *obd);
/* Pack OSC object metadata for disk storage (LE byte order). */
static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
struct lov_oinfo *loi, cfs_page_t *page,
obd_off offset, struct obd_async_page_ops *ops,
- void *data, void **res)
+ void *data, void **res, int nocache,
+ struct lustre_handle *lockh)
{
struct osc_async_page *oap;
+ struct ldlm_res_id oid = {{0}};
+ int rc = 0;
ENTRY;
if (!page)
CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+ CFS_INIT_LIST_HEAD(&oap->oap_page_list);
oap->oap_occ.occ_interrupted = osc_occ_interrupted;
+ spin_lock_init(&oap->oap_lock);
+
+ /* If the page was marked as notcacheable - don't add to any locks */
+ if (!nocache) {
+ oid.name[0] = loi->loi_id;
+ oid.name[2] = loi->loi_gr;
+ /* This is the only place where we can call cache_add_extent
+ without oap_lock, because this page is locked now, and
+ the lock we are adding it to is referenced, so cannot lose
+ any pages either. */
+ rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
+ if (rc)
+ RETURN(rc);
+ }
+
CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
RETURN(0);
}
lop_update_pending(cli, lop, oap->oap_cmd, -1);
}
loi_list_maint(cli, loi);
+ cache_remove_extent(cli->cl_cache, oap);
LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
out:
RETURN(rc);
}
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *new, void *data,
+ int flag)
+{
+ struct lustre_handle lockh = { 0 };
+ int rc;
+ ENTRY;
+
+ if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
+ LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
+ LBUG();
+ }
+
+ switch (flag) {
+ case LDLM_CB_BLOCKING:
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc != ELDLM_OK)
+ CERROR("ldlm_cli_cancel failed: %d\n", rc);
+ break;
+ case LDLM_CB_CANCELING: {
+
+ ldlm_lock2handle(lock, &lockh);
+ /* This lock wasn't granted, don't try to do anything */
+ if (lock->l_req_mode != lock->l_granted_mode)
+ RETURN(0);
+
+ cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
+ &lockh);
+
+ if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
+ lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
+ lock, new, data,flag);
+ break;
+ }
+ default:
+ LBUG();
+ }
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(osc_extent_blocking_cb);
+
static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
int flags)
{
return 0;
}
-static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
- int intent, int rc)
+static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
+ struct obd_info *oinfo, int intent, int rc)
{
ENTRY;
oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
}
+ if (!rc)
+ cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
+
/* Call the update callback. */
rc = oinfo->oi_cb_up(oinfo, rc);
RETURN(rc);
aa->oa_oi->oi_lockh, rc);
/* Complete osc stuff. */
- rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
+ rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
/* Release the lock for async request. */
if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
RETURN(rc);
}
- rc = osc_enqueue_fini(req, oinfo, intent, rc);
+ rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
if (intent)
ptlrpc_req_finished(req);
ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
OST_MAXREQSIZE,
ptlrpc_add_rqs_to_pool);
+ cli->cl_cache = cache_create(obd);
+ if (!cli->cl_cache) {
+ osc_cleanup(obd);
+ rc = -ENOMEM;
+ }
}
RETURN(rc);
/* free memory of osc quota cache */
lquota_cleanup(quota_interface, obd);
+ cache_destroy(obd->u.cli.cl_cache);
rc = client_obd_cleanup(obd);
ptlrpcd_decref();
RETURN(rc);
}
+static int osc_register_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func,
+ obd_pin_extent_cb pin_cb)
+{
+ return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
+ pin_cb);
+}
+
+static int osc_unregister_page_removal_cb(struct obd_export *exp,
+ obd_page_removal_cb_t func)
+{
+ return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
+}
+
+static int osc_register_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
+ return 0;
+}
+
+static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
+ obd_lock_cancel_cb cb)
+{
+ if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
+ CERROR("Unregistering cancel cb %p, while only %p was "
+ "registered\n", cb,
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
+ RETURN(-EINVAL);
+ }
+
+ exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
+ return 0;
+}
+
static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
{
struct lustre_cfg *lcfg = buf;
.o_llog_init = osc_llog_init,
.o_llog_finish = osc_llog_finish,
.o_process_config = osc_process_config,
+ .o_register_page_removal_cb = osc_register_page_removal_cb,
+ .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
+ .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
+ .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
};
int __init osc_init(void)
{