*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*/
#define DEBUG_SUBSYSTEM S_OSC
#include <linux/workqueue.h>
+#include <libcfs/libcfs.h>
+#include <linux/falloc.h>
#include <lprocfs_status.h>
-#include <lustre_debug.h>
#include <lustre_dlm.h>
#include <lustre_fid.h>
#include <lustre_ha.h>
#include <uapi/linux/lustre/lustre_ioctl.h>
+#include <lustre_ioctl_old.h>
#include <lustre_net.h>
#include <lustre_obdo.h>
+#include <lustre_osc.h>
#include <obd.h>
#include <obd_cksum.h>
#include <obd_class.h>
-#include <lustre_osc.h>
#include "osc_internal.h"
+#include <lnet/lnet_rdma.h>
atomic_t osc_pool_req_count;
unsigned int osc_reqpool_maxreqcount;
static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
void *data, int rc);
-void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
+static void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
{
struct ost_body *body;
static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
struct obdo *oa)
{
- struct ptlrpc_request *req;
- struct ost_body *body;
- int rc;
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ int rc;
ENTRY;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
EXIT;
out:
- ptlrpc_req_finished(req);
+ ptlrpc_req_put(req);
return rc;
}
static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
struct obdo *oa)
{
- struct ptlrpc_request *req;
- struct ost_body *body;
- int rc;
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ int rc;
ENTRY;
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
EXIT;
out:
- ptlrpc_req_finished(req);
+ ptlrpc_req_put(req);
RETURN(rc);
}
static int osc_setattr_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *args, int rc)
{
- struct osc_setattr_args *sa = args;
+ struct osc_setattr_args *sa = args;
struct ost_body *body;
ENTRY;
obd_enqueue_update_f upcall, void *cookie,
struct ptlrpc_request_set *rqset)
{
- struct ptlrpc_request *req;
- struct osc_setattr_args *sa;
- int rc;
+ struct ptlrpc_request *req;
+ struct osc_setattr_args *sa;
+ int rc;
ENTRY;
sa->sa_upcall = upcall;
sa->sa_cookie = cookie;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
+ ptlrpc_set_add_req(rqset, req);
}
RETURN(0);
{
struct osc_ladvise_args *la = arg;
struct ost_body *body;
- ENTRY;
+ ENTRY;
if (rc != 0)
GOTO(out, rc);
obd_enqueue_update_f upcall, void *cookie,
struct ptlrpc_request_set *rqset)
{
- struct ptlrpc_request *req;
- struct ost_body *body;
- struct osc_ladvise_args *la;
- int rc;
- struct lu_ladvise *req_ladvise;
- struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
- int num_advise = ladvise_hdr->lah_count;
- struct ladvise_hdr *req_ladvise_hdr;
- ENTRY;
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ struct osc_ladvise_args *la;
+ struct lu_ladvise *req_ladvise;
+ struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
+ int num_advise = ladvise_hdr->lah_count;
+ struct ladvise_hdr *req_ladvise_hdr;
+ int rc;
+ ENTRY;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
if (req == NULL)
RETURN(-ENOMEM);
la->la_upcall = upcall;
la->la_cookie = cookie;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
+ ptlrpc_set_add_req(rqset, req);
RETURN(0);
}
static int osc_create(const struct lu_env *env, struct obd_export *exp,
struct obdo *oa)
{
- struct ptlrpc_request *req;
- struct ost_body *body;
- int rc;
- ENTRY;
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ int rc;
+ ENTRY;
LASSERT(oa != NULL);
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
- if (req == NULL)
- GOTO(out, rc = -ENOMEM);
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
+ if (req == NULL)
+ GOTO(out, rc = -ENOMEM);
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
- if (rc) {
- ptlrpc_request_free(req);
- GOTO(out, rc);
- }
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
+ if (rc) {
+ ptlrpc_request_free(req);
+ GOTO(out, rc);
+ }
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ LASSERT(body);
lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
- ptlrpc_request_set_replen(req);
+ ptlrpc_request_set_replen(req);
- rc = ptlrpc_queue_wait(req);
- if (rc)
- GOTO(out_req, rc);
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out_req, rc);
- body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- GOTO(out_req, rc = -EPROTO);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ GOTO(out_req, rc = -EPROTO);
CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
CDEBUG(D_HA, "transno: %lld\n",
lustre_msg_get_transno(req->rq_repmsg));
out_req:
- ptlrpc_req_finished(req);
+ ptlrpc_req_put(req);
out:
RETURN(rc);
}
}
EXPORT_SYMBOL(osc_punch_send);
+/**
+ * osc_fallocate_base() - Handles fallocate request.
+ *
+ * @exp: Export structure
+ * @oa: Attributes passed to OSS from client (obdo structure)
+ * @upcall: Primary & supplementary group information
+ * @cookie: Exclusive identifier
+ * @rqset: Request list.
+ * @mode: Operation done on given range.
+ *
+ * osc_fallocate_base() - Handles fallocate requests only. Only block
+ * allocation or standard preallocate operation is supported currently.
+ * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
+ * is supported via SETATTR request.
+ *
+ * Return: Non-zero on failure and O on success.
+ */
+int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
+ obd_enqueue_update_f upcall, void *cookie, int mode)
+{
+ struct ptlrpc_request *req;
+ struct osc_setattr_args *sa;
+ struct ost_body *body;
+ struct obd_import *imp = class_exp2cliimp(exp);
+ int rc;
+
+ ENTRY;
+ oa->o_falloc_mode = mode;
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+ &RQF_OST_FALLOCATE);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
+ if (rc != 0) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ LASSERT(body);
+
+ lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
+
+ ptlrpc_request_set_replen(req);
+
+ req->rq_interpret_reply = osc_setattr_interpret;
+ BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
+ sa = ptlrpc_req_async_args(sa, req);
+ sa->sa_oa = oa;
+ sa->sa_upcall = upcall;
+ sa->sa_cookie = cookie;
+
+ ptlrpcd_add_req(req);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(osc_fallocate_base);
+
static int osc_sync_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *args, int rc)
{
+ const char *obd_name = req->rq_import->imp_obd->obd_name;
struct osc_fsync_args *fa = args;
struct ost_body *body;
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
unsigned long valid = 0;
struct cl_object *obj;
- ENTRY;
+ ENTRY;
if (rc != 0)
GOTO(out, rc);
body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL) {
- CERROR("can't unpack ost_body\n");
- GOTO(out, rc = -EPROTO);
+ rc = -EPROTO;
+ CERROR("%s: Failed to unpack ost_body: rc = %d\n", obd_name,
+ rc);
+ GOTO(out, rc);
}
*fa->fa_oa = body->oa;
int osc_sync_base(struct osc_object *obj, struct obdo *oa,
obd_enqueue_update_f upcall, void *cookie,
- struct ptlrpc_request_set *rqset)
+ struct ptlrpc_request_set *rqset)
{
- struct obd_export *exp = osc_export(obj);
+ struct obd_export *exp = osc_export(obj);
struct ptlrpc_request *req;
- struct ost_body *body;
+ struct ost_body *body;
struct osc_fsync_args *fa;
- int rc;
- ENTRY;
+ int rc;
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
- if (req == NULL)
- RETURN(-ENOMEM);
+ ENTRY;
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
+ if (req == NULL)
+ RETURN(-ENOMEM);
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
/* overload the size and blocks fields in the oa with start/end */
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
fa->fa_upcall = upcall;
fa->fa_cookie = cookie;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
+ ptlrpc_set_add_req(rqset, req);
- RETURN (0);
+ RETURN(0);
}
/* Find and cancel locally locks matched by @mode in the resource found by
* @objid. Found locks are added into @cancel list. Returns the amount of
- * locks added to @cancels list. */
+ * locks added to @cancels list.
+ */
static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
struct list_head *cancels,
enum ldlm_mode mode, __u64 lock_flags)
struct ldlm_res_id res_id;
struct ldlm_resource *res;
int count;
- ENTRY;
+ ENTRY;
/* Return, i.e. cancel nothing, only if ELC is supported (flag in
* export) but disabled through procfs (flag in NS).
*
* This distinguishes from a case when ELC is not supported originally,
* when we still want to cancel locks in advance and just cancel them
- * locally, without sending any RPC. */
+ * locally, without sending any RPC.
+ */
if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
RETURN(0);
ostid_build_res_name(&oa->o_oi, &res_id);
- res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
+ res = ldlm_resource_get(ns, &res_id, 0, 0);
if (IS_ERR(res))
RETURN(0);
- LDLM_RESOURCE_ADDREF(res);
- count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
- lock_flags, 0, NULL);
- LDLM_RESOURCE_DELREF(res);
- ldlm_resource_putref(res);
- RETURN(count);
+ count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
+ lock_flags, 0, NULL);
+ ldlm_resource_putref(res);
+ RETURN(count);
}
static int osc_destroy_interpret(const struct lu_env *env,
static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
struct obdo *oa)
{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- struct ptlrpc_request *req;
- struct ost_body *body;
- struct list_head cancels = LIST_HEAD_INIT(cancels);
- int rc, count;
- ENTRY;
-
- if (!oa) {
- CDEBUG(D_INFO, "oa NULL\n");
- RETURN(-EINVAL);
- }
-
- count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
- LDLM_FL_DISCARD_DATA);
-
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
- if (req == NULL) {
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
- RETURN(-ENOMEM);
- }
-
- rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
- 0, &cancels, count);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
-
- req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
- ptlrpc_at_set_req_timeout(req);
+ struct client_obd *cli = &exp->exp_obd->u.cli;
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ LIST_HEAD(cancels);
+ int rc, count;
+
+ ENTRY;
+ if (!oa) {
+ CDEBUG(D_INFO, "oa NULL\n");
+ RETURN(-EINVAL);
+ }
+
+ count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
+ LDLM_FL_DISCARD_DATA);
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
+ if (req == NULL) {
+ ldlm_lock_list_put(&cancels, l_bl_ast, count);
+ RETURN(-ENOMEM);
+ }
+
+ rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
+ 0, &cancels, count);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
+ ptlrpc_at_set_req_timeout(req);
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
- ptlrpc_request_set_replen(req);
+ ptlrpc_request_set_replen(req);
req->rq_interpret_reply = osc_destroy_interpret;
if (!osc_can_send_destroy(cli)) {
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-
/*
* Wait until the number of on-going destroy RPCs drops
* under max_rpc_in_flight
*/
- rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
- osc_can_send_destroy(cli), &lwi);
+ rc = l_wait_event_abortable_exclusive(
+ cli->cl_destroy_waitq,
+ osc_can_send_destroy(cli));
if (rc) {
- ptlrpc_req_finished(req);
- RETURN(rc);
+ ptlrpc_req_put(req);
+ RETURN(-EINTR);
}
}
}
static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
- long writing_bytes)
+ long writing_bytes)
{
u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
oa->o_valid |= bits;
spin_lock(&cli->cl_loi_list_lock);
- if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
+ if (cli->cl_ocd_grant_param)
oa->o_dirty = cli->cl_dirty_grant;
else
oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
- if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
- cli->cl_dirty_max_pages)) {
- CERROR("dirty %lu - %lu > dirty_max %lu\n",
- cli->cl_dirty_pages, cli->cl_dirty_transit,
+ if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
+ CERROR("%s: dirty %lu > dirty_max %lu\n", cli_name(cli),
+ cli->cl_dirty_pages,
cli->cl_dirty_max_pages);
oa->o_undirty = 0;
- } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
- atomic_long_read(&obd_dirty_transit_pages) >
+ } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
(long)(obd_max_dirty_pages + 1))) {
/* The atomic_read() allowing the atomic_inc() are
* not covered by a lock thus they may safely race and trip
- * this CERROR() unless we add in a small fudge factor (+1). */
- CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
+ * this CERROR() unless we add in a small fudge factor (+1)
+ */
+ CERROR("%s: dirty %ld > system dirty_max %ld\n",
cli_name(cli), atomic_long_read(&obd_dirty_pages),
- atomic_long_read(&obd_dirty_transit_pages),
obd_max_dirty_pages);
oa->o_undirty = 0;
} else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
0x7fffffff)) {
- CERROR("dirty %lu - dirty_max %lu too big???\n",
- cli->cl_dirty_pages, cli->cl_dirty_max_pages);
+ CERROR("%s: dirty %lu - dirty_max %lu too big???\n",
+ cli_name(cli), cli->cl_dirty_pages,
+ cli->cl_dirty_max_pages);
oa->o_undirty = 0;
} else {
unsigned long nrpages;
nrpages *= cli->cl_max_rpcs_in_flight + 1;
nrpages = max(nrpages, cli->cl_dirty_max_pages);
undirty = nrpages << PAGE_SHIFT;
- if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
- GRANT_PARAM)) {
+ if (cli->cl_ocd_grant_param) {
int nrextents;
/* take extent tax into account when asking for more
- * grant space */
- nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
+ * grant space
+ */
+ nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
cli->cl_max_extent_pages;
undirty += nrextents * cli->cl_grant_extent_tax;
}
*/
oa->o_undirty = min(undirty, OBD_MAX_GRANT &
~(PTLRPC_MAX_BRW_SIZE * 4UL));
- }
+ }
oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
- oa->o_dropped = cli->cl_lost_grant;
- cli->cl_lost_grant = 0;
+ /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
+ if (cli->cl_lost_grant > INT_MAX) {
+ CDEBUG(D_CACHE,
+ "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
+ cli_name(cli), cli->cl_lost_grant);
+ oa->o_dropped = INT_MAX;
+ } else {
+ oa->o_dropped = cli->cl_lost_grant;
+ }
+ cli->cl_lost_grant -= oa->o_dropped;
spin_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
- oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
+ CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu cl_lost_grant %lu\n",
+ cli_name(cli), oa->o_dirty, oa->o_undirty, oa->o_dropped,
+ oa->o_grant, cli->cl_lost_grant);
}
void osc_update_next_shrink(struct client_obd *cli)
CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
cli->cl_next_shrink_grant);
}
+EXPORT_SYMBOL(osc_update_next_shrink);
static void __osc_update_grant(struct client_obd *cli, u64 grant)
{
static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
{
- if (body->oa.o_valid & OBD_MD_FLGRANT) {
+ if (body->oa.o_valid & OBD_MD_FLGRANT) {
CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
- __osc_update_grant(cli, body->oa.o_grant);
- }
+ __osc_update_grant(cli, body->oa.o_grant);
+ }
}
/**
oa->o_grant = cli->cl_avail_grant / 4;
cli->cl_avail_grant -= oa->o_grant;
spin_unlock(&cli->cl_loi_list_lock);
- if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
- oa->o_valid |= OBD_MD_FLFLAGS;
- oa->o_flags = 0;
- }
- oa->o_flags |= OBD_FL_SHRINK_GRANT;
- osc_update_next_shrink(cli);
+ if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
+ oa->o_valid |= OBD_MD_FLFLAGS;
+ oa->o_flags = 0;
+ }
+ oa->o_flags |= OBD_FL_SHRINK_GRANT;
+ osc_update_next_shrink(cli);
}
/* Shrink the current grant, either from some large amount to enough for a
* full set of in-flight RPCs, or if we have already shrunk to that limit
* then to enough for a single RPC. This avoids keeping more grant than
- * needed, and avoids shrinking the grant piecemeal. */
+ * needed, and avoids shrinking the grant piecemeal.
+ */
static int osc_shrink_grant(struct client_obd *cli)
{
__u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
{
- int rc = 0;
- struct ost_body *body;
- ENTRY;
+ int rc = 0;
+ struct ost_body *body;
+ ENTRY;
spin_lock(&cli->cl_loi_list_lock);
/* Don't shrink if we are already above or below the desired limit
* We don't want to shrink below a single RPC, as that will negatively
- * impact block allocation and long-term performance. */
+ * impact block allocation and long-term performance.
+ */
if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
body->oa.o_grant = cli->cl_avail_grant - target_bytes;
cli->cl_avail_grant = target_bytes;
spin_unlock(&cli->cl_loi_list_lock);
- if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
- body->oa.o_valid |= OBD_MD_FLFLAGS;
- body->oa.o_flags = 0;
- }
- body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
- osc_update_next_shrink(cli);
-
- rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
- sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
- sizeof(*body), body, NULL);
- if (rc != 0)
- __osc_update_grant(cli, body->oa.o_grant);
+ if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
+ body->oa.o_valid |= OBD_MD_FLFLAGS;
+ body->oa.o_flags = 0;
+ }
+ body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
+ osc_update_next_shrink(cli);
+
+ rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
+ sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
+ sizeof(*body), body, NULL);
+ if (rc != 0)
+ __osc_update_grant(cli, body->oa.o_grant);
out_free:
- OBD_FREE_PTR(body);
- RETURN(rc);
+ OBD_FREE_PTR(body);
+ RETURN(rc);
}
static int osc_should_shrink_grant(struct client_obd *client)
if (client->cl_import == NULL)
return 0;
- if ((client->cl_import->imp_connect_data.ocd_connect_flags &
- OBD_CONNECT_GRANT_SHRINK) == 0)
- return 0;
+ if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
+ client->cl_import->imp_grant_shrink_disabled) {
+ osc_update_next_shrink(client);
+ return 0;
+ }
if (ktime_get_seconds() >= next_shrink - 5) {
/* Get the current RPC size directly, instead of going via:
* cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
- * Keep comment here so that it can be found by searching. */
+ * Keep comment here so that it can be found by searching.
+ */
int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
client->cl_avail_grant > brw_size)
return 1;
- else
- osc_update_next_shrink(client);
+ osc_update_next_shrink(client);
}
- return 0;
+ return 0;
}
#define GRANT_SHRINK_RPC_BATCH 100
cancel_delayed_work_sync(&work);
schedule_work(&work.work);
}
+EXPORT_SYMBOL(osc_schedule_grant_work);
/**
* Start grant thread for returing grant to server for idle clients.
spin_lock(&cli->cl_loi_list_lock);
cli->cl_avail_grant = ocd->ocd_grant;
if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
- cli->cl_avail_grant -= cli->cl_reserved_grant;
+ unsigned long consumed = cli->cl_reserved_grant;
+
if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
- cli->cl_avail_grant -= cli->cl_dirty_grant;
+ consumed += cli->cl_dirty_grant;
else
- cli->cl_avail_grant -=
- cli->cl_dirty_pages << PAGE_SHIFT;
+ consumed += cli->cl_dirty_pages << PAGE_SHIFT;
+ if (cli->cl_avail_grant < consumed) {
+ CERROR("%s: granted %ld but already consumed %ld\n",
+ cli_name(cli), cli->cl_avail_grant, consumed);
+ cli->cl_avail_grant = 0;
+ } else {
+ cli->cl_avail_grant -= consumed;
+ }
}
if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
~chunk_mask) & chunk_mask;
/* determine maximum extent size, in #pages */
size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
- cli->cl_max_extent_pages = size >> PAGE_SHIFT;
- if (cli->cl_max_extent_pages == 0)
- cli->cl_max_extent_pages = 1;
+ cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
+ cli->cl_ocd_grant_param = 1;
} else {
+ cli->cl_ocd_grant_param = 0;
cli->cl_grant_extent_tax = 0;
cli->cl_chunkbits = PAGE_SHIFT;
cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
}
spin_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
- "chunk bits: %d cl_max_extent_pages: %d\n",
- cli_name(cli),
- cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
- cli->cl_max_extent_pages);
+ CDEBUG(D_CACHE,
+ "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
+ cli_name(cli),
+ cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
+ cli->cl_max_extent_pages);
if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
osc_add_grant_list(cli);
/* We assume that the reason this OSC got a short read is because it read
* beyond the end of a stripe file; i.e. lustre is reading a sparse file
* via the LOV, and it _knows_ it's reading inside the file, it's just that
- * this stripe never got written at or beyond this stripe offset yet. */
+ * this stripe never got written at or beyond this stripe offset yet.
+ */
static void handle_short_read(int nob_read, size_t page_count,
- struct brw_page **pga)
+ struct brw_page **pga)
{
- char *ptr;
- int i = 0;
+ char *ptr;
+ int i = 0;
- /* skip bytes read OK */
- while (nob_read > 0) {
- LASSERT (page_count > 0);
+ /* skip bytes read OK */
+ while (nob_read > 0) {
+ LASSERT(page_count > 0);
- if (pga[i]->count > nob_read) {
+ if (pga[i]->bp_count > nob_read) {
/* EOF inside this page */
- ptr = kmap(pga[i]->pg) +
- (pga[i]->off & ~PAGE_MASK);
- memset(ptr + nob_read, 0, pga[i]->count - nob_read);
- kunmap(pga[i]->pg);
+ ptr = kmap(pga[i]->bp_page) +
+ (pga[i]->bp_off & ~PAGE_MASK);
+ memset(ptr + nob_read, 0, pga[i]->bp_count - nob_read);
+ kunmap(pga[i]->bp_page);
page_count--;
i++;
break;
}
- nob_read -= pga[i]->count;
- page_count--;
- i++;
- }
+ nob_read -= pga[i]->bp_count;
+ page_count--;
+ i++;
+ }
/* zero remaining pages */
while (page_count-- > 0) {
- ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
- memset(ptr, 0, pga[i]->count);
- kunmap(pga[i]->pg);
+ ptr = kmap(pga[i]->bp_page) + (pga[i]->bp_off & ~PAGE_MASK);
+ memset(ptr, 0, pga[i]->bp_count);
+ kunmap(pga[i]->bp_page);
i++;
}
}
int requested_nob, int niocount,
size_t page_count, struct brw_page **pga)
{
- int i;
- __u32 *remote_rcs;
-
- remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
- sizeof(*remote_rcs) *
- niocount);
- if (remote_rcs == NULL) {
- CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
- return(-EPROTO);
- }
+ const char *obd_name = req->rq_import->imp_obd->obd_name;
+ __u32 *remote_rcs;
+ int i;
+
+ remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
+ sizeof(*remote_rcs) *
+ niocount);
+ if (remote_rcs == NULL) {
+ CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
+ return(-EPROTO);
+ }
- /* return error if any niobuf was in error */
- for (i = 0; i < niocount; i++) {
+ /* return error if any niobuf was in error */
+ for (i = 0; i < niocount; i++) {
if ((int)remote_rcs[i] < 0) {
CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
i, remote_rcs[i], req);
return remote_rcs[i];
}
- if (remote_rcs[i] != 0) {
- CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
- i, remote_rcs[i], req);
- return(-EPROTO);
- }
- }
+ if (remote_rcs[i] != 0) {
+ CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
+ i, remote_rcs[i], req);
+ return(-EPROTO);
+ }
+ }
if (req->rq_bulk != NULL &&
req->rq_bulk->bd_nob_transferred != requested_nob) {
- CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
- req->rq_bulk->bd_nob_transferred, requested_nob);
- return(-EPROTO);
- }
+ CERROR("%s: Unexpected # bytes transferred: %d (requested %d)\n",
+ obd_name, req->rq_bulk->bd_nob_transferred,
+ requested_nob);
+ return(-EPROTO);
+ }
- return (0);
+ return (0);
}
static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
{
- if (p1->flag != p2->flag) {
- unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
- OBD_BRW_SYNC | OBD_BRW_ASYNC |
- OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
-
- /* warn if we try to combine flags that we don't know to be
- * safe to combine */
- if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
- CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
- "report this at https://jira.whamcloud.com/\n",
- p1->flag, p2->flag);
- }
- return 0;
- }
+ if (p1->bp_flag != p2->bp_flag) {
+ unsigned int mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
+ OBD_BRW_SYNC | OBD_BRW_ASYNC |
+ OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC |
+ OBD_BRW_SYS_RESOURCE);
+
+ /* warn if combine flags that we don't know to be safe */
+ if (unlikely((p1->bp_flag & mask) != (p2->bp_flag & mask))) {
+ CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at https://jira.whamcloud.com/\n",
+ p1->bp_flag, p2->bp_flag);
+ }
+ return 0;
+ }
- return (p1->off + p1->count == p2->off);
+ return (p1->bp_off + p1->bp_count == p2->bp_off);
}
#if IS_ENABLED(CONFIG_CRC_T10DIF)
size_t pg_count, struct brw_page **pga,
int opc, obd_dif_csum_fn *fn,
int sector_size,
- u32 *check_sum)
+ u32 *check_sum, bool resend)
{
struct ahash_request *req;
/* Used Adler as the default checksum type on top of DIF tags */
unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
struct page *__page;
unsigned char *buffer;
- __u16 *guard_start;
- unsigned int bufsize;
+ __be16 *guard_start;
int guard_number;
int used_number = 0;
int used;
u32 cksum;
- int rc = 0;
+ unsigned int bufsize = sizeof(cksum);
+ int rc = 0, rc2;
int i = 0;
LASSERT(pg_count > 0);
}
buffer = kmap(__page);
- guard_start = (__u16 *)buffer;
+ guard_start = (__be16 *)buffer;
guard_number = PAGE_SIZE / sizeof(*guard_start);
+ CDEBUG(D_PAGE | (resend ? D_HA : 0),
+ "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n",
+ guard_number, resend, nob, pg_count);
+
while (nob > 0 && pg_count > 0) {
- unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
+ int off = pga[i]->bp_off & ~PAGE_MASK;
+ unsigned int count =
+ pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
+ int guards_needed = DIV_ROUND_UP(off + count, sector_size) -
+ (off / sector_size);
+
+ if (guards_needed > guard_number - used_number) {
+ cfs_crypto_hash_update_page(req, __page, 0,
+ used_number * sizeof(*guard_start));
+ used_number = 0;
+ }
/* corrupt the data before we compute the checksum, to
- * simulate an OST->client data error */
+ * simulate an OST->client data error
+ */
if (unlikely(i == 0 && opc == OST_READ &&
- OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
- unsigned char *ptr = kmap(pga[i]->pg);
- int off = pga[i]->off & ~PAGE_MASK;
+ CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
+ unsigned char *ptr = kmap(pga[i]->bp_page);
memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
- kunmap(pga[i]->pg);
+ kunmap(pga[i]->bp_page);
}
/*
* The left guard number should be able to hold checksums of a
* whole page
*/
- rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
- pga[i]->off & ~PAGE_MASK,
+ rc = obd_page_dif_generate_buffer(obd_name, pga[i]->bp_page,
+ pga[i]->bp_off & ~PAGE_MASK,
count,
guard_start + used_number,
guard_number - used_number,
&used, sector_size,
fn);
+ if (unlikely(resend))
+ CDEBUG(D_PAGE | D_HA,
+ "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n",
+ i, used, pga[i]->bp_off & ~PAGE_MASK, count,
+ (int)(used * sizeof(*guard_start)),
+ guard_start + used_number);
if (rc)
break;
used_number += used;
- if (used_number == guard_number) {
- cfs_crypto_hash_update_page(req, __page, 0,
- used_number * sizeof(*guard_start));
- used_number = 0;
- }
-
- nob -= pga[i]->count;
+ nob -= pga[i]->bp_count;
pg_count--;
i++;
}
kunmap(__page);
if (rc)
- GOTO(out, rc);
+ GOTO(out_hash, rc);
if (used_number != 0)
cfs_crypto_hash_update_page(req, __page, 0,
used_number * sizeof(*guard_start));
- bufsize = sizeof(cksum);
- cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
-
- /* For sending we only compute the wrong checksum instead
- * of corrupting the data so it is still correct on a redo */
- if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
- cksum++;
+out_hash:
+ rc2 = cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
+ if (!rc)
+ rc = rc2;
+ if (rc == 0) {
+ /* For sending we only compute the wrong checksum instead
+ * of corrupting the data so it is still correct on a redo
+ */
+ if (opc == OST_WRITE &&
+ CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+ cksum++;
- *check_sum = cksum;
+ *check_sum = cksum;
+ }
out:
__free_page(__page);
return rc;
#else /* !CONFIG_CRC_T10DIF */
#define obd_dif_ip_fn NULL
#define obd_dif_crc_fn NULL
-#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
+#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \
-EOPNOTSUPP
#endif /* CONFIG_CRC_T10DIF */
enum cksum_types cksum_type,
u32 *cksum)
{
- int i = 0;
- struct ahash_request *req;
- unsigned int bufsize;
- unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
+ int i = 0;
+ struct ahash_request *req;
+ unsigned int bufsize;
+ unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
LASSERT(pg_count > 0);
}
while (nob > 0 && pg_count > 0) {
- unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
+ unsigned int count =
+ pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
/* corrupt the data before we compute the checksum, to
- * simulate an OST->client data error */
+ * simulate an OST->client data error
+ */
if (i == 0 && opc == OST_READ &&
- OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
- unsigned char *ptr = kmap(pga[i]->pg);
- int off = pga[i]->off & ~PAGE_MASK;
+ CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
+ unsigned char *ptr = kmap(pga[i]->bp_page);
+ int off = pga[i]->bp_off & ~PAGE_MASK;
memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
- kunmap(pga[i]->pg);
+ kunmap(pga[i]->bp_page);
}
- cfs_crypto_hash_update_page(req, pga[i]->pg,
- pga[i]->off & ~PAGE_MASK,
+ cfs_crypto_hash_update_page(req, pga[i]->bp_page,
+ pga[i]->bp_off & ~PAGE_MASK,
count);
- LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
- (int)(pga[i]->off & ~PAGE_MASK));
+ LL_CDEBUG_PAGE(D_PAGE, pga[i]->bp_page, "off %d\n",
+ (int)(pga[i]->bp_off & ~PAGE_MASK));
- nob -= pga[i]->count;
+ nob -= pga[i]->bp_count;
pg_count--;
i++;
}
cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
/* For sending we only compute the wrong checksum instead
- * of corrupting the data so it is still correct on a redo */
- if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+ * of corrupting the data so it is still correct on a redo
+ */
+ if (opc == OST_WRITE && CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
(*cksum)++;
return 0;
enum cksum_types cksum_type,
int nob, size_t pg_count,
struct brw_page **pga, int opc,
- u32 *check_sum)
+ u32 *check_sum, bool resend)
{
obd_dif_csum_fn *fn = NULL;
int sector_size = 0;
if (fn)
rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
- opc, fn, sector_size, check_sum);
+ opc, fn, sector_size, check_sum,
+ resend);
else
rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
check_sum);
RETURN(rc);
}
+#ifdef CONFIG_LL_ENCRYPTION
+/**
+ * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks
+ * @srcpage: The locked pagecache page containing the block(s) to encrypt
+ * @dstpage: The page to put encryption result
+ * @len: Total size of the block(s) to encrypt. Must be a nonzero
+ * multiple of the filesystem's block size.
+ * @offs: Byte offset within @page of the first block to encrypt. Must be
+ * a multiple of the filesystem's block size.
+ * @gfp_flags: Memory allocation flags
+ *
+ * This overlay function is necessary to be able to provide our own bounce page.
+ */
+static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage,
+ struct page *dstpage,
+ unsigned int len,
+ unsigned int offs,
+ gfp_t gfp_flags)
+
+{
+ const struct inode *inode = srcpage->mapping->host;
+ const unsigned int blockbits = inode->i_blkbits;
+ const unsigned int blocksize = 1 << blockbits;
+ u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) +
+ (offs >> blockbits);
+ unsigned int i;
+ int err;
+
+ if (unlikely(!dstpage))
+ return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs,
+ gfp_flags);
+
+ if (WARN_ON_ONCE(!PageLocked(srcpage)))
+ return ERR_PTR(-EINVAL);
+
+ if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize)))
+ return ERR_PTR(-EINVAL);
+
+ /* Set PagePrivate2 for disambiguation in
+ * osc_finalize_bounce_page().
+ * It means cipher page was not allocated by llcrypt.
+ */
+ SetPagePrivate2(dstpage);
+
+ for (i = offs; i < offs + len; i += blocksize, lblk_num++) {
+ err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize,
+ i, lblk_num, gfp_flags);
+ if (err)
+ return ERR_PTR(err);
+ }
+ SetPagePrivate(dstpage);
+ set_page_private(dstpage, (unsigned long)srcpage);
+ return dstpage;
+}
+
+/**
+ * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page
+ *
+ * This overlay function is necessary to handle bounce pages
+ * allocated by ourselves.
+ */
+static inline void osc_finalize_bounce_page(struct page **pagep)
+{
+ struct page *page = *pagep;
+
+ ClearPageChecked(page);
+ /* PagePrivate2 was set in osc_encrypt_pagecache_blocks
+ * to indicate the cipher page was allocated by ourselves.
+ * So we must not free it via llcrypt.
+ */
+ if (unlikely(!page || !PagePrivate2(page)))
+ return llcrypt_finalize_bounce_page(pagep);
+
+ if (llcrypt_is_bounce_page(page)) {
+ *pagep = llcrypt_pagecache_page(page);
+ ClearPagePrivate2(page);
+ set_page_private(page, (unsigned long)NULL);
+ ClearPagePrivate(page);
+ }
+}
+#else /* !CONFIG_LL_ENCRYPTION */
+#define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \
+ llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags)
+#define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page)
+#endif
+
+static inline void osc_release_bounce_pages(struct brw_page **pga,
+ u32 page_count)
+{
+#ifdef HAVE_LUSTRE_CRYPTO
+ struct page **pa = NULL;
+ int i, j = 0;
+
+ if (!pga[0])
+ return;
+
+#ifdef CONFIG_LL_ENCRYPTION
+ if (PageChecked(pga[0]->bp_page)) {
+ OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
+ if (!pa)
+ return;
+ }
+#endif
+
+ for (i = 0; i < page_count; i++) {
+ /* Bounce pages used by osc_encrypt_pagecache_blocks()
+ * called from osc_brw_prep_request()
+ * are identified thanks to the PageChecked flag.
+ */
+ if (PageChecked(pga[i]->bp_page)) {
+ if (pa)
+ pa[j++] = pga[i]->bp_page;
+ osc_finalize_bounce_page(&pga[i]->bp_page);
+ }
+ pga[i]->bp_count -= pga[i]->bp_count_diff;
+ pga[i]->bp_off += pga[i]->bp_off_diff;
+ }
+
+ if (pa) {
+ obd_pool_put_pages_array(pa, j);
+ OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
+ }
+#endif
+}
+
+static inline bool is_interop_required(u64 foffset, u32 off0, u32 npgs,
+ struct brw_page **pga)
+{
+ struct brw_page *pg0 = pga[0];
+ struct brw_page *pgN = pga[npgs - 1];
+ const u32 nob = ((npgs - 2) << PAGE_SHIFT) + pg0->bp_count +
+ pgN->bp_count;
+
+ return ((nob + off0) >= LNET_MTU &&
+ cl_io_nob_aligned(foffset, nob, MD_MAX_INTEROP_PAGE_SIZE) !=
+ cl_io_nob_aligned(foffset, nob, MD_MIN_INTEROP_PAGE_SIZE));
+}
+
+static inline u32 interop_pages(u64 foffset, u32 npgs, struct brw_page **pga)
+{
+ u32 off0;
+
+ if (foffset == 0 || npgs < 15)
+ return 0;
+
+ off0 = (foffset & (MD_MAX_INTEROP_PAGE_SIZE - 1));
+ if (is_interop_required(foffset, off0, npgs, pga))
+ return off0 >> MD_MIN_INTEROP_PAGE_SHIFT;
+
+ return 0;
+}
+
static int
osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
u32 page_count, struct brw_page **pga,
struct ptlrpc_request **reqp, int resend)
{
- struct ptlrpc_request *req;
- struct ptlrpc_bulk_desc *desc;
- struct ost_body *body;
- struct obd_ioobj *ioobj;
- struct niobuf_remote *niobuf;
+ struct ptlrpc_request *req;
+ struct ptlrpc_bulk_desc *desc;
+ struct ost_body *body;
+ struct obd_ioobj *ioobj;
+ struct niobuf_remote *niobuf;
int niocount, i, requested_nob, opc, rc, short_io_size = 0;
- struct osc_brw_async_args *aa;
- struct req_capsule *pill;
- struct brw_page *pg_prev;
+ struct osc_brw_async_args *aa;
+ struct req_capsule *pill;
+ struct brw_page *pg_prev;
void *short_io_buf;
const char *obd_name = cli->cl_import->imp_obd->obd_name;
+ struct inode *inode = NULL;
+ bool directio = false;
+ bool gpu = 0;
+ bool enable_checksum = true;
+ struct cl_page *clpage;
+ u64 foffset = 0;
- ENTRY;
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
- RETURN(-ENOMEM); /* Recoverable */
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
- RETURN(-EINVAL); /* Fatal */
+ ENTRY;
+ if (pga[0]->bp_page) {
+ clpage = oap2cl_page(brw_page2oap(pga[0]));
+ inode = clpage->cp_inode;
+ if (clpage->cp_type == CPT_TRANSIENT) {
+ directio = true;
+ /* When page size interop logic is not supported by the
+ * remote server use the old logic.
+ */
+ if (imp_connect_unaligned_dio(cli->cl_import))
+ foffset = pga[0]->bp_off;
+ }
+ }
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
+ RETURN(-ENOMEM); /* Recoverable */
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
+ RETURN(-EINVAL); /* Fatal */
if ((cmd & OBD_BRW_WRITE) != 0) {
opc = OST_WRITE;
opc = OST_READ;
req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
}
- if (req == NULL)
- RETURN(-ENOMEM);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
+ llcrypt_has_encryption_key(inode)) {
+ struct page **pa = NULL;
+
+#ifdef CONFIG_LL_ENCRYPTION
+ OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
+ if (pa == NULL) {
+ ptlrpc_request_free(req);
+ RETURN(-ENOMEM);
+ }
+
+ rc = obd_pool_get_pages_array(pa, page_count);
+ if (rc) {
+ CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n",
+ rc);
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+#endif
+
+ for (i = 0; i < page_count; i++) {
+ struct brw_page *brwpg = pga[i];
+ struct page *data_page = NULL;
+ bool retried = false;
+ bool lockedbymyself;
+ u32 nunits =
+ (brwpg->bp_off & ~PAGE_MASK) + brwpg->bp_count;
+ struct address_space *map_orig = NULL;
+ pgoff_t index_orig;
+
+retry_encrypt:
+ nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
+ /* The page can already be locked when we arrive here.
+ * This is possible when cl_page_assume/vvp_page_assume
+ * is stuck on wait_on_page_writeback with page lock
+ * held. In this case there is no risk for the lock to
+ * be released while we are doing our encryption
+ * processing, because writeback against that page will
+ * end in vvp_page_completion_write/cl_page_completion,
+ * which means only once the page is fully processed.
+ */
+ lockedbymyself = trylock_page(brwpg->bp_page);
+ if (directio) {
+ map_orig = brwpg->bp_page->mapping;
+ brwpg->bp_page->mapping = inode->i_mapping;
+ index_orig = brwpg->bp_page->index;
+ clpage = oap2cl_page(brw_page2oap(brwpg));
+ brwpg->bp_page->index = clpage->cp_page_index;
+ }
+ data_page =
+ osc_encrypt_pagecache_blocks(brwpg->bp_page,
+ pa ? pa[i] : NULL,
+ nunits, 0,
+ GFP_NOFS);
+ if (directio) {
+ brwpg->bp_page->mapping = map_orig;
+ brwpg->bp_page->index = index_orig;
+ }
+ if (lockedbymyself)
+ unlock_page(brwpg->bp_page);
+ if (IS_ERR(data_page)) {
+ rc = PTR_ERR(data_page);
+ if (rc == -ENOMEM && !retried) {
+ retried = true;
+ rc = 0;
+ goto retry_encrypt;
+ }
+ if (pa) {
+ obd_pool_put_pages_array(pa + i,
+ page_count - i);
+ OBD_FREE_PTR_ARRAY_LARGE(pa,
+ page_count);
+ }
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+ /* Set PageChecked flag on bounce page for
+ * disambiguation in osc_release_bounce_pages().
+ */
+ SetPageChecked(data_page);
+ brwpg->bp_page = data_page;
+ /* there should be no gap in the middle of page array */
+ if (i == page_count - 1) {
+ struct osc_async_page *oap =
+ brw_page2oap(brwpg);
+
+ oa->o_size = oap->oap_count +
+ oap->oap_obj_off + oap->oap_page_off;
+ }
+ /* len is forced to nunits, and relative offset to 0
+ * so store the old, clear text info
+ */
+ brwpg->bp_count_diff = nunits - brwpg->bp_count;
+ brwpg->bp_count = nunits;
+ brwpg->bp_off_diff = brwpg->bp_off & ~PAGE_MASK;
+ brwpg->bp_off = brwpg->bp_off & PAGE_MASK;
+ }
+
+ if (pa)
+ OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
+ } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
+ struct osc_async_page *oap = brw_page2oap(pga[0]);
+ struct cl_page *clpage = oap2cl_page(oap);
+ struct cl_object *clobj = clpage->cp_obj;
+ struct cl_attr attr = { 0 };
+ struct lu_env *env;
+ __u16 refcheck;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env)) {
+ rc = PTR_ERR(env);
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ cl_object_attr_lock(clobj);
+ rc = cl_object_attr_get(env, clobj, &attr);
+ cl_object_attr_unlock(clobj);
+ cl_env_put(env, &refcheck);
+ if (rc != 0) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+ if (attr.cat_size)
+ oa->o_size = attr.cat_size;
+ } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) &&
+ llcrypt_has_encryption_key(inode)) {
+ for (i = 0; i < page_count; i++) {
+ struct brw_page *pg = pga[i];
+ u32 nunits = (pg->bp_off & ~PAGE_MASK) + pg->bp_count;
+
+ nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
+ /* count/off are forced to cover the whole encryption
+ * unit size so that all encrypted data is stored on the
+ * OST, so adjust bp_{count,off}_diff for the size of
+ * the clear text.
+ */
+ pg->bp_count_diff = nunits - pg->bp_count;
+ pg->bp_count = nunits;
+ pg->bp_off_diff = pg->bp_off & ~PAGE_MASK;
+ pg->bp_off = pg->bp_off & PAGE_MASK;
+ }
+ }
+
+ for (niocount = i = 1; i < page_count; i++) {
+ if (!can_merge_pages(pga[i - 1], pga[i]))
+ niocount++;
+ }
- for (niocount = i = 1; i < page_count; i++) {
- if (!can_merge_pages(pga[i - 1], pga[i]))
- niocount++;
- }
+ pill = &req->rq_pill;
+ req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT, sizeof(*ioobj));
+ req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
+ niocount * sizeof(*niobuf));
- pill = &req->rq_pill;
- req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
- sizeof(*ioobj));
- req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
- niocount * sizeof(*niobuf));
+ for (i = 0; i < page_count; i++) {
+ short_io_size += pga[i]->bp_count;
+ if (!inode || !IS_ENCRYPTED(inode) ||
+ !llcrypt_has_encryption_key(inode)) {
+ pga[i]->bp_count_diff = 0;
+ pga[i]->bp_off_diff = 0;
+ }
+ }
- for (i = 0; i < page_count; i++)
- short_io_size += pga[i]->count;
+ if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) {
+ enable_checksum = false;
+ short_io_size = 0;
+ gpu = 1;
+ }
/* Check if read/write is small enough to be a short io. */
if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
!imp_connect_shortio(cli->cl_import))
short_io_size = 0;
+ /* If this is an empty RPC to old server, just ignore it */
+ if (!short_io_size && !pga[0]->bp_page) {
+ ptlrpc_request_free(req);
+ RETURN(-ENODATA);
+ }
+
req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
opc == OST_READ ? 0 : short_io_size);
if (opc == OST_READ)
req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
short_io_size);
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
osc_set_io_portal(req);
ptlrpc_at_set_req_timeout(req);
/* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
- * retry logic */
+ * retry logic
+ */
req->rq_no_retry_einprogress = 1;
if (short_io_size != 0) {
desc = ptlrpc_prep_bulk_imp(req, page_count,
cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
(opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
- PTLRPC_BULK_PUT_SINK) |
- PTLRPC_BULK_BUF_KIOV,
+ PTLRPC_BULK_PUT_SINK),
OST_BULK_PORTAL,
&ptlrpc_bulk_kiov_pin_ops);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
- /* NB request now owns desc and will free it when it gets freed */
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
+ /* NB request now owns desc and will free it when it gets freed */
+ desc->bd_is_rdma = gpu;
+ if (directio && foffset)
+ desc->bd_md_offset = interop_pages(foffset, page_count, pga);
+
no_bulk:
- body = req_capsule_client_get(pill, &RMF_OST_BODY);
- ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
- niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
- LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
+ body = req_capsule_client_get(pill, &RMF_OST_BODY);
+ ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
+ niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
+ LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
* oa contains valid o_uid and o_gid in these two operations.
* Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
* OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
- * other process logic */
+ * other process logic
+ */
body->oa.o_uid = oa->o_uid;
body->oa.o_gid = oa->o_gid;
- obdo_to_ioobj(oa, ioobj);
- ioobj->ioo_bufcnt = niocount;
- /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
- * that might be send for this request. The actual number is decided
- * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
- * "max - 1" for old client compatibility sending "0", and also so the
- * the actual maximum is a power-of-two number, not one less. LU-1431 */
- if (desc != NULL)
- ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
- else /* short io */
- ioobj_max_brw_set(ioobj, 0);
+ if (inode && IS_ENCRYPTED(inode) &&
+ llcrypt_has_encryption_key(inode) &&
+ !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_ENCFLAG)) {
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+ body->oa.o_valid |= OBD_MD_FLFLAGS;
+ body->oa.o_flags = 0;
+ }
+ body->oa.o_flags |= LUSTRE_ENCRYPT_FL;
+ }
if (short_io_size != 0) {
if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
LASSERT(page_count > 0);
pg_prev = pga[0];
- for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
- struct brw_page *pg = pga[i];
- int poff = pg->off & ~PAGE_MASK;
+ for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
+ struct brw_page *pg = pga[i];
+ int poff = pg->bp_off & ~PAGE_MASK;
- LASSERT(pg->count > 0);
- /* make sure there is no gap in the middle of page array */
+ LASSERT(pg->bp_count > 0);
+ /* make sure there is no gap in the middle of page array */
LASSERTF(page_count == 1 ||
- (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
+ (ergo(i == 0, poff + pg->bp_count == PAGE_SIZE) &&
ergo(i > 0 && i < page_count - 1,
- poff == 0 && pg->count == PAGE_SIZE) &&
+ poff == 0 && pg->bp_count == PAGE_SIZE) &&
ergo(i == page_count - 1, poff == 0)),
- "i: %d/%d pg: %p off: %llu, count: %u\n",
- i, page_count, pg, pg->off, pg->count);
- LASSERTF(i == 0 || pg->off > pg_prev->off,
- "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
- " prev_pg %p [pri %lu ind %lu] off %llu\n",
- i, page_count,
- pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
- pg_prev->pg, page_private(pg_prev->pg),
- pg_prev->pg->index, pg_prev->off);
- LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
- (pg->flag & OBD_BRW_SRVLOCK));
+ "i: %d/%d pg: %px off: %llu, count: %u\n",
+ i, page_count, pg, pg->bp_off, pg->bp_count);
+ LASSERTF(i == 0 || pg->bp_off > pg_prev->bp_off,
+ "i %d p_c %u pg %px [pri %lu ind %lu] off %llu prev_pg %px [pri %lu ind %lu] off %llu\n",
+ i, page_count,
+ pg->bp_page, page_private(pg->bp_page),
+ pg->bp_page->index, pg->bp_off,
+ pg_prev->bp_page, page_private(pg_prev->bp_page),
+ pg_prev->bp_page->index, pg_prev->bp_off);
+ LASSERT((pga[0]->bp_flag & OBD_BRW_SRVLOCK) ==
+ (pg->bp_flag & OBD_BRW_SRVLOCK));
if (short_io_size != 0 && opc == OST_WRITE) {
- unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
+ unsigned char *ptr = kmap_atomic(pg->bp_page);
- LASSERT(short_io_size >= requested_nob + pg->count);
+ LASSERT(short_io_size >= requested_nob + pg->bp_count);
memcpy(short_io_buf + requested_nob,
ptr + poff,
- pg->count);
- ll_kunmap_atomic(ptr, KM_USER0);
+ pg->bp_count);
+ kunmap_atomic(ptr);
} else if (short_io_size == 0) {
- desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
- pg->count);
+ desc->bd_frag_ops->add_kiov_frag(desc, pg->bp_page,
+ poff, pg->bp_count);
+ }
+ requested_nob += pg->bp_count;
+
+ if (i > 0 && can_merge_pages(pg_prev, pg)) {
+ niobuf--;
+ niobuf->rnb_len += pg->bp_count;
+ } else {
+ niobuf->rnb_offset = pg->bp_off;
+ niobuf->rnb_len = pg->bp_count;
+ niobuf->rnb_flags = pg->bp_flag;
+ }
+ pg_prev = pg;
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MARK_COMPRESSED))
+ niobuf->rnb_flags |= OBD_BRW_COMPRESSED;
+ }
+
+ obdo_to_ioobj(oa, ioobj);
+ ioobj->ioo_bufcnt = niocount;
+
+ /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
+ * that might be send for this request. The actual number is decided
+ * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
+ * "max - 1" for old client compatibility sending "0", and also so the
+ * actual maximum is a power-of-two number, not one less. LU-1431
+ *
+ * The low bits are reserved for md flags used for interopability, Ex:
+ * - OBD_IOOBJ_INTEROP_PAGE_ALIGNMENT
+ */
+ if (desc)
+ ioobj_max_brw_set(ioobj, desc->bd_md_max_brw,
+ desc->bd_md_offset);
+ else
+ ioobj_max_brw_set(ioobj, 0, 0); /* short io */
+
+ LASSERTF((void *)(niobuf - niocount) ==
+ req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
+ "want %px - real %px\n",
+ req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
+ (void *)(niobuf - niocount));
+
+ osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
+ if (resend) {
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+ body->oa.o_valid |= OBD_MD_FLFLAGS;
+ body->oa.o_flags = 0;
}
- requested_nob += pg->count;
+ body->oa.o_flags |= OBD_FL_RECOV_RESEND;
+ }
- if (i > 0 && can_merge_pages(pg_prev, pg)) {
- niobuf--;
- niobuf->rnb_len += pg->count;
- } else {
- niobuf->rnb_offset = pg->off;
- niobuf->rnb_len = pg->count;
- niobuf->rnb_flags = pg->flag;
- }
- pg_prev = pg;
- }
-
- LASSERTF((void *)(niobuf - niocount) ==
- req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
- "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
- &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
-
- osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
- if (resend) {
- if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
- body->oa.o_valid |= OBD_MD_FLFLAGS;
- body->oa.o_flags = 0;
- }
- body->oa.o_flags |= OBD_FL_RECOV_RESEND;
- }
-
- if (osc_should_shrink_grant(cli))
- osc_shrink_grant_local(cli, &body->oa);
-
- /* size[REQ_REC_OFF] still sizeof (*body) */
- if (opc == OST_WRITE) {
- if (cli->cl_checksum &&
- !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
- /* store cl_cksum_type in a local variable since
- * it can be changed via lprocfs */
+ if (osc_should_shrink_grant(cli))
+ osc_shrink_grant_local(cli, &body->oa);
+
+ if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
+ enable_checksum = false;
+
+ /* size[REQ_REC_OFF] still sizeof (*body) */
+ if (opc == OST_WRITE) {
+ if (enable_checksum) {
+ /* store cl_cksum_type in a local variable since
+ * it can be changed via lprocfs
+ */
enum cksum_types cksum_type = cli->cl_cksum_type;
- if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
- body->oa.o_flags = 0;
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
+ body->oa.o_flags = 0;
body->oa.o_flags |= obd_cksum_type_pack(obd_name,
cksum_type);
- body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+ body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
rc = osc_checksum_bulk_rw(obd_name, cksum_type,
requested_nob, page_count,
pga, OST_WRITE,
- &body->oa.o_cksum);
+ &body->oa.o_cksum, resend);
if (rc < 0) {
- CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
+ CDEBUG(D_PAGE, "failed to checksum: rc = %d\n",
rc);
GOTO(out, rc);
}
- CDEBUG(D_PAGE, "checksum at write origin: %x\n",
- body->oa.o_cksum);
+ CDEBUG(D_PAGE | (resend ? D_HA : 0),
+ "checksum at write origin: %x (%x)\n",
+ body->oa.o_cksum, cksum_type);
- /* save this in 'oa', too, for later checking */
- oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+ /* save this in 'oa', too, for later checking */
+ oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
oa->o_flags |= obd_cksum_type_pack(obd_name,
cksum_type);
- } else {
- /* clear out the checksum flag, in case this is a
- * resend but cl_checksum is no longer set. b=11238 */
- oa->o_valid &= ~OBD_MD_FLCKSUM;
- }
- oa->o_cksum = body->oa.o_cksum;
- /* 1 RC per niobuf */
- req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
- sizeof(__u32) * niocount);
- } else {
- if (cli->cl_checksum &&
- !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
- if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
- body->oa.o_flags = 0;
+ } else {
+ /* clear out the checksum flag, in case this is a
+ * resend but cl_checksum is no longer set. b=11238
+ */
+ oa->o_valid &= ~OBD_MD_FLCKSUM;
+ }
+ oa->o_cksum = body->oa.o_cksum;
+ /* 1 RC per niobuf */
+ req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
+ sizeof(__u32) * niocount);
+ } else {
+ if (enable_checksum) {
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
+ body->oa.o_flags = 0;
body->oa.o_flags |= obd_cksum_type_pack(obd_name,
cli->cl_cksum_type);
- body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+ body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
}
/* Client cksum has been already copied to wire obdo in previous
* lustre_set_wire_obdo(), and in the case a bulk-read is being
* resent due to cksum error, this will allow Server to
- * check+dump pages on its side */
+ * check+dump pages on its side
+ */
}
ptlrpc_request_set_replen(req);
CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
- RETURN(0);
+ RETURN(0);
- out:
- ptlrpc_req_finished(req);
- RETURN(rc);
+out:
+ ptlrpc_req_finished(req);
+ RETURN(rc);
}
char dbgcksum_file_name[PATH_MAX];
__u32 client_cksum)
{
struct file *filp;
- int rc, i;
unsigned int len;
+ int rc, i;
char *buf;
/* will only keep dump of pages on first error for the same range in
- * file/fid, not during the resends/retries. */
+ * file/fid, not during the resends/retries.
+ */
snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
"%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
- (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
- libcfs_debug_file_path_arr :
- LIBCFS_DEBUG_FILE_PATH_DEFAULT),
+ (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
+ libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
- pga[0]->off,
- pga[page_count-1]->off + pga[page_count-1]->count - 1,
+ pga[0]->bp_off,
+ pga[page_count-1]->bp_off + pga[page_count-1]->bp_count - 1,
client_cksum, server_cksum);
+ CWARN("%s: dumping checksum data\n", dbgcksum_file_name);
filp = filp_open(dbgcksum_file_name,
O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
if (IS_ERR(filp)) {
rc = PTR_ERR(filp);
if (rc == -EEXIST)
- CDEBUG(D_INFO, "%s: can't open to dump pages with "
- "checksum error: rc = %d\n", dbgcksum_file_name,
- rc);
+ CDEBUG(D_INFO,
+ "%s: can't open to dump pages with checksum error: rc = %d\n",
+ dbgcksum_file_name, rc);
else
- CERROR("%s: can't open to dump pages with checksum "
- "error: rc = %d\n", dbgcksum_file_name, rc);
+ CERROR("%s: can't open to dump pages with checksum error: rc = %d\n",
+ dbgcksum_file_name, rc);
return;
}
for (i = 0; i < page_count; i++) {
- len = pga[i]->count;
- buf = kmap(pga[i]->pg);
+ len = pga[i]->bp_count;
+ buf = kmap(pga[i]->bp_page);
while (len != 0) {
rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
if (rc < 0) {
- CERROR("%s: wanted to write %u but got %d "
- "error\n", dbgcksum_file_name, len, rc);
+ CERROR("%s: wanted to write %u but got error: rc = %d\n",
+ dbgcksum_file_name, len, rc);
break;
}
len -= rc;
buf += rc;
- CDEBUG(D_INFO, "%s: wrote %d bytes\n",
- dbgcksum_file_name, rc);
}
- kunmap(pga[i]->pg);
+ kunmap(pga[i]->bp_page);
}
rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
if (rc)
CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
filp_close(filp, NULL);
- return;
+
+ libcfs_debug_dumplog();
}
static int
-check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
+check_write_checksum(struct obdo *oa, const struct lnet_processid *peer,
__u32 client_cksum, __u32 server_cksum,
struct osc_brw_async_args *aa)
{
char *msg;
int rc;
- if (server_cksum == client_cksum) {
- CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
- return 0;
- }
+ if (server_cksum == client_cksum) {
+ CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
+ return 0;
+ }
if (aa->aa_cli->cl_checksum_dump)
dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
aa->aa_page_count, aa->aa_ppga,
OST_WRITE, fn, sector_size,
- &new_cksum);
+ &new_cksum, true);
else
rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
aa->aa_ppga, OST_WRITE, cksum_type,
if (rc < 0)
msg = "failed to calculate the client write checksum";
else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
- msg = "the server did not use the checksum type specified in "
- "the original request - likely a protocol problem";
- else if (new_cksum == server_cksum)
- msg = "changed on the client after we checksummed it - "
- "likely false positive due to mmap IO (bug 11742)";
- else if (new_cksum == client_cksum)
- msg = "changed in transit before arrival at OST";
- else
- msg = "changed in transit AND doesn't match the original - "
- "likely false positive due to mmap IO (bug 11742)";
-
- LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
- DFID " object "DOSTID" extent [%llu-%llu], original "
- "client csum %x (type %x), server csum %x (type %x),"
- " client csum now %x\n",
- obd_name, msg, libcfs_nid2str(peer->nid),
- oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
- oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
- oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
- POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
- aa->aa_ppga[aa->aa_page_count - 1]->off +
- aa->aa_ppga[aa->aa_page_count-1]->count - 1,
- client_cksum,
- obd_cksum_type_unpack(aa->aa_oa->o_flags),
- server_cksum, cksum_type, new_cksum);
+ msg = "the server did not use the checksum type specified in the original request - likely a protocol problem";
+ else if (new_cksum == server_cksum)
+ msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)";
+ else if (new_cksum == client_cksum)
+ msg = "changed in transit before arrival at OST";
+ else
+ msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)";
+
+ LCONSOLE_ERROR("%s: BAD WRITE CHECKSUM: %s: from %s inode " DFID " object " DOSTID " extent [%llu-%llu], original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
+ obd_name, msg, libcfs_nidstr(&peer->nid),
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
+ POSTID(&oa->o_oi), aa->aa_ppga[0]->bp_off,
+ aa->aa_ppga[aa->aa_page_count - 1]->bp_off +
+ aa->aa_ppga[aa->aa_page_count-1]->bp_count - 1,
+ client_cksum,
+ obd_cksum_type_unpack(aa->aa_oa->o_flags),
+ server_cksum, cksum_type, new_cksum);
return 1;
}
struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
struct client_obd *cli = aa->aa_cli;
const char *obd_name = cli->cl_import->imp_obd->obd_name;
- const struct lnet_process_id *peer =
+ const struct lnet_processid *peer =
&req->rq_import->imp_connection->c_peer;
struct ost_body *body;
u32 client_cksum = 0;
+ struct inode *inode = NULL;
+ unsigned int blockbits = 0, blocksize = 0;
+ struct cl_page *clpage;
ENTRY;
/* set/clear over quota flag for a uid/gid/projid */
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
- unsigned qid[LL_MAXQUOTAS] = {
+ unsigned int qid[LL_MAXQUOTAS] = {
body->oa.o_uid, body->oa.o_gid,
body->oa.o_projid };
CDEBUG(D_QUOTA,
"setdq for [%u %u %u] with valid %#llx, flags %x\n",
body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
body->oa.o_valid, body->oa.o_flags);
- osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
- body->oa.o_flags);
+ osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
+ body->oa.o_flags);
}
osc_update_grant(cli, body);
nob = rc;
while (nob > 0 && pg_count > 0) {
unsigned char *ptr;
- int count = aa->aa_ppga[i]->count > nob ?
- nob : aa->aa_ppga[i]->count;
+ int count = aa->aa_ppga[i]->bp_count > nob ?
+ nob : aa->aa_ppga[i]->bp_count;
CDEBUG(D_CACHE, "page %p count %d\n",
- aa->aa_ppga[i]->pg, count);
- ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
- memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
+ aa->aa_ppga[i]->bp_page, count);
+ ptr = kmap_atomic(aa->aa_ppga[i]->bp_page);
+ memcpy(ptr + (aa->aa_ppga[i]->bp_off & ~PAGE_MASK), buf,
count);
- ll_kunmap_atomic((void *) ptr, KM_USER0);
+ kunmap_atomic((void *) ptr);
buf += count;
nob -= count;
}
}
- if (rc < aa->aa_requested_nob)
- handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
+ if (rc < aa->aa_requested_nob)
+ handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
- if (body->oa.o_valid & OBD_MD_FLCKSUM) {
- static int cksum_counter;
- u32 server_cksum = body->oa.o_cksum;
- char *via = "";
- char *router = "";
+ if (body->oa.o_valid & OBD_MD_FLCKSUM) {
+ static int cksum_counter;
+ u32 server_cksum = body->oa.o_cksum;
+ int nob = rc;
+ char *via = "";
+ char *router = "";
enum cksum_types cksum_type;
u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
body->oa.o_flags : 0;
cksum_type = obd_cksum_type_unpack(o_flags);
- rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
+ rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob,
aa->aa_page_count, aa->aa_ppga,
- OST_READ, &client_cksum);
+ OST_READ, &client_cksum, false);
if (rc < 0)
GOTO(out, rc);
if (req->rq_bulk != NULL &&
- peer->nid != req->rq_bulk->bd_sender) {
+ !nid_same(&peer->nid, &req->rq_bulk->bd_sender)) {
via = " via ";
- router = libcfs_nid2str(req->rq_bulk->bd_sender);
+ router = libcfs_nidstr(&req->rq_bulk->bd_sender);
}
if (server_cksum != client_cksum) {
struct ost_body *clbody;
+ __u32 client_cksum2;
u32 page_count = aa->aa_page_count;
+ osc_checksum_bulk_rw(obd_name, cksum_type, nob,
+ page_count, aa->aa_ppga,
+ OST_READ, &client_cksum2, true);
clbody = req_capsule_client_get(&req->rq_pill,
&RMF_OST_BODY);
if (cli->cl_checksum_dump)
aa->aa_ppga, server_cksum,
client_cksum);
- LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
- "%s%s%s inode "DFID" object "DOSTID
- " extent [%llu-%llu], client %x, "
- "server %x, cksum_type %x\n",
- obd_name,
- libcfs_nid2str(peer->nid),
- via, router,
- clbody->oa.o_valid & OBD_MD_FLFID ?
- clbody->oa.o_parent_seq : 0ULL,
- clbody->oa.o_valid & OBD_MD_FLFID ?
- clbody->oa.o_parent_oid : 0,
- clbody->oa.o_valid & OBD_MD_FLFID ?
- clbody->oa.o_parent_ver : 0,
- POSTID(&body->oa.o_oi),
- aa->aa_ppga[0]->off,
- aa->aa_ppga[page_count-1]->off +
- aa->aa_ppga[page_count-1]->count - 1,
- client_cksum, server_cksum,
- cksum_type);
+ LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu], client %x/%x, server %x, cksum_type %x\n",
+ obd_name, libcfs_nidstr(&peer->nid),
+ via, router,
+ clbody->oa.o_valid & OBD_MD_FLFID ?
+ clbody->oa.o_parent_seq : 0ULL,
+ clbody->oa.o_valid & OBD_MD_FLFID ?
+ clbody->oa.o_parent_oid : 0,
+ clbody->oa.o_valid & OBD_MD_FLFID ?
+ clbody->oa.o_parent_ver : 0,
+ POSTID(&body->oa.o_oi),
+ aa->aa_ppga[0]->bp_off,
+ aa->aa_ppga[page_count-1]->bp_off +
+ aa->aa_ppga[page_count-1]->bp_count - 1,
+ client_cksum, client_cksum2,
+ server_cksum, cksum_type);
cksum_counter = 0;
aa->aa_oa->o_cksum = client_cksum;
rc = -EAGAIN;
if ((cksum_missed & (-cksum_missed)) == cksum_missed)
CERROR("%s: checksum %u requested from %s but not sent\n",
obd_name, cksum_missed,
- libcfs_nid2str(peer->nid));
+ libcfs_nidstr(&peer->nid));
} else {
rc = 0;
}
+
+ /* get the inode from the first cl_page */
+ clpage = oap2cl_page(brw_page2oap(aa->aa_ppga[0]));
+ inode = clpage->cp_inode;
+ if (clpage->cp_type == CPT_TRANSIENT && inode) {
+ blockbits = inode->i_blkbits;
+ blocksize = 1 << blockbits;
+ }
+ if (inode && IS_ENCRYPTED(inode)) {
+ int idx;
+
+ if (!llcrypt_has_encryption_key(inode)) {
+ CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
+ GOTO(out, rc);
+ }
+ for (idx = 0; idx < aa->aa_page_count; idx++) {
+ struct brw_page *brwpg = aa->aa_ppga[idx];
+ unsigned int offs = 0;
+
+ while (offs < PAGE_SIZE) {
+ /* do not decrypt if page is all 0s */
+ if (memchr_inv(page_address(brwpg->bp_page) +
+ offs, 0,
+ LUSTRE_ENCRYPTION_UNIT_SIZE) ==
+ NULL) {
+ /* if page is empty forward info to
+ * upper layers (ll_io_zero_page) by
+ * clearing PagePrivate2
+ */
+ if (!offs)
+ ClearPagePrivate2(brwpg->bp_page);
+ break;
+ }
+
+ if (blockbits) {
+ /* This is direct IO case. Directly call
+ * decrypt function that takes inode as
+ * input parameter. Page does not need
+ * to be locked.
+ */
+ u64 lblk_num;
+ unsigned int i;
+
+ clpage =
+ oap2cl_page(brw_page2oap(brwpg));
+ lblk_num =
+ ((u64)(clpage->cp_page_index) <<
+ (PAGE_SHIFT - blockbits)) +
+ (offs >> blockbits);
+ for (i = offs; i < offs +
+ LUSTRE_ENCRYPTION_UNIT_SIZE;
+ i += blocksize, lblk_num++) {
+ rc =
+ llcrypt_decrypt_block_inplace(
+ inode, brwpg->bp_page,
+ blocksize, i,
+ lblk_num);
+ if (rc)
+ break;
+ }
+ } else {
+ rc = llcrypt_decrypt_pagecache_blocks(
+ brwpg->bp_page,
+ LUSTRE_ENCRYPTION_UNIT_SIZE,
+ offs);
+ }
+ if (rc)
+ GOTO(out, rc);
+
+ offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
+ }
+ }
+ }
+
out:
if (rc >= 0)
lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
{
struct ptlrpc_request *new_req;
struct osc_brw_async_args *new_aa;
- struct osc_async_page *oap;
- ENTRY;
- /* The below message is checked in replay-ost-single.sh test_8ae*/
+ ENTRY;
+ /* The below message is checked in replay-ost-single.sh test_8ae */
DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
"redo for recoverable error %d", rc);
OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
aa->aa_cli, aa->aa_oa, aa->aa_page_count,
aa->aa_ppga, &new_req, 1);
- if (rc)
- RETURN(rc);
-
- list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
- if (oap->oap_request != NULL) {
- LASSERTF(request == oap->oap_request,
- "request %p != oap_request %p\n",
- request, oap->oap_request);
- if (oap->oap_interrupted) {
- ptlrpc_req_finished(new_req);
- RETURN(-EINTR);
- }
- }
- }
- /*
- * New request takes over pga and oaps from old request.
+ if (rc)
+ RETURN(rc);
+
+
+ LASSERTF(request == aa->aa_request,
+ "request %p != aa_request %p\n",
+ request, aa->aa_request);
+ /* New request takes over pga and oaps from old request.
* Note that copying a list_head doesn't work, need to move it...
*/
aa->aa_resends++;
new_req->rq_async_args = request->rq_async_args;
new_req->rq_commit_cb = request->rq_commit_cb;
/* cap resend delay to the current request timeout, this is similar to
- * what ptlrpc does (see after_reply()) */
+ * what ptlrpc does (see after_reply())
+ */
if (aa->aa_resends > new_req->rq_timeout)
- new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
+ new_req->rq_sent = ktime_get_real_seconds() +
+ new_req->rq_timeout;
else
new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
- new_req->rq_generation_set = 1;
- new_req->rq_import_generation = request->rq_import_generation;
+ new_req->rq_generation_set = 1;
+ new_req->rq_import_generation = request->rq_import_generation;
new_aa = ptlrpc_req_async_args(new_aa, new_req);
list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
new_aa->aa_resends = aa->aa_resends;
- list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
- if (oap->oap_request) {
- ptlrpc_req_finished(oap->oap_request);
- oap->oap_request = ptlrpc_request_addref(new_req);
- }
- }
+ if (aa->aa_request) {
+ ptlrpc_req_put(aa->aa_request);
+ new_aa->aa_request = ptlrpc_request_addref(new_req);
+ }
/* XXX: This code will run into problem if we're going to support
* to add a series of BRW RPCs into a self-defined ptlrpc_request_set
* and wait for all of them to be finished. We should inherit request
- * set from old request. */
+ * set from old request.
+ */
ptlrpcd_add_req(new_req);
DEBUG_REQ(D_INFO, new_req, "new request");
RETURN(0);
}
-/*
- * ugh, we want disk allocation on the target to happen in offset order. we'll
+/* ugh, we want disk allocation on the target to happen in offset order. we'll
* follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
* fine for our small page arrays and doesn't require allocation. its an
* insertion sort that swaps elements that are strides apart, shrinking the
*/
static void sort_brw_pages(struct brw_page **array, int num)
{
- int stride, i, j;
- struct brw_page *tmp;
-
- if (num == 1)
- return;
- for (stride = 1; stride < num ; stride = (stride * 3) + 1)
- ;
+ int stride, i, j;
+ struct brw_page *tmp;
- do {
- stride /= 3;
- for (i = stride ; i < num ; i++) {
- tmp = array[i];
- j = i;
- while (j >= stride && array[j - stride]->off > tmp->off) {
- array[j] = array[j - stride];
- j -= stride;
- }
- array[j] = tmp;
- }
- } while (stride > 1);
+ if (num == 1)
+ return;
+ for (stride = 1; stride < num ; stride = (stride * 3) + 1)
+ ;
+
+ do {
+ stride /= 3;
+ for (i = stride ; i < num ; i++) {
+ tmp = array[i];
+ j = i;
+ while (j >= stride &&
+ array[j - stride]->bp_off > tmp->bp_off) {
+ array[j] = array[j - stride];
+ j -= stride;
+ }
+ array[j] = tmp;
+ }
+ } while (stride > 1);
}
static void osc_release_ppga(struct brw_page **ppga, size_t count)
{
- LASSERT(ppga != NULL);
- OBD_FREE(ppga, sizeof(*ppga) * count);
+ LASSERT(ppga != NULL);
+ OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
+}
+
+/* this is trying to propogate async writeback errors back up to the
+ * application. As an async write fails we record the error code for later if
+ * the app does an fsync. As long as errors persist we force future rpcs to be
+ * sync so that the app can get a sync error and break the cycle of queueing
+ * pages for which writeback will fail.
+ */
+static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
+ int rc)
+{
+ if (rc) {
+ if (!ar->ar_rc)
+ ar->ar_rc = rc;
+
+ ar->ar_force_sync = 1;
+ ar->ar_min_xid = ptlrpc_sample_next_xid();
+ return;
+
+ }
+
+ if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
+ ar->ar_force_sync = 0;
}
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *args, int rc)
{
struct osc_brw_async_args *aa = args;
- struct osc_extent *ext;
- struct osc_extent *tmp;
struct client_obd *cli = aa->aa_cli;
unsigned long transferred = 0;
+ struct cl_object *obj = NULL;
+ struct osc_async_page *last;
+ struct osc_extent *ext;
+ struct osc_extent *tmp;
+ struct lov_oinfo *loi;
ENTRY;
+ ext = list_first_entry(&aa->aa_exts, struct osc_extent, oe_link);
+
rc = osc_brw_fini_request(req, rc);
CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+
+ /* restore clear text pages */
+ osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
+
/*
* When server returns -EINPROGRESS, client should always retry
* regardless of the number of times the bulk was resent already.
if (osc_recoverable_error(rc) && !req->rq_no_delay) {
if (req->rq_import_generation !=
req->rq_import->imp_generation) {
- CDEBUG(D_HA, "%s: resend cross eviction for object: "
- ""DOSTID", rc = %d.\n",
+ CDEBUG(D_HA,
+ "%s: resend cross eviction for object: "DOSTID": rc = %d.\n",
req->rq_import->imp_obd->obd_name,
POSTID(&aa->aa_oa->o_oi), rc);
} else if (rc == -EINPROGRESS ||
- client_should_resend(aa->aa_resends, aa->aa_cli)) {
+ client_should_resend(aa->aa_resends, aa->aa_cli)) {
rc = osc_brw_redo_request(req, aa, rc);
} else {
- CERROR("%s: too many resent retries for object: "
- "%llu:%llu, rc = %d.\n",
+ CERROR("%s: too many resent retries for object: %llu:%llu: rc = %d\n",
req->rq_import->imp_obd->obd_name,
POSTID(&aa->aa_oa->o_oi), rc);
}
rc = -EIO;
}
+ last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
+ obj = osc2cl(ext->oe_obj);
+ loi = cl2osc(obj)->oo_oinfo;
+
if (rc == 0) {
struct obdo *oa = aa->aa_oa;
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
unsigned long valid = 0;
- struct cl_object *obj;
- struct osc_async_page *last;
-
- last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
- obj = osc2cl(last->oap_obj);
cl_object_attr_lock(obj);
if (oa->o_valid & OBD_MD_FLBLOCKS) {
}
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
- struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
loff_t last_off = last->oap_count + last->oap_obj_off +
last->oap_page_off;
/* Change file size if this is an out of quota or
- * direct IO write and it extends the file size */
+ * direct IO write and it extends the file size
+ */
if (loi->loi_lvb.lvb_size < last_off) {
attr->cat_size = last_off;
valid |= CAT_SIZE;
OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
aa->aa_oa = NULL;
- if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
+ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) {
osc_inc_unstable_pages(req);
+ /*
+ * If req->rq_committed is set, it means that the dirty pages
+ * have already committed into the stable storage on OSTs
+ * (i.e. Direct I/O).
+ */
+ if (!req->rq_committed)
+ cl_object_dirty_for_sync(env, cl_object_top(obj));
+ }
+
+ if (aa->aa_request) {
+ __u64 xid = ptlrpc_req_xid(req);
+ ptlrpc_req_put(req);
+ if (xid && lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
+ spin_lock(&cli->cl_loi_list_lock);
+ osc_process_ar(&cli->cl_ar, xid, rc);
+ osc_process_ar(&loi->loi_ar, xid, rc);
+ spin_unlock(&cli->cl_loi_list_lock);
+ }
+ }
list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
list_del_init(&ext->oe_link);
osc_extent_finish(env, ext, 1,
- rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
+ rc && req->rq_no_delay ? -EAGAIN : rc);
}
LASSERT(list_empty(&aa->aa_exts));
LASSERT(list_empty(&aa->aa_oaps));
spin_lock(&cli->cl_loi_list_lock);
/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
* is called so we know whether to go to sync BRWs or wait for more
- * RPCs to complete */
+ * RPCs to complete
+ */
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
cli->cl_w_in_flight--;
else
/* If osc_inc_unstable_pages (via osc_extent_finish) races with
* this called via the rq_commit_cb, I need to ensure
* osc_dec_unstable_pages is still called. Otherwise unstable
- * pages may be leaked. */
+ * pages may be leaked.
+ */
spin_lock(&req->rq_lock);
if (likely(req->rq_unstable)) {
req->rq_unstable = 0;
int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
struct list_head *ext_list, int cmd)
{
- struct ptlrpc_request *req = NULL;
- struct osc_extent *ext;
- struct brw_page **pga = NULL;
- struct osc_brw_async_args *aa = NULL;
- struct obdo *oa = NULL;
- struct osc_async_page *oap;
- struct osc_object *obj = NULL;
- struct cl_req_attr *crattr = NULL;
- loff_t starting_offset = OBD_OBJECT_EOF;
- loff_t ending_offset = 0;
- int mpflag = 0;
- int mem_tight = 0;
- int page_count = 0;
- bool soft_sync = false;
- bool interrupted = false;
- bool ndelay = false;
- int i;
- int grant = 0;
- int rc;
- __u32 layout_version = 0;
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
- struct ost_body *body;
+ struct ptlrpc_request *req = NULL;
+ struct osc_extent *ext;
+ struct brw_page **pga = NULL;
+ struct osc_brw_async_args *aa = NULL;
+ struct obdo *oa = NULL;
+ struct osc_async_page *oap;
+ struct osc_object *obj = NULL;
+ struct cl_req_attr *crattr = NULL;
+ loff_t starting_offset = OBD_OBJECT_EOF;
+ loff_t ending_offset = 0;
+ /* '1' for consistency with code that checks !mpflag to restore */
+ int mpflag = 1;
+ int mem_tight = 0;
+ int page_count = 0;
+ bool soft_sync = false;
+ bool ndelay = false;
+ int grant = 0;
+ int i, rc;
+ __u32 layout_version = 0;
+ LIST_HEAD(rpc_list);
+ struct ost_body *body;
+
ENTRY;
LASSERT(!list_empty(ext_list));
/* add pages into rpc_list to build BRW rpc */
list_for_each_entry(ext, ext_list, oe_link) {
+ struct cl_sub_dio *sdio = ext->oe_csd;
+
LASSERT(ext->oe_state == OES_RPC);
mem_tight |= ext->oe_memalloc;
grant += ext->oe_grants;
page_count += ext->oe_nr_pages;
- layout_version = MAX(layout_version, ext->oe_layout_version);
+ layout_version = max(layout_version, ext->oe_layout_version);
if (obj == NULL)
obj = ext->oe_obj;
+
+ /* for unaligned writes, we do the data copying here */
+ if (sdio && sdio->csd_unaligned && sdio->csd_write) {
+ rc = ll_dio_user_copy(sdio);
+ if (rc < 0)
+ GOTO(out, rc);
+ /* dio_user_copy has some concurrency handling in it,
+ * so we add this assert to ensure it did its job...
+ */
+ LASSERT(sdio->csd_write_copied);
+ }
}
soft_sync = osc_over_unstable_soft_limit(cli);
if (mem_tight)
- mpflag = cfs_memory_pressure_get_and_set();
+ mpflag = memalloc_noreclaim_save();
- OBD_ALLOC(pga, sizeof(*pga) * page_count);
+ OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
if (pga == NULL)
GOTO(out, rc = -ENOMEM);
if (soft_sync)
oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
pga[i] = &oap->oap_brw_page;
- pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
+ pga[i]->bp_off = oap->oap_obj_off + oap->oap_page_off;
i++;
list_add_tail(&oap->oap_rpc_item, &rpc_list);
if (starting_offset == OBD_OBJECT_EOF ||
- starting_offset > oap->oap_obj_off)
+ starting_offset > oap->oap_obj_off) {
starting_offset = oap->oap_obj_off;
- else
+ } else {
+ CDEBUG(D_CACHE, "page i:%d, oap->oap_obj_off %llu, oap->oap_page_off %u\n",
+ i, oap->oap_obj_off, oap->oap_page_off);
LASSERT(oap->oap_page_off == 0);
- if (ending_offset < oap->oap_obj_off + oap->oap_count)
+ }
+ if (ending_offset < oap->oap_obj_off + oap->oap_count) {
ending_offset = oap->oap_obj_off +
oap->oap_count;
- else
+ } else {
LASSERT(oap->oap_page_off + oap->oap_count ==
PAGE_SIZE);
- if (oap->oap_interrupted)
- interrupted = true;
+ }
}
if (ext->oe_ndelay)
ndelay = true;
}
/* first page in the list */
- oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
+ oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
crattr = &osc_env_info(env)->oti_req_attr;
memset(crattr, 0, sizeof(*crattr));
sort_brw_pages(pga, page_count);
rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
if (rc != 0) {
- CERROR("prep_req failed: %d\n", rc);
+ CERROR("%s: prep_req failed: rc = %d\n",
+ cli->cl_import->imp_obd->obd_name, rc);
GOTO(out, rc);
}
req->rq_commit_cb = brw_commit;
req->rq_interpret_reply = brw_interpret;
req->rq_memalloc = mem_tight != 0;
- oap->oap_request = ptlrpc_request_addref(req);
- if (interrupted && !req->rq_intr)
- ptlrpc_mark_interrupted(req);
if (ndelay) {
req->rq_no_resend = req->rq_no_delay = 1;
/* probably set a shorter timeout value.
- * to handle ETIMEDOUT in brw_interpret() correctly. */
- /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
+ * to handle ETIMEDOUT in brw_interpret() correctly.
+ * lustre_msg_set_timeout(req, req->rq_timeout / 2);
+ */
}
/* Need to update the timestamps after the request is built in case
* we race with setattr (locally or in queue at OST). If OST gets
* later setattr before earlier BRW (as determined by the request xid),
* the OST will not use BRW timestamps. Sadly, there is no obvious
- * way to do this in a single call. bug 10150 */
+ * way to do this in a single call. bug 10150
+ */
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
crattr->cra_oa = &body->oa;
crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
cl_req_attr_set(env, osc2cl(obj), crattr);
- lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
+ lustre_msg_set_jobinfo(req->rq_reqmsg, &crattr->cra_jobinfo);
aa = ptlrpc_req_async_args(aa, req);
INIT_LIST_HEAD(&aa->aa_oaps);
list_splice_init(&rpc_list, &aa->aa_oaps);
INIT_LIST_HEAD(&aa->aa_exts);
list_splice_init(ext_list, &aa->aa_exts);
+ aa->aa_request = ptlrpc_request_addref(req);
spin_lock(&cli->cl_loi_list_lock);
starting_offset >>= PAGE_SHIFT;
+ ending_offset >>= PAGE_SHIFT;
if (cmd == OBD_BRW_READ) {
cli->cl_r_in_flight++;
lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
spin_unlock(&cli->cl_loi_list_lock);
DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
- page_count, aa, cli->cl_r_in_flight,
- cli->cl_w_in_flight);
- OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
+ page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
+ if (libcfs_debug & D_IOTRACE) {
+ struct lu_fid fid;
+
+ fid.f_seq = crattr->cra_oa->o_parent_seq;
+ fid.f_oid = crattr->cra_oa->o_parent_oid;
+ fid.f_ver = crattr->cra_oa->o_parent_ver;
+ CDEBUG(D_IOTRACE,
+ DFID": %d %s pages, start %lld, end %lld, now %ur/%uw in flight\n",
+ PFID(&fid), page_count,
+ cmd == OBD_BRW_READ ? "read" : "write", starting_offset,
+ ending_offset, cli->cl_r_in_flight, cli->cl_w_in_flight);
+ }
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
ptlrpcd_add_req(req);
rc = 0;
EXIT;
out:
- if (mem_tight != 0)
- cfs_memory_pressure_restore(mpflag);
+ if (mem_tight)
+ memalloc_noreclaim_restore(mpflag);
if (rc != 0) {
LASSERT(req == NULL);
if (oa)
OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
- if (pga)
- OBD_FREE(pga, sizeof(*pga) * page_count);
+ if (pga) {
+ osc_release_bounce_pages(pga, page_count);
+ osc_release_ppga(pga, page_count);
+ }
/* this should happen rarely and is pretty bad, it makes the
- * pending list not follow the dirty order */
- while (!list_empty(ext_list)) {
- ext = list_entry(ext_list->next, struct osc_extent,
- oe_link);
+ * pending list not follow the dirty order
+ */
+ while ((ext = list_first_entry_or_null(ext_list,
+ struct osc_extent,
+ oe_link)) != NULL) {
list_del_init(&ext->oe_link);
osc_extent_finish(env, ext, 0, rc);
}
RETURN(rc);
}
+/* This is to refresh our lock in face of no RPCs. */
+void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
+{
+ struct ptlrpc_request *req;
+ struct obdo oa;
+ struct brw_page bpg = { .bp_off = start, .bp_count = 1};
+ struct brw_page *pga = &bpg;
+ int rc;
+
+ memset(&oa, 0, sizeof(oa));
+ oa.o_oi = osc->oo_oinfo->loi_oi;
+ oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
+ /* For updated servers - don't do a read */
+ oa.o_flags = OBD_FL_NORPC;
+
+ rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
+ &req, 0);
+
+ /* If we succeeded we ship it off, if not there's no point in doing
+ * anything. Also no resends.
+ * No interpret callback, no commit callback.
+ */
+ if (!rc) {
+ req->rq_no_resend = 1;
+ ptlrpcd_add_req(req);
+ }
+}
+
static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
{
- int set = 0;
+ int set = 0;
- LASSERT(lock != NULL);
+ LASSERT(lock != NULL);
- lock_res_and_lock(lock);
+ lock_res_and_lock(lock);
if (lock->l_ast_data == NULL)
lock->l_ast_data = data;
return set;
}
-int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
- void *cookie, struct lustre_handle *lockh,
- enum ldlm_mode mode, __u64 *flags, bool speculative,
- int errcode)
+static int osc_enqueue_fini(struct ptlrpc_request *req,
+ osc_enqueue_upcall_f upcall,
+ void *cookie, struct lustre_handle *lockh,
+ enum ldlm_mode mode, __u64 *flags,
+ bool speculative, int errcode)
{
bool intent = *flags & LDLM_FL_HAS_INTENT;
int rc;
- ENTRY;
+ ENTRY;
/* The request was created before ldlm_cli_enqueue call. */
if (intent && errcode == ELDLM_LOCK_ABORTED) {
struct ldlm_reply *rep;
*flags |= LDLM_FL_LVB_READY;
}
- /* Call the update callback. */
+ /* Call the update callback. */
rc = (*upcall)(cookie, lockh, errcode);
/* release the reference taken in ldlm_cli_enqueue() */
RETURN(rc);
}
-int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
- void *args, int rc)
+static int osc_enqueue_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *args, int rc)
{
struct osc_enqueue_args *aa = args;
struct ldlm_lock *lock;
struct ost_lvb *lvb = aa->oa_lvb;
__u32 lvb_len = sizeof(*lvb);
__u64 flags = 0;
+ struct ldlm_enqueue_info einfo = {
+ .ei_type = aa->oa_type,
+ .ei_mode = mode,
+ };
ENTRY;
- /* ldlm_cli_enqueue is holding a reference on the lock, so it must
- * be valid. */
+ /* ldlm_cli_enqueue holds a reference on the lock, it must be valid. */
lock = ldlm_handle2lock(lockh);
LASSERTF(lock != NULL,
- "lockh %#llx, req %p, aa %p - client evicted?\n",
+ "lockh %#llx, req %px, aa %px - client evicted?\n",
lockh->cookie, req, aa);
/* Take an additional reference so that a blocking AST that
* ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
* to arrive after an upcall has been executed by
- * osc_enqueue_fini(). */
+ * osc_enqueue_fini().
+ */
ldlm_lock_addref(lockh, mode);
/* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
/* Let CP AST to grant the lock first. */
- OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
if (aa->oa_speculative) {
LASSERT(aa->oa_lvb == NULL);
}
/* Complete obtaining the lock procedure. */
- rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
- aa->oa_mode, aa->oa_flags, lvb, lvb_len,
- lockh, rc);
+ rc = ldlm_cli_enqueue_fini(aa->oa_exp, &req->rq_pill, &einfo, 1,
+ aa->oa_flags, lvb, lvb_len, lockh, rc,
+ false);
/* Complete osc stuff. */
rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
aa->oa_flags, aa->oa_speculative, rc);
- OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
ldlm_lock_decref(lockh, mode);
- LDLM_LOCK_PUT(lock);
+ ldlm_lock_put(lock);
RETURN(rc);
}
-struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
-
/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
* from the 2nd OSC before a lock from the 1st one. This does not deadlock with
* other synchronous requests, however keeping some locks and trying to obtain
* others may take a considerable amount of time in a case of ost failure; and
* when other sync requests do not get released lock from a client, the client
* is evicted from the cluster -- such scenarious make the life difficult, so
- * release locks just after they are obtained. */
+ * release locks just after they are obtained.
+ */
int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
__u64 *flags, union ldlm_policy_data *policy,
- struct ost_lvb *lvb, int kms_valid,
- osc_enqueue_upcall_f upcall, void *cookie,
- struct ldlm_enqueue_info *einfo,
+ struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
+ void *cookie, struct ldlm_enqueue_info *einfo,
struct ptlrpc_request_set *rqset, int async,
bool speculative)
{
struct lustre_handle lockh = { 0 };
struct ptlrpc_request *req = NULL;
int intent = *flags & LDLM_FL_HAS_INTENT;
- __u64 match_flags = *flags;
+ __u64 search_flags = *flags;
+ __u64 match_flags = 0;
enum ldlm_mode mode;
int rc;
- ENTRY;
- /* Filesystem lock extents are extended to page boundaries so that
- * dealing with the page cache is a little smoother. */
+ ENTRY;
+ /* Filesystem lock extents are extended to page boundaries so that
+ * dealing with the page cache is a little smoother.
+ */
policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
policy->l_extent.end |= ~PAGE_MASK;
- /*
- * kms is not valid when either object is completely fresh (so that no
- * locks are cached), or object was evicted. In the latter case cached
- * lock cannot be used, because it would prime inode state with
- * potentially stale LVB.
+ /* Next, search for already existing extent locks that will cover us
+ *
+ * If we're trying to read, we also search for an existing PW lock. The
+ * VFS and page cache already protect us locally, so lots of readers/
+ * writers can share a single PW lock.
+ *
+ * There are problems with conversion deadlocks, so instead of
+ * converting a read lock to a write lock, we'll just enqueue a new
+ * one.
+ *
+ * At some point we should cancel the read lock instead of making them
+ * send us a blocking callback, but there are problems with canceling
+ * locks out from other users right now, too.
*/
- if (!kms_valid)
- goto no_match;
-
- /* Next, search for already existing extent locks that will cover us */
- /* If we're trying to read, we also search for an existing PW lock. The
- * VFS and page cache already protect us locally, so lots of readers/
- * writers can share a single PW lock.
- *
- * There are problems with conversion deadlocks, so instead of
- * converting a read lock to a write lock, we'll just enqueue a new
- * one.
- *
- * At some point we should cancel the read lock instead of making them
- * send us a blocking callback, but there are problems with canceling
- * locks out from other users right now, too. */
- mode = einfo->ei_mode;
- if (einfo->ei_mode == LCK_PR)
- mode |= LCK_PW;
+ mode = einfo->ei_mode;
+ if (einfo->ei_mode == LCK_PR)
+ mode |= LCK_PW;
/* Normal lock requests must wait for the LVB to be ready before
* matching a lock; speculative lock requests do not need to,
- * because they will not actually use the lock. */
+ * because they will not actually use the lock.
+ */
if (!speculative)
- match_flags |= LDLM_FL_LVB_READY;
+ search_flags |= LDLM_FL_LVB_READY;
if (intent != 0)
- match_flags |= LDLM_FL_BLOCK_GRANTED;
- mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
- einfo->ei_type, policy, mode, &lockh, 0);
+ search_flags |= LDLM_FL_BLOCK_GRANTED;
+ if (mode == LCK_GROUP)
+ match_flags = LDLM_MATCH_GROUP;
+ mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+ res_id, einfo->ei_type, policy, mode,
+ &lockh, match_flags);
if (mode) {
struct ldlm_lock *matched;
/* This DLM lock request is speculative, and does not
* have an associated IO request. Therefore if there
* is already a DLM lock, it wll just inform the
- * caller to cancel the request for this stripe.*/
+ * caller to cancel the request for this stripe.
+ */
lock_res_and_lock(matched);
if (ldlm_extent_equal(&policy->l_extent,
&matched->l_policy_data.l_extent))
unlock_res_and_lock(matched);
ldlm_lock_decref(&lockh, mode);
- LDLM_LOCK_PUT(matched);
+ ldlm_lock_put(matched);
RETURN(rc);
} else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
*flags |= LDLM_FL_LVB_READY;
(*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
ldlm_lock_decref(&lockh, mode);
- LDLM_LOCK_PUT(matched);
+ ldlm_lock_put(matched);
RETURN(ELDLM_OK);
} else {
ldlm_lock_decref(&lockh, mode);
- LDLM_LOCK_PUT(matched);
+ ldlm_lock_put(matched);
}
}
-no_match:
if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
RETURN(-ENOLCK);
- if (intent) {
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_LDLM_ENQUEUE_LVB);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
- sizeof *lvb);
- ptlrpc_request_set_replen(req);
- }
-
- /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
- *flags &= ~LDLM_FL_BLOCK_GRANTED;
+ /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
+ *flags &= ~LDLM_FL_BLOCK_GRANTED;
- rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
+ rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
sizeof(*lvb), LVB_T_OST, &lockh, async);
if (async) {
if (!rc) {
struct osc_enqueue_args *aa;
+
aa = ptlrpc_req_async_args(aa, req);
aa->oa_exp = exp;
aa->oa_mode = einfo->ei_mode;
} else {
/* speculative locks are essentially to enqueue
* a DLM lock in advance, so we don't care
- * about the result of the enqueue. */
+ * about the result of the enqueue.
+ */
aa->oa_lvb = NULL;
aa->oa_flags = NULL;
}
req->rq_interpret_reply = osc_enqueue_interpret;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
- } else if (intent) {
- ptlrpc_req_finished(req);
+ ptlrpc_set_add_req(rqset, req);
}
RETURN(rc);
}
rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
flags, speculative, rc);
- if (intent)
- ptlrpc_req_finished(req);
RETURN(rc);
}
-int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
- enum ldlm_type type, union ldlm_policy_data *policy,
- enum ldlm_mode mode, __u64 *flags, void *data,
- struct lustre_handle *lockh, int unref)
+int osc_match_base(const struct lu_env *env, struct obd_export *exp,
+ struct ldlm_res_id *res_id, enum ldlm_type type,
+ union ldlm_policy_data *policy, enum ldlm_mode mode,
+ __u64 *flags, struct osc_object *obj,
+ struct lustre_handle *lockh,
+ enum ldlm_match_flags match_flags)
{
struct obd_device *obd = exp->exp_obd;
__u64 lflags = *flags;
enum ldlm_mode rc;
- ENTRY;
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
+ ENTRY;
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
RETURN(-EIO);
/* Filesystem lock extents are extended to page boundaries so that
- * dealing with the page cache is a little smoother */
+ * dealing with the page cache is a little smoother
+ */
policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
policy->l_extent.end |= ~PAGE_MASK;
- /* Next, search for already existing extent locks that will cover us */
- /* If we're trying to read, we also search for an existing PW lock. The
- * VFS and page cache already protect us locally, so lots of readers/
- * writers can share a single PW lock. */
- rc = mode;
- if (mode == LCK_PR)
- rc |= LCK_PW;
- rc = ldlm_lock_match(obd->obd_namespace, lflags,
- res_id, type, policy, rc, lockh, unref);
+ /* Next, search for already existing extent locks that will cover us */
+ rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
+ res_id, type, policy, mode, lockh,
+ match_flags);
if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
RETURN(rc);
- if (data != NULL) {
+ if (obj != NULL) {
struct ldlm_lock *lock = ldlm_handle2lock(lockh);
LASSERT(lock != NULL);
- if (!osc_set_lock_data(lock, data)) {
+ if (osc_set_lock_data(lock, obj)) {
+ lock_res_and_lock(lock);
+ if (!ldlm_is_lvb_cached(lock)) {
+ LASSERT(lock->l_ast_data == obj);
+ osc_lock_lvb_update(env, obj, lock, NULL);
+ ldlm_set_lvb_cached(lock);
+ }
+ unlock_res_and_lock(lock);
+ } else {
ldlm_lock_decref(lockh, rc);
rc = 0;
}
- LDLM_LOCK_PUT(lock);
+ ldlm_lock_put(lock);
}
RETURN(rc);
}
msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
if (msfs == NULL)
- GOTO(out, rc = -EPROTO);
+ GOTO(out, rc = -EPROTO);
*aa->aa_oi->oi_osfs = *msfs;
out:
static int osc_statfs_async(struct obd_export *exp,
struct obd_info *oinfo, time64_t max_age,
- struct ptlrpc_request_set *rqset)
+ struct ptlrpc_request_set *rqset)
{
- struct obd_device *obd = class_exp2obd(exp);
- struct ptlrpc_request *req;
- struct osc_async_args *aa;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct ptlrpc_request *req;
+ struct osc_async_args *aa;
int rc;
- ENTRY;
+ ENTRY;
if (obd->obd_osfs_age >= max_age) {
CDEBUG(D_SUPER,
"%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
RETURN(0);
}
- /* We could possibly pass max_age in the request (as an absolute
- * timestamp or a "seconds.usec ago") so the target can avoid doing
- * extra calls into the filesystem if that isn't necessary (e.g.
- * during mount that would help a bit). Having relative timestamps
- * is not so great if request processing is slow, while absolute
- * timestamps are not ideal because they need time synchronization. */
- req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
+ /* We could possibly pass max_age in the request (as an absolute
+ * timestamp or a "seconds.usec ago") so the target can avoid doing
+ * extra calls into the filesystem if that isn't necessary (e.g.
+ * during mount that would help a bit). Having relative timestamps
+ * is not so great if request processing is slow, while absolute
+ * timestamps are not ideal because they need time synchronization.
+ */
+ req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
ptlrpc_request_set_replen(req);
req->rq_request_portal = OST_CREATE_PORTAL;
ptlrpc_at_set_req_timeout(req);
static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
struct obd_statfs *osfs, time64_t max_age, __u32 flags)
{
- struct obd_device *obd = class_exp2obd(exp);
- struct obd_statfs *msfs;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct obd_statfs *msfs;
struct ptlrpc_request *req;
- struct obd_import *imp = NULL;
+ struct obd_import *imp, *imp0;
int rc;
- ENTRY;
-
- /*Since the request might also come from lprocfs, so we need
- *sync this with client_disconnect_export Bug15684*/
- down_read(&obd->u.cli.cl_sem);
- if (obd->u.cli.cl_import)
- imp = class_import_get(obd->u.cli.cl_import);
- up_read(&obd->u.cli.cl_sem);
- if (!imp)
- RETURN(-ENODEV);
+ ENTRY;
+ /* Since the request might also come from lprocfs, so we need
+ * sync this with client_disconnect_export Bug15684
+ */
+ with_imp_locked(obd, imp0, rc)
+ imp = class_import_get(imp0);
+ if (rc)
+ RETURN(rc);
/* We could possibly pass max_age in the request (as an absolute
* timestamp or a "seconds.usec ago") so the target can avoid doing
* extra calls into the filesystem if that isn't necessary (e.g.
* during mount that would help a bit). Having relative timestamps
* is not so great if request processing is slow, while absolute
- * timestamps are not ideal because they need time synchronization. */
+ * timestamps are not ideal because they need time synchronization.
+ */
req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
class_import_put(imp);
EXIT;
out:
- ptlrpc_req_finished(req);
+ ptlrpc_req_put(req);
return rc;
}
void *karg, void __user *uarg)
{
struct obd_device *obd = exp->exp_obd;
- struct obd_ioctl_data *data = karg;
- int rc = 0;
+ struct obd_ioctl_data *data;
+ int rc;
ENTRY;
+ CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
+ obd->obd_name, cmd, len, karg, uarg);
+
if (!try_module_get(THIS_MODULE)) {
CERROR("%s: cannot get module '%s'\n", obd->obd_name,
module_name(THIS_MODULE));
- return -EINVAL;
+ RETURN(-EINVAL);
}
+
switch (cmd) {
case OBD_IOC_CLIENT_RECOVER:
+ if (unlikely(karg == NULL)) {
+ OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
+ rc = -EINVAL);
+ break;
+ }
+ data = karg;
rc = ptlrpc_recover_import(obd->u.cli.cl_import,
data->ioc_inlbuf1, 0);
if (rc > 0)
rc = 0;
break;
- case IOC_OSC_SET_ACTIVE:
+ case OBD_IOC_GETATTR:
+ if (unlikely(karg == NULL)) {
+ OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
+ rc = -EINVAL);
+ break;
+ }
+ data = karg;
+ rc = obd_getattr(NULL, exp, &data->ioc_obdo1);
+ break;
+#ifdef IOC_OSC_SET_ACTIVE
+ case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17);
+#endif
+ case OBD_IOC_SET_ACTIVE:
+ if (unlikely(karg == NULL)) {
+ OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
+ rc = -EINVAL);
+ break;
+ }
+ data = karg;
rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
data->ioc_offset);
break;
default:
- rc = -ENOTTY;
- CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
- obd->obd_name, cmd, current_comm(), rc);
+ rc = OBD_IOC_DEBUG(D_IOCTL, obd->obd_name, cmd, "unrecognized",
+ -ENOTTY);
break;
}
u32 keylen, void *key, u32 vallen, void *val,
struct ptlrpc_request_set *set)
{
- struct ptlrpc_request *req;
- struct obd_device *obd = exp->exp_obd;
- struct obd_import *imp = class_exp2cliimp(exp);
- char *tmp;
- int rc;
- ENTRY;
+ struct ptlrpc_request *req;
+ struct obd_device *obd = exp->exp_obd;
+ struct obd_import *imp = class_exp2cliimp(exp);
+ char *tmp;
+ int rc;
- OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
+ ENTRY;
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
- if (KEY_IS(KEY_CHECKSUM)) {
- if (vallen != sizeof(int))
- RETURN(-EINVAL);
- exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
- RETURN(0);
- }
+ if (KEY_IS(KEY_CHECKSUM)) {
+ if (vallen != sizeof(int))
+ RETURN(-EINVAL);
+ exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
+ RETURN(0);
+ }
- if (KEY_IS(KEY_SPTLRPC_CONF)) {
- sptlrpc_conf_client_adapt(obd);
- RETURN(0);
- }
+ if (KEY_IS(KEY_SPTLRPC_CONF)) {
+ sptlrpc_conf_client_adapt(obd);
+ RETURN(0);
+ }
- if (KEY_IS(KEY_FLUSH_CTX)) {
- sptlrpc_import_flush_my_ctx(imp);
- RETURN(0);
- }
+ if (KEY_IS(KEY_FLUSH_CTX)) {
+ sptlrpc_import_flush_my_ctx(imp);
+ RETURN(0);
+ }
if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
struct client_obd *cli = &obd->u.cli;
RETURN(0);
}
- if (!set && !KEY_IS(KEY_GRANT_SHRINK))
- RETURN(-EINVAL);
+ if (KEY_IS(KEY_UNEVICT_CACHE_SHRINK)) {
+ struct client_obd *cli = &obd->u.cli;
+ long ret;
+
+ ret = osc_unevict_cache_shrink(env, cli);
+ if (ret > 0)
+ ret = 0;
+
+ /*
+ * Clear unused cache pages and move mlock()ed pages from
+ * the normal LRU list into unevictable LRU list.
+ */
+ ret = osc_lru_shrink(env, cli,
+ atomic_long_read(&cli->cl_lru_in_list),
+ true);
+ if (ret > 0)
+ ret = 0;
+
+ RETURN(ret);
+ }
+
+ if (!set && !KEY_IS(KEY_GRANT_SHRINK))
+ RETURN(-EINVAL);
- /* We pass all other commands directly to OST. Since nobody calls osc
- methods directly and everybody is supposed to go through LOV, we
- assume lov checked invalid values for us.
- The only recognised values so far are evict_by_nid and mds_conn.
- Even if something bad goes through, we'd get a -EINVAL from OST
- anyway. */
+ /*
+ * We pass all other commands directly to OST. Since nobody calls osc
+ * methods directly and everybody is supposed to go through LOV, we
+ * assume lov checked invalid values for us.
+ * The only recognised values so far are evict_by_nid and mds_conn.
+ * Even if something bad goes through, we'd get a -EINVAL from OST
+ * anyway.
+ */
req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
&RQF_OST_SET_GRANT_INFO :
aa = ptlrpc_req_async_args(aa, req);
OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
if (!oa) {
- ptlrpc_req_finished(req);
+ ptlrpc_req_put(req);
RETURN(-ENOMEM);
}
*oa = ((struct ost_body *)val)->oa;
cli->cl_lost_grant = 0;
spin_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
- " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
- data->ocd_version, data->ocd_grant, lost_grant);
+ CDEBUG(D_RPCTRACE,
+ "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
+ data->ocd_connect_flags, data->ocd_version,
+ data->ocd_grant, lost_grant);
}
RETURN(0);
struct ldlm_resource *res = cfs_hash_object(hs, hnode);
struct ldlm_lock *lock;
struct osc_object *osc = NULL;
- ENTRY;
+ ENTRY;
lock_res(res);
list_for_each_entry(lock, &res->lr_granted, l_res_link) {
if (lock->l_ast_data != NULL && osc == NULL) {
/* clear LDLM_FL_CLEANED flag to make sure it will be canceled
* by the 2nd round of ldlm_namespace_clean() call in
- * osc_import_event(). */
+ * osc_import_event().
+ */
ldlm_clear_cleaned(lock);
}
unlock_res(res);
}
EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
-static int osc_import_event(struct obd_device *obd,
- struct obd_import *imp,
- enum obd_import_event event)
+static int osc_import_event(struct obd_device *obd, struct obd_import *imp,
+ enum obd_import_event event)
{
- struct client_obd *cli;
- int rc = 0;
-
- ENTRY;
- LASSERT(imp->imp_obd == obd);
+ struct client_obd *cli;
+ int rc = 0;
- switch (event) {
- case IMP_EVENT_DISCON: {
- cli = &obd->u.cli;
+ ENTRY;
+ if (WARN_ON_ONCE(!obd || !imp || imp->imp_obd != obd))
+ RETURN(-ENODEV);
+
+ switch (event) {
+ case IMP_EVENT_DISCON: {
+ cli = &obd->u.cli;
+ if (!cli)
+ RETURN(-ENODEV);
spin_lock(&cli->cl_loi_list_lock);
cli->cl_avail_grant = 0;
cli->cl_lost_grant = 0;
spin_unlock(&cli->cl_loi_list_lock);
- break;
- }
- case IMP_EVENT_INACTIVE: {
+ break;
+ }
+ case IMP_EVENT_INACTIVE: {
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
- break;
- }
- case IMP_EVENT_INVALIDATE: {
- struct ldlm_namespace *ns = obd->obd_namespace;
- struct lu_env *env;
- __u16 refcheck;
+ break;
+ }
+ case IMP_EVENT_INVALIDATE: {
+ struct ldlm_namespace *ns = obd->obd_namespace;
+ struct lu_env *env;
+ __u16 refcheck;
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
- env = cl_env_get(&refcheck);
- if (!IS_ERR(env)) {
+ env = cl_env_get(&refcheck);
+ if (!IS_ERR(env)) {
osc_io_unplug(env, &obd->u.cli, NULL);
cfs_hash_for_each_nolock(ns->ns_rs_hash,
cl_env_put(env, &refcheck);
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
- } else
- rc = PTR_ERR(env);
- break;
- }
- case IMP_EVENT_ACTIVE: {
+ } else {
+ rc = PTR_ERR(env);
+ }
+ break;
+ }
+ case IMP_EVENT_ACTIVE: {
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
- break;
- }
- case IMP_EVENT_OCD: {
- struct obd_connect_data *ocd = &imp->imp_connect_data;
+ break;
+ }
+ case IMP_EVENT_OCD: {
+ struct obd_connect_data *ocd = &imp->imp_connect_data;
- if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
- osc_init_grant(&obd->u.cli, ocd);
+ if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
+ osc_init_grant(&obd->u.cli, ocd);
- /* See bug 7198 */
- if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
- imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
+ /* See bug 7198 */
+ if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
+ imp->imp_client->cli_request_portal =
+ OST_REQUEST_PORTAL;
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
- break;
- }
- case IMP_EVENT_DEACTIVATE: {
+ break;
+ }
+ case IMP_EVENT_DEACTIVATE: {
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
- break;
- }
- case IMP_EVENT_ACTIVATE: {
+ break;
+ }
+ case IMP_EVENT_ACTIVATE: {
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
- break;
- }
- default:
- CERROR("Unknown import event %d\n", event);
- LBUG();
- }
- RETURN(rc);
+ break;
+ }
+ default:
+ CERROR("%s: Unknown import event %d: rc = %d\n",
+ obd->obd_name, event, -EINVAL);
+ LBUG();
+ }
+ RETURN(rc);
}
/**
GOTO(out_ptlrpcd_work, rc);
cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+ cli->cl_root_squash = 0;
osc_update_next_shrink(cli);
RETURN(rc);
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
struct client_obd *cli = &obd->u.cli;
- int adding;
- int added;
- int req_count;
- int rc;
+ int adding;
+ int added;
+ int req_count;
+ int rc;
ENTRY;
int osc_precleanup_common(struct obd_device *obd)
{
struct client_obd *cli = &obd->u.cli;
- ENTRY;
+ ENTRY;
/* LU-464
* for echo client, export may be on zombie list, wait for
* zombie thread to cull it, because cli.cl_import will be
/* lru cleanup */
if (cli->cl_cache != NULL) {
- LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
+ LASSERT(refcount_read(&cli->cl_cache->ccc_users) > 0);
spin_lock(&cli->cl_cache->ccc_lru_lock);
list_del_init(&cli->cl_lru_osc);
spin_unlock(&cli->cl_cache->ccc_lru_lock);
}
EXPORT_SYMBOL(osc_cleanup_common);
-static struct obd_ops osc_obd_ops = {
- .o_owner = THIS_MODULE,
- .o_setup = osc_setup,
- .o_precleanup = osc_precleanup,
+static const struct obd_ops osc_obd_ops = {
+ .o_owner = THIS_MODULE,
+ .o_setup = osc_setup,
+ .o_precleanup = osc_precleanup,
.o_cleanup = osc_cleanup_common,
- .o_add_conn = client_import_add_conn,
- .o_del_conn = client_import_del_conn,
+ .o_add_conn = client_import_add_conn,
+ .o_del_conn = client_import_del_conn,
.o_connect = client_connect_import,
- .o_reconnect = osc_reconnect,
- .o_disconnect = osc_disconnect,
- .o_statfs = osc_statfs,
- .o_statfs_async = osc_statfs_async,
- .o_create = osc_create,
- .o_destroy = osc_destroy,
- .o_getattr = osc_getattr,
- .o_setattr = osc_setattr,
- .o_iocontrol = osc_iocontrol,
- .o_set_info_async = osc_set_info_async,
- .o_import_event = osc_import_event,
- .o_quotactl = osc_quotactl,
+ .o_reconnect = osc_reconnect,
+ .o_disconnect = osc_disconnect,
+ .o_statfs = osc_statfs,
+ .o_statfs_async = osc_statfs_async,
+ .o_create = osc_create,
+ .o_destroy = osc_destroy,
+ .o_getattr = osc_getattr,
+ .o_setattr = osc_setattr,
+ .o_iocontrol = osc_iocontrol,
+ .o_set_info_async = osc_set_info_async,
+ .o_import_event = osc_import_event,
+ .o_quotactl = osc_quotactl,
};
-static struct shrinker *osc_cache_shrinker;
-struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
+LIST_HEAD(osc_shrink_list);
DEFINE_SPINLOCK(osc_shrink_lock);
+bool osc_page_cache_shrink_enabled = true;
-#ifndef HAVE_SHRINKER_COUNT
-static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
+#ifdef HAVE_SHRINKER_COUNT
+static struct ll_shrinker_ops osc_cache_sh_ops = {
+ .count_objects = osc_cache_shrink_count,
+ .scan_objects = osc_cache_shrink_scan,
+ .seeks = DEFAULT_SEEKS,
+};
+#else
+static int osc_cache_shrink(struct shrinker *shrinker,
+ struct shrink_control *sc)
{
- struct shrink_control scv = {
- .nr_to_scan = shrink_param(sc, nr_to_scan),
- .gfp_mask = shrink_param(sc, gfp_mask)
- };
-#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
- struct shrinker *shrinker = NULL;
-#endif
+ if (!osc_page_cache_shrink_enabled)
+ return 0;
- (void)osc_cache_shrink_scan(shrinker, &scv);
+ (void)osc_cache_shrink_scan(shrinker, sc);
- return osc_cache_shrink_count(shrinker, &scv);
+ return osc_cache_shrink_count(shrinker, sc);
}
+
+static struct ll_shrinker_ops osc_cache_sh_ops = {
+ .shrink = osc_cache_shrink,
+ .seeks = DEFAULT_SEEKS,
+};
#endif
+static struct shrinker *osc_cache_shrinker;
+
static int __init osc_init(void)
{
unsigned int reqpool_size;
unsigned int reqsize;
int rc;
- DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
- osc_cache_shrink_count, osc_cache_shrink_scan);
- ENTRY;
+ ENTRY;
/* print an address of _any_ initialized kernel symbol from this
* module, to allow debugging with gdb that doesn't support data
- * symbols from modules.*/
+ * symbols from modules.
+ */
CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
- rc = lu_kmem_init(osc_caches);
+ rc = libcfs_setup();
if (rc)
- RETURN(rc);
+ return rc;
- rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
- LUSTRE_OSC_NAME, &osc_device_type);
+ rc = lu_kmem_init(osc_caches);
if (rc)
- GOTO(out_kmem, rc);
+ RETURN(rc);
- osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
+ osc_cache_shrinker = ll_shrinker_create(&osc_cache_sh_ops, 0,
+ "osc_cache");
+ if (IS_ERR(osc_cache_shrinker))
+ GOTO(out_kmem, rc = PTR_ERR(osc_cache_shrinker));
/* This is obviously too much memory, only prevent overflow here */
if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
- GOTO(out_type, rc = -EINVAL);
+ GOTO(out_shrinker, rc = -EINVAL);
reqpool_size = osc_reqpool_mem_max << 20;
ptlrpc_add_rqs_to_pool);
if (osc_rq_pool == NULL)
- GOTO(out_type, rc = -ENOMEM);
+ GOTO(out_shrinker, rc = -ENOMEM);
rc = osc_start_grant_work();
if (rc != 0)
GOTO(out_req_pool, rc);
+ rc = class_register_type(&osc_obd_ops, NULL, true,
+ LUSTRE_OSC_NAME, &osc_device_type);
+ if (rc < 0)
+ GOTO(out_stop_grant, rc);
+
RETURN(rc);
+out_stop_grant:
+ osc_stop_grant_work();
out_req_pool:
ptlrpc_free_rq_pool(osc_rq_pool);
-out_type:
- class_unregister_type(LUSTRE_OSC_NAME);
+out_shrinker:
+ shrinker_free(osc_cache_shrinker);
out_kmem:
lu_kmem_fini(osc_caches);
static void __exit osc_exit(void)
{
- osc_stop_grant_work();
- remove_shrinker(osc_cache_shrinker);
class_unregister_type(LUSTRE_OSC_NAME);
- lu_kmem_fini(osc_caches);
ptlrpc_free_rq_pool(osc_rq_pool);
+ osc_stop_grant_work();
+ shrinker_free(osc_cache_shrinker);
+ lu_kmem_fini(osc_caches);
}
MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");